Permalink
Browse files

Code format.

  • Loading branch information...
1 parent 445e0bb commit 194c42ab790aa04573319709cae19833716a077b @haoyuan haoyuan committed Feb 10, 2012
Showing with 362 additions and 292 deletions.
  1. +2 −2 core/src/main/scala/spark/Accumulators.scala
  2. +4 −4 core/src/main/scala/spark/Aggregator.scala
  3. +13 −12 core/src/main/scala/spark/BoundedMemoryCache.scala
  4. +11 −13 core/src/main/scala/spark/Cache.scala
  5. +6 −5 core/src/main/scala/spark/CacheTracker.scala
  6. +9 −5 core/src/main/scala/spark/CartesianRDD.scala
  7. +16 −11 core/src/main/scala/spark/ClosureCleaner.scala
  8. +9 −8 core/src/main/scala/spark/CoGroupedRDD.scala
  9. +2 −1 core/src/main/scala/spark/DAGScheduler.scala
  10. +11 −10 core/src/main/scala/spark/Dependency.scala
  11. +0 −1 core/src/main/scala/spark/DiskSpillingCache.scala
  12. +2 −1 core/src/main/scala/spark/Executor.scala
  13. +9 −5 core/src/main/scala/spark/FetchFailedException.scala
  14. +9 −9 core/src/main/scala/spark/HadoopRDD.scala
  15. +6 −7 core/src/main/scala/spark/HadoopWriter.scala
  16. +7 −5 core/src/main/scala/spark/Logging.scala
  17. +8 −2 core/src/main/scala/spark/MapOutputTracker.scala
  18. +13 −6 core/src/main/scala/spark/MesosScheduler.scala
  19. +14 −9 core/src/main/scala/spark/NewHadoopRDD.scala
  20. +46 −39 core/src/main/scala/spark/PairRDDFunctions.scala
  21. +4 −4 core/src/main/scala/spark/ParallelCollection.scala
  22. +7 −4 core/src/main/scala/spark/ParallelShuffleFetcher.scala
  23. +7 −2 core/src/main/scala/spark/Partitioner.scala
  24. +5 −4 core/src/main/scala/spark/PipedRDD.scala
  25. +11 −13 core/src/main/scala/spark/RDD.scala
  26. +9 −3 core/src/main/scala/spark/ResultTask.scala
  27. +5 −5 core/src/main/scala/spark/SampledRDD.scala
  28. +8 −5 core/src/main/scala/spark/SequenceFileRDDFunctions.scala
  29. +2 −3 core/src/main/scala/spark/Serializer.scala
  30. +2 −2 core/src/main/scala/spark/SerializingCache.scala
  31. +9 −3 core/src/main/scala/spark/ShuffleMapTask.scala
  32. +6 −7 core/src/main/scala/spark/ShuffledRDD.scala
  33. +14 −14 core/src/main/scala/spark/SimpleJob.scala
  34. +0 −1 core/src/main/scala/spark/SimpleShuffleFetcher.scala
  35. +5 −6 core/src/main/scala/spark/SizeEstimator.scala
  36. +24 −25 core/src/main/scala/spark/SparkContext.scala
  37. +11 −4 core/src/main/scala/spark/Stage.scala
  38. +16 −7 core/src/main/scala/spark/UnionRDD.scala
  39. +5 −8 core/src/main/scala/spark/Utils.scala
  40. +15 −17 core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -6,8 +6,8 @@ import scala.collection.mutable.Map
