Skip to content

Commit

Permalink
Merge branch 'master' of github.com:nchammas/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
nchammas committed Sep 10, 2014
2 parents d4c5f43 + 0d1cc4a commit 03180a4
Show file tree
Hide file tree
Showing 81 changed files with 1,455 additions and 317 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

Expand Down Expand Up @@ -825,7 +826,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

/** The version of Spark on which this application is running. */
def version = SparkContext.SPARK_VERSION
def version = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
Expand Down Expand Up @@ -1297,8 +1298,6 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.2.0-SNAPSHOT"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ object SparkEnv extends Logging {
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ package org.apache

package object spark {
// For package docs only
val SPARK_VERSION = "1.2.0-SNAPSHOT"
}
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag](
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
if (num == 0) {
Array.empty
} else {
mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ class DAGScheduler(
callSite: CallSite)
: Stage =
{
val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage =
new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.SPARK_VERSION
import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}

/**
Expand Down Expand Up @@ -86,7 +87,7 @@ private[spark] class EventLoggingListener(
sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
logger.newFile(SPARK_VERSION_PREFIX + SPARK_VERSION)
logger.newFile(LOG_PREFIX + logger.fileIndex)
}

Expand Down
33 changes: 33 additions & 0 deletions core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.scalatest.BeforeAndAfterAll

class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {

// This test suite should run all tests in ShuffleSuite with hash-based shuffle.

override def beforeAll() {
System.setProperty("spark.shuffle.manager", "hash")
}

override def afterAll() {
System.clearProperty("spark.shuffle.manager")
}
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair

class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {

val conf = new SparkConf(loadDefaults = false)

Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with sort-based shuffle.

override def beforeAll() {
System.setProperty("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.SortShuffleManager")
System.setProperty("spark.shuffle.manager", "sort")
}

override def afterAll() {
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedLowerK === Array(1, 2, 3, 4, 5))
}

test("takeOrdered with limit 0") {
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.makeRDD(nums, 2)
val sortedLowerK = rdd.takeOrdered(0)
assert(sortedLowerK.size === 0)
}

test("takeOrdered with custom ordering") {
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
implicit val ord = implicitly[Ordering[Int]].reverse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
Expand Down Expand Up @@ -97,10 +98,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
val sparkListener = new SparkListener() {
val successfulStages = new HashSet[Int]()
val failedStages = new ArrayBuffer[Int]()
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo
stageByOrderOfExecution += stageInfo.stageId
if (stageInfo.failureReason.isEmpty) {
successfulStages += stageInfo.stageId
} else {
Expand Down Expand Up @@ -231,6 +234,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
runEvent(JobCancelled(jobId))
}

test("[SPARK-3353] parent stage should have lower stage id") {
sparkListener.stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
assert(sparkListener.stageByOrderOfExecution.length === 2)
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
}

test("zero split job") {
var numResults = 0
val fakeListener = new JobListener() {
Expand Down Expand Up @@ -457,7 +467,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
null,
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.contains(1))

// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent(
Expand Down Expand Up @@ -515,8 +525,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet).
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(1))
assert(sparkListener.failedStages.size === 1)
assert(sparkListener.failedStages.toSet === Set(0))

assertDataStructuresEmpty
}
Expand Down Expand Up @@ -563,14 +572,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
val stageFailureMessage = "Exception failure in map stage"
failed(taskSets(0), stageFailureMessage)

assert(cancelledStages.contains(1))
assert(cancelledStages.toSet === Set(0, 2))

// Make sure the listeners got told about both failed stages.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.successfulStages.isEmpty)
assert(sparkListener.failedStages.contains(1))
assert(sparkListener.failedStages.contains(3))
assert(sparkListener.failedStages.size === 2)
assert(sparkListener.failedStages.toSet === Set(0, 2))

assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.SPARK_VERSION
import org.apache.spark.util.{JsonProtocol, Utils}

import java.io.File
Expand Down Expand Up @@ -196,7 +197,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
assert(info.logPaths.size > 0)
assert(info.sparkVersion === SparkContext.SPARK_VERSION)
assert(info.sparkVersion === SPARK_VERSION)
assert(info.compressionCodec.isDefined === compressionCodec.isDefined)
info.compressionCodec.foreach { codec =>
assert(compressionCodec.isDefined)
Expand Down Expand Up @@ -381,7 +382,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) {
val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile)
assert(file.isDefined)
assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION)
assert(EventLoggingListener.parseSparkVersion(file.get) === SPARK_VERSION)
}

private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
rdd3.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true}
stageInfo3.rddInfos.exists(_.name == "Trois") should be {true}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
conf.set("spark.shuffle.compress", codec.isDefined.toString)
codec.foreach { c => conf.set("spark.io.compression.codec", c) }
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
Expand Down
4 changes: 2 additions & 2 deletions dev/check-license
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ acquire_rat_jar () {
printf "Attempting to fetch rat\n"
JAR_DL=${JAR}.part
if hash curl 2>/dev/null; then
(curl --progress-bar ${URL1} > "$JAR_DL" || curl --progress-bar ${URL2} > "$JAR_DL") && mv "$JAR_DL" "$JAR"
(curl --silent ${URL1} > "$JAR_DL" || curl --silent ${URL2} > "$JAR_DL") && mv "$JAR_DL" "$JAR"
elif hash wget 2>/dev/null; then
(wget --progress=bar ${URL1} -O "$JAR_DL" || wget --progress=bar ${URL2} -O "$JAR_DL") && mv "$JAR_DL" "$JAR"
(wget --quiet ${URL1} -O "$JAR_DL" || wget --quiet ${URL2} -O "$JAR_DL") && mv "$JAR_DL" "$JAR"
else
printf "You do not have curl or wget installed, please install rat manually.\n"
exit -1
Expand Down
5 changes: 3 additions & 2 deletions dev/lint-python
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ cd $SPARK_ROOT_DIR
#+ - Download this from a more reliable source. (GitHub raw can be flaky, apparently. (?))
PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8.py"
PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/1.5.7/pep8.py"
PEP8_PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/"

curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH"
curl_status=$?
Expand All @@ -44,7 +45,7 @@ fi
#+ first, but we do so so that the check status can
#+ be output before the report, like with the
#+ scalastyle and RAT checks.
python $PEP8_SCRIPT_PATH ./python/pyspark > "$PEP8_REPORT_PATH"
python $PEP8_SCRIPT_PATH $PEP8_PATHS_TO_CHECK > "$PEP8_REPORT_PATH"
pep8_status=${PIPESTATUS[0]} #$?

if [ $pep8_status -ne 0 ]; then
Expand All @@ -54,7 +55,7 @@ else
echo "PEP 8 checks passed."
fi

rm -f "$PEP8_REPORT_PATH"
rm "$PEP8_REPORT_PATH"
rm "$PEP8_SCRIPT_PATH"

exit $pep8_status
9 changes: 4 additions & 5 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,11 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.shuffle.manager</code></td>
<td>HASH</td>
<td>sort</td>
<td>
Implementation to use for shuffling data. A hash-based shuffle manager is the default, but
starting in Spark 1.1 there is an experimental sort-based shuffle manager that is more
memory-efficient in environments with small executors, such as YARN. To use that, change
this value to <code>SORT</code>.
Implementation to use for shuffling data. There are two implementations available:
<code>sort</code> and <code>hash</code>. Sort-based shuffle is more memory-efficient and is
the default option starting in 1.2.
</td>
</tr>
<tr>
Expand Down
Binary file modified docs/img/streaming-arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/streaming-figures.pptx
Binary file not shown.
Binary file added docs/img/streaming-kinesis-arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ options for deployment:
* [Security](security.html): Spark security support
* [Hardware Provisioning](hardware-provisioning.html): recommendations for cluster hardware
* [3<sup>rd</sup> Party Hadoop Distributions](hadoop-third-party-distributions.html): using common Hadoop distributions
* Integration with other storage systems:
* [OpenStack Swift](storage-openstack-swift.html)
* [Building Spark with Maven](building-with-maven.html): build Spark using the Maven system
* [Contributing to Spark](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark)

Expand Down
Loading

0 comments on commit 03180a4

Please sign in to comment.