Skip to content

Commit

Permalink
simplify solution
Browse files Browse the repository at this point in the history
Suggestion by jcrist
  • Loading branch information
mrocklin committed May 12, 2017
1 parent 67f3972 commit e7ba963
Showing 1 changed file with 7 additions and 15 deletions.
22 changes: 7 additions & 15 deletions dask/bag/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,33 +733,25 @@ def reduction(self, perpartition, aggregate, split_every=None,
fmt = '%s-aggregate-%s' % (name or funcname(aggregate), token)
depth = 0

while k > 1:
while k > split_every:
c = fmt + str(depth)
is_last = k <= split_every
dsk2 = dict(((c, i), (empty_safe_aggregate, aggregate,
[(b, j) for j in inds], is_last))
[(b, j) for j in inds], False))
for i, inds in enumerate(partition_all(split_every,
range(k))))
dsk.update(dsk2)
k = len(dsk2)
b = c
depth += 1

if self.npartitions == 1:
dsk[(a, 0)] = (aggregate, [dsk[(a, 0)]])

if not self.npartitions:
task = (aggregate, [])
if out_type is Item:
return Item({b: task}, b)
else:
return Bag({(b, 0): task}, b, 1)
dsk[(fmt, 0)] = (empty_safe_aggregate, aggregate,
[(b, j) for j in range(k)], True)

if out_type is Item:
dsk[b] = dsk.pop((b, 0))
return Item(merge(self.dask, dsk), b)
dsk[fmt] = dsk.pop((fmt, 0))
return Item(merge(self.dask, dsk), fmt)
else:
return Bag(merge(self.dask, dsk), b, 1)
return Bag(merge(self.dask, dsk), fmt, 1)

def sum(self, split_every=None):
""" Sum all elements """
Expand Down

0 comments on commit e7ba963

Please sign in to comment.