diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index de55ede19c516..b6075c87bae73 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -56,9 +56,9 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar import org.apache.spark.util.Utils private[spark] class Client( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf) + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) extends Logging { import Client._ @@ -122,8 +122,8 @@ private[spark] class Client( * This uses the YarnClientApplication not available in the Yarn alpha API. */ def createApplicationSubmissionContext( - newApp: YarnClientApplication, - containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { + newApp: YarnClientApplication, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) @@ -190,9 +190,9 @@ private[spark] class Client( * for preparing resources for launching the ApplicationMaster container. Exposed for testing. */ private[yarn] def copyFileToRemote( - destDir: Path, - srcPath: Path, - replication: Short): Path = { + destDir: Path, + srcPath: Path, + replication: Short): Path = { val destFs = destDir.getFileSystem(hadoopConf) val srcFs = srcPath.getFileSystem(hadoopConf) var destPath = srcPath @@ -462,7 +462,7 @@ private[spark] class Client( // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - // Allow users to specify some environment variables. + // Allow users to specify some environment variables. YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. env("SPARK_YARN_USER_ENV") = userEnvs @@ -522,7 +522,7 @@ private[spark] class Client( * This sets up the launch environment, java options, and the command for launching the AM. */ private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { + : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) @@ -661,14 +661,14 @@ private[spark] class Client( val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++ userArgs ++ Seq( - "--executor-memory", args.executorMemory.toString + "m", - "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString) + "--executor-memory", args.executorMemory.toString + "m", + "--executor-cores", args.executorCores.toString, + "--num-executors ", args.numExecutors.toString) // Command for the ApplicationMaster val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" - ) ++ + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" + ) ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -728,9 +728,9 @@ private[spark] class Client( * @return A pair of the yarn application state and the final application state. */ def monitorApplication( - appId: ApplicationId, - returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { + appId: ApplicationId, + returnOnRunning: Boolean = false, + logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) var lastState: YarnApplicationState = null while (true) { @@ -1085,7 +1085,7 @@ object Client extends Logging { val hiveConf = hiveClass.getMethod("getConf").invoke(hive) val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") - val hiveConfGet = (param:String) => Option(hiveConfClass + val hiveConfGet = (param: String) => Option(hiveConfClass .getMethod("get", classOf[java.lang.String]) .invoke(hiveConf, param)) @@ -1107,7 +1107,7 @@ object Client extends Logging { val hive2Token = new Token[DelegationTokenIdentifier]() hive2Token.decodeFromUrlString(tokenStr) - credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token) + credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token) logDebug("Added hive.Server2.delegation.token to conf.") hiveClass.getMethod("closeCurrent").invoke(null) } else { @@ -1152,13 +1152,13 @@ object Client extends Logging { logInfo("Added HBase security token to credentials.") } catch { - case e:java.lang.NoSuchMethodException => + case e: java.lang.NoSuchMethodException => logInfo("HBase Method not found: " + e) - case e:java.lang.ClassNotFoundException => + case e: java.lang.ClassNotFoundException => logDebug("HBase Class not found: " + e) - case e:java.lang.NoClassDefFoundError => + case e: java.lang.NoClassDefFoundError => logDebug("HBase Class not found: " + e) - case e:Exception => + case e: Exception => logError("Exception when obtaining HBase security token: " + e) } }