From f200b9d9599fa4a1e52dc36313c57c49e7506bc3 Mon Sep 17 00:00:00 2001 From: Pedro Rodrigues Date: Fri, 7 Dec 2018 22:00:38 +0100 Subject: [PATCH 1/6] adds failing tests fold cannot use lambdas if using multiprocessing.Pool().map() chunksize should not be less than 2 --- toolz/sandbox/tests/test_parallel.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/toolz/sandbox/tests/test_parallel.py b/toolz/sandbox/tests/test_parallel.py index e22c3de2..aae033fa 100644 --- a/toolz/sandbox/tests/test_parallel.py +++ b/toolz/sandbox/tests/test_parallel.py @@ -2,6 +2,8 @@ from toolz import reduce from operator import add from pickle import dumps, loads +from multiprocessing import Pool + # is comparison will fail between this and no_default no_default2 = loads(dumps('__no__default__')) @@ -9,7 +11,9 @@ def test_fold(): assert fold(add, range(10), 0) == reduce(add, range(10), 0) + assert fold(add, range(10), 0, map=Pool().map) == reduce(add, range(10), 0) assert fold(add, range(10), 0, chunksize=2) == reduce(add, range(10), 0) + assert fold(add, range(10), 0, chunksize=1) == reduce(add, range(10), 0) assert fold(add, range(10)) == fold(add, range(10), 0) def setadd(s, item): From 05947bb8759be523d1c0ba2dd0bfc5c6d9bf6b3d Mon Sep 17 00:00:00 2001 From: Pedro Rodrigues Date: Sun, 9 Dec 2018 18:36:36 +0100 Subject: [PATCH 2/6] refactor to allow use with multiprocessing.Pool.map() --- toolz/sandbox/parallel.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/toolz/sandbox/parallel.py b/toolz/sandbox/parallel.py index 96bef753..c9e197d5 100644 --- a/toolz/sandbox/parallel.py +++ b/toolz/sandbox/parallel.py @@ -1,8 +1,15 @@ +import functools from toolz.itertoolz import partition_all from toolz.compatibility import reduce, map from toolz.utils import no_default +def _reduce(func, seq, initial=None): + if initial is None: + return functools.reduce(func, seq) + else: + return functools.reduce(func, seq, initial) + def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): """ Reduce without guarantee of ordered reduction. @@ -50,9 +57,9 @@ def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): # Evaluate sequence in chunks via map if default == no_default: - results = map(lambda chunk: reduce(binop, chunk), chunks) + results = map(functools.partial(_reduce, binop), chunks) else: - results = map(lambda chunk: reduce(binop, chunk, default), chunks) + results = map(functools.partial(_reduce, binop, initial=default), chunks) results = list(results) # TODO: Support complete laziness From e9c2724108a1487a1e781c86999ccd9d73471653 Mon Sep 17 00:00:00 2001 From: Pedro Rodrigues Date: Sun, 9 Dec 2018 18:52:31 +0100 Subject: [PATCH 3/6] raise if chunksize less than two --- toolz/sandbox/parallel.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/toolz/sandbox/parallel.py b/toolz/sandbox/parallel.py index c9e197d5..76b77c1a 100644 --- a/toolz/sandbox/parallel.py +++ b/toolz/sandbox/parallel.py @@ -50,6 +50,8 @@ def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map) 10 """ + assert chunksize > 1 + if combine is None: combine = binop From 2955e28b510eab648bca0f12a001a8a68c07e2f4 Mon Sep 17 00:00:00 2001 From: Pedro Rodrigues Date: Sun, 9 Dec 2018 19:03:35 +0100 Subject: [PATCH 4/6] remove failing test Previously I planned to force chunksize value to be larger than 2 and wrote a test to allow chunksize equal to two. Fold takes a combination function which I am not sure how to interpret and prefer not to mess with so if the user choses a chunksize of 1 than we raise an exception instead of trapping ourselves in an infinite loop. --- toolz/sandbox/tests/test_parallel.py | 1 - 1 file changed, 1 deletion(-) diff --git a/toolz/sandbox/tests/test_parallel.py b/toolz/sandbox/tests/test_parallel.py index aae033fa..7a455937 100644 --- a/toolz/sandbox/tests/test_parallel.py +++ b/toolz/sandbox/tests/test_parallel.py @@ -13,7 +13,6 @@ def test_fold(): assert fold(add, range(10), 0) == reduce(add, range(10), 0) assert fold(add, range(10), 0, map=Pool().map) == reduce(add, range(10), 0) assert fold(add, range(10), 0, chunksize=2) == reduce(add, range(10), 0) - assert fold(add, range(10), 0, chunksize=1) == reduce(add, range(10), 0) assert fold(add, range(10)) == fold(add, range(10), 0) def setadd(s, item): From 1735c7125953ecb4a7b50bde252a04f4b224a8ea Mon Sep 17 00:00:00 2001 From: Pedro Rodrigues Date: Sun, 9 Dec 2018 20:29:39 +0100 Subject: [PATCH 5/6] fix codestyle issues --- toolz/sandbox/parallel.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/toolz/sandbox/parallel.py b/toolz/sandbox/parallel.py index 76b77c1a..ef8ed39d 100644 --- a/toolz/sandbox/parallel.py +++ b/toolz/sandbox/parallel.py @@ -10,6 +10,7 @@ def _reduce(func, seq, initial=None): else: return functools.reduce(func, seq, initial) + def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): """ Reduce without guarantee of ordered reduction. @@ -59,9 +60,13 @@ def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): # Evaluate sequence in chunks via map if default == no_default: - results = map(functools.partial(_reduce, binop), chunks) + results = map( + functools.partial(_reduce, binop), + chunks) else: - results = map(functools.partial(_reduce, binop, initial=default), chunks) + results = map( + functools.partial(_reduce, binop, initial=default), + chunks) results = list(results) # TODO: Support complete laziness From ac68944cc6a68a569fb73c0556007b930086e104 Mon Sep 17 00:00:00 2001 From: Pedro Rodrigues Date: Wed, 12 Dec 2018 08:33:40 +0100 Subject: [PATCH 6/6] stop tracking .tox dir --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 6e6f4b7d..cfef783b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ dist/ *.egg-info/ bench/shakespeare.txt .coverage + +\.tox/