Skip to content

Commit

Permalink
fix QueueStream default value (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
svenkreiss committed May 6, 2017
1 parent dbd4006 commit 67599f9
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pysparkling/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ def parallelize(self, x, numPartitions=None):
:param x:
An iterable (e.g. a list) that represents the data.
:param numPartitions: (optional)
:param int|None numPartitions: (optional)
The number of partitions the data should be split into.
A partition is a unit of data that is processed at a time.
Can be ``None``.
:rtype: RDD
"""
if not numPartitions:
if numPartitions is None or numPartitions <= 1:
return RDD([Partition(x, 0)], self)

x_len_iter, x = itertools.tee(x, 2)
Expand Down
17 changes: 17 additions & 0 deletions pysparkling/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ def queueStream(self, rdds, oneAtATime=True, default=None):
[4]
[2]
[7]
Example testing the default value:
>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
... ssc
... .queueStream([[4], [2]], default=['placeholder'])
... .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[4]
[2]
['placeholder']
"""
deserializer = QueueStreamDeserializer(self._context)
if default is not None:
Expand Down
9 changes: 4 additions & 5 deletions pysparkling/streaming/queuestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ def ensure_rdd(self, data):
return self.context.parallelize(data)

def __call__(self, data):
rdds = [self.ensure_rdd(d) for d in data]
return self.context.union(rdds)
return self.ensure_rdd(data)


class QueueStream(object):
Expand All @@ -29,9 +28,9 @@ def get(self):
q_size = self.queue.qsize()

if q_size == 0:
return [self.default]
return self.default

if self.oneAtATime:
return [self.queue.get_nowait()]
return self.queue.get_nowait()

return [self.queue.get_nowait() for _ in range(q_size)]
return [e for _ in range(q_size) for e in self.queue.get_nowait()]
9 changes: 8 additions & 1 deletion scripts/pyspark_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ def stream_log(ssc):
ssc.textFileStream('/var/log/system.log*').pprint()


def stream_queue_default(ssc):
(ssc
.queueStream([[4], [2]], default=['placeholder'])
.foreachRDD(lambda rdd: print(rdd.collect())))


if __name__ == '__main__':
sc = pyspark.SparkContext()
ssc = pyspark.streaming.StreamingContext(sc, 1)
Expand All @@ -72,8 +78,9 @@ def stream_log(ssc):
# save_text(ssc)
# window(ssc)
# union(ssc)
updateStateByKey(ssc)
# updateStateByKey(ssc)
# stream_log(ssc)
stream_queue_default(ssc)

ssc.start()
time.sleep(3.0)
Expand Down

0 comments on commit 67599f9

Please sign in to comment.