From bff47ad6d51ea20db00f7aba47b7c381562e712c Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Wed, 24 Jun 2015 15:08:12 +0900 Subject: [PATCH] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - Followed spark's way to support pyspark - https://issues.apache.org/jira/browse/SPARK-6869 - https://github.com/apache/spark/pull/5580 - https://github.com/apache/spark/pull/5478/files --- bin/interpreter.sh | 10 ++++++ spark/pom.xml | 33 ++++++++++++++----- .../zeppelin/spark/PySparkInterpreter.java | 12 ------- .../zeppelin/spark/SparkInterpreter.java | 19 ++++++++--- 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/bin/interpreter.sh b/bin/interpreter.sh index c214c3081d0..524e784b044 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -73,6 +73,16 @@ if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then $(mkdir -p "${ZEPPELIN_LOG_DIR}") fi +if [[ x"" == x${PYTHONPATH} ]]; then + export PYTHONPATH="${ZEPPELIN_HOME}/python/lib/pyspark.zip:${ZEPPELIN_HOME}/python/lib/py4j-0.8.2.1-src.zip" +else + export PYTHONPATH="$PYTHONPATH${ZEPPELIN_HOME}/lib/pyspark.zip:${ZEPPELIN_HOME}/python/lib/py4j-0.8.2.1-src.zip" +fi + +if [[ x"" == x${SPARK_HOME} ]]; then + export SPARK_HOME=${ZEPPELIN_HOME} +fi + ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} & pid=$! diff --git a/spark/pom.xml b/spark/pom.xml index 22aa7ab0f32..59ee5e806f5 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -915,7 +915,7 @@ download-pyspark-files - prepare-package + validate wget @@ -927,6 +927,20 @@ + + maven-clean-plugin + + + + ${basedir}/../python/build + + + ${project.build.direcoty}/spark-dist + + + true + + org.apache.maven.plugins maven-antrun-plugin @@ -934,18 +948,21 @@ download-and-zip-pyspark-files - package + generate-resources run - - - + + + + + + diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 95eefd809c8..4fe9dc61e7b 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -158,18 +158,6 @@ public void open() { try { Map env = EnvironmentUtils.getProcEnvironment(); - String pythonPath = (String) env.get("PYTHONPATH"); - if (pythonPath == null) { - pythonPath = ""; - } else { - pythonPath += ":"; - } - - pythonPath += getSparkHome() + "/python/lib/py4j-0.8.2.1-src.zip:" - + getSparkHome() + "/python"; - - env.put("PYTHONPATH", pythonPath); - executor.execute(cmd, env, this); pythonscriptRunning = true; } catch (IOException e) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index ab3609ab422..57703b0eb3f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -26,12 +26,9 @@ import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; +import com.google.common.base.Joiner; import org.apache.spark.HttpServer; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; @@ -272,6 +269,18 @@ public SparkContext createSparkContext() { conf.set(key, val); } } + + //TODO(jongyoul): Move these codes into PySparkInterpreter.java + String zeppelinHome = getSystemDefault("ZEPPELIN_HOME", "zeppelin.home", "../"); + File zeppelinPythonLibPath = new File(zeppelinHome, "python/lib"); + String[] pythonLibs = new String[] {"pyspark.zip", "py4j-0.8.2.1-src.zip"}; + ArrayList pythonLibUris = new ArrayList<>(); + for (String lib: pythonLibs) { + pythonLibUris.add(new File(zeppelinPythonLibPath, lib).toURI().toString()); + } + conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris)); + conf.set("spark.files", conf.get("spark.yarn.dist.files")); + conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs)); SparkContext sparkContext = new SparkContext(conf); return sparkContext;