diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index f44cd696894ba..3996991109d60 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -1,6 +1,7 @@ import sys from operator import add +from pyspark.conf import SparkConf from pyspark.streaming.context import StreamingContext from pyspark.streaming.duration import * @@ -8,15 +9,22 @@ if len(sys.argv) != 2: print >> sys.stderr, "Usage: wordcount " exit(-1) - ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + conf = SparkConf() + conf.setAppName("PythonStreamingWordCount") + conf.set("spark.default.parallelism", 1) + +# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.textFileStream(sys.argv[1]) fm_lines = lines.flatMap(lambda x: x.split(" ")) filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) + reduced_lines = mapped_lines.reduce(add) fm_lines.pyprint() filtered_lines.pyprint() mapped_lines.pyprint() + reduced_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index a512517f6e437..e144f8bc1cc09 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -29,6 +29,7 @@ __all__ = ["DStream"] + class DStream(object): def __init__(self, jdstream, ssc, jrdd_deserializer): self._jdstream = jdstream @@ -149,7 +150,7 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners, """ """ if numPartitions is None: - numPartitions = self.ctx._defaultParallelism() + numPartitions = self._defaultReducePartitions() def combineLocally(iterator): combiners = {} for x in iterator: @@ -211,7 +212,6 @@ def add_shuffle_key(split, iterator): return dstream - def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc): """ """ @@ -254,8 +254,31 @@ def wrapRDD(self, rdd): raise NotImplementedError def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + + """ return PipelinedDStream(self, f, preservesPartitioning) + def _defaultReducePartitions(self): + """ + + """ + # hard code to avoid the error + return 2 + if self.ctx._conf.contains("spark.default.parallelism"): + return self.ctx.defaultParallelism + else: + return self.getNumPartitions() + + def getNumPartitions(self): + """ + Returns the number of partitions in RDD + >>> rdd = sc.parallelize([1, 2, 3, 4], 2) + >>> rdd.getNumPartitions() + 2 + """ + return self._jdstream.partitions().size() + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 389136f9e21a0..719dd0a6a53c2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -129,7 +129,7 @@ class PythonDStream[T: ClassTag]( } } -/* + private class PairwiseDStream(prev:DStream[Array[Byte]]) extends DStream[(Long, Array[Byte])](prev.ssc){ override def dependencies = List(prev) @@ -144,9 +144,9 @@ DStream[(Long, Array[Byte])](prev.ssc){ case None => None } } - val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this) + val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) } -*/ +