Skip to content

Commit

Permalink
Add composition of parallel functions in a block
Browse files Browse the repository at this point in the history
  • Loading branch information
versae committed Apr 5, 2016
1 parent b7da5b1 commit a8ecbe6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
19 changes: 11 additions & 8 deletions functional/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from functional.transformations import CACHE_T
from functional.transformations import ExecutionStrategies
from functional.util import parallelize
from functional.util import compose, parallelize


class Lineage(object):
Expand Down Expand Up @@ -46,17 +46,20 @@ def apply(self, transform):
def evaluate(self, sequence):
result = sequence
last_cache_index = self.cache_scan()
staged = []
for transform in self.transformations[last_cache_index:]:
strategies = transform.execution_strategies
if strategies is not None:
if ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if ExecutionStrategies.PARALLEL in strategies:
result = parallelize(transform.function, result)
else:
result = transform.function(result)
if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if strategies and ExecutionStrategies.PARALLEL in strategies:
staged.append(transform.function)
else:
if staged:
result = parallelize(compose(*staged), result)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result)
return iter(result)

def cache_scan(self):
Expand Down
5 changes: 5 additions & 0 deletions functional/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import
from itertools import chain, count, islice, takewhile
from functools import reduce
from multiprocessing import Pool, cpu_count

import collections
Expand Down Expand Up @@ -145,6 +146,10 @@ def parallelize(func, result):
return chain.from_iterable(results)


def compose(*functions):
return reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x)


class ReusableFile(object):
"""
Class which emulates the builtin file except that calling iter() on it will return separate
Expand Down

0 comments on commit a8ecbe6

Please sign in to comment.