Skip to content

Commit

Permalink
add zip pyspark archives in build or sparksubmit
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Apr 26, 2015
1 parent 31e8e06 commit e0179be
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 15 deletions.
40 changes: 34 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
import org.apache.spark.util.{Utils, ChildFirstURLClassLoader, MutableURLClassLoader}

/**
* Whether to submit, kill, or request the status of an application.
Expand Down Expand Up @@ -328,12 +328,40 @@ object SparkSubmit {
}
}

// In yarn mode for a python app, if PYSPARK_ARCHIVES_PATH is in the user environment
// add pyspark archives to files that can be distributed with the job
if (args.isPython && clusterManager == YARN){
sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives =>
args.files = mergeFileLists(args.files, Utils.resolveURIs(archives))
// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) {
pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get
} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
Utils.zipRecursive(pySrc, pyArchivesFile)
}
pythonPath += pyArchivesFile.getAbsolutePath
pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map( localPath=> {
val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
(new Path(localPath)).getName
} else {
localURI.getPath.toString
}
}).mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util

import java.io._
import java.util.zip._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
Expand Down Expand Up @@ -1000,6 +1001,40 @@ private[spark] object Utils extends Logging {
!fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())
}

/**
* recursively add files to the zip file
*/
def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = {
if (source.isDirectory()) {
output.putNextEntry(new ZipEntry(parent + source.getName()))
for (file <- source.listFiles()) {
addFilesToZip(parent + source.getName + File.separator, file, output)
}
} else {
val in = new FileInputStream(source)
output.putNextEntry(new ZipEntry(parent + source.getName()))
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
output.write(buf, 0, n)
}
}
in.close()
}
}

/**
* zip source file to dest ZipFile
*/
def zipRecursive(source: File, destZipFile: File) = {
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
addFilesToZip("", source, destOutput)
destOutput.flush()
destOutput.close()
}

/**
* Determines if a directory contains any files newer than cutoff seconds.
*
Expand Down
12 changes: 10 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,20 @@ object PySparkAssembly {
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
// list since that will copy unneeded / unwanted files.
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
val src = new File(BuildCommons.sparkHome, "python/pyspark")

val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
IO.delete(zipFile)
def entries(f: File):List[File] =
f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil)
IO.zip(entries(src).map(
d => (d, d.getAbsolutePath.substring(src.getParent.length +1))),
zipFile)

val dst = new File(outDir, "pyspark")
if (!dst.isDirectory()) {
require(dst.mkdirs())
}

val src = new File(BuildCommons.sparkHome, "python/pyspark")
copy(src, dst)
}
)
Expand Down
12 changes: 5 additions & 7 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,12 @@ private[spark] class Client(
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)

// If PYSPARK_ARCHIVES_PATH is in the user environment, set PYTHONPATH to be passed
// if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed
// on to the ApplicationMaster and the executors.
sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives =>
// archives will be distributed to each machine's working directory, so strip the
// path prefix
val pythonPath = archives.split(",").map(p => (new Path(p)).getName).mkString(":")
env("PYTHONPATH") = pythonPath
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
if (sparkConf.contains("spark.submit.pyArchives")) {
val archives = sparkConf.get("spark.submit.pyArchives")
env("PYTHONPATH") = archives
sparkConf.setExecutorEnv("PYTHONPATH", archives)
}

// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
Expand Down

0 comments on commit e0179be

Please sign in to comment.