Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into fix-drop-events
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jul 31, 2014
2 parents 8e91921 + e966284 commit da8e322
Show file tree
Hide file tree
Showing 77 changed files with 3,227 additions and 478 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@
<artifactId>asm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,23 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}

@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
{
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
Expand All @@ -85,9 +90,12 @@ case class Aggregator[K, V, C] (
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
executorEnvs("SPARK_PREPEND_CLASSES") = v
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
Expand Down Expand Up @@ -1205,10 +1205,10 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* if not.
*
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] =
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
Expand All @@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter[T, U] extends Serializable {
trait Converter[T, + U] extends Serializable {
def convert(obj: T): U
}

private[python] object Converter extends Logging {

def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
def getInstance(converterClass: Option[String],
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
Expand All @@ -49,15 +51,17 @@ private[python] object Converter extends Logging {
logError(s"Failed to load converter: $cc")
throw err
}
}.getOrElse { new DefaultConverter }
}.getOrElse { defaultConverter }
}
}

/**
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
private[python] class DefaultConverter extends Converter[Any, Any] {
private[python] class WritableToJavaConverter(
conf: Broadcast[SerializableWritable[Configuration]],
batchSize: Int) extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
Expand All @@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
case byw: BytesWritable => byw.getBytes
case byw: BytesWritable =>
val bytes = new Array[Byte](byw.getLength)
System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
bytes
case n: NullWritable => null
case aw: ArrayWritable => aw.get().map(convertWritable(_))
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
(convertWritable(k), convertWritable(v))
}.toMap)
case aw: ArrayWritable =>
// Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
// Since we can't determine element types for empty arrays, we will not attempt to
// convert to primitive arrays (which get pickled to Python arrays). Users may want
// write custom converters for arrays if they know the element types a priori.
aw.get().map(convertWritable(_))
case mw: MapWritable =>
val map = new java.util.HashMap[Any, Any]()
mw.foreach { case (k, v) =>
map.put(convertWritable(k), convertWritable(v))
}
map
case w: Writable =>
if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
case other => other
}
}

def convert(obj: Any): Any = {
override def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
Expand All @@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
}
}

/**
* A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
* types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
* to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
* [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
* PySpark RDD `saveAsNewAPIHadoopFile` doctest.
*/
private[python] class JavaToWritableConverter extends Converter[Any, Writable] {

/**
* Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
* supported out-of-the-box.
*/
private def convertToWritable(obj: Any): Writable = {
import collection.JavaConversions._
obj match {
case i: java.lang.Integer => new IntWritable(i)
case d: java.lang.Double => new DoubleWritable(d)
case l: java.lang.Long => new LongWritable(l)
case f: java.lang.Float => new FloatWritable(f)
case s: java.lang.String => new Text(s)
case b: java.lang.Boolean => new BooleanWritable(b)
case aob: Array[Byte] => new BytesWritable(aob)
case null => NullWritable.get()
case map: java.util.Map[_, _] =>
val mapWritable = new MapWritable()
map.foreach { case (k, v) =>
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
case other => throw new SparkException(
s"Data of type ${other.getClass.getName} cannot be used")
}
}

override def convert(obj: Any): Writable = obj match {
case writable: Writable => writable
case other => convertToWritable(other)
}
}

/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {

Expand All @@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
* [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
Expand Down
Loading

0 comments on commit da8e322

Please sign in to comment.