diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index cd55861e00ccb..ac23bc9912170 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -343,10 +343,14 @@ object SparkSubmit { val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) val pyArchivesFile = new File(pyLibPath, "pyspark.zip") if (!pyArchivesFile.exists()) { - val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) - Utils.zipRecursive(pySrc, pyArchivesFile) + printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") } - pythonPath += pyArchivesFile.getAbsolutePath + val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") + if (!py4jFile.exists()) { + printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + + "in yarn mode.") + } + pythonPath += Seq(pyLibPath, "pyspark.zip").mkString(File.separator) pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator) } pyArchives = pythonPath.mkString(",") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 14ece5debc886..1029b0f9fce1e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,7 +18,6 @@ package org.apache.spark.util import java.io._ -import java.util.zip.{ZipOutputStream, ZipEntry} import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer @@ -1001,40 +1000,6 @@ private[spark] object Utils extends Logging { !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile()) } - /** - * recursively add files to the zip file - */ - def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = { - if (source.isDirectory()) { - output.putNextEntry(new ZipEntry(parent + source.getName())) - for (file <- source.listFiles()) { - addFilesToZip(parent + source.getName + File.separator, file, output) - } - } else { - val in = new FileInputStream(source) - output.putNextEntry(new ZipEntry(parent + source.getName())) - val buf = new Array[Byte](8192) - var n = 0 - while (n != -1) { - n = in.read(buf) - if (n != -1) { - output.write(buf, 0, n) - } - } - in.close() - } - } - - /** - * zip source file to dest ZipFile - */ - def zipRecursive(source: File, destZipFile: File) = { - val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile)) - addFilesToZip("", source, destOutput) - destOutput.flush() - destOutput.close() - } - /** * Determines if a directory contains any files newer than cutoff seconds. * diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 76f6edf69b46f..c27453b4703c7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer @@ -326,14 +327,6 @@ private[spark] class Client( distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) - // if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed - // on to the ApplicationMaster and the executors. - if (sparkConf.contains("spark.submit.pyArchives")) { - val archives = sparkConf.get("spark.submit.pyArchives") - env("PYTHONPATH") = archives - sparkConf.setExecutorEnv("PYTHONPATH", archives) - } - // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* val amEnvPrefix = "spark.yarn.appMasterEnv." sparkConf.getAll @@ -349,6 +342,17 @@ private[spark] class Client( env("SPARK_YARN_USER_ENV") = userEnvs } + // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH + // that can be passed on to the ApplicationMaster and the executors. + if (sparkConf.contains("spark.submit.pyArchives")) { + var pythonPath = sparkConf.get("spark.submit.pyArchives") + if (env.contains("PYTHONPATH")) { + pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) + } + env("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + } + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's // SparkContext will not let that set spark* system properties, which is expected behavior for