class Accumulator[T] (
@transient initialValue: T,
- param: AccumulatorParam[T]
- ) extends Serializable {
+ param: AccumulatorParam[T])
+ extends Serializable {
val id = Accumulators.newId
@transient
@@ -1,7 +1,7 @@
package spark
class Aggregator[K, V, C] (
- val createCombiner: V => C,
- val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C
-) extends Serializable
+ val createCombiner: V => C,
+ val mergeValue: (C, V) => C,
+ val mergeCombiners: (C, C) => C
+ ) extends Serializable
@@ -3,13 +3,11 @@ package spark
import java.util.LinkedHashMap
/**
- * An implementation of Cache that estimates the sizes of its entries and
- * attempts to limit its total memory usage to a fraction of the JVM heap.
- * Objects' sizes are estimated using SizeEstimator, which has limitations;
- * most notably, we will overestimate total memory used if some cache
- * entries have pointers to a shared object. Nonetheless, this Cache should
- * work well when most of the space is used by arrays of primitives or of
- * simple classes.
+ * An implementation of Cache that estimates the sizes of its entries and attempts to limit its
+ * total memory usage to a fraction of the JVM heap. Objects' sizes are estimated using
+ * SizeEstimator, which has limitations; most notably, we will overestimate total memory used if
+ * some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
+ * when most of the space is used by arrays of primitives or of simple classes.
*/
class BoundedMemoryCache extends Cache with Logging {
private val maxBytes: Long = getMaxBytes()
@@ -24,7 +22,11 @@ class BoundedMemoryCache extends Cache with Logging {
override def get(key: Any): Any = {
synchronized {
val entry = map.get(key)
- if (entry != null) entry.value else null
+ if (entry != null) {
+ entry.value
+ } else {
+ null
+ }
}
}
@@ -51,8 +53,8 @@ class BoundedMemoryCache extends Cache with Logging {
}
/**
- * Remove least recently used entries from the map until at least space
- * bytes are free. Assumes that a lock is held on the BoundedMemoryCache.
+ * Remove least recently used entries from the map until at least space bytes are free. Assumes
+ * that a lock is held on the BoundedMemoryCache.
*/
private def ensureFreeSpace(space: Long) {
logInfo("ensureFreeSpace(%d) called with curBytes=%d, maxBytes=%d".format(
@@ -67,7 +69,6 @@ class BoundedMemoryCache extends Cache with Logging {
}
protected def dropEntry(key: Any, entry: Entry) {
- logInfo("Dropping key %s of size %d to make space".format(
- key, entry.size))
+ logInfo("Dropping key %s of size %d to make space".format(key, entry.size))
}
}
@@ -3,20 +3,18 @@ package spark
import java.util.concurrent.atomic.AtomicLong
/**
- * An interface for caches in Spark, to allow for multiple implementations.
- * Caches are used to store both partitions of cached RDDs and broadcast
- * variables on Spark executors.
+ * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
+ * both partitions of cached RDDs and broadcast variables on Spark executors.
*
- * A single Cache instance gets created on each machine and is shared by all
- * caches (i.e. both the RDD split cache and the broadcast variable cache),
- * to enable global replacement policies. However, because these several
- * independent modules all perform caching, it is important to give them
- * separate key namespaces, so that an RDD and a broadcast variable (for
- * example) do not use the same key. For this purpose, Cache has the
- * notion of KeySpaces. Each client module must first ask for a KeySpace,
- * and then call get() and put() on that space using its own keys.
- * This abstract class handles the creation of key spaces, so that subclasses
- * need only deal with keys that are unique across modules.
+ * A single Cache instance gets created on each machine and is shared by all caches (i.e. both the
+ * RDD split cache and the broadcast variable cache), to enable global replacement policies.
+ * However, because these several independent modules all perform caching, it is important to give
+ * them separate key namespaces, so that an RDD and a broadcast variable (for example) do not use
+ * the same key. For this purpose, Cache has the notion of KeySpaces. Each client module must first
+ * ask for a KeySpace, and then call get() and put() on that space using its own keys.
+ *
+ * This abstract class handles the creation of key spaces, so that subclasses need only deal with
+ * keys that are unique across modules.
*/
abstract class Cache {
private val nextKeySpaceId = new AtomicLong(0)
@@ -96,15 +96,16 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Get a snapshot of the currently known locations
def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = {
(trackerActor !? GetCacheLocations) match {
- case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]]
- case _ => throw new SparkException(
- "Internal error: CacheTrackerActor did not reply with a HashMap")
+ case h: HashMap[_, _] =>
+ h.asInstanceOf[HashMap[Int, Array[List[String]]]]
+
+ case _ =>
+ throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
}
}
// Gets or computes an RDD split
- def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
- : Iterator[T] = {
+ def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]): Iterator[T] = {
val key = (rdd.id, split.index)
logInfo("CachedRDD partition key is " + key)
val cachedVal = cache.get(key)
@@ -1,16 +1,20 @@
package spark
-class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
-extends Split with Serializable {
+class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
override val index = idx
}
class CartesianRDD[T: ClassManifest, U:ClassManifest](
- sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
-extends RDD[Pair[T, U]](sc) with Serializable {
+ sc: SparkContext,
+ rdd1: RDD[T],
+ rdd2: RDD[U])
+ extends RDD[Pair[T, U]](sc)
+ with Serializable {
+
val numSplitsInRdd2 = rdd2.splits.size
- @transient val splits_ = {
+ @transient
+ val splits_ = {
// create the cross product split
val array = new Array[Split](rdd1.splits.size * rdd2.splits.size)
for (s1 <- rdd1.splits; s2 <- rdd2.splits) {
@@ -69,10 +69,11 @@ object ClosureCleaner extends Logging {
}
private def createNullValue(cls: Class[_]): AnyRef = {
- if (cls.isPrimitive)
+ if (cls.isPrimitive) {
new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
- else
+ } else {
null
+ }
}
def clean(func: AnyRef): Unit = {
@@ -157,26 +158,28 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
return new EmptyVisitor {
- override def visitFieldInsn(op: Int, owner: String, name: String,
- desc: String) {
- if (op == GETFIELD)
- for (cl <- output.keys if cl.getName == owner.replace('/', '.'))
+ override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
+ if (op == GETFIELD) {
+ for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
output(cl) += name
+ }
+ }
}
override def visitMethodInsn(op: Int, owner: String, name: String,
desc: String) {
// Check for calls a getter method for a variable in an interpreter wrapper object.
// This means that the corresponding field will be accessed, so we should save it.
- if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer"))
- for (cl <- output.keys if cl.getName == owner.replace('/', '.'))
+ if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
+ for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
output(cl) += name
+ }
+ }
}
}
}
}
-
class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor {
var myName: String = null
@@ -194,8 +197,10 @@ class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor {
if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
&& argTypes(0).toString.startsWith("L") // is it an object?
&& argTypes(0).getInternalName == myName)
- output += Class.forName(owner.replace('/', '.'), false,
- Thread.currentThread.getContextClassLoader)
+ output += Class.forName(
+ owner.replace('/', '.'),
+ false,
+ Thread.currentThread.getContextClassLoader)
}
}
}
@@ -10,20 +10,20 @@ sealed trait CoGroupSplitDep extends Serializable
case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep])
-extends Split with Serializable {
+class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
override val index = idx
override def hashCode(): Int = idx
}
class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
- { x => ArrayBuffer(x) },
- { (b, x) => b += x },
- { (b1, b2) => b1 ++ b2 }
-) with Serializable
+ { x => ArrayBuffer(x) },
+ { (b, x) => b += x },
+ { (b1, b2) => b1 ++ b2 }
+ ) with Serializable
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
-extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {
+ extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {
+
val aggr = new CoGroupAggregator
override val dependencies = {
@@ -41,7 +41,8 @@ extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {
deps.toList
}
- @transient val splits_ : Array[Split] = {
+ @transient
+ val splits_ : Array[Split] = {
val firstRdd = rdds.head
val array = new Array[Split](part.numPartitions)
for (i <- 0 until array.size) {
@@ -144,7 +144,8 @@ private trait DAGScheduler extends Scheduler with Logging {
if (!stage.isAvailable) {
missing += stage
}
- case narrowDep: NarrowDependency[_] => visit(narrowDep.rdd)
+ case narrowDep: NarrowDependency[_] =>
+ visit(narrowDep.rdd)
}
}
}
@@ -2,28 +2,29 @@ package spark
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable
-abstract class NarrowDependency[T](rdd: RDD[T])
-extends Dependency(rdd, false) {
+abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) {
def getParents(outputPartition: Int): Seq[Int]
}
class ShuffleDependency[K, V, C](
- val shuffleId: Int,
- rdd: RDD[(K, V)],
- val aggregator: Aggregator[K, V, C],
- val partitioner: Partitioner
-) extends Dependency(rdd, true)
+ val shuffleId: Int,
+ rdd: RDD[(K, V)],
+ val aggregator: Aggregator[K, V, C],
+ val partitioner: Partitioner)
+ extends Dependency(rdd, true)
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
-extends NarrowDependency[T](rdd) {
+ extends NarrowDependency[T](rdd) {
+
override def getParents(partitionId: Int) = {
- if (partitionId >= outStart && partitionId < outStart + length)
+ if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
- else
+ } else {
Nil
+ }
}
}
@@ -8,7 +8,6 @@ import java.util.UUID
// TODO: cache into a separate directory using Utils.createTempDir
// TODO: clean up disk cache afterwards
-
class DiskSpillingCache extends BoundedMemoryCache {
private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true)
@@ -67,8 +67,9 @@ class Executor extends org.apache.mesos.Executor with Logging {
Thread.currentThread.setContextClassLoader(classLoader)
Accumulators.clear
val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
- for (gen <- task.generation) // Update generation if any is set
+ for (gen <- task.generation) {// Update generation if any is set
env.mapOutputTracker.updateGeneration(gen)
+ }
val value = task.run(tid.toInt)
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
@@ -1,13 +1,17 @@
package spark
-class FetchFailedException(val serverUri: String, val shuffleId: Int,
- val mapId: Int, val reduceId: Int, cause: Throwable)
-extends Exception {
+class FetchFailedException(
+ val serverUri: String,
+ val shuffleId: Int,
+ val mapId: Int,
+ val reduceId: Int,
+ cause: Throwable)
+ extends Exception {
+
override def getMessage(): String =
"Fetch failed: %s %d %d %d".format(serverUri, shuffleId, mapId, reduceId)
override def getCause(): Throwable = cause
- def toTaskEndReason: TaskEndReason =
- FetchFailed(serverUri, shuffleId, mapId, reduceId)
+ def toTaskEndReason: TaskEndReason = FetchFailed(serverUri, shuffleId, mapId, reduceId)
}
@@ -18,8 +18,8 @@ import org.apache.hadoop.util.ReflectionUtils
class HadoopSplit(
rddId: Int,
idx: Int,
- @transient s: InputSplit
- ) extends Split with Serializable {
+ @transient s: InputSplit)
+ extends Split with Serializable {
val inputSplit = new SerializableWritable[InputSplit](s)
@@ -33,13 +33,13 @@ class HadoopSplit(
* system, or S3, tables in HBase, etc).
*/
class HadoopRDD[K, V](
- sc: SparkContext,
- @transient conf: JobConf,
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int
- ) extends RDD[(K, V)](sc) {
+ sc: SparkContext,
+ @transient conf: JobConf,
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int)
+ extends RDD[(K, V)](sc) {
val serializableConf = new SerializableWritable(conf)
Oops, something went wrong.

0 comments on commit 194c42a

Please sign in to comment.