From e06c7dfbc2331db2d1c365959c12aaac640a610a Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 5 Mar 2015 16:35:17 +0800 Subject: [PATCH 01/22] [SPARK-6153] [SQL] promote guava dep for hive-thriftserver For package thriftserver, guava is used at runtime. /cc pwendell Author: Daoyuan Wang Closes #4884 from adrian-wang/test and squashes the following commits: 4600ae7 [Daoyuan Wang] only promote for thriftserver 44dda18 [Daoyuan Wang] promote guava dep for hive --- sql/hive-thriftserver/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 123a1f629ab1c..279987f58313b 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -41,6 +41,11 @@ spark-hive_${scala.binary.version} ${project.version} + + com.google.guava + guava + runtime + ${hive.group} hive-cli From c9cfba0cebe3eb546e3e96f3e5b9b89a74c5b7de Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 5 Mar 2015 11:31:48 -0800 Subject: [PATCH 02/22] SPARK-6182 [BUILD] spark-parent pom needs to be published for both 2.10 and 2.11 Option 1 of 2: Convert spark-parent module name to spark-parent_2.10 / spark-parent_2.11 Author: Sean Owen Closes #4912 from srowen/SPARK-6182.1 and squashes the following commits: eff60de [Sean Owen] Convert spark-parent module name to spark-parent_2.10 / spark-parent_2.11 --- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 28 files changed, 28 insertions(+), 28 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 3d1ed0dd8a7bd..cbf5b6c4aa8df 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 510e92640eff8..1fe61062b4606 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index c993781c0e0d6..fab776d142ef7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 8caad2bc2e27a..994071d94d0ad 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 0706f1ebf66e2..96c2787e35cd0 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 1f2681394c583..172d447b77cda 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index 8daa7ed608f6a..5109b8ed87524 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index af96138d79405..369856187a244 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 560c8b9d18276..a344f000c5002 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index da6ffe7662f63..e95853f005ce2 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index e919c2c9b19ea..9b3475d7c3dc2 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index 0fb431808bacd..bc2f8be10c9ce 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -19,7 +19,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index 216661b8bc73a..7e49a71907336 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -19,7 +19,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index f2f0aa78b0a4b..6eb29af03f833 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -19,7 +19,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 8fac24b6ed86d..57e338c03ecf9 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index 4c8f34417ca65..b5c949e155cfd 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/network/common/pom.xml b/network/common/pom.xml index 8f7c924d6b3a3..74437f37c47e4 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index c2d0300ecd904..a2bcca26d8344 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index 39b99f54f6dbc..cea7a20c223e2 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/pom.xml b/pom.xml index 54fe784fe566f..f99a83b9994ed 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ 14 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT pom Spark Project Parent POM diff --git a/repl/pom.xml b/repl/pom.xml index b883344bf0ceb..295f88ea3ecf9 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index a1947fb022e54..8ad026dbdf8ff 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index d4c8c687b67bd..3640104e497d4 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 279987f58313b..f466a3c0b5dc2 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 72c474d66055c..0e3f4eb98cbf7 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -21,7 +21,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 1e92ba686a57d..0370b0e9e1aa3 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index e7419ed2c607a..181236d1bcbf6 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -19,7 +19,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 65344aa8738e0..c13534f0410a1 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -19,7 +19,7 @@ 4.0.0 org.apache.spark - spark-parent + spark-parent_2.10 1.3.0-SNAPSHOT ../pom.xml From 0bfacd5c5dd7d10a69bcbcbda630f0843d1cf285 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 5 Mar 2015 11:50:09 -0800 Subject: [PATCH 03/22] [SPARK-6090][MLLIB] add a basic BinaryClassificationMetrics to PySpark/MLlib A simple wrapper around the Scala implementation. `DataFrame` is used for serialization/deserialization. Methods that return `RDD`s are not supported in this PR. davies If we recognize Scala's `Product`s in Py4J, we can easily add wrappers for Scala methods that returns `RDD[(Double, Double)]`. Is it easy to register serializer for `Product` in PySpark? Author: Xiangrui Meng Closes #4863 from mengxr/SPARK-6090 and squashes the following commits: 009a3a3 [Xiangrui Meng] provide schema dcddab5 [Xiangrui Meng] add a basic BinaryClassificationMetrics to PySpark/MLlib --- .../BinaryClassificationMetrics.scala | 8 ++ python/docs/pyspark.mllib.rst | 7 ++ python/pyspark/mllib/evaluation.py | 83 +++++++++++++++++++ python/run-tests | 1 + 4 files changed, 99 insertions(+) create mode 100644 python/pyspark/mllib/evaluation.py diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index ced042e2f96ca..c1d1a224817e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -22,6 +22,7 @@ import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.mllib.evaluation.binary._ import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.sql.DataFrame /** * :: Experimental :: @@ -53,6 +54,13 @@ class BinaryClassificationMetrics( */ def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0) + /** + * An auxiliary constructor taking a DataFrame. + * @param scoreAndLabels a DataFrame with two double columns: score and label + */ + private[mllib] def this(scoreAndLabels: DataFrame) = + this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1)))) + /** Unpersist intermediate RDDs used in the computation. */ def unpersist() { cumulativeCounts.unpersist() diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index b706c5e376ef4..15101470afc07 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -16,6 +16,13 @@ pyspark.mllib.clustering module :members: :undoc-members: +pyspark.mllib.evaluation module +------------------------------- + +.. automodule:: pyspark.mllib.evaluation + :members: + :undoc-members: + pyspark.mllib.feature module ------------------------------- diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py new file mode 100644 index 0000000000000..16cb49cc0cfff --- /dev/null +++ b/python/pyspark/mllib/evaluation.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.mllib.common import JavaModelWrapper +from pyspark.sql import SQLContext +from pyspark.sql.types import StructField, StructType, DoubleType + + +class BinaryClassificationMetrics(JavaModelWrapper): + """ + Evaluator for binary classification. + + >>> scoreAndLabels = sc.parallelize([ + ... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2) + >>> metrics = BinaryClassificationMetrics(scoreAndLabels) + >>> metrics.areaUnderROC() + 0.70... + >>> metrics.areaUnderPR() + 0.83... + >>> metrics.unpersist() + """ + + def __init__(self, scoreAndLabels): + """ + :param scoreAndLabels: an RDD of (score, label) pairs + """ + sc = scoreAndLabels.ctx + sql_ctx = SQLContext(sc) + df = sql_ctx.createDataFrame(scoreAndLabels, schema=StructType([ + StructField("score", DoubleType(), nullable=False), + StructField("label", DoubleType(), nullable=False)])) + java_class = sc._jvm.org.apache.spark.mllib.evaluation.BinaryClassificationMetrics + java_model = java_class(df._jdf) + super(BinaryClassificationMetrics, self).__init__(java_model) + + def areaUnderROC(self): + """ + Computes the area under the receiver operating characteristic + (ROC) curve. + """ + return self.call("areaUnderROC") + + def areaUnderPR(self): + """ + Computes the area under the precision-recall curve. + """ + return self.call("areaUnderPR") + + def unpersist(self): + """ + Unpersists intermediate RDDs used in the computation. + """ + self.call("unpersist") + + +def _test(): + import doctest + from pyspark import SparkContext + import pyspark.mllib.evaluation + globs = pyspark.mllib.evaluation.__dict__.copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest') + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index a2c2f37a54eda..b7630c356cfae 100755 --- a/python/run-tests +++ b/python/run-tests @@ -75,6 +75,7 @@ function run_mllib_tests() { echo "Run mllib tests ..." run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" + run_test "pyspark/mllib/evaluation.py" run_test "pyspark/mllib/feature.py" run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/rand.py" From 424a86a1ed2a3e6dd54cf8b09fe2f13a1311b7e6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Mar 2015 12:04:00 -0800 Subject: [PATCH 04/22] [SPARK-6175] Fix standalone executor log links when ephemeral ports or SPARK_PUBLIC_DNS are used This patch fixes two issues with the executor log viewing links added in Spark 1.3. In standalone mode, the log URLs might include a port value of 0 rather than the actual bound port of the UI, which broke the ability to view logs from workers whose web UIs had been configured to bind to ephemeral ports. In addition, the URLs used workers' local hostnames instead of respecting SPARK_PUBLIC_DNS, which prevented this feature from working properly on Spark EC2 clusters because the links would point to internal DNS names instead of external ones. I included tests for both of these bugs: - We now browse to the URLs and verify that they point to the expected pages. - To test SPARK_PUBLIC_DNS, I changed the code that reads the environment variable to do so via `SparkConf.getenv`, then used a custom SparkConf subclass to mock the environment variable (this pattern is used elsewhere in Spark's tests). Author: Josh Rosen Closes #4903 from JoshRosen/SPARK-6175 and squashes the following commits: 5577f41 [Josh Rosen] Remove println cfec135 [Josh Rosen] Use webUi.boundPort and publicAddress in log links 27918c7 [Josh Rosen] Add failing unit tests for standalone log URL viewing c250fbe [Josh Rosen] Respect SparkConf in local-cluster Workers. 422a2ef [Josh Rosen] Use conf.getenv to read SPARK_PUBLIC_DNS --- .../spark/deploy/LocalSparkCluster.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 2 +- .../spark/deploy/worker/ExecutorRunner.scala | 4 +- .../apache/spark/deploy/worker/Worker.scala | 9 ++-- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../spark/deploy/LogUrlsStandaloneSuite.scala | 54 +++++++++++++++---- .../deploy/worker/ExecutorRunnerTest.scala | 2 +- 8 files changed, 57 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 0401b15446a7b..3ab425aab84c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -59,7 +59,7 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masters, null, Some(workerNum)) + memoryPerWorker, masters, null, Some(workerNum), _conf) workerActorSystems += workerSystem } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4584b730e3420..15814293227ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -96,7 +96,7 @@ private[spark] class Master( val webUi = new MasterWebUI(this, webUiPort) val masterPublicAddress = { - val envVar = System.getenv("SPARK_PUBLIC_DNS") + val envVar = conf.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 066d46c4473eb..023f3c6269062 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -44,6 +44,7 @@ private[spark] class ExecutorRunner( val workerId: String, val host: String, val webUiPort: Int, + val publicAddress: String, val sparkHome: File, val executorDir: File, val workerUrl: String, @@ -140,7 +141,8 @@ private[spark] class ExecutorRunner( builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") // Add webUI log urls - val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + val baseUrl = + s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 2473a90aa9309..f2e7418f4bf15 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -121,7 +121,7 @@ private[spark] class Worker( val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr) val publicAddress = { - val envVar = System.getenv("SPARK_PUBLIC_DNS") + val envVar = conf.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } var webUi: WorkerWebUI = null @@ -362,7 +362,8 @@ private[spark] class Worker( self, workerId, host, - webUiPort, + webUi.boundPort, + publicAddress, sparkHome, executorDir, akkaUrl, @@ -538,10 +539,10 @@ private[spark] object Worker extends Logging { memory: Int, masterUrls: Array[String], workDir: String, - workerNumber: Option[Int] = None): (ActorSystem, Int) = { + workerNumber: Option[Int] = None, + conf: SparkConf = new SparkConf): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems - val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val securityMgr = new SecurityManager(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 9be65a4a39a09..ec68837a1516c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -47,7 +47,7 @@ private[spark] abstract class WebUI( protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None protected val localHostName = Utils.localHostName() - protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) + protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index e955636cf5b59..68b5776fc6515 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite { def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123, - new File("sparkHome"), new File("workDir"), "akka://worker", + "publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index f33bdc73e40ac..54dd7c9c45c61 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -17,35 +17,69 @@ package org.apache.spark.deploy +import java.net.URL + import scala.collection.mutable +import scala.io.Source -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.FunSuite import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} -import org.apache.spark.{SparkContext, LocalSparkContext} +import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} -class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { +class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext { /** Length of time to wait while draining listener events. */ - val WAIT_TIMEOUT_MILLIS = 10000 + private val WAIT_TIMEOUT_MILLIS = 10000 - before { + test("verify that correct log urls get propagated from workers") { sc = new SparkContext("local-cluster[2,1,512]", "test") + + val listener = new SaveExecutorInfo + sc.addSparkListener(listener) + + // Trigger a job so that executors get added + sc.parallelize(1 to 100, 4).map(_.toString).count() + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + // Browse to each URL to check that it's valid + info.logUrlMap.foreach { case (logType, logUrl) => + val html = Source.fromURL(logUrl).mkString + assert(html.contains(s"$logType log page")) + } + } } - test("verify log urls get propagated from workers") { + test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") { + val SPARK_PUBLIC_DNS = "public_dns" + class MySparkConf extends SparkConf(false) { + override def getenv(name: String) = { + if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS + else super.getenv(name) + } + + override def clone: SparkConf = { + new MySparkConf().setAll(getAll) + } + } + val conf = new MySparkConf() + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) + val listener = new SaveExecutorInfo sc.addSparkListener(listener) - val rdd1 = sc.parallelize(1 to 100, 4) - val rdd2 = rdd1.map(_.toString) - rdd2.setName("Target RDD") - rdd2.count() + // Trigger a job so that executors get added + sc.parallelize(1 to 100, 4).map(_.toString).count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) + info.logUrlMap.values.foreach { logUrl => + assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS) + } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 76511699e5ac5..6fca6321e5a1b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite { val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, - new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), + "publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables) assert(builder.command().last === appId) From 5873c713cc47af311f517ea33a6110993a410377 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Mar 2015 14:49:01 -0800 Subject: [PATCH 05/22] [SPARK-6145][SQL] fix ORDER BY on nested fields Based on #4904 with style errors fixed. `LogicalPlan#resolve` will not only produce `Attribute`, but also "`GetField` chain". So in `ResolveSortReferences`, after resolve the ordering expressions, we should not just collect the `Attribute` results, but also `Attribute` at the bottom of "`GetField` chain". Author: Wenchen Fan Author: Michael Armbrust Closes #4918 from marmbrus/pr/4904 and squashes the following commits: 997f84e [Michael Armbrust] fix style 3eedbfc [Wenchen Fan] fix 6145 --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 2 +- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 5 +++-- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 10 ++++++++++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index c363a5efacde8..54ab13ca352d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -385,7 +385,7 @@ class SqlParser extends AbstractSparkSQLParser { protected lazy val dotExpressionHeader: Parser[Expression] = (ident <~ ".") ~ ident ~ rep("." ~> ident) ^^ { - case i1 ~ i2 ~ rest => UnresolvedAttribute(i1 + "." + i2 + rest.mkString(".", ".", "")) + case i1 ~ i2 ~ rest => UnresolvedAttribute((Seq(i1, i2) ++ rest).mkString(".")) } protected lazy val dataType: Parser[DataType] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e4e542562f22d..7753331748d7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -310,7 +310,7 @@ class Analyzer(catalog: Catalog, } /** - * In many dialects of SQL is it valid to sort by attributes that are not present in the SELECT + * In many dialects of SQL it is valid to sort by attributes that are not present in the SELECT * clause. This rule detects such queries and adds the required attributes to the original * projection, so that they will be available during sorting. Another projection is added to * remove these attributes after sorting. @@ -321,7 +321,8 @@ class Analyzer(catalog: Catalog, if !s.resolved && p.resolved => val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name }) val resolved = unresolved.flatMap(child.resolve(_, resolver)) - val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a }) + val requiredAttributes = + AttributeSet(resolved.flatMap(_.collect { case a: Attribute => a })) val missingInProject = requiredAttributes -- p.output if (missingInProject.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 097bf0dd23c89..4dedcd365f6cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1049,4 +1049,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { rdd.toDF().registerTempTable("distinctData") checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2)) } + + test("SPARK-6145: ORDER BY test for nested fields") { + jsonRDD(sparkContext.makeRDD( + """{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil)).registerTempTable("nestedOrder") + // These should be successfully analyzed + sql("SELECT 1 FROM nestedOrder ORDER BY a.b").queryExecution.analyzed + sql("SELECT a.b FROM nestedOrder ORDER BY a.b").queryExecution.analyzed + sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a").queryExecution.analyzed + sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d").queryExecution.analyzed + } } From 1b4bb25c10d72132d7f4f3835ef9a3b94b2349e0 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 5 Mar 2015 14:49:44 -0800 Subject: [PATCH 06/22] [SPARK-6163][SQL] jsonFile should be backed by the data source API jira: https://issues.apache.org/jira/browse/SPARK-6163 Author: Yin Huai Closes #4896 from yhuai/SPARK-6163 and squashes the following commits: 45e023e [Yin Huai] Address @chenghao-intel's comment. 2e8734e [Yin Huai] Use JSON data source for jsonFile. 92a4a33 [Yin Huai] Test. --- .../org/apache/spark/sql/SQLContext.scala | 12 +++----- .../org/apache/spark/sql/json/JsonSuite.scala | 28 +++++++++++++++++++ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ce800e0754559..9c49e84bf9680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -542,20 +542,16 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group specificdata */ @Experimental - def jsonFile(path: String, schema: StructType): DataFrame = { - val json = sparkContext.textFile(path) - jsonRDD(json, schema) - } + def jsonFile(path: String, schema: StructType): DataFrame = + load("json", schema, Map("path" -> path)) /** * :: Experimental :: * @group specificdata */ @Experimental - def jsonFile(path: String, samplingRatio: Double): DataFrame = { - val json = sparkContext.textFile(path) - jsonRDD(json, samplingRatio) - } + def jsonFile(path: String, samplingRatio: Double): DataFrame = + load("json", Map("path" -> path, "samplingRatio" -> samplingRatio.toString)) /** * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 9d94d3406acfb..0c21f725f0b49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.json import java.sql.{Date, Timestamp} +import org.scalactic.Tolerance._ + import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions._ @@ -551,6 +553,32 @@ class JsonSuite extends QueryTest { jsonDF.registerTempTable("jsonTable") } + test("jsonFile should be based on JSONRelation") { + val file = getTempFilePath("json") + val path = file.toString + sparkContext.parallelize(1 to 100).map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path) + val jsonDF = jsonFile(path, 0.49) + + val analyzed = jsonDF.queryExecution.analyzed + assert( + analyzed.isInstanceOf[LogicalRelation], + "The DataFrame returned by jsonFile should be based on JSONRelation.") + val relation = analyzed.asInstanceOf[LogicalRelation].relation + assert( + relation.isInstanceOf[JSONRelation], + "The DataFrame returned by jsonFile should be based on JSONRelation.") + assert(relation.asInstanceOf[JSONRelation].path === path) + assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001)) + + val schema = StructType(StructField("a", LongType, true) :: Nil) + val logicalRelation = + jsonFile(path, schema).queryExecution.analyzed.asInstanceOf[LogicalRelation] + val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation] + assert(relationWithSchema.path === path) + assert(relationWithSchema.schema === schema) + assert(relationWithSchema.samplingRatio > 0.99) + } + test("Loading a JSON dataset from a text file") { val file = getTempFilePath("json") val path = file.toString From eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 5 Mar 2015 14:50:25 -0800 Subject: [PATCH 07/22] [SQL] Make Strategies a public developer API Author: Michael Armbrust Closes #4920 from marmbrus/openStrategies and squashes the following commits: cbc35c0 [Michael Armbrust] [SQL] Make Strategies a public developer API --- sql/core/src/main/scala/org/apache/spark/sql/package.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 02e5b015e8ec2..3f97a11ceb97d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -34,10 +34,13 @@ import org.apache.spark.sql.execution.SparkPlan package object sql { /** - * Converts a logical plan into zero or more SparkPlans. + * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting + * with the query planner and is not designed to be stable across spark releases. Developers + * writing libraries should instead consider using the stable APIs provided in + * [[org.apache.spark.sql.sources]] */ @DeveloperApi - protected[sql] type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan] + type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan] /** * Type alias for [[DataFrame]]. Kept here for backward source compatibility for Scala. From d8b3da9ddfe44a2886f3841ceef4ebf9fc318640 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 6 Mar 2015 09:34:07 +0000 Subject: [PATCH 08/22] [CORE, DEPLOY][minor] align arguments order with docs of worker The help message for starting `worker` is `Usage: Worker [options] `. While in `start-slaves.sh`, the format is not align with that, it is confusing for the fist glance. Author: Zhang, Liye Closes #4924 from liyezhang556520/startSlaves and squashes the following commits: 7fd5deb [Zhang, Liye] align arguments order with docs of worker --- sbin/start-slaves.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sbin/start-slaves.sh b/sbin/start-slaves.sh index ba1a84abc1fef..76316a3067c93 100755 --- a/sbin/start-slaves.sh +++ b/sbin/start-slaves.sh @@ -64,6 +64,6 @@ else SPARK_WORKER_WEBUI_PORT=8081 fi for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do - "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" $(( $i + 1 )) "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT" --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i )) + "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" $(( $i + 1 )) --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i )) "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT" done fi From cd7594ca6acf1226bf91f8a783606bf5c116f7df Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 6 Mar 2015 09:43:24 +0000 Subject: [PATCH 09/22] [core] [minor] Don't pollute source directory when running UtilsSuite. Author: Marcelo Vanzin Closes #4921 from vanzin/utils-suite and squashes the following commits: 7795dd4 [Marcelo Vanzin] [core] [minor] Don't pollute source directory when running UtilsSuite. --- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index fd77753c0d362..b91428efadfd0 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -386,10 +386,11 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { } test("fetch hcfs dir") { - val sourceDir = Utils.createTempDir() + val tempDir = Utils.createTempDir() + val sourceDir = new File(tempDir, "source-dir") val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath) val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir) - val targetDir = new File(Utils.createTempDir(), "target-dir") + val targetDir = new File(tempDir, "target-dir") Files.write("some text", sourceFile, UTF_8) val path = new Path("file://" + sourceDir.getAbsolutePath) @@ -413,7 +414,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { assert(destInnerFile.isFile()) val filePath = new Path("file://" + sourceFile.getAbsolutePath) - val testFileDir = new File("test-filename") + val testFileDir = new File(tempDir, "test-filename") val testFileName = "testFName" val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf) Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(), From 05cb6b34d8fc25114f3dd3e2bd156386c00eb391 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Fri, 6 Mar 2015 13:20:20 +0000 Subject: [PATCH 10/22] [Minor] Resolve sbt warnings: postfix operator second should be enabled Resolve sbt warnings: ``` [warn] spark/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala:155: postfix operator second should be enabled [warn] by making the implicit value scala.language.postfixOps visible. [warn] This can be achieved by adding the import clause 'import scala.language.postfixOps' [warn] or by setting the compiler option -language:postfixOps. [warn] See the Scala docs for value scala.language.postfixOps for a discussion [warn] why the feature should be explicitly enabled. [warn] Await.ready(f, 1 second) [warn] ^ ``` Author: GuoQiang Li Closes #4908 from witgo/sbt_warnings and squashes the following commits: 0629af4 [GuoQiang Li] Resolve sbt warnings: postfix operator second should be enabled --- .../scala/org/apache/spark/scheduler/local/LocalBackend.scala | 1 + .../org/apache/spark/streaming/util/WriteAheadLogManager.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 4676b828d3d89..d95426d918e19 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer import scala.concurrent.duration._ +import scala.language.postfixOps import akka.actor.{Actor, ActorRef, Props} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 985ded9111f74..6bdfe45dc7f83 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -20,6 +20,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Await, ExecutionContext, Future} +import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path From dba0b2eadb441f41ded0f0b9706b720bcfa84881 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Fri, 6 Mar 2015 14:43:09 +0000 Subject: [PATCH 11/22] [SPARK-6178][Shuffle] Removed unused imports Author: Vinod K C Author: Vinod K C Closes #4900 from vinodkc/unused_imports and squashes the following commits: 5373456 [Vinod K C] Removed empty lines 9da7438 [Vinod K C] Changed order of import 594d471 [Vinod K C] Removed unused imports --- .../org/apache/spark/network/TransportContext.java | 1 - .../spark/network/protocol/ChunkFetchFailure.java | 1 - .../org/apache/spark/network/protocol/Encoders.java | 1 - .../org/apache/spark/network/protocol/RpcFailure.java | 1 - .../apache/spark/network/server/TransportServer.java | 1 - .../java/org/apache/spark/network/util/JavaUtils.java | 10 ++-------- .../java/org/apache/spark/network/util/NettyUtils.java | 1 - .../org/apache/spark/network/sasl/SaslRpcHandler.java | 3 --- .../spark/network/shuffle/OneForOneBlockFetcher.java | 1 - .../spark/network/shuffle/protocol/OpenBlocks.java | 1 - .../network/shuffle/protocol/RegisterExecutor.java | 1 - .../spark/network/shuffle/protocol/StreamHandle.java | 2 -- .../spark/network/shuffle/protocol/UploadBlock.java | 1 - .../org/apache/spark/network/sasl/SparkSaslSuite.java | 6 +++--- .../network/shuffle/OneForOneBlockFetcherSuite.java | 9 +++++++-- 15 files changed, 12 insertions(+), 28 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java index 5bc6e5a2418a9..f0a89c9d9116c 100644 --- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java @@ -35,7 +35,6 @@ import org.apache.spark.network.server.TransportChannelHandler; import org.apache.spark.network.server.TransportRequestHandler; import org.apache.spark.network.server.TransportServer; -import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.util.NettyUtils; import org.apache.spark.network.util.TransportConf; diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java index 986957c1509fd..f76bb49e874fc 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java @@ -17,7 +17,6 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Charsets; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java b/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java index 873c694250942..9162d0b977f83 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java @@ -20,7 +20,6 @@ import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; /** Provides a canonical set of Encoders for simple types. */ public class Encoders { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java index ebd764eb5eb5f..6b991375fc486 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java @@ -17,7 +17,6 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Charsets; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index ef209991804b4..b7ce8541e565e 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -28,7 +28,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.util.internal.PlatformDependent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java index bf8a1fc42fc6d..73da9b7346f4d 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -17,19 +17,13 @@ package org.apache.spark.network.util; -import java.nio.ByteBuffer; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; -import com.google.common.base.Preconditions; -import com.google.common.io.Closeables; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 2a4b88b64cdc9..dabd6261d2aa0 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -25,7 +25,6 @@ import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; -import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.epoll.EpollSocketChannel; diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java index 3777a18e33f78..026cbd260d16c 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java @@ -19,16 +19,13 @@ import java.util.concurrent.ConcurrentMap; -import com.google.common.base.Charsets; import com.google.common.collect.Maps; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.protocol.Encodable; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 8ed2e0b39ad23..e653f5cb147ee 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -29,7 +29,6 @@ import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; import org.apache.spark.network.shuffle.protocol.OpenBlocks; import org.apache.spark.network.shuffle.protocol.StreamHandle; -import org.apache.spark.network.util.JavaUtils; /** * Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java index 62fce9b0d16cd..60485bace643c 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** Request to read a set of blocks. Returns {@link StreamHandle}. */ public class OpenBlocks extends BlockTransferMessage { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java index 7eb4385044077..38acae3b31d64 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java @@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** * Initial registration message between an executor and its local shuffle server. diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java index bc9daa6158ba3..9a9220211a50c 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java @@ -20,8 +20,6 @@ import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; - /** * Identifier for a fixed number of chunks to read from a stream created by an "open blocks" * message. This is used by {@link org.apache.spark.network.shuffle.OneForOneBlockFetcher}. diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java index 0b23e112bd512..2ff9aaa650f92 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */ diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index 67a07f38eb5a0..23b4e06f064e1 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -17,12 +17,12 @@ package org.apache.spark.network.sasl; -import java.util.Map; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import com.google.common.collect.ImmutableMap; import org.junit.Test; -import static org.junit.Assert.*; /** * Jointly tests SparkSaslClient and SparkSaslServer, as both are black boxes. diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java index 842741e3d354f..b35a6d685dd02 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java @@ -28,11 +28,16 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NettyManagedBuffer; From 48a723c98684c5bb3d185cada4888cae952791bd Mon Sep 17 00:00:00 2001 From: RobertZK Date: Sat, 7 Mar 2015 00:16:50 +0000 Subject: [PATCH 12/22] Fix python typo (+ Scala, Java typos) Author: RobertZK Author: Robert Krzyzanowski Closes #4840 from robertzk/patch-1 and squashes the following commits: d286215 [RobertZK] lambda fix per @laserson 5937989 [Robert Krzyzanowski] Fix python typo --- docs/programming-guide.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 7b0701828878e..b5e04bd0c610d 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1336,25 +1336,28 @@ Accumulators do not change the lazy evaluation model of Spark. If they are being
{% highlight scala %} -val acc = sc.accumulator(0) -data.map(x => acc += x; f(x)) -// Here, acc is still 0 because no actions have cause the `map` to be computed. +val accum = sc.accumulator(0) +data.map { x => accum += x; f(x) } +// Here, accum is still 0 because no actions have caused the `map` to be computed. {% endhighlight %}
{% highlight java %} Accumulator accum = sc.accumulator(0); -data.map(x -> accum.add(x); f(x);); -// Here, accum is still 0 because no actions have cause the `map` to be computed. +data.map(x -> { accum.add(x); return f(x); }); +// Here, accum is still 0 because no actions have caused the `map` to be computed. {% endhighlight %}
{% highlight python %} accum = sc.accumulator(0) -data.map(lambda x => acc.add(x); f(x)) -# Here, acc is still 0 because no actions have cause the `map` to be computed. +def g(x): + accum.add(x) + return f(x) +data.map(g) +# Here, accum is still 0 because no actions have caused the `map` to be computed. {% endhighlight %}
From 2646794ffb2970618087e2e964d9f4c953e17e6a Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sat, 7 Mar 2015 12:33:41 +0000 Subject: [PATCH 13/22] [EC2] Reorder print statements on termination The PR reorders some print statements slightly on cluster termination so that they read better. For example, from this: ``` Are you sure you want to destroy the cluster spark-cluster-test? The following instances will be terminated: Searching for existing cluster spark-cluster-test in region us-west-2... Found 1 master(s), 2 slaves > ... ALL DATA ON ALL NODES WILL BE LOST!! Destroy cluster spark-cluster-test (y/N): ``` To this: ``` Searching for existing cluster spark-cluster-test in region us-west-2... Found 1 master(s), 2 slaves The following instances will be terminated: > ... ALL DATA ON ALL NODES WILL BE LOST!! Are you sure you want to destroy the cluster spark-cluster-test? (y/N) ``` Author: Nicholas Chammas Closes #4932 from nchammas/termination-print-order and squashes the following commits: c23711d [Nicholas Chammas] reorder prints on termination --- ec2/spark_ec2.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index c59ab565c6862..dabb9fce40d01 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -1126,14 +1126,16 @@ def real_main(): setup_cluster(conn, master_nodes, slave_nodes, opts, True) elif action == "destroy": - print "Are you sure you want to destroy the cluster %s?" % cluster_name - print "The following instances will be terminated:" (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - for inst in master_nodes + slave_nodes: - print "> %s" % inst.public_dns_name - msg = "ALL DATA ON ALL NODES WILL BE LOST!!\nDestroy cluster %s (y/N): " % cluster_name + if any(master_nodes + slave_nodes): + print "The following instances will be terminated:" + for inst in master_nodes + slave_nodes: + print "> %s" % inst.public_dns_name + print "ALL DATA ON ALL NODES WILL BE LOST!!" + + msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) response = raw_input(msg) if response == "y": print "Terminating master..." @@ -1145,7 +1147,6 @@ def real_main(): # Delete security groups as well if opts.delete_groups: - print "Deleting security groups (this will take some time)..." group_names = [cluster_name + "-master", cluster_name + "-slaves"] wait_for_cluster_state( conn=conn, @@ -1153,6 +1154,7 @@ def real_main(): cluster_instances=(master_nodes + slave_nodes), cluster_state='terminated' ) + print "Deleting security groups (this will take some time)..." attempt = 1 while attempt <= 3: print "Attempt %d" % attempt From 729c05bda87c2383d1c54b31850ed10814617cde Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Sat, 7 Mar 2015 12:35:26 +0000 Subject: [PATCH 14/22] [Minor]fix the wrong description Found it by accident. I'm not gonna file jira for this as it is a very tiny fix. Author: WangTaoTheTonic Closes #4936 from WangTaoTheTonic/wrongdesc and squashes the following commits: fb8a8ec [WangTaoTheTonic] fix the wrong description aca5596 [WangTaoTheTonic] fix the wrong description --- sbin/stop-all.sh | 4 ++-- sbin/stop-master.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sbin/stop-all.sh b/sbin/stop-all.sh index 971d5d49da664..1a9abe07db844 100755 --- a/sbin/stop-all.sh +++ b/sbin/stop-all.sh @@ -17,8 +17,8 @@ # limitations under the License. # -# Start all spark daemons. -# Run this on the master nde +# Stop all spark daemons. +# Run this on the master node. sbin="`dirname "$0"`" diff --git a/sbin/stop-master.sh b/sbin/stop-master.sh index b6bdaa4db373c..729702d92191e 100755 --- a/sbin/stop-master.sh +++ b/sbin/stop-master.sh @@ -17,7 +17,7 @@ # limitations under the License. # -# Starts the master on the machine this script is executed on. +# Stops the master on the machine this script is executed on. sbin=`dirname "$0"` sbin=`cd "$sbin"; pwd` From 334c5bd1ae50ac76770e545cab302361673f45de Mon Sep 17 00:00:00 2001 From: Florian Verhein Date: Sat, 7 Mar 2015 12:56:59 +0000 Subject: [PATCH 15/22] [SPARK-5641] [EC2] Allow spark_ec2.py to copy arbitrary files to cluster Give users an easy way to rcp a directory structure to the master's / as part of the cluster launch, at a useful point in the workflow (before setup.sh is called on the master). This is an alternative approach to meeting requirements discussed in https://github.com/apache/spark/pull/4487 Author: Florian Verhein Closes #4583 from florianverhein/master and squashes the following commits: 49dee88 [Florian Verhein] removed addition of trailing / in rsync to give user this option, added documentation in help 7b8e3d8 [Florian Verhein] remove unused args 87d922c [Florian Verhein] [SPARK-5641] [EC2] implement --deploy-root-dir --- ec2/spark_ec2.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index dabb9fce40d01..b6e7c4c2af39b 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -159,6 +159,15 @@ def parse_args(): "--spark-ec2-git-branch", default=DEFAULT_SPARK_EC2_BRANCH, help="Github repo branch of spark-ec2 to use (default: %default)") + parser.add_option( + "--deploy-root-dir", + default=None, + help="A directory to copy into / on the first master. " + + "Must be absolute. Note that a trailing slash is handled as per rsync: " + + "If you omit it, the last directory of the --deploy-root-dir path will be created " + + "in / before copying its contents. If you append the trailing slash, " + + "the directory is not created and its contents are copied directly into /. " + + "(default: %default).") parser.add_option( "--hadoop-major-version", default="1", help="Major version of Hadoop (default: %default)") @@ -694,6 +703,14 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): modules=modules ) + if opts.deploy_root_dir is not None: + print "Deploying {s} to master...".format(s=opts.deploy_root_dir) + deploy_user_files( + root_dir=opts.deploy_root_dir, + opts=opts, + master_nodes=master_nodes + ) + print "Running setup on master..." setup_spark_cluster(master, opts) print "Done!" @@ -931,6 +948,23 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): shutil.rmtree(tmp_dir) +# Deploy a given local directory to a cluster, WITHOUT parameter substitution. +# Note that unlike deploy_files, this works for binary files. +# Also, it is up to the user to add (or not) the trailing slash in root_dir. +# Files are only deployed to the first master instance in the cluster. +# +# root_dir should be an absolute path. +def deploy_user_files(root_dir, opts, master_nodes): + active_master = master_nodes[0].public_dns_name + command = [ + 'rsync', '-rv', + '-e', stringify_command(ssh_command(opts)), + "%s" % root_dir, + "%s@%s:/" % (opts.user, active_master) + ] + subprocess.check_call(command) + + def stringify_command(parts): if isinstance(parts, str): return parts @@ -1099,6 +1133,14 @@ def real_main(): "Furthermore, we currently only support forks named spark-ec2." sys.exit(1) + if not (opts.deploy_root_dir is None or + (os.path.isabs(opts.deploy_root_dir) and + os.path.isdir(opts.deploy_root_dir) and + os.path.exists(opts.deploy_root_dir))): + print >> stderr, "--deploy-root-dir must be an absolute path to a directory that exists " \ + "on the local file system" + sys.exit(1) + try: conn = ec2.connect_to_region(opts.region) except Exception as e: From 52ed7da12e26c45734ce53a1be19ef148b2b953e Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 8 Mar 2015 14:01:26 +0000 Subject: [PATCH 16/22] [SPARK-6193] [EC2] Push group filter up to EC2 When looking for a cluster, spark-ec2 currently pulls down [info for all instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620) and filters locally. When working on an AWS account with hundreds of active instances, this step alone can take over 10 seconds. This PR improves how spark-ec2 searches for clusters by pushing the filter up to EC2. Basically, the problem (and solution) look like this: ```python >>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ import conn', number=10) 116.96390509605408 >>> timeit.timeit('blah = conn.get_all_reservations(filters={"instance.group-name": ["my-cluster-master"]})', setup='from __main__ import conn', number=10) 4.629754066467285 ``` Translated to a user-visible action, this looks like (against an AWS account with ~200 active instances): ```shell # master $ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)' ... 3 loops, best of 3: 9.83 sec per loop # this PR $ python -m timeit -n 3 --setup 'import subprocess' 'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", shell=True)' ... 3 loops, best of 3: 1.47 sec per loop ``` This PR also refactors `get_existing_cluster()` to make it, I hope, simpler. Finally, this PR fixes some minor grammar issues related to printing status to the user. :tophat: :clap: Author: Nicholas Chammas Closes #4922 from nchammas/get-existing-cluster-faster and squashes the following commits: 18802f1 [Nicholas Chammas] ignore shutting-down f2a5b9f [Nicholas Chammas] fix grammar d96a489 [Nicholas Chammas] push group filter up to EC2 --- ec2/spark_ec2.py | 78 +++++++++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index b6e7c4c2af39b..5e636ddd17e94 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -22,6 +22,7 @@ from __future__ import with_statement import hashlib +import itertools import logging import os import os.path @@ -299,13 +300,6 @@ def get_validate_spark_version(version, repo): return version -# Check whether a given EC2 instance object is in a state we consider active, -# i.e. not terminating or terminated. We count both stopping and stopped as -# active since we can restart stopped clusters. -def is_active(instance): - return (instance.state in ['pending', 'running', 'stopping', 'stopped']) - - # Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/ # Last Updated: 2014-06-20 # For easy maintainability, please keep this manually-inputted dictionary sorted by key. @@ -573,8 +567,11 @@ def launch_cluster(conn, opts, cluster_name): placement_group=opts.placement_group, user_data=user_data_content) slave_nodes += slave_res.instances - print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, - zone, slave_res.id) + print "Launched {s} slave{plural_s} in {z}, regid = {r}".format( + s=num_slaves_this_zone, + plural_s=('' if num_slaves_this_zone == 1 else 's'), + z=zone, + r=slave_res.id) i += 1 # Launch or resume masters @@ -621,40 +618,47 @@ def launch_cluster(conn, opts, cluster_name): return (master_nodes, slave_nodes) -# Get the EC2 instances in an existing cluster if available. -# Returns a tuple of lists of EC2 instance objects for the masters and slaves def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): - print "Searching for existing cluster " + cluster_name + " in region " \ - + opts.region + "..." - reservations = conn.get_all_reservations() - master_nodes = [] - slave_nodes = [] - for res in reservations: - active = [i for i in res.instances if is_active(i)] - for inst in active: - group_names = [g.name for g in inst.groups] - if (cluster_name + "-master") in group_names: - master_nodes.append(inst) - elif (cluster_name + "-slaves") in group_names: - slave_nodes.append(inst) - if any((master_nodes, slave_nodes)): - print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes)) - if master_nodes != [] or not die_on_error: - return (master_nodes, slave_nodes) - else: - if master_nodes == [] and slave_nodes != []: - print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name \ - + "-master" + " in region " + opts.region - else: - print >> sys.stderr, "ERROR: Could not find any existing cluster" \ - + " in region " + opts.region + """ + Get the EC2 instances in an existing cluster if available. + Returns a tuple of lists of EC2 instance objects for the masters and slaves. + """ + print "Searching for existing cluster {c} in region {r}...".format( + c=cluster_name, r=opts.region) + + def get_instances(group_names): + """ + Get all non-terminated instances that belong to any of the provided security groups. + + EC2 reservation filters and instance states are documented here: + http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options + """ + reservations = conn.get_all_reservations( + filters={"instance.group-name": group_names}) + instances = itertools.chain.from_iterable(r.instances for r in reservations) + return [i for i in instances if i.state not in ["shutting-down", "terminated"]] + + master_instances = get_instances([cluster_name + "-master"]) + slave_instances = get_instances([cluster_name + "-slaves"]) + + if any((master_instances, slave_instances)): + print "Found {m} master{plural_m}, {s} slave{plural_s}.".format( + m=len(master_instances), + plural_m=('' if len(master_instances) == 1 else 's'), + s=len(slave_instances), + plural_s=('' if len(slave_instances) == 1 else 's')) + + if not master_instances and die_on_error: + print >> sys.stderr, \ + "ERROR: Could not find a master for cluster {c} in region {r}.".format( + c=cluster_name, r=opts.region) sys.exit(1) + return (master_instances, slave_instances) + # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. - - def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): master = master_nodes[0].public_dns_name if deploy_ssh_key: From f16b7b031feeb13ec9c17608bd99566f56431869 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 8 Mar 2015 14:09:40 +0000 Subject: [PATCH 17/22] SPARK-6205 [CORE] UISeleniumSuite fails for Hadoop 2.x test with NoClassDefFoundError Add xml-apis to core test deps to work aroudn UISeleniumSuite classpath issue Author: Sean Owen Closes #4933 from srowen/SPARK-6205 and squashes the following commits: ddd4d32 [Sean Owen] Add xml-apis to core test deps to work aroudn UISeleniumSuite classpath issue --- core/pom.xml | 6 ++++++ pom.xml | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/core/pom.xml b/core/pom.xml index fab776d142ef7..dc0d07d806635 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -319,6 +319,12 @@ selenium-java test
+ + + xml-apis + xml-apis + test + org.mockito mockito-all diff --git a/pom.xml b/pom.xml index f99a83b9994ed..51bef30f9ca8f 100644 --- a/pom.xml +++ b/pom.xml @@ -422,6 +422,13 @@ 2.42.2 test + + + xml-apis + xml-apis + 1.4.01 + test + org.slf4j slf4j-api From 55b1b32dc8b9b25deea8e5864b53fe802bb92741 Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Sun, 8 Mar 2015 19:47:35 +0000 Subject: [PATCH 18/22] [GraphX] Improve LiveJournalPageRank example 1. Removed unnecessary import 2. Modified usage print since user must specify the --numEPart parameter as it is required in Analytics.main Author: Jacky Li Closes #4917 from jackylk/import and squashes the following commits: 6c07682 [Jacky Li] fix comment c0df8f2 [Jacky Li] fix scalastyle b6235e6 [Jacky Li] fix for comment 87be83b [Jacky Li] remove default value description 5caae76 [Jacky Li] remove import and modify usage --- .../spark/examples/graphx/LiveJournalPageRank.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala index e809a65b79975..f6f8d9f90c275 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala @@ -17,11 +17,6 @@ package org.apache.spark.examples.graphx -import org.apache.spark.SparkContext._ -import org.apache.spark._ -import org.apache.spark.graphx._ - - /** * Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from * http://snap.stanford.edu/data/soc-LiveJournal1.html. @@ -31,13 +26,13 @@ object LiveJournalPageRank { if (args.length < 1) { System.err.println( "Usage: LiveJournalPageRank \n" + + " --numEPart=\n" + + " The number of partitions for the graph's edge RDD.\n" + " [--tol=]\n" + " The tolerance allowed at convergence (smaller => more accurate). Default is " + "0.001.\n" + " [--output=]\n" + " If specified, the file to write the ranks to.\n" + - " [--numEPart=]\n" + - " The number of partitions for the graph's edge RDD. Default is 4.\n" + " [--partStrategy=RandomVertexCut | EdgePartition1D | EdgePartition2D | " + "CanonicalRandomVertexCut]\n" + " The way edges are assigned to edge partitions. Default is RandomVertexCut.") From f7c799204358bcc38c5972a29e5994b78b25b515 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Mon, 9 Mar 2015 14:16:07 +0000 Subject: [PATCH 19/22] [EC2] [SPARK-6188] Instance types can be mislabeled when re-starting cluster with default arguments As described in https://issues.apache.org/jira/browse/SPARK-6188 and discovered in https://issues.apache.org/jira/browse/SPARK-5838. When re-starting a cluster, if the user does not provide the instance types, which is the recommended behavior in the docs currently, the instance will be assigned the default type m1.large. This then affects the setup of the machines. This solves this by getting the instance types from the existing instances, and overwriting the default options. EDIT: Further clarification of the issue: In short, while the instances themselves are the same as launched, their setup is done assuming the default instance type, m1.large. This means that the machines are assumed to have 2 disks, and that leads to problems that are described in in issue [5838](https://issues.apache.org/jira/browse/SPARK-5838), where machines that have one disk end up having shuffle spills in the in the small (8GB) snapshot partitions that quickly fills up and results in failing jobs due to "No space left on device" errors. Other instance specific settings that are set in the spark_ec2.py script are likely to be wrong as well. Author: Theodore Vasiloudis Author: Theodore Vasiloudis Closes #4916 from thvasilo/SPARK-6188]-Instance-types-can-be-mislabeled-when-re-starting-cluster-with-default-arguments and squashes the following commits: 6705b98 [Theodore Vasiloudis] Added comment to clarify setting master instance type to the empty string. a3d29fe [Theodore Vasiloudis] More trailing whitespace 7b32429 [Theodore Vasiloudis] Removed trailing whitespace 3ebd52a [Theodore Vasiloudis] Make sure that the instance type is correct when relaunching a cluster. --- ec2/spark_ec2.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 5e636ddd17e94..b50b3816ff890 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -1307,6 +1307,17 @@ def real_main(): cluster_instances=(master_nodes + slave_nodes), cluster_state='ssh-ready' ) + + # Determine types of running instances + existing_master_type = master_nodes[0].instance_type + existing_slave_type = slave_nodes[0].instance_type + # Setting opts.master_instance_type to the empty string indicates we + # have the same instance type for the master and the slaves + if existing_master_type == existing_slave_type: + existing_master_type = "" + opts.master_instance_type = existing_master_type + opts.instance_type = existing_slave_type + setup_cluster(conn, master_nodes, slave_nodes, opts, False) else: From 70f88148bb04161a1a4968230d8e3fc7e3f8321a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 9 Mar 2015 13:29:19 -0700 Subject: [PATCH 20/22] [Docs] Replace references to SchemaRDD with DataFrame Author: Reynold Xin Closes #4952 from rxin/schemardd-df-reference and squashes the following commits: b2b1dbe [Reynold Xin] [Docs] Replace references to SchemaRDD with DataFrame --- python/pyspark/ml/pipeline.py | 4 ++-- python/pyspark/ml/wrapper.py | 2 +- .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 5233c5801e2e6..83880a5afcd1d 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -39,7 +39,7 @@ def fit(self, dataset, params={}): Fits a model to the input dataset with optional parameters. :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` + :py:class:`pyspark.sql.DataFrame` :param params: an optional param map that overwrites embedded params :returns: fitted model @@ -62,7 +62,7 @@ def transform(self, dataset, params={}): Transforms the input dataset with optional parameters. :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` + :py:class:`pyspark.sql.DataFrame` :param params: an optional param map that overwrites embedded params :returns: transformed dataset diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 4bae96f678388..31a66b3d2f730 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -102,7 +102,7 @@ def _fit_java(self, dataset, params={}): """ Fits a Java model to the input dataset. :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` + :py:class:`pyspark.sql.DataFrame` :param params: additional params (overwriting embedded values) :return: fitted Java model """ diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index f966f25c5a14c..ed9b207a86a0b 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -263,14 +263,14 @@ class ReplSuite extends FunSuite { assertDoesNotContain("Exception", output) } - test("SPARK-2576 importing SQLContext.createSchemaRDD.") { + test("SPARK-2576 importing SQLContext.createDataFrame.") { // We need to use local-cluster to test this case. val output = runInterpreter("local-cluster[1,1,512]", """ |val sqlContext = new org.apache.spark.sql.SQLContext(sc) - |import sqlContext.createSchemaRDD + |import sqlContext.implicits._ |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) From 3cac1991a1def0adaf42face2c578d3ab8c27025 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 9 Mar 2015 16:16:16 -0700 Subject: [PATCH 21/22] [SPARK-5310][Doc] Update SQL Programming Guide to include DataFrames. Author: Reynold Xin Closes #4954 from rxin/df-docs and squashes the following commits: c592c70 [Reynold Xin] [SPARK-5310][Doc] Update SQL Programming Guide to include DataFrames. --- docs/_layouts/global.html | 2 +- docs/index.md | 2 +- docs/sql-programming-guide.md | 404 ++++++++++++++++++++++++---------- 3 files changed, 286 insertions(+), 122 deletions(-) diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index efc4c612937df..2e88b3093652d 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -71,7 +71,7 @@
  • Spark Programming Guide
  • Spark Streaming
  • -
  • Spark SQL
  • +
  • DataFrames and SQL
  • MLlib (Machine Learning)
  • GraphX (Graph Processing)
  • Bagel (Pregel on Spark)
  • diff --git a/docs/index.md b/docs/index.md index 0986398e6f744..b5b016e34795e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -74,7 +74,7 @@ options for deployment: in all supported languages (Scala, Java, Python) * Modules built on Spark: * [Spark Streaming](streaming-programming-guide.html): processing real-time data streams - * [Spark SQL](sql-programming-guide.html): support for structured data and relational queries + * [Spark SQL and DataFrames](sql-programming-guide.html): support for structured data and relational queries * [MLlib](mllib-guide.html): built-in machine learning library * [GraphX](graphx-programming-guide.html): Spark's new API for graph processing * [Bagel (Pregel on Spark)](bagel-programming-guide.html): older, simple graph processing model diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 0146a4ed1b745..4fbdca7397951 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1,7 +1,7 @@ --- layout: global -displayTitle: Spark SQL Programming Guide -title: Spark SQL +displayTitle: Spark SQL and DataFrame Guide +title: Spark SQL and DataFrames --- * This will become a table of contents (this text will be scraped). @@ -9,55 +9,24 @@ title: Spark SQL # Overview -
    -
    - -Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using -Spark. At the core of this component is a new type of RDD, -[DataFrame](api/scala/index.html#org.apache.spark.sql.DataFrame). DataFrames are composed of -[Row](api/scala/index.html#org.apache.spark.sql.package@Row:org.apache.spark.sql.catalyst.expressions.Row.type) objects, along with -a schema that describes the data types of each column in the row. A DataFrame is similar to a table -in a traditional relational database. A DataFrame can be created from an existing RDD, a [Parquet](http://parquet.io) -file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). - -All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`. - -
    +Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed query engine. -
    -Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using -Spark. At the core of this component is a new type of RDD, -[DataFrame](api/scala/index.html#org.apache.spark.sql.DataFrame). DataFrames are composed of -[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects, along with -a schema that describes the data types of each column in the row. A DataFrame is similar to a table -in a traditional relational database. A DataFrame can be created from an existing RDD, a [Parquet](http://parquet.io) -file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). -
    -
    +# DataFrames -Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using -Spark. At the core of this component is a new type of RDD, -[DataFrame](api/python/pyspark.sql.html#pyspark.sql.DataFrame). DataFrames are composed of -[Row](api/python/pyspark.sql.Row-class.html) objects, along with -a schema that describes the data types of each column in the row. A DataFrame is similar to a table -in a traditional relational database. A DataFrame can be created from an existing RDD, a [Parquet](http://parquet.io) -file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. -All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell. -
    -
    +The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame), [Java](api/java/index.html?org/apache/spark/sql/DataFrame.html), and [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame). -**Spark SQL is currently an alpha component. While we will minimize API changes, some APIs may change in future releases.** +All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell` or the `pyspark` shell. -*************************************************************************************************** -# Getting Started +## Starting Point: SQLContext
    -The entry point into all relational functionality in Spark is the +The entry point into all functionality in Spark SQL is the [SQLContext](api/scala/index.html#org.apache.spark.sql.SQLContext) class, or one of its descendants. To create a basic SQLContext, all you need is a SparkContext. @@ -69,39 +38,19 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ {% endhighlight %} -In addition to the basic SQLContext, you can also create a HiveContext, which provides a -superset of the functionality provided by the basic SQLContext. Additional features include -the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the -ability to read data from Hive tables. To use a HiveContext, you do not need to have an -existing Hive setup, and all of the data sources available to a SQLContext are still available. -HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default -Spark build. If these dependencies are not a problem for your application then using HiveContext -is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to -feature parity with a HiveContext. -
    -The entry point into all relational functionality in Spark is the -[SQLContext](api/scala/index.html#org.apache.spark.sql.api.SQLContext) class, or one -of its descendants. To create a basic SQLContext, all you need is a JavaSparkContext. +The entry point into all functionality in Spark SQL is the +[SQLContext](api/java/index.html#org.apache.spark.sql.SQLContext) class, or one of its +descendants. To create a basic SQLContext, all you need is a SparkContext. {% highlight java %} JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); {% endhighlight %} -In addition to the basic SQLContext, you can also create a HiveContext, which provides a strict -super set of the functionality provided by the basic SQLContext. Additional features include -the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the -ability to read data from Hive tables. To use a HiveContext, you do not need to have an -existing Hive setup, and all of the data sources available to a SQLContext are still available. -HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default -Spark build. If these dependencies are not a problem for your application then using HiveContext -is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to -feature parity with a HiveContext. -
    @@ -115,35 +64,266 @@ from pyspark.sql import SQLContext sqlContext = SQLContext(sc) {% endhighlight %} -In addition to the basic SQLContext, you can also create a HiveContext, which provides a strict -super set of the functionality provided by the basic SQLContext. Additional features include -the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the +
    +
    + +In addition to the basic SQLContext, you can also create a HiveContext, which provides a +superset of the functionality provided by the basic SQLContext. Additional features include +the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables. To use a HiveContext, you do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available. HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default Spark build. If these dependencies are not a problem for your application then using HiveContext -is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to -feature parity with a HiveContext. - - - - +is recommended for the 1.3 release of Spark. Future releases will focus on bringing SQLContext up +to feature parity with a HiveContext. The specific variant of SQL that is used to parse queries can also be selected using the `spark.sql.dialect` option. This parameter can be changed using either the `setConf` method on a SQLContext or by using a `SET key=value` command in SQL. For a SQLContext, the only dialect available is "sql" which uses a simple SQL parser provided by Spark SQL. In a HiveContext, the default is "hiveql", though "sql" is also available. Since the HiveQL parser is much more complete, - this is recommended for most use cases. +this is recommended for most use cases. -# Data Sources -Spark SQL supports operating on a variety of data sources through the `DataFrame` interface. -A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. -Registering a DataFrame as a table allows you to run SQL queries over its data. This section -describes the various methods for loading data into a DataFrame. +## Creating DataFrames + +With a `SQLContext`, applications can create `DataFrame`s from an existing `RDD`, from a Hive table, or from data sources. + +As an example, the following creates a `DataFrame` based on the content of a JSON file: + +
    +
    +{% highlight scala %} +val sc: SparkContext // An existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +val df = sqlContext.jsonFile("examples/src/main/resources/people.json") + +// Displays the content of the DataFrame to stdout +df.show() +{% endhighlight %} + +
    + +
    +{% highlight java %} +JavaSparkContext sc = ...; // An existing JavaSparkContext. +SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); + +DataFrame df = sqlContext.jsonFile("examples/src/main/resources/people.json"); + +// Displays the content of the DataFrame to stdout +df.show(); +{% endhighlight %} + +
    + +
    +{% highlight python %} +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + +df = sqlContext.jsonFile("examples/src/main/resources/people.json") + +# Displays the content of the DataFrame to stdout +df.show() +{% endhighlight %} + +
    +
    + + +## DataFrame Operations + +DataFrames provide a domain-specific language for structured data manipulation in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame), [Java](api/java/index.html?org/apache/spark/sql/DataFrame.html), and [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame). + +Here we include some basic examples of structured data processing using DataFrames: + + +
    +
    +{% highlight scala %} +val sc: SparkContext // An existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +// Create the DataFrame +val df = sqlContext.jsonFile("examples/src/main/resources/people.json") + +// Show the content of the DataFrame +df.show() +// age name +// null Michael +// 30 Andy +// 19 Justin + +// Print the schema in a tree format +df.printSchema() +// root +// |-- age: long (nullable = true) +// |-- name: string (nullable = true) + +// Select only the "name" column +df.select("name").show() +// name +// Michael +// Andy +// Justin + +// Select everybody, but increment the age by 1 +df.select("name", df("age") + 1).show() +// name (age + 1) +// Michael null +// Andy 31 +// Justin 20 + +// Select people older than 21 +df.filter(df("name") > 21).show() +// age name +// 30 Andy + +// Count people by age +df.groupBy("age").count().show() +// age count +// null 1 +// 19 1 +// 30 1 +{% endhighlight %} + +
    + +
    +{% highlight java %} +val sc: JavaSparkContext // An existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +// Create the DataFrame +DataFrame df = sqlContext.jsonFile("examples/src/main/resources/people.json"); + +// Show the content of the DataFrame +df.show(); +// age name +// null Michael +// 30 Andy +// 19 Justin + +// Print the schema in a tree format +df.printSchema(); +// root +// |-- age: long (nullable = true) +// |-- name: string (nullable = true) + +// Select only the "name" column +df.select("name").show(); +// name +// Michael +// Andy +// Justin + +// Select everybody, but increment the age by 1 +df.select("name", df.col("age").plus(1)).show(); +// name (age + 1) +// Michael null +// Andy 31 +// Justin 20 + +// Select people older than 21 +df.filter(df("name") > 21).show(); +// age name +// 30 Andy + +// Count people by age +df.groupBy("age").count().show(); +// age count +// null 1 +// 19 1 +// 30 1 +{% endhighlight %} + +
    + +
    +{% highlight python %} +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + +# Create the DataFrame +df = sqlContext.jsonFile("examples/src/main/resources/people.json") + +# Show the content of the DataFrame +df.show() +## age name +## null Michael +## 30 Andy +## 19 Justin + +# Print the schema in a tree format +df.printSchema() +## root +## |-- age: long (nullable = true) +## |-- name: string (nullable = true) + +# Select only the "name" column +df.select("name").show() +## name +## Michael +## Andy +## Justin + +# Select everybody, but increment the age by 1 +df.select("name", df.age + 1).show() +## name (age + 1) +## Michael null +## Andy 31 +## Justin 20 + +# Select people older than 21 +df.filter(df.name > 21).show() +## age name +## 30 Andy + +# Count people by age +df.groupBy("age").count().show() +## age count +## null 1 +## 19 1 +## 30 1 + +{% endhighlight %} + +
    +
    + + +## Running SQL Queries Programmatically + +The `sql` function on a `SQLContext` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. + +
    +
    +{% highlight scala %} +val sqlContext = ... // An existing SQLContext +val df = sqlContext.sql("SELECT * FROM table") +{% endhighlight %} +
    + +
    +{% highlight java %} +val sqlContext = ... // An existing SQLContext +val df = sqlContext.sql("SELECT * FROM table") +{% endhighlight %} +
    + +
    +{% highlight python %} +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) +df = sqlContext.sql("SELECT * FROM table") +{% endhighlight %} +
    +
    + -## RDDs +## Interoperating with RDDs Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This @@ -373,12 +553,12 @@ by `SQLContext`. For example: {% highlight java %} // Import factory methods provided by DataType. -import org.apache.spark.sql.api.java.DataType +import org.apache.spark.sql.types.DataType; // Import StructType and StructField -import org.apache.spark.sql.api.java.StructType -import org.apache.spark.sql.api.java.StructField +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.StructField; // Import Row. -import org.apache.spark.sql.api.java.Row +import org.apache.spark.sql.Row; // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); @@ -472,11 +652,19 @@ for name in names.collect(): print name {% endhighlight %} - + +# Data Sources + +Spark SQL supports operating on a variety of data sources through the `DataFrame` interface. +A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. +Registering a DataFrame as a table allows you to run SQL queries over its data. This section +describes the various methods for loading data into a DataFrame. + + ## Parquet Files [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. @@ -904,15 +1092,14 @@ that these options will be deprecated in future release as more optimizations ar -# Other SQL Interfaces +# Distributed Query Engine -Spark SQL also supports interfaces for running SQL queries directly without the need to write any -code. +Spark SQL can also act as a distributed query engine using its JDBC/ODBC or command-line interface. In this mode, end-users or applications can interact with Spark SQL directly to run SQL queries, without the need to write any code. ## Running the Thrift JDBC/ODBC server The Thrift JDBC/ODBC server implemented here corresponds to the [`HiveServer2`](https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) -in Hive 0.12. You can test the JDBC server with the beeline script that comes with either Spark or Hive 0.12. +in Hive 0.13. You can test the JDBC server with the beeline script that comes with either Spark or Hive 0.13. To start the JDBC/ODBC server, run the following in the Spark directory: @@ -982,7 +1169,7 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. You may run `./bin/spark-sql --help` for a complete list of all available options. -# Compatibility with Other Systems +# Migration Guide ## Migration Guide for Shark User @@ -1139,33 +1326,10 @@ releases of Spark SQL. Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that. -# Writing Language-Integrated Relational Queries - -**Language-Integrated queries are experimental and currently only supported in Scala.** - -Spark SQL also supports a domain specific language for writing queries. Once again, -using the data from the above examples: - -{% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import sqlContext._ -val people: RDD[Person] = ... // An RDD of case class objects, from the first example. - -// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' -val teenagers = people.where('age >= 10).where('age <= 19).select('name) -teenagers.map(t => "Name: " + t(0)).collect().foreach(println) -{% endhighlight %} - -The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers -prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are -evaluated by the SQL execution engine. A full list of the functions supported can be found in the -[ScalaDoc](api/scala/index.html#org.apache.spark.sql.DataFrame). - +# Data Types -# Spark SQL DataType Reference +Spark SQL and DataFrames support the following data types: * Numeric types - `ByteType`: Represents 1-byte signed integer numbers. @@ -1208,10 +1372,10 @@ evaluated by the SQL execution engine. A full list of the functions supported c
    -All data types of Spark SQL are located in the package `org.apache.spark.sql`. +All data types of Spark SQL are located in the package `org.apache.spark.sql.types`. You can access them by doing {% highlight scala %} -import org.apache.spark.sql._ +import org.apache.spark.sql.types._ {% endhighlight %} @@ -1263,7 +1427,7 @@ import org.apache.spark.sql._ - + @@ -1457,7 +1621,7 @@ please use factory methods provided in - +
    DecimalType scala.math.BigDecimal java.math.BigDecimal DecimalType
    StructType org.apache.spark.sql.api.java.Row org.apache.spark.sql.Row DataTypes.createStructType(fields)
    Note: fields is a List or an array of StructFields. @@ -1478,10 +1642,10 @@ please use factory methods provided in
    -All data types of Spark SQL are located in the package of `pyspark.sql`. +All data types of Spark SQL are located in the package of `pyspark.sql.types`. You can access them by doing {% highlight python %} -from pyspark.sql import * +from pyspark.sql.types import * {% endhighlight %} From 8767565cef01d847f57b7293d8b63b2422009b90 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 9 Mar 2015 16:24:06 -0700 Subject: [PATCH 22/22] [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect() Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM. This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python. cc JoshRosen Author: Davies Liu Closes #4923 from davies/fix_collect and squashes the following commits: d730286 [Davies Liu] address comments 24c92a4 [Davies Liu] fix style ba54614 [Davies Liu] use socket to transfer data from JVM 9517c8f [Davies Liu] fix memory leak in collect() --- .../apache/spark/api/python/PythonRDD.scala | 76 ++++++++++++++----- python/pyspark/context.py | 13 ++-- python/pyspark/rdd.py | 30 ++++---- python/pyspark/sql/dataframe.py | 14 +--- 4 files changed, 82 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b1cec0f6472b0..8d4a53b4ca9b0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,26 +19,27 @@ package org.apache.spark.api.python import java.io._ import java.net._ -import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, UUID, Collections} - -import org.apache.spark.input.PortableDataStream +import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConversions._ import scala.collection.mutable import scala.language.existentials import com.google.common.base.Charsets.UTF_8 - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} +import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat} + import org.apache.spark._ -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[spark] class PythonRDD( @transient parent: RDD[_], command: Array[Byte], @@ -341,21 +342,33 @@ private[spark] object PythonRDD extends Logging { /** * Adapter for calling SparkContext#runJob from Python. * - * This method will return an iterator of an array that contains all elements in the RDD + * This method will serve an iterator of an array that contains all elements in the RDD * (effectively a collect()), but allows you to run on a certain subset of partitions, * or to enable local execution. + * + * @return the port number of a local socket which serves the data collected from this job. */ def runJob( sc: SparkContext, rdd: JavaRDD[Array[Byte]], partitions: JArrayList[Int], - allowLocal: Boolean): Iterator[Array[Byte]] = { + allowLocal: Boolean): Int = { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal) val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) - flattenedPartition.iterator + serveIterator(flattenedPartition.iterator, + s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}") + } + + /** + * A helper function to collect an RDD as an iterator, then serve it via socket. + * + * @return the port number of a local socket which serves the data collected from this job. + */ + def collectAndServe[T](rdd: RDD[T]): Int = { + serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}") } def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): @@ -575,15 +588,44 @@ private[spark] object PythonRDD extends Logging { dataOut.write(bytes) } - def writeToFile[T](items: java.util.Iterator[T], filename: String) { - import scala.collection.JavaConverters._ - writeToFile(items.asScala, filename) - } + /** + * Create a socket server and a background thread to serve the data in `items`, + * + * The socket server can only accept one connection, or close if no connection + * in 3 seconds. + * + * Once a connection comes in, it tries to serialize all the data in `items` + * and send them into this connection. + * + * The thread will terminate after all the data are sent or any exceptions happen. + */ + private def serveIterator[T](items: Iterator[T], threadName: String): Int = { + val serverSocket = new ServerSocket(0, 1) + serverSocket.setReuseAddress(true) + // Close the socket if no connection in 3 seconds + serverSocket.setSoTimeout(3000) + + new Thread(threadName) { + setDaemon(true) + override def run() { + try { + val sock = serverSocket.accept() + val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream)) + try { + writeIteratorToStream(items, out) + } finally { + out.close() + } + } catch { + case NonFatal(e) => + logError(s"Error while sending iterator", e) + } finally { + serverSocket.close() + } + } + }.start() - def writeToFile[T](items: Iterator[T], filename: String) { - val file = new DataOutputStream(new FileOutputStream(filename)) - writeIteratorToStream(items, file) - file.close() + serverSocket.getLocalPort } private def getMergedConf(confAsMap: java.util.HashMap[String, String], diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6011caf9f1c5a..78dccc40470e3 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -21,6 +21,8 @@ from threading import Lock from tempfile import NamedTemporaryFile +from py4j.java_collections import ListConverter + from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast @@ -30,13 +32,11 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, AutoBatchedSerializer, NoOpSerializer from pyspark.storagelevel import StorageLevel -from pyspark.rdd import RDD +from pyspark.rdd import RDD, _load_from_socket from pyspark.traceback_utils import CallSite, first_spark_call from pyspark.status import StatusTracker from pyspark.profiler import ProfilerCollector, BasicProfiler -from py4j.java_collections import ListConverter - __all__ = ['SparkContext'] @@ -59,7 +59,6 @@ class SparkContext(object): _gateway = None _jvm = None - _writeToFile = None _next_accum_id = 0 _active_spark_context = None _lock = Lock() @@ -221,7 +220,6 @@ def _ensure_initialized(cls, instance=None, gateway=None): if not SparkContext._gateway: SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm - SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: if (SparkContext._active_spark_context and @@ -840,8 +838,9 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): # by runJob() in order to avoid having to pass a Python lambda into # SparkContext#runJob. mappedRDD = rdd.mapPartitions(partitionFunc) - it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) - return list(mappedRDD._collect_iterator_through_file(it)) + port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, + allowLocal) + return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) def show_profiles(self): """ Print the profile stats to stdout """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index cb12fed98c53d..bf17f513c0bc3 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -19,7 +19,6 @@ from collections import defaultdict from itertools import chain, ifilter, imap import operator -import os import sys import shlex from subprocess import Popen, PIPE @@ -29,6 +28,7 @@ import heapq import bisect import random +import socket from math import sqrt, log, isinf, isnan, pow, ceil from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ @@ -111,6 +111,17 @@ def _parse_memory(s): return int(float(s[:-1]) * units[s[-1].lower()]) +def _load_from_socket(port, serializer): + sock = socket.socket() + try: + sock.connect(("localhost", port)) + rf = sock.makefile("rb", 65536) + for item in serializer.load_stream(rf): + yield item + finally: + sock.close() + + class Partitioner(object): def __init__(self, numPartitions, partitionFunc): self.numPartitions = numPartitions @@ -698,21 +709,8 @@ def collect(self): Return a list that contains all of the elements in this RDD. """ with SCCallSiteSync(self.context) as css: - bytesInJava = self._jrdd.collect().iterator() - return list(self._collect_iterator_through_file(bytesInJava)) - - def _collect_iterator_through_file(self, iterator): - # Transferring lots of data through Py4J can be slow because - # socket.readline() is inefficient. Instead, we'll dump the data to a - # file and read it back. - tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) - tempFile.close() - self.ctx._writeToFile(iterator, tempFile.name) - # Read the data into Python and deserialize it: - with open(tempFile.name, 'rb') as tempFile: - for item in self._jrdd_deserializer.load_stream(tempFile): - yield item - os.unlink(tempFile.name) + port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) + return list(_load_from_socket(port, self._jrdd_deserializer)) def reduce(self, f): """ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5c3b7377c33b5..e8ce4547455a5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -19,13 +19,11 @@ import itertools import warnings import random -import os -from tempfile import NamedTemporaryFile from py4j.java_collections import ListConverter, MapConverter from pyspark.context import SparkContext -from pyspark.rdd import RDD +from pyspark.rdd import RDD, _load_from_socket from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync @@ -310,14 +308,8 @@ def collect(self): [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] """ with SCCallSiteSync(self._sc) as css: - bytesInJava = self._jdf.javaToPython().collect().iterator() - tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) - tempFile.close() - self._sc._writeToFile(bytesInJava, tempFile.name) - # Read the data into Python and deserialize it: - with open(tempFile.name, 'rb') as tempFile: - rs = list(BatchedSerializer(PickleSerializer()).load_stream(tempFile)) - os.unlink(tempFile.name) + port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) + rs = list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) cls = _create_cls(self.schema) return [cls(r) for r in rs]