Skip to content

Commit

Permalink
Support dask.bag reductions on no partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed May 10, 2017
1 parent 917bbd4 commit 05481cc
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dask/bag/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,9 @@ def reduction(self, perpartition, aggregate, split_every=None,
b = c
depth += 1

if not self.npartitions:
return Item({b: (aggregate, [])}, b)

if out_type is Item:
dsk[b] = dsk.pop((b, 0))
return Item(merge(self.dask, dsk), b)
Expand Down
8 changes: 8 additions & 0 deletions dask/bag/tests/test_bag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,3 +1142,11 @@ def test_temporary_directory():
b2 = b.groupby(lambda x: x % 2)
b2.compute()
assert any(fn.endswith('.partd') for fn in os.listdir(os.getcwd()))


def test_empty_bag():
b = db.from_sequence([])
assert b.map(inc).all().compute(get=dask.get)
assert not b.map(inc).any().compute(get=dask.get)
assert not b.map(inc).sum().compute(get=dask.get)
assert not b.map(inc).count().compute(get=dask.get)

0 comments on commit 05481cc

Please sign in to comment.