From 8dcda84d2d1de47b00e0209b269513ff0bdb1671 Mon Sep 17 00:00:00 2001 From: giwa Date: Thu, 14 Aug 2014 18:07:10 -0700 Subject: [PATCH] all tests are passed if numSlice is 2 and the numver of each input is over 4 --- python/pyspark/streaming/context.py | 46 ------------------- .../streaming/api/python/PythonDStream.scala | 19 +++++++- 2 files changed, 18 insertions(+), 47 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 96d717cfcc75c..1668bfcd41a57 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -142,49 +142,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): def _testInputStream(self, test_inputs, numSlices=None): """ -<<<<<<< HEAD This function is only for test. This implementation is inspired by QueStream implementation. Give list of RDD to generate DStream which contains the RDD. -======= - Generate multiple files to make "stream" in Scala side for test. - Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile. - - QueStream maybe good way to implement this function - """ - numSlices = numSlices or self._sc.defaultParallelism - # Calling the Java parallelize() method with an ArrayList is too slow, - # because it sends O(n) Py4J commands. As an alternative, serialized - # objects are written to a file and loaded through textFile(). - - tempFiles = list() - for test_input in test_inputs: - tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) - - # Make sure we distribute data evenly if it's smaller than self.batchSize - if "__len__" not in dir(test_input): - test_input = list(test_input) # Make it a list so we can compute its length - batchSize = min(len(test_input) // numSlices, self._sc._batchSize) - if batchSize > 1: - serializer = BatchedSerializer(self._sc._unbatched_serializer, - batchSize) - else: - serializer = self._sc._unbatched_serializer - serializer.dump_stream(test_input, tempFile) - tempFile.close() - tempFiles.append(tempFile.name) - - jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client) - jinput_stream = self._jvm.PythonTestInputStream(self._jssc, - jtempFiles, - numSlices).asJavaDStream() - return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer())) - - def _testInputStream2(self, test_inputs, numSlices=None): - """ - This is inpired by QueStream implementation. Give list of RDD and generate DStream - which contain the RDD. ->>>>>>> broke something """ test_rdds = list() test_rdd_deserializers = list() @@ -196,10 +156,4 @@ def _testInputStream2(self, test_inputs, numSlices=None): jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() -<<<<<<< HEAD return DStream(jinput_stream, self, test_rdd_deserializers[0]) -======= - dstream = DStream(jinput_stream, self, test_rdd_deserializers[0]) - dstream._test_switch_dserializer(test_rdd_deserializers) - return dstream ->>>>>>> broke something diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 63b2f709df7e4..0bafe3f846793 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -280,4 +280,21 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[ val asJavaDStream = JavaDStream.fromDStream(this) } ->>>>>>> broke something + + +class PythonTestInputStream3(ssc_ : JavaStreamingContext) + extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) { + + def start() {} + + def stop() {} + + def compute(validTime: Time): Option[RDD[Any]] = { + val index = ((validTime - zeroTime) / slideDuration - 1).toInt + val selectedInput = ArrayBuffer(1, 2, 3).toSeq + val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2) + Some(rdd) + } + + val asJavaDStream = JavaDStream.fromDStream(this) +}>>>>>>> broke something