Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Jun 6, 2014
2 parents 51fb3d6 + b45c13e commit b627a6a
Show file tree
Hide file tree
Showing 38 changed files with 167 additions and 44 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
Expand Down Expand Up @@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
import org.apache.spark.util.Utils
import org.apache.spark.util.{SystemClock, Clock, Utils}

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
Expand All @@ -61,7 +61,8 @@ class DAGScheduler(
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv)
env: SparkEnv,
clock: Clock = SystemClock)
extends Logging {

import DAGScheduler._
Expand Down Expand Up @@ -781,7 +782,7 @@ class DAGScheduler(
logDebug("New pending tasks: " + myPending)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
stageToInfos(stage).submissionTime = Some(clock.getTime())
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
Expand All @@ -807,11 +808,11 @@ class DAGScheduler(

def markStageAsFinished(stage: Stage) = {
val serviceTime = stageToInfos(stage).submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
case _ => "Unknown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
stageToInfos(stage).completionTime = Some(clock.getTime())
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
runningStages -= stage
}
Expand Down Expand Up @@ -1015,7 +1016,7 @@ class DAGScheduler(
return
}
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
stageToInfos(failedStage).completionTime = Some(clock.getTime())
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
import java.util.Comparator

import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
private val inputStreams = Seq(sortedMap) ++ spilledMaps
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
Expand All @@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
while (it.hasNext && kc._1.hashCode() == minHash) {
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
Expand Down Expand Up @@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
private class StreamBuffer(
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {

def isEmpty = pairs.length == 0
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (old Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
}

test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Expand All @@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (new Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
}

test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
("pomatoes", "eructation") // 568647356
)

collisionPairs.foreach { case (w1, w2) =>
// String.hashCode is documented to use a specific algorithm, but check just in case
assert(w1.hashCode === w2.hashCode)
}

(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
Expand All @@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
assert(count == 100000 + collisionPairs.size * 2)
assert(count === 100000 + collisionPairs.size * 2)
}

test("spilling with many hash collisions") {
val conf = new SparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)

// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
for (i <- 1 to 10) {
for (j <- 1 to 10000) {
map.insert(FixedHashObject(j, j % 2), 1)
}
}

val it = map.iterator
var count = 0
while (it.hasNext) {
val kv = it.next()
assert(kv._2 === 10)
count += 1
}
assert(count === 10000)
}

test("spilling with hash collisions using the Int.MaxValue key") {
Expand All @@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
}
}

/**
* A dummy class that always returns the same hash code, to easily test hash collisions
*/
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
override def hashCode(): Int = h
}
1 change: 1 addition & 0 deletions dev/mima
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ if [ $ret_val != 0 ]; then
echo "NOTE: Exceptions to binary compatibility can be added in project/MimaExcludes.scala"
fi

rm -f .generated-mima-excludes
exit $ret_val
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
</td>
</tr>
<tr>
<td>spark.hadoop.validateOutputSpecs</td>
<td>true</td>
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr>
</table>

#### Networking
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/flume/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/twitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/zeromq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion extras/java8-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion extras/spark-ganglia-lgpl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
1 change: 1 addition & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ object Pregel extends Logging {
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
prevG.unpersistVertices(blocking=false)
prevG.edges.unpersist(blocking=false)
// count the iteration
i += 1
}
Expand Down
4 changes: 3 additions & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -51,6 +51,8 @@
<artifactId>breeze_${scala.binary.version}</artifactId>
<version>0.7</version>
<exclusions>
<!-- This is included as a compile-scoped dependency by jtransforms, which is
a dependency of breeze. -->
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>http://spark.apache.org/</url>
Expand Down
5 changes: 2 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,8 @@ object SparkBuild extends Build {
val excludeFastutil = ExclusionRule(organization = "it.unimi.dsi")
val excludeJruby = ExclusionRule(organization = "org.jruby")
val excludeThrift = ExclusionRule(organization = "org.apache.thrift")
val excludeJunit = ExclusionRule(organization = "junit")
val excludeCommonsLang = ExclusionRule(organization = "commons-lang")
val excludeServletApi = ExclusionRule(organization = "javax.servlet", artifact = "servlet-api")
val excludeJUnit = ExclusionRule(organization = "junit")

def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark",
version: String = "1.0.0", crossVersion: String = "2.10"): Option[sbt.ModuleID] = {
Expand Down Expand Up @@ -469,7 +468,7 @@ object SparkBuild extends Build {
previousArtifact := sparkPreviousArtifact("spark-mllib"),
libraryDependencies ++= Seq(
"org.jblas" % "jblas" % jblasVersion,
"org.scalanlp" %% "breeze" % "0.7" excludeAll(excludeJunit)
"org.scalanlp" %% "breeze" % "0.7" excludeAll(excludeJUnit)
)
)

Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")

Expand Down
Loading

0 comments on commit b627a6a

Please sign in to comment.