Skip to content

Commit

Permalink
address tgravescs's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Apr 27, 2015
1 parent 9396346 commit 3b1e4c8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 46 deletions.
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,14 @@ object SparkSubmit {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
Utils.zipRecursive(pySrc, pyArchivesFile)
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
}
pythonPath += pyArchivesFile.getAbsolutePath
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
if (!py4jFile.exists()) {
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
"in yarn mode.")
}
pythonPath += Seq(pyLibPath, "pyspark.zip").mkString(File.separator)
pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pyArchives = pythonPath.mkString(",")
Expand Down
35 changes: 0 additions & 35 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.util

import java.io._
import java.util.zip.{ZipOutputStream, ZipEntry}
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
Expand Down Expand Up @@ -1001,40 +1000,6 @@ private[spark] object Utils extends Logging {
!fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())
}

/**
* recursively add files to the zip file
*/
def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = {
if (source.isDirectory()) {
output.putNextEntry(new ZipEntry(parent + source.getName()))
for (file <- source.listFiles()) {
addFilesToZip(parent + source.getName + File.separator, file, output)
}
} else {
val in = new FileInputStream(source)
output.putNextEntry(new ZipEntry(parent + source.getName()))
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
output.write(buf, 0, n)
}
}
in.close()
}
}

/**
* zip source file to dest ZipFile
*/
def zipRecursive(source: File, destZipFile: File) = {
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
addFilesToZip("", source, destOutput)
destOutput.flush()
destOutput.close()
}

/**
* Determines if a directory contains any files newer than cutoff seconds.
*
Expand Down
20 changes: 12 additions & 8 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer

Expand Down Expand Up @@ -326,14 +327,6 @@ private[spark] class Client(
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)

// if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed
// on to the ApplicationMaster and the executors.
if (sparkConf.contains("spark.submit.pyArchives")) {
val archives = sparkConf.get("spark.submit.pyArchives")
env("PYTHONPATH") = archives
sparkConf.setExecutorEnv("PYTHONPATH", archives)
}

// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
val amEnvPrefix = "spark.yarn.appMasterEnv."
sparkConf.getAll
Expand All @@ -349,6 +342,17 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}

// if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
// that can be passed on to the ApplicationMaster and the executors.
if (sparkConf.contains("spark.submit.pyArchives")) {
var pythonPath = sparkConf.get("spark.submit.pyArchives")
if (env.contains("PYTHONPATH")) {
pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
}
env("PYTHONPATH") = pythonPath
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
}

// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
Expand Down

0 comments on commit 3b1e4c8

Please sign in to comment.