From 7078f6028bf012235c664b02ec3541cbb0a248a7 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Mon, 27 Apr 2015 13:38:25 -0700 Subject: [PATCH 1/8] [SPARK-6856] [R] Make RDD information more useful in SparkR Author: Jeff Harrison Closes #5667 from His-name-is-Joof/joofspark and squashes the following commits: f8814a6 [Jeff Harrison] newline added after RDD show() output 4d9d972 [Jeff Harrison] Merge branch 'master' into joofspark 9d2295e [Jeff Harrison] parallelize with 1:10 878b830 [Jeff Harrison] Merge branch 'master' into joofspark c8c0b80 [Jeff Harrison] add test for RDD function show() 123be65 [Jeff Harrison] SPARK-6856 --- R/pkg/R/RDD.R | 5 +++++ R/pkg/inst/tests/test_rdd.R | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 1662d6bb3b1ac..f90c26b253455 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode, .Object }) +setMethod("show", "RDD", + function(.Object) { + cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep="")) + }) + setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) { .Object@env <- new.env() .Object@env$isCached <- FALSE diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R index d55af93e3e50a..03207353c31c6 100644 --- a/R/pkg/inst/tests/test_rdd.R +++ b/R/pkg/inst/tests/test_rdd.R @@ -759,6 +759,11 @@ test_that("collectAsMap() on a pairwise RDD", { expect_equal(vals, list(`1` = "a", `2` = "b")) }) +test_that("show()", { + rdd <- parallelize(sc, list(1:10)) + expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+") +}) + test_that("sampleByKey() on pairwise RDDs", { rdd <- parallelize(sc, 1:2000) pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) }) From ef82bddc11d1aea42e22d2f85613a869cbe9a990 Mon Sep 17 00:00:00 2001 From: tedyu Date: Mon, 27 Apr 2015 14:42:40 -0700 Subject: [PATCH 2/8] SPARK-7107 Add parameter for zookeeper.znode.parent to hbase_inputformat... ....py Author: tedyu Closes #5673 from tedyu/master and squashes the following commits: ab7c72b [tedyu] SPARK-7107 Adjust indentation to pass Python style tests 6e25939 [tedyu] Adjust line length to be shorter than 100 characters 18d172a [tedyu] SPARK-7107 Add parameter for zookeeper.znode.parent to hbase_inputformat.py --- examples/src/main/python/hbase_inputformat.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index e17819d5feb76..5b82a14fba413 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -54,8 +54,9 @@ Run with example jar: ./bin/spark-submit --driver-class-path /path/to/example/jar \ - /path/to/examples/hbase_inputformat.py + /path/to/examples/hbase_inputformat.py
[] Assumes you have some data in HBase already, running on , in
+ optionally, you can specify parent znode for your hbase cluster - """, file=sys.stderr) exit(-1) @@ -64,6 +65,9 @@ sc = SparkContext(appName="HBaseInputFormat") conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + if len(sys.argv) > 3: + conf = {"hbase.zookeeper.quorum": host, "zookeeper.znode.parent": sys.argv[3], + "hbase.mapreduce.inputtable": table} keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" From ca9f4ebb8e510e521bf4df0331375ddb385fb9d2 Mon Sep 17 00:00:00 2001 From: hlin09 Date: Mon, 27 Apr 2015 15:04:37 -0700 Subject: [PATCH 3/8] [SPARK-6991] [SPARKR] Adds support for zipPartitions. Author: hlin09 Closes #5568 from hlin09/zipPartitions and squashes the following commits: 12c08a5 [hlin09] Fix comments d2d32db [hlin09] Merge branch 'master' into zipPartitions ec56d2f [hlin09] Fix test. 27655d3 [hlin09] Adds support for zipPartitions. --- R/pkg/NAMESPACE | 1 + R/pkg/R/RDD.R | 46 +++++++++++++++++++++++++ R/pkg/R/generics.R | 5 +++ R/pkg/inst/tests/test_binary_function.R | 33 ++++++++++++++++++ 4 files changed, 85 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 80283643861ac..e077eace74375 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -71,6 +71,7 @@ exportMethods( "unpersist", "value", "values", + "zipPartitions", "zipRDD", "zipWithIndex", "zipWithUniqueId" diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index f90c26b253455..a3a0421a0746d 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1595,3 +1595,49 @@ setMethod("intersection", keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction)) }) + +#' Zips an RDD's partitions with one (or more) RDD(s). +#' Same as zipPartitions in Spark. +#' +#' @param ... RDDs to be zipped. +#' @param func A function to transform zipped partitions. +#' @return A new RDD by applying a function to the zipped partitions. +#' Assumes that all the RDDs have the *same number of partitions*, but +#' does *not* require them to have the same number of elements in each partition. +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2 +#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4 +#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6 +#' collect(zipPartitions(rdd1, rdd2, rdd3, +#' func = function(x, y, z) { list(list(x, y, z))} )) +#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))) +#'} +#' @rdname zipRDD +#' @aliases zipPartitions,RDD +setMethod("zipPartitions", + "RDD", + function(..., func) { + rrdds <- list(...) + if (length(rrdds) == 1) { + return(rrdds[[1]]) + } + nPart <- sapply(rrdds, numPartitions) + if (length(unique(nPart)) != 1) { + stop("Can only zipPartitions RDDs which have the same number of partitions.") + } + + rrdds <- lapply(rrdds, function(rdd) { + mapPartitionsWithIndex(rdd, function(partIndex, part) { + print(length(part)) + list(list(partIndex, part)) + }) + }) + union.rdd <- Reduce(unionRDD, rrdds) + zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1])) + res <- mapPartitions(zipped.rdd, function(plist) { + do.call(func, plist[[1]]) + }) + res + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 34dbe84051c50..e88729387ef95 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") }) #' @export setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") }) +#' @rdname zipRDD +#' @export +setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") }, + signature = "...") + #' @rdname zipWithIndex #' @seealso zipWithUniqueId #' @export diff --git a/R/pkg/inst/tests/test_binary_function.R b/R/pkg/inst/tests/test_binary_function.R index c15553ba28517..6785a7bdae8cb 100644 --- a/R/pkg/inst/tests/test_binary_function.R +++ b/R/pkg/inst/tests/test_binary_function.R @@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", { expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) + +test_that("zipPartitions() on RDDs", { + rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2 + rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4 + rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6 + actual <- collect(zipPartitions(rdd1, rdd2, rdd3, + func = function(x, y, z) { list(list(x, y, z))} )) + expect_equal(actual, + list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))) + + mockFile = c("Spark is pretty.", "Spark is awesome.") + fileName <- tempfile(pattern="spark-test", fileext=".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName, 1) + actual <- collect(zipPartitions(rdd, rdd, + func = function(x, y) { list(paste(x, y, sep = "\n")) })) + expected <- list(paste(mockFile, mockFile, sep = "\n")) + expect_equal(actual, expected) + + rdd1 <- parallelize(sc, 0:1, 1) + actual <- collect(zipPartitions(rdd1, rdd, + func = function(x, y) { list(x + nchar(y)) })) + expected <- list(0:1 + nchar(mockFile)) + expect_equal(actual, expected) + + rdd <- map(rdd, function(x) { x }) + actual <- collect(zipPartitions(rdd, rdd1, + func = function(x, y) { list(y + nchar(x)) })) + expect_equal(actual, expected) + + unlink(fileName) +}) From b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56 Mon Sep 17 00:00:00 2001 From: Steven She Date: Mon, 27 Apr 2015 18:55:02 -0400 Subject: [PATCH 4/8] [SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner Added a check to the SparkContext.union method to check that a partitioner is defined on all RDDs when instantiating a PartitionerAwareUnionRDD. Author: Steven She Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits: 5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at least one RDD has no partitioner --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/rdd/PartitionerAwareUnionRDD.scala | 1 + .../scala/org/apache/spark/rdd/RDDSuite.scala | 21 +++++++++++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 86269eac52db0..ea4ddcc2e265d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1055,7 +1055,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Build the union of a list of RDDs. */ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { val partitioners = rdds.flatMap(_.partitioner).toSet - if (partitioners.size == 1) { + if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) } else { new UnionRDD(this, rdds) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 92b0641d0fb6e..7598ff617b399 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag]( var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { require(rdds.length > 0) + require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner)) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index df42faab64505..ef8c36a28655b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) } + test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") { + val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1)) + val rddWithNoPartitioner = sc.parallelize(Seq(2->true)) + val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner) + assert(unionRdd.isInstanceOf[UnionRDD[_]]) + } + + test("SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners") { + val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1)) + val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner) + assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]]) + } + + test("PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner") { + val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1)) + val rddWithNoPartitioner = sc.parallelize(Seq(2->true)) + intercept[IllegalArgumentException] { + new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, rddWithPartitioner)) + } + } + test("partitioner aware union") { def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = { sc.makeRDD(seq, 1) From 8e1c00dbf4b60962908626dead744e5d73c8085e Mon Sep 17 00:00:00 2001 From: Hong Shen Date: Mon, 27 Apr 2015 18:57:31 -0400 Subject: [PATCH 5/8] [SPARK-6738] [CORE] Improve estimate the size of a large array Currently, SizeEstimator.visitArray is not correct in the follow case, ``` array size > 200, elem has the share object ``` when I add a debug log in SizeTracker.scala: ``` System.err.println(s"numUpdates:$numUpdates, size:$ts, bytesPerUpdate:$bytesPerUpdate, cost time:$b") ``` I get the following log: ``` numUpdates:1, size:262448, bytesPerUpdate:0.0, cost time:35 numUpdates:2, size:420698, bytesPerUpdate:158250.0, cost time:35 numUpdates:4, size:420754, bytesPerUpdate:28.0, cost time:32 numUpdates:7, size:420754, bytesPerUpdate:0.0, cost time:27 numUpdates:12, size:420754, bytesPerUpdate:0.0, cost time:28 numUpdates:20, size:420754, bytesPerUpdate:0.0, cost time:25 numUpdates:32, size:420754, bytesPerUpdate:0.0, cost time:21 numUpdates:52, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:84, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:135, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:216, size:420754, bytesPerUpdate:0.0, cost time:11 numUpdates:346, size:420754, bytesPerUpdate:0.0, cost time:6 numUpdates:554, size:488911, bytesPerUpdate:327.67788461538464, cost time:8 numUpdates:887, size:2312259426, bytesPerUpdate:6942253.798798799, cost time:198 15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 3.0 GB to disk (1 time so far) 15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc ``` But in fact the file size is only 162K: ``` $ ll -h /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc -rw-r----- 1 spark users 162K Apr 21 14:27 /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc ``` In order to test case, I change visitArray to: ``` var size = 0l for (i <- 0 until length) { val obj = JArray.get(array, i) size += SizeEstimator.estimate(obj, state.visited).toLong } state.size += size ``` I get the following log: ``` ... 14895 277016088 566.9046118590662 time:8470 23832 281840544 552.3308270676691 time:8031 38132 289891824 539.8294729775092 time:7897 61012 302803640 563.0265734265735 time:13044 97620 322904416 564.3276223776223 time:13554 15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 314.5 MB to disk (1 time so far) 15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark-local-20150414114020-2fcb/14/temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0 ``` the file size is 85M. ``` $ ll -h /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark- local-20150414114020-2fcb/14/ total 85M -rw-r----- 1 spark users 85M Apr 14 11:46 temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0 ``` The following log is when I use this patch, ``` .... numUpdates:32, size:365484, bytesPerUpdate:0.0, cost time:7 numUpdates:52, size:365484, bytesPerUpdate:0.0, cost time:5 numUpdates:84, size:365484, bytesPerUpdate:0.0, cost time:5 numUpdates:135, size:372208, bytesPerUpdate:131.84313725490196, cost time:86 numUpdates:216, size:379020, bytesPerUpdate:84.09876543209876, cost time:21 numUpdates:346, size:1865208, bytesPerUpdate:11432.215384615385, cost time:23 numUpdates:554, size:2052380, bytesPerUpdate:899.8653846153846, cost time:16 numUpdates:887, size:2142820, bytesPerUpdate:271.59159159159157, cost time:15 .. numUpdates:14895, size:251675500, bytesPerUpdate:438.5263157894737, cost time:13 numUpdates:23832, size:257010268, bytesPerUpdate:596.9305135951662, cost time:14 numUpdates:38132, size:263922396, bytesPerUpdate:483.3655944055944, cost time:15 numUpdates:61012, size:268962596, bytesPerUpdate:220.28846153846155, cost time:24 numUpdates:97620, size:286980644, bytesPerUpdate:492.1888111888112, cost time:22 15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: Thread 53 spilling in-memory map of 328.7 MB to disk (1 time so far) 15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/temp_local_9c109510-af16-4468-8f23-48cad04da88f ``` the file size is 88M. ``` $ ll -h /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/ total 88M -rw-r----- 1 spark users 88M Apr 21 14:45 temp_local_9c109510-af16-4468-8f23-48cad04da88f ``` Author: Hong Shen Closes #5608 from shenh062326/my_change5 and squashes the following commits: 5506bae [Hong Shen] Fix compile error c275dd3 [Hong Shen] Alter code style fe202a2 [Hong Shen] Change the code style and add documentation. a9fca84 [Hong Shen] Add test case for SizeEstimator 4877eee [Hong Shen] Improve estimate the size of a large array a2ea7ac [Hong Shen] Alter code style 4c28e36 [Hong Shen] Improve estimate the size of a large array --- .../org/apache/spark/util/SizeEstimator.scala | 45 ++++++++++++------- .../spark/util/SizeEstimatorSuite.scala | 18 ++++++++ 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 26ffbf9350388..4dd7ab9e0767b 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -179,7 +179,7 @@ private[spark] object SizeEstimator extends Logging { } // Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling. - private val ARRAY_SIZE_FOR_SAMPLING = 200 + private val ARRAY_SIZE_FOR_SAMPLING = 400 private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING private def visitArray(array: AnyRef, arrayClass: Class[_], state: SearchState) { @@ -204,25 +204,40 @@ private[spark] object SizeEstimator extends Logging { } } else { // Estimate the size of a large array by sampling elements without replacement. - var size = 0.0 + // To exclude the shared objects that the array elements may link, sample twice + // and use the min one to caculate array size. val rand = new Random(42) - val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE) - var numElementsDrawn = 0 - while (numElementsDrawn < ARRAY_SAMPLE_SIZE) { - var index = 0 - do { - index = rand.nextInt(length) - } while (drawn.contains(index)) - drawn.add(index) - val elem = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef] - size += SizeEstimator.estimate(elem, state.visited) - numElementsDrawn += 1 - } - state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong + val drawn = new OpenHashSet[Int](2 * ARRAY_SAMPLE_SIZE) + val s1 = sampleArray(array, state, rand, drawn, length) + val s2 = sampleArray(array, state, rand, drawn, length) + val size = math.min(s1, s2) + state.size += math.max(s1, s2) + + (size * ((length - ARRAY_SAMPLE_SIZE) / (ARRAY_SAMPLE_SIZE))).toLong } } } + private def sampleArray( + array: AnyRef, + state: SearchState, + rand: Random, + drawn: OpenHashSet[Int], + length: Int): Long = { + var size = 0L + for (i <- 0 until ARRAY_SAMPLE_SIZE) { + var index = 0 + do { + index = rand.nextInt(length) + } while (drawn.contains(index)) + drawn.add(index) + val obj = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef] + if (obj != null) { + size += SizeEstimator.estimate(obj, state.visited).toLong + } + } + size + } + private def primitiveSize(cls: Class[_]): Long = { if (cls == classOf[Byte]) { BYTE_SIZE diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index 67a9f75ff2187..28915bd53354e 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import scala.collection.mutable.ArrayBuffer + import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, PrivateMethodTester} class DummyClass1 {} @@ -96,6 +98,22 @@ class SizeEstimatorSuite // Past size 100, our samples 100 elements, but we should still get the right size. assertResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) + + val arr = new Array[Char](100000) + assertResult(200016)(SizeEstimator.estimate(arr)) + assertResult(480032)(SizeEstimator.estimate(Array.fill(10000)(new DummyString(arr)))) + + val buf = new ArrayBuffer[DummyString]() + for (i <- 0 until 5000) { + buf.append(new DummyString(new Array[Char](10))) + } + assertResult(340016)(SizeEstimator.estimate(buf.toArray)) + + for (i <- 0 until 5000) { + buf.append(new DummyString(arr)) + } + assertResult(683912)(SizeEstimator.estimate(buf.toArray)) + // If an array contains the *same* element many times, we should only count it once. val d1 = new DummyClass1 // 10 pointers plus 8-byte object From 5d45e1f60059e2f2fc8ad64778b9ddcc8887c570 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 27 Apr 2015 19:46:17 -0400 Subject: [PATCH 6/8] [SPARK-3090] [CORE] Stop SparkContext if user forgets to. Set up a shutdown hook to try to stop the Spark context in case the user forgets to do it. The main effect is that any open logs files are flushed and closed, which is particularly interesting for event logs. Author: Marcelo Vanzin Closes #5696 from vanzin/SPARK-3090 and squashes the following commits: 3b554b5 [Marcelo Vanzin] [SPARK-3090] [core] Stop SparkContext if user forgets to. --- .../scala/org/apache/spark/SparkContext.scala | 38 ++++++++++++------- .../scala/org/apache/spark/util/Utils.scala | 10 ++++- .../spark/deploy/yarn/ApplicationMaster.scala | 10 +---- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ea4ddcc2e265d..65b903a55d5bd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _listenerBusStarted: Boolean = false private var _jars: Seq[String] = _ private var _files: Seq[String] = _ + private var _shutdownHookRef: AnyRef = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _taskScheduler.postStartHook() _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler)) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) + + // Make sure the context is stopped if the user forgets about it. This avoids leaving + // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM + // is killed, though. + _shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => + logInfo("Invoking stop() from shutdown hook") + stop() + } } catch { case NonFatal(e) => logError("Error initializing SparkContext.", e) @@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo("SparkContext already stopped.") return } + if (_shutdownHookRef != null) { + Utils.removeShutdownHook(_shutdownHookRef) + } postApplicationEnd() _ui.foreach(_.stop()) @@ -1891,7 +1903,7 @@ object SparkContext extends Logging { * * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK. */ - private val activeContext: AtomicReference[SparkContext] = + private val activeContext: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) /** @@ -1944,11 +1956,11 @@ object SparkContext extends Logging { } /** - * This function may be used to get or instantiate a SparkContext and register it as a - * singleton object. Because we can only have one active SparkContext per JVM, - * this is useful when applications may wish to share a SparkContext. + * This function may be used to get or instantiate a SparkContext and register it as a + * singleton object. Because we can only have one active SparkContext per JVM, + * this is useful when applications may wish to share a SparkContext. * - * Note: This function cannot be used to create multiple SparkContext instances + * Note: This function cannot be used to create multiple SparkContext instances * even if multiple contexts are allowed. */ def getOrCreate(config: SparkConf): SparkContext = { @@ -1961,17 +1973,17 @@ object SparkContext extends Logging { activeContext.get() } } - + /** - * This function may be used to get or instantiate a SparkContext and register it as a - * singleton object. Because we can only have one active SparkContext per JVM, + * This function may be used to get or instantiate a SparkContext and register it as a + * singleton object. Because we can only have one active SparkContext per JVM, * this is useful when applications may wish to share a SparkContext. - * + * * This method allows not passing a SparkConf (useful if just retrieving). - * - * Note: This function cannot be used to create multiple SparkContext instances - * even if multiple contexts are allowed. - */ + * + * Note: This function cannot be used to create multiple SparkContext instances + * even if multiple contexts are allowed. + */ def getOrCreate(): SparkContext = { getOrCreate(new SparkConf()) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c6c6df7cfa56e..342bc9a06db47 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -67,6 +67,12 @@ private[spark] object Utils extends Logging { val DEFAULT_SHUTDOWN_PRIORITY = 100 + /** + * The shutdown priority of the SparkContext instance. This is lower than the default + * priority, so that by default hooks are run before the context is shut down. + */ + val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50 + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null @@ -2116,7 +2122,7 @@ private[spark] object Utils extends Logging { * @return A handle that can be used to unregister the shutdown hook. */ def addShutdownHook(hook: () => Unit): AnyRef = { - addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook) + addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook) } /** @@ -2126,7 +2132,7 @@ private[spark] object Utils extends Logging { * @param hook The code to run during shutdown. * @return A handle that can be used to unregister the shutdown hook. */ - def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = { + def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { shutdownHooks.add(priority, hook) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 93ae45133ce24..70cb57ffd8c69 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -95,14 +95,8 @@ private[spark] class ApplicationMaster( val fs = FileSystem.get(yarnConf) - Utils.addShutdownHook { () => - // If the SparkContext is still registered, shut it down as a best case effort in case - // users do not call sc.stop or do System.exit(). - val sc = sparkContextRef.get() - if (sc != null) { - logInfo("Invoking sc stop from shutdown hook") - sc.stop() - } + // This shutdown hook should run *after* the SparkContext is shut down. + Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1) { () => val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts From ab5adb7a973eec9d95c7575c864cba9f8d83a0fd Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 27 Apr 2015 19:50:55 -0400 Subject: [PATCH 7/8] [SPARK-7145] [CORE] commons-lang (2.x) classes used instead of commons-lang3 (3.x); commons-io used without dependency Remove use of commons-lang in favor of commons-lang3 classes; remove commons-io use in favor of Guava Author: Sean Owen Closes #5703 from srowen/SPARK-7145 and squashes the following commits: 21fbe03 [Sean Owen] Remove use of commons-lang in favor of commons-lang3 classes; remove commons-io use in favor of Guava --- .../test/scala/org/apache/spark/FileServerSuite.scala | 7 +++---- .../apache/spark/metrics/InputOutputMetricsSuite.scala | 4 ++-- .../netty/NettyBlockTransferSecuritySuite.scala | 10 +++++++--- external/flume-sink/pom.xml | 4 ++++ .../flume/sink/SparkAvroCallbackHandler.scala | 4 ++-- .../main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 6 +++++- .../sql/hive/thriftserver/AbstractSparkSQLDriver.scala | 4 ++-- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 8 +++----- .../apache/spark/sql/hive/execution/UDFListString.java | 6 +++--- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 9 ++++----- 10 files changed, 35 insertions(+), 27 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index a69e9b761f9a7..c0439f934813e 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -22,8 +22,7 @@ import java.net.URI import java.util.jar.{JarEntry, JarOutputStream} import javax.net.ssl.SSLException -import com.google.common.io.ByteStreams -import org.apache.commons.io.{FileUtils, IOUtils} +import com.google.common.io.{ByteStreams, Files} import org.apache.commons.lang3.RandomUtils import org.scalatest.FunSuite @@ -239,7 +238,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = { val randomContent = RandomUtils.nextBytes(100) val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir) - FileUtils.writeByteArrayToFile(file, randomContent) + Files.write(randomContent, file) server.addFile(file) val uri = new URI(server.serverUri + "/files/" + file.getName) @@ -254,7 +253,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { Utils.setupSecureURLConnection(connection, sm) } - val buf = IOUtils.toByteArray(connection.getInputStream) + val buf = ByteStreams.toByteArray(connection.getInputStream) assert(buf === randomContent) } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 190b08d950a02..ef3e213f1fcce 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -21,7 +21,7 @@ import java.io.{File, FileWriter, PrintWriter} import scala.collection.mutable.ArrayBuffer -import org.apache.commons.lang.math.RandomUtils +import org.apache.commons.lang3.RandomUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{LongWritable, Text} @@ -60,7 +60,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt") val pw = new PrintWriter(new FileWriter(tmpFile)) for (x <- 1 to numRecords) { - pw.println(RandomUtils.nextInt(numBuckets)) + pw.println(RandomUtils.nextInt(0, numBuckets)) } pw.close() diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 94bfa67451892..46d2e5173acae 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.network.netty +import java.io.InputStreamReader import java.nio._ +import java.nio.charset.Charset import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} import scala.util.{Failure, Success, Try} -import org.apache.commons.io.IOUtils +import com.google.common.io.CharStreams import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.network.{BlockDataManager, BlockTransferService} @@ -32,7 +34,7 @@ import org.apache.spark.storage.{BlockId, ShuffleBlockId} import org.apache.spark.{SecurityManager, SparkConf} import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, ShouldMatchers} +import org.scalatest.{FunSuite, ShouldMatchers} class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with ShouldMatchers { test("security default off") { @@ -113,7 +115,9 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh val result = fetchBlock(exec0, exec1, "1", blockId) match { case Success(buf) => - IOUtils.toString(buf.createInputStream()) should equal(blockString) + val actualString = CharStreams.toString( + new InputStreamReader(buf.createInputStream(), Charset.forName("UTF-8"))) + actualString should equal(blockString) buf.release() Success() case Failure(t) => diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 67907bbfb6d1b..1f3e619d97a24 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -35,6 +35,10 @@ http://spark.apache.org/ + + org.apache.commons + commons-lang3 + org.apache.flume flume-ng-sdk diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index 4373be443e67d..fd01807fc3ac4 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -21,9 +21,9 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import org.apache.flume.Channel -import org.apache.commons.lang.RandomStringUtils import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.flume.Channel +import org.apache.commons.lang3.RandomStringUtils /** * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index f326510042122..f3b5455574d1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException} import java.util.Properties -import org.apache.commons.lang.StringEscapeUtils.escapeSql +import org.apache.commons.lang3.StringUtils + import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow} @@ -239,6 +240,9 @@ private[sql] class JDBCRDD( case _ => value } + private def escapeSql(value: String): String = + if (value == null) null else StringUtils.replace(value, "'", "''") + /** * Turns a single Filter into a String representing a SQL expression. * Returns null for an unhandled filter. diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala index 59f3a75768082..48ac9062af96a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import scala.collection.JavaConversions._ -import org.apache.commons.lang.exception.ExceptionUtils +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse @@ -61,7 +61,7 @@ private[hive] abstract class AbstractSparkSQLDriver( } catch { case cause: Throwable => logError(s"Failed in [$command]", cause) - new CommandProcessorResponse(1, ExceptionUtils.getFullStackTrace(cause), null) + new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7e307bb4ad1e8..b7b6925aa87f7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -24,18 +24,16 @@ import java.util.{ArrayList => JArrayList} import jline.{ConsoleReader, History} -import org.apache.commons.lang.StringUtils +import org.apache.commons.lang3.StringUtils import org.apache.commons.logging.LogFactory import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor} -import org.apache.hadoop.hive.common.LogUtils.LogInitializationException -import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils, LogUtils} +import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor, CommandProcessorFactory} +import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor} import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.shims.ShimLoader import org.apache.thrift.transport.TSocket import org.apache.spark.Logging diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java index efd34df293c88..f33210ebdae1b 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java @@ -17,10 +17,10 @@ package org.apache.spark.sql.hive.execution; -import org.apache.hadoop.hive.ql.exec.UDF; - import java.util.List; -import org.apache.commons.lang.StringUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.ql.exec.UDF; public class UDFListString extends UDF { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e09c702c8969e..0538aa203c5a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.BeforeAndAfterEach -import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.metastore.TableType import org.apache.hadoop.hive.ql.metadata.Table @@ -174,7 +173,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM jsonTable"), Row("a", "b")) - FileUtils.deleteDirectory(tempDir) + Utils.deleteRecursively(tempDir) sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) @@ -190,7 +189,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT * FROM jsonTable"), Row("a1", "b1", "c1")) - FileUtils.deleteDirectory(tempDir) + Utils.deleteRecursively(tempDir) } test("drop, change, recreate") { @@ -212,7 +211,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM jsonTable"), Row("a", "b")) - FileUtils.deleteDirectory(tempDir) + Utils.deleteRecursively(tempDir) sparkContext.parallelize(("a", "b", "c") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) @@ -231,7 +230,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT * FROM jsonTable"), Row("a", "b", "c")) - FileUtils.deleteDirectory(tempDir) + Utils.deleteRecursively(tempDir) } test("invalidate cache and reload") { From 62888a4ded91b3c2cbb05936c374c7ebfc10799e Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Mon, 27 Apr 2015 19:52:41 -0400 Subject: [PATCH 8/8] [SPARK-7162] [YARN] Launcher error in yarn-client jira: https://issues.apache.org/jira/browse/SPARK-7162 Author: GuoQiang Li Closes #5716 from witgo/SPARK-7162 and squashes the following commits: b64564c [GuoQiang Li] Launcher error in yarn-client --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 019afbd1a1743..741239c953794 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -354,7 +354,7 @@ private[spark] class Client( val dir = new File(path) if (dir.isDirectory()) { dir.listFiles().foreach { file => - if (!hadoopConfFiles.contains(file.getName())) { + if (file.isFile && !hadoopConfFiles.contains(file.getName())) { hadoopConfFiles(file.getName()) = file } }