Permalink
Browse files

First-cut implementation of RDD checksums.

  • Loading branch information...
1 parent 2142500 commit d754cf41e8414d4973d9b3d688d5b84b627c275b @tdas tdas committed Jun 21, 2011
Showing with 75 additions and 2 deletions.
  1. +75 −2 core/src/main/scala/spark/RDD.scala
@@ -15,6 +15,66 @@ import SparkContext._
import mesos._
+@serializable
+class RDDChecksum {
+ private var isSet = false
+ private var checksum: Int = 0
+
+ def reset() {
+ checksum = 0
+ isSet = false
+ }
+
+ def add(obj: Any) {
+ checksum = checksum + obj.hashCode
+ }
+
+ def set() {
+ isSet = true
+ }
+
+ def value: Option[Int] = {
+ if (isSet)
+ Some(checksum)
+ else None
+ }
+
+ override def toString = value.getOrElse("None").toString
+}
+
+@serializable
+class RDDIterator[T](rdd: RDD[T], iter: Iterator[T], split: Split) extends Iterator[T] with Logging {
+ private var started = false
+
+ def hasNext(): Boolean = {
+ val res = iter.hasNext
+ if (res == false) {
+ rdd.checksums.get(split) match {
+ case Some(checksum) => {
+ checksum.set
+ logInfo ("Checksum for %s(%d.%d) is %s".format(rdd.getClass.getSimpleName, rdd.id, split.index, checksum))
+ }
+ case None => throw new Exception("Checksum not found")
+ }
+ }
+ res
+ }
+
+ def next(): T = {
+ val res = iter.next
+ if (!started) {
+ rdd.checksums -= split
+ rdd.checksums += ((split, new RDDChecksum))
+ started = true
+ }
+ rdd.checksums.get(split) match {
+ case Some(checksum) => checksum.add(res)
+ case None => throw new Exception("Checksum not found")
+ }
+ res
+ }
+}
+
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
// Methods that must be implemented by subclasses
@@ -32,6 +92,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
// Get a unique ID for this RDD
val id = sc.newRddId()
+
+ // Checksums of every split, calculated when the RDD is generated
+ val checksums = new HashMap[Split,RDDChecksum]
+
+ private def shouldChecksum: Boolean = {
+ true
+ }
// Variables relating to caching
private var shouldCache = false
@@ -44,10 +111,16 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
// Read this RDD; will read from cache if applicable, or otherwise compute
final def iterator(split: Split): Iterator[T] = {
+ var iter: Iterator[T] = null
if (shouldCache) {
- SparkEnv.get.cacheTracker.getOrCompute[T](this, split)
+ iter = SparkEnv.get.cacheTracker.getOrCompute[T](this, split)
+ } else {
+ iter = compute(split)
+ }
+ if (shouldChecksum) {
+ new RDDIterator(this, iter, split)
} else {
- compute(split)
+ iter
}
}

0 comments on commit d754cf4

Please sign in to comment.