Permalink
Browse files

Merge branches 'subtract' and 'bettersplits' into bizo

* subtract:
  Add RDD.subtract.

* bettersplits:
  Update more javadocs.
  Tweak test names.
  Remove fileServerSuite.txt.
  Update default.parallelism docs, have StandaloneSchedulerBackend use it.
  Change defaultPartitioner to use upstream split size.

Conflicts:
	core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
	core/src/test/scala/spark/ShuffleSuite.scala
  • Loading branch information...
3 parents 9d979fb + 924f47d + 4281e57 commit 02b4a0d96861fbf608f866940a65aef4408e8c61 @stephenh stephenh committed Feb 16, 2013
@@ -23,6 +23,7 @@ import spark.partial.BoundedDouble
import spark.partial.PartialResult
import spark.rdd._
import spark.SparkContext._
+import spark.Partitioner._
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -248,8 +249,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
- * Simplified version of combineByKey that hash-partitions the resulting RDD using the default
- * parallelism level.
+ * Simplified version of combineByKey that hash-partitions the resulting RDD using the
+ * existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = {
@@ -259,15 +260,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
- * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
+ * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+ * parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with the default parallelism level.
+ * resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): RDD[(K, Seq[V])] = {
groupByKey(defaultPartitioner(self))
@@ -295,7 +297,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
- * using the default level of parallelism.
+ * using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, defaultPartitioner(self, other))
@@ -315,7 +317,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
- * RDD using the default parallelism level.
+ * RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, defaultPartitioner(self, other))
@@ -438,17 +440,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
- /**
- * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
- * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
- */
- def defaultPartitioner(rdds: RDD[_]*): Partitioner = {
- for (r <- rdds if r.partitioner != None) {
- return r.partitioner.get
- }
- return new HashPartitioner(self.context.defaultParallelism)
- }
-
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
@@ -9,6 +9,25 @@ abstract class Partitioner extends Serializable {
def getPartition(key: Any): Int
}
+object Partitioner {
+ /**
+ * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
+ * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
+ *
+ * The number of partitions will be the same as the number of partitions in the largest upstream
+ * RDD, as this should be least likely to cause out-of-memory errors.
+ *
+ * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
+ */
+ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
+ val bySize = (Seq(rdd) ++ others).sortBy(_.splits.size).reverse
+ for (r <- bySize if r.partitioner != None) {
+ return r.partitioner.get
+ }
+ return new HashPartitioner(bySize.head.splits.size)
+ }
+}
+
/**
* A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
*
@@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import spark.Partitioner._
import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
@@ -30,6 +31,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithSplitRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
+import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
import spark.storage.StorageLevel
@@ -299,19 +301,26 @@ abstract class RDD[T: ClassManifest](
*/
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
+ /**
+ * Return an RDD of grouped items.
+ */
+ def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
+ groupBy[K](f, defaultPartitioner(this))
+
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
- val cleanF = sc.clean(f)
- this.map(t => (cleanF(t), t)).groupByKey(numSplits)
- }
-
+ def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] =
+ groupBy(f, new HashPartitioner(numSplits))
+
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
+ def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+ val cleanF = sc.clean(f)
+ this.map(t => (cleanF(t), t)).groupByKey(p)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
@@ -383,6 +392,26 @@ abstract class RDD[T: ClassManifest](
filter(f.isDefinedAt).map(f)
}
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/split size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: RDD[T]): RDD[T] =
+ subtract(other, partitioner.getOrElse(new HashPartitioner(splits.size)))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], numSplits: Int): RDD[T] =
+ subtract(other, new HashPartitioner(numSplits))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p)
+
/**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
@@ -693,7 +693,7 @@ class SparkContext(
checkpointDir = Some(dir)
}
- /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */
+ /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = taskScheduler.defaultParallelism
/** Default min number of splits for Hadoop RDDs when not given by user */
@@ -19,6 +19,7 @@ import spark.OrderedRDDFunctions
import spark.storage.StorageLevel
import spark.HashPartitioner
import spark.Partitioner
+import spark.Partitioner._
import spark.RDD
import spark.SparkContext.rddToPairRDDFunctions
@@ -220,30 +221,30 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(rdd.rightOuterJoin(other, partitioner))
/**
- * Simplified version of combineByKey that hash-partitions the resulting RDD using the default
- * parallelism level.
+ * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
+ * partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
implicit val cm: ClassManifest[C] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
- fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners))
+ fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
}
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
- * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
+ * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+ * parallelism level.
*/
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
- val partitioner = rdd.defaultPartitioner(rdd)
- fromRDD(reduceByKey(partitioner, func))
+ fromRDD(reduceByKey(defaultPartitioner(rdd), func))
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with the default parallelism level.
+ * resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): JavaPairRDD[K, JList[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))
@@ -268,7 +269,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
- * using the default level of parallelism.
+ * using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
fromRDD(rdd.leftOuterJoin(other))
@@ -286,7 +287,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
- * RDD using the default parallelism level.
+ * RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other))
@@ -0,0 +1,108 @@
+package spark.rdd
+
+import java.util.{HashSet => JHashSet}
+import scala.collection.JavaConversions._
+import spark.RDD
+import spark.Partitioner
+import spark.Dependency
+import spark.TaskContext
+import spark.Split
+import spark.SparkEnv
+import spark.ShuffleDependency
+import spark.OneToOneDependency
+
+/**
+ * An optimized version of cogroup for set difference/subtraction.
+ *
+ * It is possible to implement this operation with just `cogroup`, but
+ * that is less efficient because all of the entries from `rdd2`, for
+ * both matching and non-matching values in `rdd1`, are kept in the
+ * JHashMap until the end.
+ *
+ * With this implementation, only the entries from `rdd1` are kept in-memory,
+ * and the entries from `rdd2` are essentially streamed, as we only need to
+ * touch each once to decide if the value needs to be removed.
+ *
+ * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
+ * you can use `rdd1`'s partitioner/split size and not worry about running
+ * out of memory because of the size of `rdd2`.
+ */
+private[spark] class SubtractedRDD[T: ClassManifest](
+ @transient var rdd1: RDD[T],
+ @transient var rdd2: RDD[T],
+ part: Partitioner) extends RDD[T](rdd1.context, Nil) {
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(rdd1, rdd2).map { rdd =>
+ if (rdd.partitioner == Some(part)) {
+ logInfo("Adding one-to-one dependency with " + rdd)
+ new OneToOneDependency(rdd)
+ } else {
+ logInfo("Adding shuffle dependency with " + rdd)
+ val mapSideCombinedRDD = rdd.mapPartitions(i => {
+ val set = new JHashSet[T]()
+ while (i.hasNext) {
+ set.add(i.next)
+ }
+ set.iterator
+ }, true)
+ // ShuffleDependency requires a tuple (k, v), which it will partition by k.
+ // We need this to partition to map to the same place as the k for
+ // OneToOneDependency, which means:
+ // - for already-tupled RDD[(A, B)], into getPartition(a)
+ // - for non-tupled RDD[C], into getPartition(c)
+ val part2 = new Partitioner() {
+ def numPartitions = part.numPartitions
+ def getPartition(key: Any) = key match {
+ case (k, v) => part.getPartition(k)
+ case k => part.getPartition(k)
+ }
+ }
+ new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2)
+ }
+ }
+ }
+
+ override def getSplits: Array[Split] = {
+ val array = new Array[Split](part.numPartitions)
+ for (i <- 0 until array.size) {
+ // Each CoGroupSplit will dependend on rdd1 and rdd2
+ array(i) = new CoGroupSplit(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
+ dependencies(j) match {
+ case s: ShuffleDependency[_, _] =>
+ new ShuffleCoGroupSplitDep(s.shuffleId)
+ case _ =>
+ new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
+ }
+ }.toList)
+ }
+ array
+ }
+
+ override val partitioner = Some(part)
+
+ override def compute(s: Split, context: TaskContext): Iterator[T] = {
+ val split = s.asInstanceOf[CoGroupSplit]
+ val set = new JHashSet[T]
+ def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
+ for (k <- rdd.iterator(itsSplit, context))
+ op(k.asInstanceOf[T])
+ case ShuffleCoGroupSplitDep(shuffleId) =>
+ for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, split.index))
+ op(k.asInstanceOf[T])
+ }
+ // the first dep is rdd1; add all keys to the set
+ integrate(split.deps(0), set.add)
+ // the second dep is rdd2; remove all of its keys from the set
+ integrate(split.deps(1), set.remove)
+ set.iterator
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ }
+
+}
@@ -153,8 +153,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
driverActor ! ReviveOffers
}
- override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
-
// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
try {
@@ -166,6 +164,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
}
}
+
+ override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+ .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
}
private[spark] object StandaloneSchedulerBackend {
@@ -84,10 +84,10 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner)
assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
- assert(grouped2.join(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
Oops, something went wrong.

0 comments on commit 02b4a0d

Please sign in to comment.