Skip to content

Commit

Permalink
basic function test cases are passed
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 8dcda84 commit c5ecfc1
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 71 deletions.
10 changes: 0 additions & 10 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ def main(infile, outfile):
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)
print "deserializer in worker: %s" % str(deserializer)
iterator, walk = itertools.tee(iterator)
if isinstance(walk, int):
print "this is int"
print walk
else:
try:
print list(walk)
except:
print list(walk)
serializer.dump_stream(func(split_index, iterator), outfile)
except Exception:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,56 +206,14 @@ class PythonTransformedDStream(
}
*/

<<<<<<< HEAD
=======
/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
*/
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int)
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
logInfo("Computing RDD for time " + validTime)
inputFiles.foreach(logInfo(_))
// make a temporary file
// make empty RDD
val prefix = "spark"
val suffix = ".tmp"
val tempFile = File.createTempFile(prefix, suffix)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
logInfo("Index: " + index)

val selectedInputFile: String = {
if (inputFiles.isEmpty){
tempFile.getAbsolutePath
}else if (index < inputFiles.size()) {
inputFiles.get(index)
} else {
tempFile.getAbsolutePath
}
}
val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
logInfo("Created RDD " + rdd.id + " with " + selectedInputFile)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
* This implementation is close to QueStream
*/

class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}
Expand All @@ -280,21 +238,3 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[

val asJavaDStream = JavaDStream.fromDStream(this)
}


class PythonTestInputStream3(ssc_ : JavaStreamingContext)
extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Any]] = {
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = ArrayBuffer(1, 2, 3).toSeq
val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}>>>>>>> broke something

0 comments on commit c5ecfc1

Please sign in to comment.