Skip to content

Commit

Permalink
Add support passing processors and raise_errors when serialization fa…
Browse files Browse the repository at this point in the history
…ils in parallel mode by both, setting pseq attrobutes or by passing them as parameters to pseq(). seq now does not support kwargs.
  • Loading branch information
versae committed Apr 11, 2016
1 parent 2009d36 commit 72b3178
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 222 deletions.
15 changes: 10 additions & 5 deletions functional/execution.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
from functional.util import compose, parallelize


Expand All @@ -12,6 +13,7 @@ class ExecutionStrategies(object):
class ExecutionEngine(object):

def evaluate(self, sequence, transformations):
# pylint: disable=no-self-use
result = sequence
for transform in transformations:
strategies = transform.execution_strategies
Expand All @@ -25,13 +27,16 @@ def evaluate(self, sequence, transformations):

class ParallelExecutionEngine(ExecutionEngine):

def __init__(self, processes=None, *args, **kwargs):
def __init__(self, processes=None, raise_errors=True,
*args, **kwargs):
super(ParallelExecutionEngine, self).__init__(*args, **kwargs)
self._processes = processes
self.processes = processes
self.raise_errors = raise_errors

def evaluate(self, sequence, transformations):
processes = self._processes
result = sequence
parallel = partial(parallelize, processes=self.processes,
raise_errors=self.raise_errors)
staged = []
for transform in transformations:
strategies = transform.execution_strategies
Expand All @@ -41,9 +46,9 @@ def evaluate(self, sequence, transformations):
staged.insert(0, transform.function)
else:
if staged:
result = parallelize(compose(*staged), result, processes)
result = parallel(compose(*staged), result)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result, processes)
result = parallel(compose(*staged), result)
return iter(result)

0 comments on commit 72b3178

Please sign in to comment.