From b77c19be053125fde99b098ec1e1162f25b5433c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 4 Jun 2014 22:56:49 -0700 Subject: [PATCH 01/11] Fix issue in ReplSuite with hadoop-provided profile. When building the assembly with the maven "hadoop-provided" profile, the executors were failing to come up because Hadoop classes were not found in the classpath anymore; so add them explicitly to the classpath using spark.executor.extraClassPath. This is only needed for the local-cluster mode, but doesn't affect other tests, so it's added for all of them to keep the code simpler. Author: Marcelo Vanzin Closes #781 from vanzin/repl-test-fix and squashes the following commits: 4f0a3b0 [Marcelo Vanzin] Fix issue in ReplSuite with hadoop-provided profile. --- .../scala/org/apache/spark/repl/ReplSuite.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 98cdfd0054713..7c765edd55027 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -32,6 +32,8 @@ import org.apache.spark.util.Utils class ReplSuite extends FunSuite { def runInterpreter(master: String, input: String): String = { + val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath" + val in = new BufferedReader(new StringReader(input + "\n")) val out = new StringWriter() val cl = getClass.getClassLoader @@ -44,13 +46,23 @@ class ReplSuite extends FunSuite { } } } + val classpath = paths.mkString(File.pathSeparator) + + val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) + System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) + val interp = new SparkILoop(in, new PrintWriter(out), master) org.apache.spark.repl.Main.interp = interp - interp.process(Array("-classpath", paths.mkString(File.pathSeparator))) + interp.process(Array("-classpath", classpath)) org.apache.spark.repl.Main.interp = null if (interp.sparkContext != null) { interp.sparkContext.stop() } + if (oldExecutorClasspath != null) { + System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath) + } else { + System.clearProperty(CONF_EXECUTOR_CLASSPATH) + } return out.toString } From 7c160293d6d708718d566e700cfb407a31280b89 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Jun 2014 11:27:33 -0700 Subject: [PATCH 02/11] [SPARK-2029] Bump pom.xml version number of master branch to 1.1.0-SNAPSHOT. Author: Takuya UESHIN Closes #974 from ueshin/issues/SPARK-2029 and squashes the following commits: e19e8f4 [Takuya UESHIN] Bump version number to 1.1.0-SNAPSHOT. --- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/alpha/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml | 2 +- 23 files changed, 23 insertions(+), 23 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 963357b9ab167..0c60b66c3daca 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 355f437c5b16a..c8e39a415af28 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 0777c5b1f03d4..0c746175afa73 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 874bcd7916f35..4f6d7fdb87d47 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 6aec215687fe0..c1f581967777b 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 979eb0ca624bd..d014a7aad0fca 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 7b2dc5ba1d7f9..4980208cba3b0 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 5766d3a0d44ec..7073bd4404d9c 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 4ed4196bd8662..cf306e0dca8bd 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index 602f66f9c5cf1..955ec1a8c3033 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index 11ac827ed54a0..22ea330b4374d 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index dc108d2fe7fbd..7d5d83e7f3bb9 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index cdd33dbb7970d..4aae2026dcaf2 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index fcd6f66b4414a..87c8e29ad1069 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index bcdb24b040cc8..4a66408ef3d2d 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 8d2e4baf69e30..6c78c34486010 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index fb3b190b4ec5a..e65ca6be485e3 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 9254b70e64a08..5ede76e5c3904 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 6435224a14674..f506d6ce34a6f 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 1875c497bc61c..79cd8551d0722 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml index e076ca1d44b97..b8a631dd0bb3b 100644 --- a/yarn/alpha/pom.xml +++ b/yarn/alpha/pom.xml @@ -20,7 +20,7 @@ org.apache.spark yarn-parent_2.10 - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 2811ffffbdfa2..ef7066ef1fdfc 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml index 0780f251b595c..0931beb505508 100644 --- a/yarn/stable/pom.xml +++ b/yarn/stable/pom.xml @@ -20,7 +20,7 @@ org.apache.spark yarn-parent_2.10 - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml From 89cdbb087cb2f0d03be2dd77440300c6bd61c792 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 5 Jun 2014 11:39:35 -0700 Subject: [PATCH 03/11] SPARK-1677: allow user to disable output dir existence checking https://issues.apache.org/jira/browse/SPARK-1677 For compatibility with older versions of Spark it would be nice to have an option `spark.hadoop.validateOutputSpecs` (default true) for the user to disable the output directory existence checking Author: CodingCat Closes #947 from CodingCat/SPARK-1677 and squashes the following commits: 7930f83 [CodingCat] miao c0c0e03 [CodingCat] bug fix and doc update 5318562 [CodingCat] bug fix 13219b5 [CodingCat] allow user to disable output dir existence checking --- .../apache/spark/rdd/PairRDDFunctions.scala | 6 +++-- .../scala/org/apache/spark/FileSuite.scala | 22 +++++++++++++++++++ docs/configuration.md | 8 +++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f2ce3cbd47f93..8909980957058 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance - if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) && + jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } @@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) && + outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(conf) conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 1f2206b1f0379..070e974657860 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext { } } + test ("allow user to disable the output directory existence checking (old Hadoop API") { + val sf = new SparkConf() + sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") + sc = new SparkContext(sf) + val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + randomRDD.saveAsTextFile(tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) + randomRDD.saveAsTextFile(tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) + } + test ("prevent user from overwriting the empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) @@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext { } } + test ("allow user to disable the output directory existence checking (new Hadoop API") { + val sf = new SparkConf() + sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") + sc = new SparkContext(sf) + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) + } + test ("save Hadoop Dataset through old Hadoop API") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) diff --git a/docs/configuration.md b/docs/configuration.md index 0697f7fc2fd91..71fafa573467f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful this duration will be cleared as well. + + spark.hadoop.validateOutputSpecs + true + If set to true, validates the output specification (e.g. checking if the output directory already exists) + used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing + output directories. We recommend that users do not disable this except if trying to achieve compatibility with + previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. + #### Networking From e4c11eef2f64df0b6a432f40b669486d91ca6352 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Jun 2014 12:00:31 -0700 Subject: [PATCH 04/11] [SPARK-2036] [SQL] CaseConversionExpression should check if the evaluated value is null. `CaseConversionExpression` should check if the evaluated value is `null`. Author: Takuya UESHIN Closes #982 from ueshin/issues/SPARK-2036 and squashes the following commits: 61e1c54 [Takuya UESHIN] Add check if the evaluated value is null. --- .../catalyst/expressions/stringOperations.scala | 8 ++++++-- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++++ .../test/scala/org/apache/spark/sql/TestData.scala | 8 ++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index dcded0774180e..420303408451f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala @@ -81,8 +81,12 @@ trait CaseConversionExpression { def dataType: DataType = StringType override def eval(input: Row): Any = { - val converted = child.eval(input) - convert(converted.toString) + val evaluated = child.eval(input) + if (evaluated == null) { + null + } else { + convert(evaluated.toString) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 95860e6683f67..e2ad3915d3134 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -322,6 +322,13 @@ class SQLQuerySuite extends QueryTest { (2, "B"), (3, "C"), (4, "D"))) + + checkAnswer( + sql("SELECT n, UPPER(s) FROM nullStrings"), + Seq( + (1, "ABC"), + (2, "ABC"), + (3, null))) } test("system function lower()") { @@ -334,6 +341,13 @@ class SQLQuerySuite extends QueryTest { (4, "d"), (5, "e"), (6, "f"))) + + checkAnswer( + sql("SELECT n, LOWER(s) FROM nullStrings"), + Seq( + (1, "abc"), + (2, "abc"), + (3, null))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 944f520e43515..876bd1636aab3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -106,4 +106,12 @@ object TestData { NullInts(null) :: Nil ) nullInts.registerAsTable("nullInts") + + case class NullStrings(n: Int, s: String) + val nullStrings = + TestSQLContext.sparkContext.parallelize( + NullStrings(1, "abc") :: + NullStrings(2, "ABC") :: + NullStrings(3, null) :: Nil) + nullStrings.registerAsTable("nullStrings") } From f6143f127db59e7f5a00fd70605f85248869347d Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 5 Jun 2014 13:06:46 -0700 Subject: [PATCH 05/11] HOTFIX: Remove generated-mima-excludes file after runing MIMA. This has been causing some false failures on PR's that don't merge correctly. Author: Patrick Wendell Closes #971 from pwendell/mima and squashes the following commits: 1dc80aa [Patrick Wendell] HOTFIX: Remove generated-mima-excludes file after runing MIMA. --- dev/mima | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/mima b/dev/mima index d4099990254cc..ab6bd4469b0e8 100755 --- a/dev/mima +++ b/dev/mima @@ -31,4 +31,5 @@ if [ $ret_val != 0 ]; then echo "NOTE: Exceptions to binary compatibility can be added in project/MimaExcludes.scala" fi +rm -f .generated-mima-excludes exit $ret_val From 5473aa7c02916022430493637b1492554b48c30b Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Thu, 5 Jun 2014 13:07:26 -0700 Subject: [PATCH 06/11] sbt 0.13.X should be using sbt-assembly 0.11.X https://github.com/sbt/sbt-assembly/blob/master/README.md Author: Kalpit Shah Closes #555 from kalpit/upgrade/sbtassembly and squashes the following commits: 1fa7324 [Kalpit Shah] sbt 0.13.X should be using sbt-assembly 0.11.X --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 0cd16fd5bedd4..472819b9fb8ba 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,7 +4,7 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline. resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") From 668cb1defe735add91f4a5b7b8ebe7cfd5640b25 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 5 Jun 2014 13:13:33 -0700 Subject: [PATCH 07/11] Remove compile-scoped junit dependency. This avoids having junit classes showing up in the assembly jar. I verified that only test classes in the jtransforms package use junit. Author: Marcelo Vanzin Closes #794 from vanzin/junit-dep-exclusion and squashes the following commits: 274e1c2 [Marcelo Vanzin] Remove junit from assembly in sbt build also. ad950be [Marcelo Vanzin] Remove compile-scoped junit dependency. --- mllib/pom.xml | 8 ++++++++ project/SparkBuild.scala | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/mllib/pom.xml b/mllib/pom.xml index 4aae2026dcaf2..878cb83dbf783 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -50,6 +50,14 @@ org.scalanlp breeze_${scala.binary.version} 0.7 + + + + junit + junit + + org.scalatest diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index efb0b9319be13..d0049a8ac43aa 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -322,6 +322,7 @@ object SparkBuild extends Build { val excludeJruby = ExclusionRule(organization = "org.jruby") val excludeThrift = ExclusionRule(organization = "org.apache.thrift") val excludeServletApi = ExclusionRule(organization = "javax.servlet", artifact = "servlet-api") + val excludeJUnit = ExclusionRule(organization = "junit") def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark", version: String = "1.0.0", crossVersion: String = "2.10"): Option[sbt.ModuleID] = { @@ -466,7 +467,7 @@ object SparkBuild extends Build { previousArtifact := sparkPreviousArtifact("spark-mllib"), libraryDependencies ++= Seq( "org.jblas" % "jblas" % jblasVersion, - "org.scalanlp" %% "breeze" % "0.7" + "org.scalanlp" %% "breeze" % "0.7" excludeAll(excludeJUnit) ) ) From c7a183b2c2bca13565496495b4ae3a3a9f63f9ab Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 5 Jun 2014 17:42:08 -0700 Subject: [PATCH 08/11] [SPARK-2041][SQL] Correctly analyze queries where columnName == tableName. Author: Michael Armbrust Closes #985 from marmbrus/tableName and squashes the following commits: 3caaa27 [Michael Armbrust] Correctly analyze queries where columnName == tableName. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 3 ++- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ++++++ sql/core/src/test/scala/org/apache/spark/sql/TestData.scala | 3 +++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 5eb52d5350f55..2b8fbdcde9d37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -64,7 +64,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { // struct fields. val options = children.flatMap(_.output).flatMap { option => // If the first part of the desired name matches a qualifier for this possible match, drop it. - val remainingParts = if (option.qualifiers contains parts.head) parts.drop(1) else parts + val remainingParts = + if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e2ad3915d3134..aa0c426f6fcb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -28,6 +28,12 @@ class SQLQuerySuite extends QueryTest { // Make sure the tables are loaded. TestData + test("SPARK-2041 column name equals tablename") { + checkAnswer( + sql("SELECT tableName FROM tableName"), + "test") + } + test("index into array") { checkAnswer( sql("SELECT data, data[0], data[0] + data[1], data[0 + 1] FROM arrayData"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 876bd1636aab3..05de736bbce1b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -114,4 +114,7 @@ object TestData { NullStrings(2, "ABC") :: NullStrings(3, null) :: Nil) nullStrings.registerAsTable("nullStrings") + + case class TableName(tableName: String) + TestSQLContext.sparkContext.parallelize(TableName("test") :: Nil).registerAsTable("tableName") } From 3d3f8c8004da110ca97973119e9d9f04f878ee81 Mon Sep 17 00:00:00 2001 From: CrazyJvm Date: Thu, 5 Jun 2014 17:44:46 -0700 Subject: [PATCH 09/11] Use pluggable clock in DAGSheduler #SPARK-2031 DAGScheduler supports pluggable clock like what TaskSetManager does. Author: CrazyJvm Closes #976 from CrazyJvm/clock and squashes the following commits: 6779a4c [CrazyJvm] Use pluggable clock in DAGSheduler --- .../org/apache/spark/scheduler/DAGScheduler.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ccff6a3d1aebc..e09a4221e8315 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{SystemClock, Clock, Utils} /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -61,7 +61,8 @@ class DAGScheduler( listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, - env: SparkEnv) + env: SparkEnv, + clock: Clock = SystemClock) extends Logging { import DAGScheduler._ @@ -781,7 +782,7 @@ class DAGScheduler( logDebug("New pending tasks: " + myPending) taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) - stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) + stageToInfos(stage).submissionTime = Some(clock.getTime()) } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -807,11 +808,11 @@ class DAGScheduler( def markStageAsFinished(stage: Stage) = { val serviceTime = stageToInfos(stage).submissionTime match { - case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) + case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0) case _ => "Unknown" } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) - stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) + stageToInfos(stage).completionTime = Some(clock.getTime()) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) runningStages -= stage } @@ -1015,7 +1016,7 @@ class DAGScheduler( return } val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq - stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis()) + stageToInfos(failedStage).completionTime = Some(clock.getTime()) for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", From 9bad0b73722fb359f14db864e69aa7efde3588c5 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 5 Jun 2014 17:45:38 -0700 Subject: [PATCH 10/11] [SPARK-2025] Unpersist edges of previous graph in Pregel Due to a bug introduced by apache/spark#497, Pregel does not unpersist replicated vertices from previous iterations. As a result, they stay cached until memory is full, wasting GC time. This PR corrects the problem by unpersisting both the edges and the replicated vertices of previous iterations. This is safe because the edges and replicated vertices of the current iteration are cached by the call to `g.cache()` and then materialized by the call to `messages.count()`. Therefore no unmaterialized RDDs depend on `prevG.edges`. I verified that no recomputation occurs by running PageRank with a custom patch to Spark that warns when a partition is recomputed. Thanks to Tim Weninger for reporting this bug. Author: Ankur Dave Closes #972 from ankurdave/SPARK-2025 and squashes the following commits: 13d5b07 [Ankur Dave] Unpersist edges of previous graph in Pregel --- graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 4572eab2875bb..5e55620147df8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -150,6 +150,7 @@ object Pregel extends Logging { oldMessages.unpersist(blocking=false) newVerts.unpersist(blocking=false) prevG.unpersistVertices(blocking=false) + prevG.edges.unpersist(blocking=false) // count the iteration i += 1 } From b45c13e7d798f97b92f1a6329528191b8d779c4f Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 5 Jun 2014 23:01:48 -0700 Subject: [PATCH 11/11] SPARK-2043: ExternalAppendOnlyMap doesn't always find matching keys The current implementation reads one key with the next hash code as it finishes reading the keys with the current hash code, which may cause it to miss some matches of the next key. This can cause operations like join to give the wrong result when reduce tasks spill to disk and there are hash collisions, as values won't be matched together. This PR fixes it by not reading in that next key, using a peeking iterator instead. Author: Matei Zaharia Closes #986 from mateiz/spark-2043 and squashes the following commits: 0959514 [Matei Zaharia] Added unit test for having many hash collisions 892debb [Matei Zaharia] SPARK-2043: don't read a key with the next hash code in ExternalAppendOnlyMap, instead use a buffered iterator to only read values with the current hash code. --- .../collection/ExternalAppendOnlyMap.scala | 10 +++-- .../ExternalAppendOnlyMapSuite.scala | 39 ++++++++++++++++++- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 170f09be21534..288badd3160f8 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -20,6 +20,7 @@ package org.apache.spark.util.collection import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException} import java.util.Comparator +import scala.collection.BufferedIterator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C]( // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order private val sortedMap = currentMap.destructiveSortedIterator(comparator) - private val inputStreams = Seq(sortedMap) ++ spilledMaps + private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered) inputStreams.foreach { it => val kcPairs = getMorePairs(it) @@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C]( * In the event of key hash collisions, this ensures no pairs are hidden from being merged. * Assume the given iterator is in sorted order. */ - private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = { + private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = { val kcPairs = new ArrayBuffer[(K, C)] if (it.hasNext) { var kc = it.next() kcPairs += kc val minHash = kc._1.hashCode() - while (it.hasNext && kc._1.hashCode() == minHash) { + while (it.hasNext && it.head._1.hashCode() == minHash) { kc = it.next() kcPairs += kc } @@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C]( * * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. */ - private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)]) + private class StreamBuffer( + val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)]) extends Comparable[StreamBuffer] { def isEmpty = pairs.length == 0 diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index cdebefb67510c..deb780953579d 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { ("pomatoes", "eructation") // 568647356 ) + collisionPairs.foreach { case (w1, w2) => + // String.hashCode is documented to use a specific algorithm, but check just in case + assert(w1.hashCode === w2.hashCode) + } + (1 to 100000).map(_.toString).foreach { i => map.insert(i, i) } collisionPairs.foreach { case (w1, w2) => map.insert(w1, w2) @@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { assert(kv._2.equals(expectedValue)) count += 1 } - assert(count == 100000 + collisionPairs.size * 2) + assert(count === 100000 + collisionPairs.size * 2) + } + + test("spilling with many hash collisions") { + val conf = new SparkConf(true) + conf.set("spark.shuffle.memoryFraction", "0.0001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + + val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) + + // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes + // problems if the map fails to group together the objects with the same code (SPARK-2043). + for (i <- 1 to 10) { + for (j <- 1 to 10000) { + map.insert(FixedHashObject(j, j % 2), 1) + } + } + + val it = map.iterator + var count = 0 + while (it.hasNext) { + val kv = it.next() + assert(kv._2 === 10) + count += 1 + } + assert(count === 10000) } test("spilling with hash collisions using the Int.MaxValue key") { @@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } } } + +/** + * A dummy class that always returns the same hash code, to easily test hash collisions + */ +case class FixedHashObject(val v: Int, val h: Int) extends Serializable { + override def hashCode(): Int = h +}