Skip to content

Commit

Permalink
Add support for specifying the number of processes required for paral…
Browse files Browse the repository at this point in the history
…lel execution
  • Loading branch information
versae committed Apr 5, 2016
1 parent 9cd0ae2 commit ebe41f4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
9 changes: 7 additions & 2 deletions functional/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ def evaluate(self, sequence, transformations):

class ParallelExecutionEngine(ExecutionEngine):

def __init__(self, processes=None, *args, **kwargs):
super(ParallelExecutionEngine, self).__init__(*args, **kwargs)
self._processes = processes

def evaluate(self, sequence, transformations):
processes = self._processes
result = sequence
staged = []
for transform in transformations:
Expand All @@ -36,9 +41,9 @@ def evaluate(self, sequence, transformations):
staged.insert(0, transform.function)
else:
if staged:
result = parallelize(compose(*staged), result)
result = parallelize(compose(*staged), result, processes)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result)
result = parallelize(compose(*staged), result, processes)
return iter(result)
13 changes: 9 additions & 4 deletions functional/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from functional.util import is_primitive, ReusableFile


def seq(*args):
def seq(*args, **kwargs):
"""
Primary entrypoint for the functional package. Returns a functional.pipeline.Sequence wrapping
the original sequence.
Expand Down Expand Up @@ -45,17 +45,22 @@ def seq(*args):
:return: wrapped sequence
"""
return _seq(ExecutionEngine(), *args)
processes = kwargs.get("processes")
if processes:
return _seq(ParallelExecutionEngine(processes=processes), *args)
else:
return _seq(ExecutionEngine(), *args)


def pseq(*args):
def pseq(*args, **kwargs):
"""
Same as functional.seq but with parallel support for maps/where and
filter/select. Returns a functional.pipeline.Sequence wrapping
the original sequence and passing
functional.execution.ParallelExecutionEngine as the execution engine.
"""
return _seq(ParallelExecutionEngine(), *args)
processes = kwargs.get("processes")
return _seq(ParallelExecutionEngine(processes=processes), *args)


def _seq(engine, *args):
Expand Down
11 changes: 8 additions & 3 deletions functional/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,22 @@ def is_serializable(func):
return False


def parallelize(func, result):
def parallelize(func, result, processes=None):
if not is_serializable(func):
return func(result)
with Pool(processes=CPU_COUNT) as pool:
chunks = split_every(CPU_COUNT, iter(result))
if processes is None or processes < 1:
processes = CPU_COUNT
else:
processes = min(processes, CPU_COUNT)
with Pool(processes=processes) as pool:
chunks = split_every(processes, iter(result))
packed_chunks = (pack(func, (chunk, )) for chunk in chunks)
results = pool.map(unpack, packed_chunks)
return chain.from_iterable(results)


def compose(*functions):
# pylint: disable=undefined-variable
return reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x)


Expand Down

0 comments on commit ebe41f4

Please sign in to comment.