Skip to content

Commit

Permalink
Merge branch 'master' into auto-broadcast-hash-join
Browse files Browse the repository at this point in the history
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
  • Loading branch information
concretevitamin committed Jun 20, 2014
2 parents 7c7158b + 0ac71d1 commit 91461c2
Show file tree
Hide file tree
Showing 63 changed files with 3,168 additions and 720 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ class SparkContext(config: SparkConf) extends Logging {

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3],
partitioner: Partitioner)
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
Expand All @@ -558,6 +570,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
Expand All @@ -574,6 +597,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3],
numPartitions: Int)
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions)))

/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
Expand All @@ -583,6 +618,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))

/** Alias for cogroup. */
def groupWith[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3)))

/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
Expand Down Expand Up @@ -786,6 +828,15 @@ object JavaPairRDD {
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
}

private[spark]
def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x =>
(asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
}

def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
new JavaPairRDD[K, V](rdd)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer

import org.apache.log4j.Level

import org.apache.spark.util.MemoryParam

/**
* Command-line parser for the driver client.
*/
Expand Down Expand Up @@ -51,8 +53,8 @@ private[spark] class ClientArguments(args: Array[String]) {
cores = value.toInt
parse(tail)

case ("--memory" | "-m") :: value :: tail =>
memory = value.toInt
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
memory = value
parse(tail)

case ("--supervise" | "-s") :: tail =>
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,28 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
new FlatMappedValuesRDD(self, cleanF)
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
Expand Down Expand Up @@ -599,6 +621,16 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
Expand Down Expand Up @@ -633,6 +665,19 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
}

/** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
Expand All @@ -644,6 +689,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/** Alias for cogroup. */
def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}

/**
* Return an RDD with the pairs from `this` whose keys are not in `other`.
*
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ abstract class RDD[T: ClassTag](
* Return this RDD sorted by the given key function.
*/
def sortBy[K](
f: (T) K,
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,13 @@ private[spark] object MetadataCleaner {
conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}

/**
* Set the default delay time (in seconds).
* @param conf SparkConf instance
* @param delay default delay time to set
* @param resetAll whether to reset all to default
*/
def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) {
// override for all ?
conf.set("spark.cleaner.ttl", delay.toString)
if (resetAll) {
for (cleanerType <- MetadataCleanerType.values) {
Expand Down
63 changes: 63 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.*;

import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;


import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -304,6 +307,66 @@ public void cogroup() {
cogrouped.collect();
}

@SuppressWarnings("unchecked")
@Test
public void cogroup3() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, String>("Apples", "Fruit"),
new Tuple2<String, String>("Oranges", "Fruit"),
new Tuple2<String, String>("Oranges", "Citrus")
));
JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, Integer>("Oranges", 2),
new Tuple2<String, Integer>("Apples", 3)
));
JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, Integer>("Oranges", 21),
new Tuple2<String, Integer>("Apples", 42)
));

JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
categories.cogroup(prices, quantities);
Assert.assertEquals("[Fruit, Citrus]",
Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));


cogrouped.collect();
}

@SuppressWarnings("unchecked")
@Test
public void cogroup4() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, String>("Apples", "Fruit"),
new Tuple2<String, String>("Oranges", "Fruit"),
new Tuple2<String, String>("Oranges", "Citrus")
));
JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, Integer>("Oranges", 2),
new Tuple2<String, Integer>("Apples", 3)
));
JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, Integer>("Oranges", 21),
new Tuple2<String, Integer>("Apples", 42)
));
JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, String>("Oranges", "BR"),
new Tuple2<String, String>("Apples", "US")
));

JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>, Iterable<String>>> cogrouped =
categories.cogroup(prices, quantities, countries);
Assert.assertEquals("[Fruit, Citrus]",
Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
Assert.assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));

cogrouped.collect();
}

@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}

test("groupWith3") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
val joined = rdd1.groupWith(rdd2, rdd3).collect()
assert(joined.size === 4)
val joinedSet = joined.map(x => (x._1,
(x._2._1.toList, x._2._2.toList, x._2._3.toList))).toSet
assert(joinedSet === Set(
(1, (List(1, 2), List('x'), List('a'))),
(2, (List(1), List('y', 'z'), List())),
(3, (List(1), List(), List('b'))),
(4, (List(), List('w'), List('c', 'd')))
))
}

test("groupWith4") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
val rdd4 = sc.parallelize(Array((2, '@')))
val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect()
assert(joined.size === 4)
val joinedSet = joined.map(x => (x._1,
(x._2._1.toList, x._2._2.toList, x._2._3.toList, x._2._4.toList))).toSet
assert(joinedSet === Set(
(1, (List(1, 2), List('x'), List('a'), List())),
(2, (List(1), List('y', 'z'), List(), List('@'))),
(3, (List(1), List(), List('b'), List())),
(4, (List(), List('w'), List('c', 'd'), List()))
))
}

test("zero-partition RDD") {
val emptyDir = Files.createTempDir()
emptyDir.deleteOnExit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,14 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
checkNonZeroAvg(
taskInfoMetrics.map(_._2.executorDeserializeTime),
stageInfo + " executorDeserializeTime")

/* Test is disabled (SEE SPARK-2208)
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
checkNonZeroAvg(
taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
stageInfo + " fetchWaitTime")
}
*/

taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
// on SparkConf settings.

def testAppenderSelection[ExpectedAppender: ClassTag, ExpectedRollingPolicy](
properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): FileAppender = {
properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): Unit = {

// Set spark conf properties
val conf = new SparkConf
Expand All @@ -129,8 +129,9 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
}

// Create and test file appender
val inputStream = new PipedInputStream(new PipedOutputStream())
val appender = FileAppender(inputStream, new File("stdout"), conf)
val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream)
val appender = FileAppender(testInputStream, testFile, conf)
assert(appender.isInstanceOf[ExpectedAppender])
assert(appender.getClass.getSimpleName ===
classTag[ExpectedAppender].runtimeClass.getSimpleName)
Expand All @@ -144,7 +145,8 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
}
assert(policyParam === expectedRollingPolicyParam)
}
appender
testOutputStream.close()
appender.awaitTermination()
}

import RollingFileAppender._
Expand Down
Loading

0 comments on commit 91461c2

Please sign in to comment.