From e0179be997988e9c7b784e9516fddfc8bc7f8973 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 26 Apr 2015 19:43:28 +0800 Subject: [PATCH] add zip pyspark archives in build or sparksubmit --- .../org/apache/spark/deploy/SparkSubmit.scala | 40 ++++++++++++++++--- .../scala/org/apache/spark/util/Utils.scala | 35 ++++++++++++++++ project/SparkBuild.scala | 12 +++++- .../org/apache/spark/deploy/yarn/Client.scala | 12 +++--- 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c668380432de1..cd55861e00ccb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ -import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} +import org.apache.spark.util.{Utils, ChildFirstURLClassLoader, MutableURLClassLoader} /** * Whether to submit, kill, or request the status of an application. @@ -328,12 +328,40 @@ object SparkSubmit { } } - // In yarn mode for a python app, if PYSPARK_ARCHIVES_PATH is in the user environment - // add pyspark archives to files that can be distributed with the job - if (args.isPython && clusterManager == YARN){ - sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives => - args.files = mergeFileLists(args.files, Utils.resolveURIs(archives)) + // In yarn mode for a python app, add pyspark archives to files + // that can be distributed with the job + if (args.isPython && clusterManager == YARN) { + var pyArchives: String = null + if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) { + pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get + } else { + if (!sys.env.contains("SPARK_HOME")) { + printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") + } + val pythonPath = new ArrayBuffer[String] + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + if (!pyArchivesFile.exists()) { + val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) + Utils.zipRecursive(pySrc, pyArchivesFile) + } + pythonPath += pyArchivesFile.getAbsolutePath + pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator) + } + pyArchives = pythonPath.mkString(",") } + + pyArchives = pyArchives.split(",").map( localPath=> { + val localURI = Utils.resolveURI(localPath) + if (localURI.getScheme != "local") { + args.files = mergeFileLists(args.files, localURI.toString) + (new Path(localPath)).getName + } else { + localURI.getPath.toString + } + }).mkString(File.pathSeparator) + sysProps("spark.submit.pyArchives") = pyArchives } // If we're running a R app, set the main class to our specific R runner diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1029b0f9fce1e..6d12f85b87762 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import java.io._ +import java.util.zip._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer @@ -1000,6 +1001,40 @@ private[spark] object Utils extends Logging { !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile()) } + /** + * recursively add files to the zip file + */ + def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = { + if (source.isDirectory()) { + output.putNextEntry(new ZipEntry(parent + source.getName())) + for (file <- source.listFiles()) { + addFilesToZip(parent + source.getName + File.separator, file, output) + } + } else { + val in = new FileInputStream(source) + output.putNextEntry(new ZipEntry(parent + source.getName())) + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + output.write(buf, 0, n) + } + } + in.close() + } + } + + /** + * zip source file to dest ZipFile + */ + def zipRecursive(source: File, destZipFile: File) = { + val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile)) + addFilesToZip("", source, destOutput) + destOutput.flush() + destOutput.close() + } + /** * Determines if a directory contains any files newer than cutoff seconds. * diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 09b4976d10c26..3bd70dc0f6af1 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,12 +361,20 @@ object PySparkAssembly { // to be included in the assembly. We can't just add "python/" to the assembly's resource dir // list since that will copy unneeded / unwanted files. resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File => + val src = new File(BuildCommons.sparkHome, "python/pyspark") + + val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") + IO.delete(zipFile) + def entries(f: File):List[File] = + f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) + IO.zip(entries(src).map( + d => (d, d.getAbsolutePath.substring(src.getParent.length +1))), + zipFile) + val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { require(dst.mkdirs()) } - - val src = new File(BuildCommons.sparkHome, "python/pyspark") copy(src, dst) } ) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 508eac37d6a90..76f6edf69b46f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -326,14 +326,12 @@ private[spark] class Client( distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) - // If PYSPARK_ARCHIVES_PATH is in the user environment, set PYTHONPATH to be passed + // if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed // on to the ApplicationMaster and the executors. - sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives => - // archives will be distributed to each machine's working directory, so strip the - // path prefix - val pythonPath = archives.split(",").map(p => (new Path(p)).getName).mkString(":") - env("PYTHONPATH") = pythonPath - sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + if (sparkConf.contains("spark.submit.pyArchives")) { + val archives = sparkConf.get("spark.submit.pyArchives") + env("PYTHONPATH") = archives + sparkConf.setExecutorEnv("PYTHONPATH", archives) } // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*