Skip to content

Commit

Permalink
Refactor to add execution engines as classes and split seq and pseq
Browse files Browse the repository at this point in the history
  • Loading branch information
versae committed Apr 5, 2016
1 parent a8ecbe6 commit 9cd0ae2
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 36 deletions.
2 changes: 1 addition & 1 deletion functional/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import absolute_import

from functional.streams import seq
from functional.streams import seq, pseq

__author__ = "Pedro Rodriguez"
__copyright__ = "Copyright 2015, Pedro Rodriguez"
Expand Down
44 changes: 44 additions & 0 deletions functional/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from functional.util import compose, parallelize


class ExecutionStrategies(object):
"""
Enum like object listing the types of execution strategies
"""
PRE_COMPUTE = 0
PARALLEL = 1


class ExecutionEngine(object):

def evaluate(self, sequence, transformations):
result = sequence
for transform in transformations:
strategies = transform.execution_strategies
if (strategies is not None
and ExecutionStrategies.PRE_COMPUTE in strategies):
result = transform.function(list(result))
else:
result = transform.function(result)
return iter(result)


class ParallelExecutionEngine(ExecutionEngine):

def evaluate(self, sequence, transformations):
result = sequence
staged = []
for transform in transformations:
strategies = transform.execution_strategies
if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if strategies and ExecutionStrategies.PARALLEL in strategies:
staged.insert(0, transform.function)
else:
if staged:
result = parallelize(compose(*staged), result)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result)
return iter(result)
25 changes: 6 additions & 19 deletions functional/lineage.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from __future__ import absolute_import

from functional.execution import ExecutionEngine
from functional.transformations import CACHE_T
from functional.transformations import ExecutionStrategies
from functional.util import compose, parallelize


class Lineage(object):
"""
Class for tracking the lineage of transformations, and applying them to a given sequence.
"""
def __init__(self, prior_lineage=None):
def __init__(self, prior_lineage=None, engine=None):
"""
Construct an empty lineage if prior_lineage is None or if its not use it as the list of
current transformations
Expand All @@ -18,6 +17,8 @@ def __init__(self, prior_lineage=None):
:return: new Lineage object
"""
self.transformations = [] if prior_lineage is None else list(prior_lineage.transformations)
self.engine = ((engine or ExecutionEngine()) if prior_lineage is None
else prior_lineage.engine)

def __repr__(self):
"""
Expand All @@ -44,23 +45,9 @@ def apply(self, transform):
self.transformations.append(transform)

def evaluate(self, sequence):
result = sequence
last_cache_index = self.cache_scan()
staged = []
for transform in self.transformations[last_cache_index:]:
strategies = transform.execution_strategies
if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if strategies and ExecutionStrategies.PARALLEL in strategies:
staged.append(transform.function)
else:
if staged:
result = parallelize(compose(*staged), result)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result)
return iter(result)
transformations = self.transformations[last_cache_index:]
return self.engine.evaluate(sequence, transformations)

def cache_scan(self):
try:
Expand Down
11 changes: 7 additions & 4 deletions functional/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import six
import future.builtins as builtins

from functional.execution import ExecutionEngine
from functional.lineage import Lineage
from functional.util import is_iterable, is_primitive, is_namedtuple, identity, CSV_WRITE_MODE
from functional import transformations
Expand All @@ -26,7 +27,7 @@ class Sequence(object):
Sequence is a wrapper around any type of sequence which provides access to common
functional transformations and reductions in a data pipelining style
"""
def __init__(self, sequence, transform=None):
def __init__(self, sequence, transform=None, engine=None):
# pylint: disable=protected-access
"""
Takes a sequence and wraps it around a Sequence object.
Expand All @@ -44,12 +45,14 @@ def __init__(self, sequence, transform=None):
:param sequence: sequence of items to wrap in a Sequence
:return: sequence wrapped in a Sequence
"""
self.engine = engine or ExecutionEngine()
if isinstance(sequence, Sequence):
self._base_sequence = sequence._base_sequence
self._lineage = Lineage(prior_lineage=sequence._lineage)
self._lineage = Lineage(prior_lineage=sequence._lineage,
engine=engine)
elif isinstance(sequence, list) or isinstance(sequence, tuple) or is_iterable(sequence):
self._base_sequence = sequence
self._lineage = Lineage()
self._lineage = Lineage(engine=engine)
else:
raise TypeError("Given sequence must be an iterable value")
if transform is not None:
Expand Down Expand Up @@ -209,7 +212,7 @@ def cache(self, delete_lineage=False):
self._base_sequence = list(self._evaluate())
self._lineage.apply(transformations.CACHE_T)
if delete_lineage:
self._lineage = Lineage()
self._lineage = Lineage(engine=self.engine)
return self

def head(self):
Expand Down
21 changes: 18 additions & 3 deletions functional/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import future.builtins as builtins
import six

from functional.execution import ExecutionEngine, ParallelExecutionEngine
from functional.pipeline import Sequence
from functional.util import is_primitive, ReusableFile

Expand Down Expand Up @@ -44,14 +45,28 @@ def seq(*args):
:return: wrapped sequence
"""
return _seq(ExecutionEngine(), *args)


def pseq(*args):
"""
Same as functional.seq but with parallel support for maps/where and
filter/select. Returns a functional.pipeline.Sequence wrapping
the original sequence and passing
functional.execution.ParallelExecutionEngine as the execution engine.
"""
return _seq(ParallelExecutionEngine(), *args)


def _seq(engine, *args):
if len(args) == 0:
raise TypeError("seq() takes at least 1 argument ({0} given)".format(len(args)))
elif len(args) > 1:
return Sequence(list(args))
return Sequence(list(args), engine=engine)
elif is_primitive(args[0]):
return Sequence([args[0]])
return Sequence([args[0]], engine=engine)
else:
return Sequence(args[0])
return Sequence(args[0], engine=engine)


def open(path, delimiter=None, mode='r', buffering=-1, encoding=None,
Expand Down
10 changes: 1 addition & 9 deletions functional/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,14 @@
from future.builtins import map, filter, zip, range

import six
from functional.execution import ExecutionStrategies


#: Defines a Transformation from a name, function, and execution_strategies
Transformation = collections.namedtuple(
'Transformation', ['name', 'function', 'execution_strategies']
)


class ExecutionStrategies(object):
"""
Enum like object listing the types of execution strategies
"""
PRE_COMPUTE = 0
PARALLEL = 1


#: Cache transformation
CACHE_T = Transformation('cache', None, None)

Expand Down

0 comments on commit 9cd0ae2

Please sign in to comment.