From 0a8bbbbaba94ca67a345f59b88b765476bbd8c3a Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Sun, 20 Jul 2014 15:32:20 -0700 Subject: [PATCH] clean up codes --- .../src/main/python/streaming/network_wordcount.py | 7 +------ examples/src/main/python/streaming/wordcount.py | 2 +- python/pyspark/streaming/dstream.py | 14 +++++++------- .../apache/spark/streaming/dstream/DStream.scala | 3 ++- 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index a1458e06f13d2..c6ededc24db21 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -11,20 +11,15 @@ exit(-1) conf = SparkConf() conf.setAppName("PythonStreamingNetworkWordCount") - conf.set("spark.default.parallelism", 1) ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) fm_lines = lines.flatMap(lambda x: x.split(" ")) - filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) - reduced_lines = mapped_lines.reduce(add) - counted_lines = reduced_lines.count() + reduced_lines = mapped_lines.reduceByKey(add) fm_lines.pyprint() - filtered_lines.pyprint() mapped_lines.pyprint() reduced_lines.pyprint() - counted_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index 9ff8bc5ac9ab2..ee52c4e178142 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -21,7 +21,7 @@ fm_lines = lines.flatMap(lambda x: x.split(" ")) filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) - reduced_lines = mapped_lines.reduce(add) + reduced_lines = mapped_lines.reduceByKey(add) fm_lines.pyprint() filtered_lines.pyprint() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 0c37cb7164288..3e617cafbaa93 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -22,13 +22,15 @@ def count(self): """ """ - #TODO make sure count implementation, thiis different from what pyspark does - return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) + pass + #TODO: make sure count implementation, thiis different from what pyspark does + #return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) def _sum(self): """ """ - return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + pass + #return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) def print_(self): """ @@ -85,7 +87,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ return PipelinedDStream(self, f, preservesPartitioning) - def reduceByKey(self, func, numPartitions=None): """ Merge the value for each key using an associative reduce function. @@ -121,7 +122,7 @@ def combineLocally(iterator): else: combiners[k] = mergeValue(combiners[k], v) return combiners.iteritems() - locally_combined = self.mapPartitions(combineLocally) + locally_combined = self._mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) def _mergeCombiners(iterator): combiners = {} @@ -131,12 +132,11 @@ def _mergeCombiners(iterator): else: combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() - return shuffled.mapPartitions(_mergeCombiners) + return shuffled._mapPartitions(_mergeCombiners) def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the DStream partitioned using the specified partitioner. - """ if numPartitions is None: numPartitions = self.ctx._defaultReducePartitions() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index fc7a2055025c1..f539bc9aa147d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -623,7 +623,7 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } -//TODO move pyprint to PythonDStream and executed by py4j call back function +//TODO: move pyprint to PythonDStream and executed by py4j call back function /** * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output * operator, so this PythonDStream will be registered as an output stream and there materialized. @@ -647,6 +647,7 @@ abstract class DStream[T: ClassTag] ( // pythonExec should be passed from python. Move pyprint to PythonDStream val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") + val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") // Call python script to deserialize and print result in stdout val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)