From a613b852668f88069555b7039d4e3c9f536bab93 Mon Sep 17 00:00:00 2001 From: giwa Date: Sun, 3 Aug 2014 23:27:56 -0700 Subject: [PATCH] clean up dstream.py --- python/pyspark/streaming/dstream.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index e6cd2eb9a49af..7233ae5249e6d 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -20,9 +20,7 @@ def __init__(self, jdstream, ssc, jrdd_deserializer): def count(self): """ - """ - # TODO: make sure count implementation, this different from what pyspark does return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum() def _sum(self): @@ -79,7 +77,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): def reduce(self, func): """ - """ return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) @@ -107,12 +104,6 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, def combineLocally(iterator): combiners = {} for x in iterator: - - #TODO for count operation make sure count implementation - # This is different from what pyspark does - #if isinstance(x, int): - # x = ("", x) - (k, v) = x if k not in combiners: combiners[k] = createCombiner(v) @@ -142,6 +133,7 @@ def partitionBy(self, numPartitions, partitionFunc=None): if partitionFunc is None: partitionFunc = lambda x: 0 if x is None else hash(x) + # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. @@ -215,7 +207,6 @@ def takeAndPrint(rdd, time): self.foreachRDD(takeAndPrint) - #def transform(self, func): # from utils import RDDFunction # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)