Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ dist/
*.egg-info/
bench/shakespeare.txt
.coverage

\.tox/
18 changes: 16 additions & 2 deletions toolz/sandbox/parallel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import functools
from toolz.itertoolz import partition_all
from toolz.compatibility import reduce, map
from toolz.utils import no_default


def _reduce(func, seq, initial=None):
if initial is None:
return functools.reduce(func, seq)
else:
return functools.reduce(func, seq, initial)


def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
"""
Reduce without guarantee of ordered reduction.
Expand Down Expand Up @@ -43,16 +51,22 @@ def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
>>> fold(add, [1, 2, 3, 4], chunksize=2, map=map)
10
"""
assert chunksize > 1

if combine is None:
combine = binop

chunks = partition_all(chunksize, seq)

# Evaluate sequence in chunks via map
if default == no_default:
results = map(lambda chunk: reduce(binop, chunk), chunks)
results = map(
functools.partial(_reduce, binop),
chunks)
else:
results = map(lambda chunk: reduce(binop, chunk, default), chunks)
results = map(
functools.partial(_reduce, binop, initial=default),
chunks)

results = list(results) # TODO: Support complete laziness

Expand Down
3 changes: 3 additions & 0 deletions toolz/sandbox/tests/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from toolz import reduce
from operator import add
from pickle import dumps, loads
from multiprocessing import Pool


# is comparison will fail between this and no_default
no_default2 = loads(dumps('__no__default__'))


def test_fold():
assert fold(add, range(10), 0) == reduce(add, range(10), 0)
assert fold(add, range(10), 0, map=Pool().map) == reduce(add, range(10), 0)
assert fold(add, range(10), 0, chunksize=2) == reduce(add, range(10), 0)
assert fold(add, range(10)) == fold(add, range(10), 0)

Expand Down