diff --git a/tests/pipeline/test_alias.py b/tests/pipeline/test_alias.py new file mode 100644 index 0000000000..bdc68675ac --- /dev/null +++ b/tests/pipeline/test_alias.py @@ -0,0 +1,65 @@ +from nose.tools import nottest +import numpy as np + +from zipline.testing.predicates import assert_equal +from zipline.pipeline import Classifier, Factor, Filter +from zipline.utils.numpy_utils import float64_dtype, int64_dtype + +from .base import BasePipelineTestCase + + +@nottest +class BaseAliasTestCase(BasePipelineTestCase): + + def test_alias(self): + f = self.Term() + alias = f.alias('ayy lmao') + + f_values = np.random.RandomState(5).randn(5, 5) + + self.check_terms( + terms={ + 'f_alias': alias, + }, + expected={ + 'f_alias': f_values, + }, + initial_workspace={f: f_values}, + mask=self.build_mask(np.ones((5, 5))), + ) + + def test_repr(self): + assert_equal( + repr(self.Term().alias('ayy lmao')), + "Aliased%s(Term(...), name='ayy lmao')" % ( + self.Term.__base__.__name__, + ), + ) + + def test_short_repr(self): + for name in ('a', 'b'): + assert_equal( + self.Term().alias(name).short_repr(), + name, + ) + + +class TestFactorAlias(BaseAliasTestCase): + class Term(Factor): + dtype = float64_dtype + inputs = () + window_length = 0 + + +class TestFilterAlias(BaseAliasTestCase): + class Term(Filter): + inputs = () + window_length = 0 + + +class TestClassifierAlias(BaseAliasTestCase): + class Term(Classifier): + dtype = int64_dtype + inputs = () + window_length = 0 + missing_value = -1 diff --git a/tests/pipeline/test_classifier.py b/tests/pipeline/test_classifier.py index aba522259b..a716c19dca 100644 --- a/tests/pipeline/test_classifier.py +++ b/tests/pipeline/test_classifier.py @@ -2,10 +2,13 @@ from operator import or_ import numpy as np +import pandas as pd from zipline.lib.labelarray import LabelArray from zipline.pipeline import Classifier from zipline.testing import parameter_space +from zipline.testing.fixtures import ZiplineTestCase +from zipline.testing.predicates import assert_equal from zipline.utils.numpy_utils import ( categorical_dtype, int64_dtype, @@ -464,3 +467,81 @@ class C(Classifier): "TypeError(\"unhashable type: 'dict'\",)." ) self.assertEqual(errmsg, expected) + + +class TestPostProcessAndToWorkSpaceValue(ZiplineTestCase): + def test_reversability_categorical(self): + class F(Classifier): + inputs = () + window_length = 0 + dtype = categorical_dtype + missing_value = '' + + f = F() + column_data = LabelArray( + np.array( + [['a', f.missing_value], + ['b', f.missing_value], + ['c', 'd']], + ), + missing_value=f.missing_value, + ) + + assert_equal( + f.postprocess(column_data.ravel()), + pd.Categorical( + ['a', f.missing_value, 'b', f.missing_value, 'c', 'd'], + ), + ) + + # only include the non-missing data + pipeline_output = pd.Series( + data=['a', 'b', 'c', 'd'], + index=pd.MultiIndex.from_arrays([ + [pd.Timestamp('2014-01-01'), + pd.Timestamp('2014-01-02'), + pd.Timestamp('2014-01-03'), + pd.Timestamp('2014-01-03')], + [0, 0, 0, 1], + ]), + dtype='category', + ) + + assert_equal( + f.to_workspace_value(pipeline_output, pd.Index([0, 1])), + column_data, + ) + + def test_reversability_int64(self): + class F(Classifier): + inputs = () + window_length = 0 + dtype = int64_dtype + missing_value = -1 + + f = F() + column_data = np.array( + [[0, f.missing_value], + [1, f.missing_value], + [2, 3]], + ) + + assert_equal(f.postprocess(column_data.ravel()), column_data.ravel()) + + # only include the non-missing data + pipeline_output = pd.Series( + data=[0, 1, 2, 3], + index=pd.MultiIndex.from_arrays([ + [pd.Timestamp('2014-01-01'), + pd.Timestamp('2014-01-02'), + pd.Timestamp('2014-01-03'), + pd.Timestamp('2014-01-03')], + [0, 0, 0, 1], + ]), + dtype=int64_dtype, + ) + + assert_equal( + f.to_workspace_value(pipeline_output, pd.Index([0, 1])), + column_data, + ) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 6a7037eb18..568b544248 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -14,6 +14,7 @@ float32, float64, full, + full_like, log, nan, tile, @@ -66,10 +67,12 @@ from zipline.testing import ( AssetID, AssetIDPlusDay, + ExplodingObject, check_arrays, make_alternating_boolean_array, make_cascading_boolean_array, OpenPrice, + parameter_space, product_upper_triangle, ) from zipline.testing.fixtures import ( @@ -78,6 +81,7 @@ WithTradingEnvironment, ZiplineTestCase, ) +from zipline.testing.predicates import assert_equal from zipline.utils.memoize import lazyval from zipline.utils.numpy_utils import bool_dtype, datetime64ns_dtype @@ -163,14 +167,14 @@ def compute(self, today, assets, out, *inputs): out[:] = sum(inputs).sum(axis=0) -class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase): +class WithConstantInputs(WithTradingEnvironment): asset_ids = ASSET_FINDER_EQUITY_SIDS = 1, 2, 3, 4 START_DATE = Timestamp('2014-01-01', tz='utc') END_DATE = Timestamp('2014-03-01', tz='utc') @classmethod def init_class_fixtures(cls): - super(ConstantInputTestCase, cls).init_class_fixtures() + super(WithConstantInputs, cls).init_class_fixtures() cls.constants = { # Every day, assume every stock starts at 2, goes down to 1, # goes up to 4, and finishes at 3. @@ -192,6 +196,8 @@ def init_class_fixtures(cls): ) cls.assets = cls.asset_finder.retrieve_all(cls.asset_ids) + +class ConstantInputTestCase(WithConstantInputs, ZiplineTestCase): def test_bad_dates(self): loader = self.loader engine = SimplePipelineEngine( @@ -1315,3 +1321,179 @@ def test_string_classifiers_produce_categoricals(self): columns=self.asset_finder.retrieve_all(self.asset_finder.sids), ) assert_frame_equal(result.c.unstack(), expected_final_result) + + +class WindowSafetyPropagationTestCase(WithSeededRandomPipelineEngine, + ZiplineTestCase): + + SEEDED_RANDOM_PIPELINE_SEED = 5 + + def test_window_safety_propagation(self): + dates = self.trading_days[-30:] + start_date, end_date = dates[[-10, -1]] + + col = TestingDataSet.float_col + pipe = Pipeline( + columns={ + 'average_of_rank_plus_one': SimpleMovingAverage( + inputs=[col.latest.rank() + 1], + window_length=10, + ), + 'average_of_aliased_rank_plus_one': SimpleMovingAverage( + inputs=[col.latest.rank().alias('some_alias') + 1], + window_length=10, + ), + 'average_of_rank_plus_one_aliased': SimpleMovingAverage( + inputs=[(col.latest.rank() + 1).alias('some_alias')], + window_length=10, + ), + } + ) + results = self.run_pipeline(pipe, start_date, end_date).unstack() + + expected_ranks = DataFrame( + self.raw_expected_values( + col, + dates[-19], + dates[-1], + ), + index=dates[-19:], + columns=self.asset_finder.retrieve_all( + self.ASSET_FINDER_EQUITY_SIDS, + ) + ).rank(axis='columns') + + # All three expressions should be equivalent and evaluate to this. + expected_result = ( + (expected_ranks + 1) + .rolling(10) + .mean() + .dropna(how='any') + ) + + for colname in results.columns.levels[0]: + assert_equal(expected_result, results[colname]) + + +class PopulateInitialWorkspaceTestCase(WithConstantInputs, ZiplineTestCase): + + @parameter_space(window_length=[3, 5], pipeline_length=[5, 10]) + def test_populate_initial_workspace(self, window_length, pipeline_length): + column = USEquityPricing.low + base_term = column.latest + + # Take a Z-Score here so that the precomputed term is window-safe. The + # z-score will never actually get computed because we swap it out. + precomputed_term = (base_term.zscore()).alias('precomputed_term') + + # A term that has `precomputed_term` as an input. + depends_on_precomputed_term = precomputed_term + 1 + # A term that requires a window of `precomputed_term`. + depends_on_window_of_precomputed_term = SimpleMovingAverage( + inputs=[precomputed_term], + window_length=window_length, + ) + + precomputed_term_with_window = SimpleMovingAverage( + inputs=(column,), + window_length=window_length, + ).alias('precomputed_term_with_window') + depends_on_precomputed_term_with_window = ( + precomputed_term_with_window + 1 + ) + + column_value = self.constants[column] + precomputed_term_value = -column_value + precomputed_term_with_window_value = -(column_value + 1) + + def populate_initial_workspace(initial_workspace, + root_mask_term, + execution_plan, + dates, + assets): + def shape_for_term(term): + ndates = len(execution_plan.mask_and_dates_for_term( + term, + root_mask_term, + initial_workspace, + dates, + )[1]) + nassets = len(assets) + return (ndates, nassets) + + ws = initial_workspace.copy() + ws[precomputed_term] = full( + shape_for_term(precomputed_term), + precomputed_term_value, + dtype=float64, + ) + ws[precomputed_term_with_window] = full( + shape_for_term(precomputed_term_with_window), + precomputed_term_with_window_value, + dtype=float64, + ) + return ws + + def dispatcher(c): + if c is column: + # the base_term should never be loaded, its initial refcount + # should be zero + return ExplodingObject() + return self.loader + + engine = SimplePipelineEngine( + dispatcher, + self.dates, + self.asset_finder, + populate_initial_workspace=populate_initial_workspace, + ) + + results = engine.run_pipeline( + Pipeline({ + 'precomputed_term': precomputed_term, + 'precomputed_term_with_window': precomputed_term_with_window, + 'depends_on_precomputed_term': depends_on_precomputed_term, + 'depends_on_precomputed_term_with_window': + depends_on_precomputed_term_with_window, + 'depends_on_window_of_precomputed_term': + depends_on_window_of_precomputed_term, + }), + self.dates[-pipeline_length], + self.dates[-1], + ) + + assert_equal( + results['precomputed_term'].values, + full_like( + results['precomputed_term'], + precomputed_term_value, + ), + ), + assert_equal( + results['precomputed_term_with_window'].values, + full_like( + results['precomputed_term_with_window'], + precomputed_term_with_window_value, + ), + ), + assert_equal( + results['depends_on_precomputed_term'].values, + full_like( + results['depends_on_precomputed_term'], + precomputed_term_value + 1, + ), + ) + assert_equal( + results['depends_on_precomputed_term_with_window'].values, + full_like( + results['depends_on_precomputed_term_with_window'], + precomputed_term_with_window_value + 1, + ), + ) + assert_equal( + results['depends_on_window_of_precomputed_term'].values, + full_like( + results['depends_on_window_of_precomputed_term'], + precomputed_term_value, + ), + ) diff --git a/tests/pipeline/test_factor.py b/tests/pipeline/test_factor.py index f020ae8b88..2d81dc18a2 100644 --- a/tests/pipeline/test_factor.py +++ b/tests/pipeline/test_factor.py @@ -21,6 +21,7 @@ where, ) from numpy.random import randn, seed +import pandas as pd from zipline.errors import UnknownRankMethod from zipline.lib.labelarray import LabelArray @@ -37,6 +38,8 @@ parameter_space, permute_rows, ) +from zipline.testing.fixtures import ZiplineTestCase +from zipline.testing.predicates import assert_equal from zipline.utils.numpy_utils import ( categorical_dtype, datetime64ns_dtype, @@ -1058,3 +1061,39 @@ def test_demean_is_window_safe_if_input_is_window_safe(self): self.assertFalse(F().demean().window_safe) self.assertFalse(F(window_safe=False).demean().window_safe) self.assertTrue(F(window_safe=True).demean().window_safe) + + +class TestPostProcessAndToWorkSpaceValue(ZiplineTestCase): + @parameter_space(dtype_=(float64_dtype, datetime64ns_dtype)) + def test_reversability(self, dtype_): + class F(Factor): + inputs = () + dtype = dtype_ + window_length = 0 + + f = F() + column_data = array( + [[0, f.missing_value], + [1, f.missing_value], + [2, 3]], + dtype=dtype_, + ) + + assert_equal(f.postprocess(column_data.ravel()), column_data.ravel()) + + # only include the non-missing data + pipeline_output = pd.Series( + data=array([0, 1, 2, 3], dtype=dtype_), + index=pd.MultiIndex.from_arrays([ + [pd.Timestamp('2014-01-01'), + pd.Timestamp('2014-01-02'), + pd.Timestamp('2014-01-03'), + pd.Timestamp('2014-01-03')], + [0, 0, 0, 1], + ]), + ) + + assert_equal( + f.to_workspace_value(pipeline_output, pd.Index([0, 1])), + column_data, + ) diff --git a/tests/pipeline/test_filter.py b/tests/pipeline/test_filter.py index 1051cee2d5..aa578a5a80 100644 --- a/tests/pipeline/test_filter.py +++ b/tests/pipeline/test_filter.py @@ -24,6 +24,7 @@ sum as np_sum ) from numpy.random import randn, seed as random_seed +import pandas as pd from zipline.errors import BadPercentileBounds from zipline.pipeline import Filter, Factor, Pipeline @@ -859,3 +860,38 @@ def compute(self, today, sids, out): assert_equal(results.odds, (sids % 2).astype(bool)) assert_equal(results.first_five, sids < 5) assert_equal(results.last_three, sids >= 7) + + +class TestPostProcessAndToWorkSpaceValue(ZiplineTestCase): + def test_reversability(self): + class F(Filter): + inputs = () + window_length = 0 + missing_value = False + + f = F() + column_data = array( + [[True, f.missing_value], + [True, f.missing_value], + [True, True]], + dtype=bool, + ) + + assert_equal(f.postprocess(column_data.ravel()), column_data.ravel()) + + # only include the non-missing data + pipeline_output = pd.Series( + data=True, + index=pd.MultiIndex.from_arrays([ + [pd.Timestamp('2014-01-01'), + pd.Timestamp('2014-01-02'), + pd.Timestamp('2014-01-03'), + pd.Timestamp('2014-01-03')], + [0, 0, 0, 1], + ]), + ) + + assert_equal( + f.to_workspace_value(pipeline_output, pd.Index([0, 1])), + column_data, + ) diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index c58dd4e068..39d507ca81 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -195,8 +195,14 @@ def check_output(graph): self.assertIn(SomeDataSet.bar, resolution_order) self.assertIn(SomeFactor(), resolution_order) - self.assertEqual(graph.node[SomeDataSet.foo]['extra_rows'], 4) - self.assertEqual(graph.node[SomeDataSet.bar]['extra_rows'], 4) + self.assertEqual( + graph.graph.node[SomeDataSet.foo]['extra_rows'], + 4, + ) + self.assertEqual( + graph.graph.node[SomeDataSet.bar]['extra_rows'], + 4, + ) for foobar in gen_equivalent_factors(): check_output(self.make_execution_plan(to_dict([foobar]))) diff --git a/zipline/lib/labelarray.py b/zipline/lib/labelarray.py index b92faf06c2..a3e8e1992e 100644 --- a/zipline/lib/labelarray.py +++ b/zipline/lib/labelarray.py @@ -197,6 +197,29 @@ def _from_codes_and_metadata(cls, ret._missing_value = missing_value return ret + @classmethod + def from_categorical(cls, categorical, missing_value=None): + """ + Create a LabelArray from a pandas categorical. + + Parameters + ---------- + categorical : pd.Categorical + The categorical object to convert. + missing_value : bytes, unicode, or None, optional + The missing value to use for this LabelArray. + + Returns + ------- + la : LabelArray + The LabelArray representation of this categorical. + """ + return LabelArray( + categorical, + missing_value, + categorical.categories, + ) + @property def categories(self): # This is a property because it should be immutable. diff --git a/zipline/pipeline/classifiers/classifier.py b/zipline/pipeline/classifiers/classifier.py index 73bf5a31b6..86a91a0e5b 100644 --- a/zipline/pipeline/classifiers/classifier.py +++ b/zipline/pipeline/classifiers/classifier.py @@ -6,6 +6,7 @@ import re from numpy import where, isnan, nan, zeros +import pandas as pd from zipline.lib.labelarray import LabelArray from zipline.lib.quantiles import quantiles @@ -23,6 +24,7 @@ from ..filters import ArrayPredicate, NotNullFilter, NullFilter, NumExprFilter from ..mixins import ( + AliasedMixin, CustomTermMixin, DownsampledMixin, LatestMixin, @@ -303,10 +305,42 @@ def postprocess(self, data): raise AssertionError("Expected a LabelArray, got %s." % type(data)) return data.as_categorical() + def to_workspace_value(self, result, assets): + """ + Called with the result of a pipeline. This needs to return an object + which can be put into the workspace to continue doing computations. + + This is the inverse of :func:`~zipline.pipeline.term.Term.postprocess`. + """ + if self.dtype == int64_dtype: + return super(Classifier, self).to_workspace_value(result, assets) + + assert isinstance(result.values, pd.Categorical), ( + 'Expected a Categorical, got %r.' % type(result.values) + ) + with_missing = pd.Series( + data=pd.Categorical( + result.values, + result.values.categories.union([self.missing_value]), + ), + index=result.index, + ) + return LabelArray( + super(Classifier, self).to_workspace_value( + with_missing, + assets, + ), + self.missing_value, + ) + @classlazyval def _downsampled_type(self): return DownsampledMixin.make_downsampled_type(Classifier) + @classlazyval + def _aliased_type(self): + return AliasedMixin.make_aliased_type(Classifier) + class Everything(Classifier): """ diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 4bb6f6b075..1d28a8349e 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -81,6 +81,39 @@ def run_pipeline(self, pipeline, start_date, end_date): ) +def default_populate_initial_workspace(initial_workspace, + root_mask_term, + execution_plan, + dates, + assets): + """The default implementation for ``populate_initial_workspace``. This + function returns the ``initial_workspace`` argument without making any + modifications. + + Parameters + ---------- + initial_workspace : dict[array-like] + The initial workspace before we have populated it with any cached + terms. + root_mask_term : Term + The root mask term, normally ``AssetExists()``. This is needed to + compute the dates for individual terms. + execution_plan : ExecutionPlan + The execution plan for the pipeline being run. + dates : pd.DatetimeIndex + All of the dates being requested in this pipeline run including + the extra dates for look back windows. + assets : pd.Int64Index + All of the assets that exist for the window being computed. + + Returns + ------- + populated_initial_workspace : dict[term, array-like] + The workspace to begin computations with. + """ + return initial_workspace + + class SimplePipelineEngine(object): """ PipelineEngine class that computes each term independently. @@ -96,6 +129,15 @@ class SimplePipelineEngine(object): asset_finder : zipline.assets.AssetFinder An AssetFinder instance. We depend on the AssetFinder to determine which assets are in the top-level universe at any point in time. + populate_initial_workspace : callable, optional + A function which will be used to populate the initial workspace when + computing a pipeline. See + :func:`zipline.pipeline.engine.default_populate_initial_workspace` + for more info. + + See Also + -------- + :func:`zipline.pipeline.engine.default_populate_initial_workspace` """ __slots__ = ( '_get_loader', @@ -103,10 +145,15 @@ class SimplePipelineEngine(object): '_finder', '_root_mask_term', '_root_mask_dates_term', + '_populate_initial_workspace', '__weakref__', ) - def __init__(self, get_loader, calendar, asset_finder): + def __init__(self, + get_loader, + calendar, + asset_finder, + populate_initial_workspace=None): self._get_loader = get_loader self._calendar = calendar self._finder = asset_finder @@ -114,6 +161,10 @@ def __init__(self, get_loader, calendar, asset_finder): self._root_mask_term = AssetExists() self._root_mask_dates_term = InputDates() + self._populate_initial_workspace = ( + populate_initial_workspace or default_populate_initial_workspace + ) + def run_pipeline(self, pipeline, start_date, end_date): """ Compute a pipeline. @@ -179,14 +230,22 @@ def run_pipeline(self, pipeline, start_date, end_date): root_mask = self._compute_root_mask(start_date, end_date, extra_rows) dates, assets, root_mask_values = explode(root_mask) + initial_workspace = self._populate_initial_workspace( + { + self._root_mask_term: root_mask_values, + self._root_mask_dates_term: as_column(dates.values) + }, + self._root_mask_term, + graph, + dates, + assets, + ) + results = self.compute_chunk( graph, dates, assets, - initial_workspace={ - self._root_mask_term: root_mask_values, - self._root_mask_dates_term: as_column(dates.values) - }, + initial_workspace, ) return self._to_narrow( @@ -255,21 +314,6 @@ def _compute_root_mask(self, start_date, end_date, extra_rows): assert shape[0] * shape[1] != 0, 'root mask cannot be empty' return ret - def _mask_and_dates_for_term(self, term, workspace, graph, all_dates): - """ - Load mask and mask row labels for term. - """ - mask = term.mask - mask_offset = graph.extra_rows[mask] - graph.extra_rows[term] - - # This offset is computed against _root_mask_term because that is what - # determines the shape of the top-level dates array. - dates_offset = ( - graph.extra_rows[self._root_mask_term] - graph.extra_rows[term] - ) - - return workspace[mask][mask_offset:], all_dates[dates_offset:] - @staticmethod def _inputs_for_term(term, workspace, graph): """ @@ -346,7 +390,7 @@ def compute_chunk(self, graph, dates, assets, initial_workspace): refcounts = graph.initial_refcounts(workspace) - for term in graph.ordered(): + for term in graph.execution_order(refcounts): # `term` may have been supplied in `initial_workspace`, and in the # future we may pre-compute loadable terms coming from the same # dataset. In either case, we will already have an entry for this @@ -356,8 +400,11 @@ def compute_chunk(self, graph, dates, assets, initial_workspace): # Asset labels are always the same, but date labels vary by how # many extra rows are needed. - mask, mask_dates = self._mask_and_dates_for_term( - term, workspace, graph, dates + mask, mask_dates = graph.mask_and_dates_for_term( + term, + self._root_mask_term, + workspace, + dates, ) if isinstance(term, LoadableTerm): diff --git a/zipline/pipeline/expression.py b/zipline/pipeline/expression.py index d99ab341db..941f058e47 100644 --- a/zipline/pipeline/expression.py +++ b/zipline/pipeline/expression.py @@ -187,6 +187,7 @@ def __new__(cls, expr, binds, dtype): inputs=binds, expr=expr, dtype=dtype, + window_safe=all(t.window_safe for t in binds), ) def _init(self, expr, *args, **kwargs): diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 3566abdc94..793282d10a 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -32,6 +32,7 @@ NullFilter, ) from zipline.pipeline.mixins import ( + AliasedMixin, CustomTermMixin, DownsampledMixin, LatestMixin, @@ -1078,6 +1079,10 @@ def isfinite(self): def _downsampled_type(self): return DownsampledMixin.make_downsampled_type(Factor) + @classlazyval + def _aliased_type(self): + return AliasedMixin.make_aliased_type(Factor) + class NumExprFactor(NumericalExpression, Factor): """ diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 89cd790a33..7f17588bf1 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -24,6 +24,7 @@ NumericalExpression, ) from zipline.pipeline.mixins import ( + AliasedMixin, CustomTermMixin, DownsampledMixin, LatestMixin, @@ -207,6 +208,10 @@ def _validate(self): def _downsampled_type(self): return DownsampledMixin.make_downsampled_type(Filter) + @classlazyval + def _aliased_type(self): + return AliasedMixin.make_aliased_type(Filter) + class NumExprFilter(NumericalExpression, Filter): """ diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 667c3be9b0..89107f0394 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -16,7 +16,7 @@ class CyclicDependency(Exception): pass -class TermGraph(DiGraph): +class TermGraph(object): """ An abstract representation of Pipeline Term dependencies. @@ -44,7 +44,7 @@ class TermGraph(DiGraph): ExecutionPlan """ def __init__(self, terms): - super(TermGraph, self).__init__() + self.graph = DiGraph() self._frozen = False parents = set() @@ -54,7 +54,6 @@ def __init__(self, terms): assert not parents self._outputs = terms - self._ordered = topological_sort(self) # Mark that no more terms should be added to the graph. self._frozen = True @@ -79,11 +78,11 @@ def _add_to_graph(self, term, parents): parents.add(term) - self.add_node(term) + self.graph.add_node(term) for dependency in term.dependencies: self._add_to_graph(dependency, parents) - self.add_edge(dependency, term) + self.graph.add_edge(dependency, term) parents.remove(term) @@ -94,15 +93,25 @@ def outputs(self): """ return self._outputs - def ordered(self): + def execution_order(self, refcounts): """ - Return a topologically-sorted iterator over the terms in `self`. + Return a topologically-sorted iterator over the terms in ``self`` which + need to be computed. """ - return iter(self._ordered) + return iter(topological_sort( + self.graph.subgraph( + {term for term, refcount in refcounts.items() if refcount > 0}, + ), + )) + + def ordered(self): + return iter(topological_sort(self.graph)) @lazyval def loadable_terms(self): - return tuple(term for term in self if isinstance(term, LoadableTerm)) + return tuple( + term for term in self.graph if isinstance(term, LoadableTerm) + ) @lazyval def jpeg(self): @@ -132,15 +141,34 @@ def initial_refcounts(self, initial_terms): nodes get one extra reference to ensure that they're still in the graph at the end of execution. """ - refcounts = self.out_degree() + refcounts = self.graph.out_degree() for t in self.outputs.values(): refcounts[t] += 1 for t in initial_terms: - self.decref_dependencies(t, refcounts) + self._decref_depencies_recursive(t, refcounts, set()) return refcounts + def _decref_depencies_recursive(self, term, refcounts, garbage): + """ + Decrement terms recursively. + + Notes + ----- + This should only be used to build the initial workspace, after that we + should use: + :meth:`~zipline.pipeline.graph.TermGraph.decref_dependencies` + """ + # Edges are tuple of (from, to). + for parent, _ in self.graph.in_edges([term]): + refcounts[parent] -= 1 + # No one else depends on this term. Remove it from the + # workspace to conserve memory. + if refcounts[parent] == 0: + garbage.add(parent) + self._decref_depencies_recursive(parent, refcounts, garbage) + def decref_dependencies(self, term, refcounts): """ Decrement in-edges for ``term`` after computation. @@ -159,7 +187,7 @@ def decref_dependencies(self, term, refcounts): """ garbage = set() # Edges are tuple of (from, to). - for parent, _ in self.in_edges([term]): + for parent, _ in self.graph.in_edges([term]): refcounts[parent] -= 1 # No one else depends on this term. Remove it from the # workspace to conserve memory. @@ -326,7 +354,7 @@ def offset(self): # How much bigger is the array for ``dep`` compared to ``term``? # How much of that difference did I ask for. (term, dep): (extra[dep] - extra[term]) - requested_extra_rows - for term in self + for term in self.graph for dep, requested_extra_rows in term.dependencies.items() } @@ -366,12 +394,49 @@ def extra_rows(self): """ return { term: attrs['extra_rows'] - for term, attrs in iteritems(self.node) + for term, attrs in iteritems(self.graph.node) } def _ensure_extra_rows(self, term, N): """ Ensure that we're going to compute at least N extra rows of `term`. """ - attrs = self.node[term] + attrs = self.graph.node[term] attrs['extra_rows'] = max(N, attrs.get('extra_rows', 0)) + + def mask_and_dates_for_term(self, + term, + root_mask_term, + workspace, + all_dates): + """ + Load mask and mask row labels for term. + + Parameters + ---------- + term : Term + The term to load the mask and labels for. + root_mask_term : Term + The term that represents the root asset exists mask. + workspace : dict[Term, any] + The values that have been computed for each term. + all_dates : pd.DatetimeIndex + All of the dates that are being computed for in the pipeline. + + Returns + ------- + mask : np.ndarray + The correct mask for this term. + dates : np.ndarray + The slice of dates for this term. + """ + mask = term.mask + mask_offset = self.extra_rows[mask] - self.extra_rows[term] + + # This offset is computed against root_mask_term because that is what + # determines the shape of the top-level dates array. + dates_offset = ( + self.extra_rows[root_mask_term] - self.extra_rows[term] + ) + + return workspace[mask][mask_offset:], all_dates[dates_offset:] diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index a18251011b..6271abcc34 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -20,6 +20,7 @@ from zipline.utils.input_validation import expect_types from zipline.utils.sharedoc import ( format_docstring, + PIPELINE_ALIAS_NAME_DOC, PIPELINE_DOWNSAMPLING_FREQUENCY_DOC, ) from zipline.utils.pandas_utils import nearest_unequal_elements @@ -240,6 +241,77 @@ def _validate(self): ) +class AliasedMixin(SingleInputMixin): + """ + Mixin for aliased terms. + """ + def __new__(cls, term, name): + return super(AliasedMixin, cls).__new__( + cls, + inputs=(term,), + outputs=term.outputs, + window_length=0, + name=name, + dtype=term.dtype, + missing_value=term.missing_value, + ndim=term.ndim, + window_safe=term.window_safe, + ) + + def _init(self, name, *args, **kwargs): + self.name = name + return super(AliasedMixin, self)._init(*args, **kwargs) + + @classmethod + def _static_identity(cls, name, *args, **kwargs): + return ( + super(AliasedMixin, cls)._static_identity(*args, **kwargs), + name, + ) + + def _compute(self, inputs, dates, assets, mask): + return inputs[0] + + def __repr__(self): + return '{type}({inner_type}(...), name={name!r})'.format( + type=type(self).__name__, + inner_type=type(self.inputs[0]).__name__, + name=self.name, + ) + + def short_repr(self): + return self.name + + @classmethod + def make_aliased_type(cls, other_base): + """ + Factory for making Aliased{Filter,Factor,Classifier}. + """ + docstring = dedent( + """ + A {t} that names another {t}. + + Parameters + ---------- + term : {t} + {{name}} + """ + ).format(t=other_base.__name__) + + doc = format_docstring( + owner_name=other_base.__name__, + docstring=docstring, + formatters={'name': PIPELINE_ALIAS_NAME_DOC}, + ) + + return type( + 'Aliased' + other_base.__name__, + (cls, other_base), + {'__doc__': doc, + '__module__': other_base.__module__}, + ) + + class DownsampledMixin(StandardOutputs): """ Mixin for behavior shared by Downsampled{Factor,Filter,Classifier} diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 6cecd1475e..284f536dc4 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -39,6 +39,7 @@ ) from zipline.utils.sharedoc import ( templated_docstring, + PIPELINE_ALIAS_NAME_DOC, PIPELINE_DOWNSAMPLING_FREQUENCY_DOC, ) @@ -590,7 +591,32 @@ def postprocess(self, data): """ return data - def _downsampled_type(self): + def to_workspace_value(self, result, assets): + """ + Called with a column of the result of a pipeline. This needs to put + the data into a format that can be used in a workspace to continue + doing computations. + + Parameters + ---------- + result : pd.Series + A multiindexed series with (dates, assets) whose values are the + results of running this pipeline term over the dates. + assets : pd.Index + All of the assets being requested. This allows us to correctly + shape the workspace value. + + Returns + ------- + workspace_value : array-like + An array like value that the engine can consume. + """ + return result.unstack().fillna(self.missing_value).reindex( + columns=assets, + fill_value=self.missing_value, + ).values + + def _downsampled_type(self, *args, **kwargs): """ The expression type to return from self.downsample(). """ @@ -611,6 +637,35 @@ def downsample(self, frequency): """ return self._downsampled_type(term=self, frequency=frequency) + def _aliased_type(self, *args, **kwargs): + """ + The expression type to return from self.alias(). + """ + raise NotImplementedError( + "alias is not yet implemented " + "for instances of %s." % type(self).__name__ + ) + + @templated_docstring(name=PIPELINE_ALIAS_NAME_DOC) + def alias(self, name): + """ + Make a term from ``self`` that names the expression. + + Parameters + ---------- + {name} + + Returns + ------- + aliased : Aliased + ``self`` with a name. + + Notes + ----- + This is useful for giving a name to a numerical or boolean expression. + """ + return self._aliased_type(term=self, name=name) + def __repr__(self): return ( "{type}({inputs}, window_length={window_length})" diff --git a/zipline/pipeline/visualize.py b/zipline/pipeline/visualize.py index f0356b4375..fdc34b70af 100644 --- a/zipline/pipeline/visualize.py +++ b/zipline/pipeline/visualize.py @@ -115,13 +115,14 @@ def _render(g, out, format_, include_asset_exists=False): add_term_node(f, term) # Write intermediate results. - for term in filter_nodes(include_asset_exists, topological_sort(g)): + for term in filter_nodes(include_asset_exists, + topological_sort(g.graph)): if term in in_nodes or term in out_nodes: continue add_term_node(f, term) # Write edges - for source, dest in g.edges(): + for source, dest in g.graph.edges(): if source is AssetExists() and not include_asset_exists: continue add_edge(f, id(source), id(dest)) diff --git a/zipline/testing/predicates.py b/zipline/testing/predicates.py index da62bdb639..cf4f042e5e 100644 --- a/zipline/testing/predicates.py +++ b/zipline/testing/predicates.py @@ -39,6 +39,7 @@ assert_frame_equal, assert_panel_equal, assert_series_equal, + assert_index_equal, ) from six import iteritems, viewkeys, PY2 from toolz import dissoc, keyfilter @@ -47,6 +48,7 @@ from zipline.testing.core import ensure_doctest from zipline.dispatch import dispatch from zipline.lib.adjustment import Adjustment +from zipline.lib.labelarray import LabelArray from zipline.utils.functional import dzip_exact, instance from zipline.utils.math_utils import tolerant_equals @@ -425,7 +427,23 @@ def assert_array_equal(result, raise AssertionError('\n'.join((str(e), _fmt_path(path)))) -def _register_assert_ndframe_equal(type_, assert_eq): +@assert_equal.register(LabelArray, LabelArray) +def assert_labelarray_equal(result, expected, path=(), **kwargs): + assert_equal( + result.categories, + expected.categories, + path=path + ('.categories',), + **kwargs + ) + assert_equal( + result.as_int_array(), + expected.as_int_array(), + path=path + ('.as_int_array()',), + **kwargs + ) + + +def _register_assert_equal_wrapper(type_, assert_eq): """Register a new check for an ndframe object. Parameters @@ -456,18 +474,40 @@ def assert_ndframe_equal(result, expected, path=(), msg='', **kwargs): return assert_ndframe_equal -assert_frame_equal = _register_assert_ndframe_equal( +assert_frame_equal = _register_assert_equal_wrapper( pd.DataFrame, assert_frame_equal, ) -assert_panel_equal = _register_assert_ndframe_equal( +assert_panel_equal = _register_assert_equal_wrapper( pd.Panel, assert_panel_equal, ) -assert_series_equal = _register_assert_ndframe_equal( +assert_series_equal = _register_assert_equal_wrapper( pd.Series, assert_series_equal, ) +assert_index_equal = _register_assert_equal_wrapper( + pd.Index, + assert_index_equal, +) + + +@assert_equal.register(pd.Categorical, pd.Categorical) +def assert_categorical_equal(result, expected, path=(), msg='', **kwargs): + assert_equal( + result.categories, + expected.categories, + path=path + ('.categories',), + msg=msg, + **kwargs + ) + assert_equal( + result.codes, + expected.codes, + path=path + ('.codes',), + msg=msg, + **kwargs + ) @assert_equal.register(Adjustment, Adjustment) diff --git a/zipline/utils/sharedoc.py b/zipline/utils/sharedoc.py index 9669366c82..d0d7248c2d 100644 --- a/zipline/utils/sharedoc.py +++ b/zipline/utils/sharedoc.py @@ -18,6 +18,13 @@ """ ) +PIPELINE_ALIAS_NAME_DOC = dedent( + """\ + name : str + The name to alias this term as. + """, +) + def pad_lines_after_first(prefix, s): """Apply a prefix to each line in s after the first."""