Skip to content

Commit

Permalink
Merge github.com:apache/incubator-spark into streaming-typos
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Feb 11, 2014
2 parents d5515b4 + d6a9bdc commit bc2e4bc
Show file tree
Hide file tree
Showing 230 changed files with 3,719 additions and 1,504 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
56 changes: 33 additions & 23 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ object Bagel extends Logging {
/**
* Runs a Bagel program.
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
* the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
* empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
* message before sending (which often involves network I/O).
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
* and provides the result to each vertex in the next superstep.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
* Key will be the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
* this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
* given vertex into one message before sending (which often involves network
* I/O).
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
* Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
* optional Aggregator and the current superstep,
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
* intermediate RDDs in each superstep. Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
* the Vertex, optional Aggregator and the current superstep,
* and returns a set of (Vertex, outgoing Messages) pairs
* @tparam K key
* @tparam V vertex type
Expand Down Expand Up @@ -71,7 +73,7 @@ object Bagel extends Logging {
var msgs = messages
var noActivity = false
do {
logInfo("Starting superstep "+superstep+".")
logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis

val aggregated = agg(verts, aggregator)
Expand All @@ -97,7 +99,8 @@ object Bagel extends Logging {
verts
}

/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
* storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
Expand All @@ -106,8 +109,8 @@ object Bagel extends Logging {
partitioner: Partitioner,
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)

/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
Expand All @@ -127,8 +130,8 @@ object Bagel extends Logging {
}

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
* and default storage level
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
* [[org.apache.spark.HashPartitioner]] and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
Expand All @@ -138,9 +141,13 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
DEFAULT_STORAGE_LEVEL)(compute)

/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
* default [[org.apache.spark.HashPartitioner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
Expand All @@ -158,7 +165,8 @@ object Bagel extends Logging {
}

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* default [[org.apache.spark.HashPartitioner]],
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand All @@ -171,7 +179,8 @@ object Bagel extends Logging {
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* the default [[org.apache.spark.HashPartitioner]]
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand Down Expand Up @@ -227,8 +236,9 @@ object Bagel extends Logging {
})

numMsgs += newMsgs.size
if (newVert.active)
if (newVert.active) {
numActiveVerts += 1
}

Some((newVert, newMsgs))
}.persist(storageLevel)
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[RDDBlockId]()

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ private[spark] class FetchFailedException(
cause: Throwable)
extends Exception {

def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, cause: Throwable) =
def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
cause: Throwable) =
this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import akka.pattern.ask

import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}

private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
Expand Down
26 changes: 15 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, Metada
*/
class SparkContext(
config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
// a map from hostname to a list of input format splits on the host.
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
extends Logging {

Expand Down Expand Up @@ -552,10 +552,11 @@ class SparkContext(

/**
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
* BytesWritable values that contain a serialized partition. This is still an experimental storage
* format and may not be supported exactly as is in future Spark releases. It will also be pretty
* slow if you use the default serializer (Java serialization), though the nice thing about it is
* that there's very little effort required to save arbitrary objects.
* BytesWritable values that contain a serialized partition. This is still an experimental
* storage format and may not be supported exactly as is in future Spark releases. It will also
* be pretty slow if you use the default serializer (Java serialization),
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
*/
def objectFile[T: ClassTag](
path: String,
Expand Down Expand Up @@ -1043,7 +1044,7 @@ object SparkContext {

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0l
def zero(initialValue: Long) = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
Expand Down Expand Up @@ -1109,7 +1110,8 @@ object SparkContext {

implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)

implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
implicit def booleanWritableConverter() =
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter() = {
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
Expand Down Expand Up @@ -1258,7 +1260,8 @@ object SparkContext {

case "yarn-client" =>
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

Expand All @@ -1269,7 +1272,8 @@ object SparkContext {
}

val backend = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object SparkEnv extends Logging {
@volatile private var lastSetSparkEnv : SparkEnv = _

def set(e: SparkEnv) {
lastSetSparkEnv = e
lastSetSparkEnv = e
env.set(e)
}

Expand All @@ -112,7 +112,7 @@ object SparkEnv extends Logging {
* Returns the ThreadLocal SparkEnv.
*/
def getThreadLocal: SparkEnv = {
env.get()
env.get()
}

private[spark] def create(
Expand Down Expand Up @@ -168,7 +168,8 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf)

val connectionManager = blockManager.connectionManager

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,16 @@ private[spark] case class ExceptionFailure(
private[spark] case object TaskResultLost extends TaskEndReason

private[spark] case object TaskKilled extends TaskEndReason

/**
* The task failed because the executor that it was running on was lost. This may happen because
* the task crashed the JVM.
*/
private[spark] case object ExecutorLostFailure extends TaskEndReason

/**
* We don't know why the task ended -- for example, because of a ClassNotFound exception when
* deserializing the task result.
*/
private[spark] case object UnknownReason extends TaskEndReason

Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def sum(): Double = srdd.sum()

/**
* Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
* of the RDD's elements in one operation.
* Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
* count of the RDD's elements in one operation.
*/
def stats(): StatCounter = srdd.stats()

Expand Down
40 changes: 22 additions & 18 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions))
def distinct(numPartitions: Int): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.distinct(numPartitions))

/**
* Return a new RDD containing only the elements that satisfy a predicate.
Expand Down Expand Up @@ -210,25 +211,25 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)

/**
* Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V])
: JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))

/**
* Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))

/**
* Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
* Merge the values for each key using an associative function and a neutral "zero value"
* which may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue)(func))
Expand Down Expand Up @@ -375,7 +376,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
Expand All @@ -397,7 +399,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
Expand Down Expand Up @@ -439,8 +442,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))

/**
Expand All @@ -462,8 +465,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
Expand Down
Loading

0 comments on commit bc2e4bc

Please sign in to comment.