Skip to content

Commit

Permalink
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
Browse files Browse the repository at this point in the history
Based on apache#5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR.
andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks.

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes apache#5580 from lianhuiwang/SPARK-6869 and squashes the following commits:

66ffa43 [Lianhui Wang] Update Client.scala
c2ad0f9 [Lianhui Wang] Update Client.scala
1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
20402cd [Lianhui Wang] use ZipEntry
9d87c3f [Lianhui Wang] update scala style
e7bd971 [Lianhui Wang] address vanzin's comments
4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt
e6b573b [Lianhui Wang] address vanzin's comments
f11f84a [Lianhui Wang] zip pyspark archives
5192cca [Lianhui Wang] update import path
3b1e4c8 [Lianhui Wang] address tgravescs's comments
9396346 [Lianhui Wang] put zip to make-distribution.sh
0d2baf7 [Lianhui Wang] update import paths
e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit
31e8e06 [Lianhui Wang] update code style
9f31dac [Lianhui Wang] update code and add comments
f72987c [Lianhui Wang] add archives path to PYTHONPATH
  • Loading branch information
lianhuiwang authored and tgravescs committed May 8, 2015
1 parent c2f0821 commit ebff732
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 8 deletions.
21 changes: 21 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@
<skip>true</skip>
</configuration>
</plugin>
<!-- zip pyspark archives to run python application on yarn mode -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
<configuration>
<target>
<delete dir="${basedir}/../python/lib/pyspark.zip"/>
<zip destfile="${basedir}/../python/lib/pyspark.zip">
<fileset dir="${basedir}/../python/" includes="pyspark/**/*"/>
</zip>
</target>
</configuration>
</plugin>
<!-- Use the shade plugin to create a big JAR with all the dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,47 @@ object SparkSubmit {
}
}

// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
if (pyArchivesEnvOpt.isDefined) {
pyArchives = pyArchivesEnvOpt.get
} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
}
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
if (!py4jFile.exists()) {
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
"in yarn mode.")
}
pythonPath += pyArchivesFile.getAbsolutePath()
pythonPath += py4jFile.getAbsolutePath()
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map { localPath=>
val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
new Path(localPath).getName
} else {
localURI.getPath
}
}.mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand Down
37 changes: 35 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,23 +370,56 @@ object Assembly {
object PySparkAssembly {
import sbtassembly.Plugin._
import AssemblyKeys._
import java.util.zip.{ZipOutputStream, ZipEntry}

lazy val settings = Seq(
unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" },
// Use a resource generator to copy all .py files from python/pyspark into a managed directory
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
// list since that will copy unneeded / unwanted files.
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
val src = new File(BuildCommons.sparkHome, "python/pyspark")

val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
zipFile.delete()
zipRecursive(src, zipFile)

val dst = new File(outDir, "pyspark")
if (!dst.isDirectory()) {
require(dst.mkdirs())
}

val src = new File(BuildCommons.sparkHome, "python/pyspark")
copy(src, dst)
}
)

private def zipRecursive(source: File, destZipFile: File) = {
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
addFilesToZipStream("", source, destOutput)
destOutput.flush()
destOutput.close()
}

private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = {
if (source.isDirectory()) {
output.putNextEntry(new ZipEntry(parent + source.getName()))
for (file <- source.listFiles()) {
addFilesToZipStream(parent + source.getName() + File.separator, file, output)
}
} else {
val in = new FileInputStream(source)
output.putNextEntry(new ZipEntry(parent + source.getName()))
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
output.write(buf, 0, n)
}
}
in.close()
}
}

private def copy(src: File, dst: File): Seq[File] = {
src.listFiles().flatMap { f =>
val child = new File(dst, f.getName())
Expand Down
23 changes: 17 additions & 6 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,17 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}

// if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
// that can be passed on to the ApplicationMaster and the executors.
if (sparkConf.contains("spark.submit.pyArchives")) {
var pythonPath = sparkConf.get("spark.submit.pyArchives")
if (env.contains("PYTHONPATH")) {
pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
}
env("PYTHONPATH") = pythonPath
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
}

// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
Expand Down Expand Up @@ -1074,7 +1085,7 @@ object Client extends Logging {
val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")

val hiveConfGet = (param:String) => Option(hiveConfClass
val hiveConfGet = (param: String) => Option(hiveConfClass
.getMethod("get", classOf[java.lang.String])
.invoke(hiveConf, param))

Expand All @@ -1096,7 +1107,7 @@ object Client extends Logging {

val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
logDebug("Added hive.Server2.delegation.token to conf.")
hiveClass.getMethod("closeCurrent").invoke(null)
} else {
Expand Down Expand Up @@ -1141,13 +1152,13 @@ object Client extends Logging {

logInfo("Added HBase security token to credentials.")
} catch {
case e:java.lang.NoSuchMethodException =>
case e: java.lang.NoSuchMethodException =>
logInfo("HBase Method not found: " + e)
case e:java.lang.ClassNotFoundException =>
case e: java.lang.ClassNotFoundException =>
logDebug("HBase Class not found: " + e)
case e:java.lang.NoClassDefFoundError =>
case e: java.lang.NoClassDefFoundError =>
logDebug("HBase Class not found: " + e)
case e:Exception =>
case e: Exception =>
logError("Exception when obtaining HBase security token: " + e)
}
}
Expand Down

0 comments on commit ebff732

Please sign in to comment.