Skip to content

Commit

Permalink
fix RDD.reduce when rdd contains empty partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
tools4origins committed Apr 7, 2019
1 parent 596d0ef commit 0631648
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions pysparkling/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,15 +1033,39 @@ def reduce(self, f):
Example:
>>> from pysparkling import Context
>>> Context().parallelize([0, 4, 7, 4, 10]).reduce(lambda a, b: a+b)
>>> Context().parallelize([0, 4, 7, 4, 10], 2).reduce(lambda a, b: a+b)
25
"""
return self.context.runJob(
>>> Context().parallelize([0, 4, 7, 4, 10], 10).reduce(lambda a, b: a+b)
25
>>> Context().parallelize([0], 10).reduce(lambda a, b: a+b)
0
>>> Context().parallelize([], 10).reduce(lambda a, b: a+b)
Traceback (most recent call last):
...
ValueError: Can not reduce() empty RDD
"""
_empty = object()

def reducer(values):
try:
return functools.reduce(f, (v for v in values if v is not _empty))
except TypeError as e:
if e.args[0] == "reduce() of empty sequence with no initial value":
return _empty
else:
raise e

result = self.context.runJob(
self,
lambda tc, x: functools.reduce(f, x),
resultHandler=lambda x: functools.reduce(f, x),
lambda tc, x: reducer(x),
resultHandler=reducer
)

if result is _empty:
raise ValueError("Can not reduce() empty RDD")

return result

def reduceByKey(self, f, numPartitions=None):
"""reduce by key
Expand Down

0 comments on commit 0631648

Please sign in to comment.