diff --git a/.gitignore b/.gitignore index 6e6f4b7d..cfef783b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ dist/ *.egg-info/ bench/shakespeare.txt .coverage + +\.tox/ diff --git a/toolz/sandbox/parallel.py b/toolz/sandbox/parallel.py index 96bef753..ef8ed39d 100644 --- a/toolz/sandbox/parallel.py +++ b/toolz/sandbox/parallel.py @@ -1,8 +1,16 @@ +import functools from toolz.itertoolz import partition_all from toolz.compatibility import reduce, map from toolz.utils import no_default +def _reduce(func, seq, initial=None): + if initial is None: + return functools.reduce(func, seq) + else: + return functools.reduce(func, seq, initial) + + def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): """ Reduce without guarantee of ordered reduction. @@ -43,6 +51,8 @@ def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map) 10 """ + assert chunksize > 1 + if combine is None: combine = binop @@ -50,9 +60,13 @@ def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): # Evaluate sequence in chunks via map if default == no_default: - results = map(lambda chunk: reduce(binop, chunk), chunks) + results = map( + functools.partial(_reduce, binop), + chunks) else: - results = map(lambda chunk: reduce(binop, chunk, default), chunks) + results = map( + functools.partial(_reduce, binop, initial=default), + chunks) results = list(results) # TODO: Support complete laziness diff --git a/toolz/sandbox/tests/test_parallel.py b/toolz/sandbox/tests/test_parallel.py index e22c3de2..7a455937 100644 --- a/toolz/sandbox/tests/test_parallel.py +++ b/toolz/sandbox/tests/test_parallel.py @@ -2,6 +2,8 @@ from toolz import reduce from operator import add from pickle import dumps, loads +from multiprocessing import Pool + # is comparison will fail between this and no_default no_default2 = loads(dumps('__no__default__')) @@ -9,6 +11,7 @@ def test_fold(): assert fold(add, range(10), 0) == reduce(add, range(10), 0) + assert fold(add, range(10), 0, map=Pool().map) == reduce(add, range(10), 0) assert fold(add, range(10), 0, chunksize=2) == reduce(add, range(10), 0) assert fold(add, range(10)) == fold(add, range(10), 0)