Skip to content

Commit

Permalink
Make results of parallel execution a real iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
versae committed Apr 10, 2016
1 parent ebe41f4 commit 2009d36
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions functional/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,23 @@ def is_serializable(func):
def parallelize(func, result, processes=None):
if not is_serializable(func):
return func(result)
return chain.from_iterable(lazy_parallelize(func, result, processes))


def lazy_parallelize(func, result, processes=None):
if processes is None or processes < 1:
processes = CPU_COUNT
else:
processes = min(processes, CPU_COUNT)
try:
chunk_size = len(result) // processes
except TypeError:
chunk_size = processes
with Pool(processes=processes) as pool:
chunks = split_every(processes, iter(result))
chunks = split_every(chunk_size, iter(result))
packed_chunks = (pack(func, (chunk, )) for chunk in chunks)
results = pool.map(unpack, packed_chunks)
return chain.from_iterable(results)
for pool_result in pool.imap(unpack, packed_chunks):
yield pool_result


def compose(*functions):
Expand Down

0 comments on commit 2009d36

Please sign in to comment.