From 3892899d6619abf5b8a1595f90acc2a5dd468055 Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Fri, 8 Apr 2016 09:30:59 -0400 Subject: [PATCH 1/7] ENH: Support multiple outputs for custom factors --- tests/pipeline/test_engine.py | 46 +++++++++++++++++++++++++ zipline/pipeline/engine.py | 9 ++++- zipline/pipeline/factors/__init__.py | 4 ++- zipline/pipeline/factors/factor.py | 50 +++++++++++++++++++++++++++- zipline/pipeline/mixins.py | 12 +++++-- zipline/pipeline/term.py | 22 ++++++++++-- 6 files changed, 136 insertions(+), 7 deletions(-) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 0f3be3979b..f1efa18687 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -112,6 +112,16 @@ def compute(self, today, assets, out, open): out[:] = open +class MultipleOutputs(CustomFactor): + window_length = 1 + inputs = [USEquityPricing.open, USEquityPricing.close] + outputs = ['double_open', 'double_close'] + + def compute(self, today, assets, out, open, close): + out.double_open[:] = open * 2 + out.double_close[:] = close * 2 + + def assert_multi_index_is_product(testcase, index, *levels): """Assert that a MultiIndex contains the product of `*levels`.""" testcase.assertIsInstance( @@ -510,6 +520,42 @@ def test_rolling_and_nonrolling(self): ), ) + def test_factor_with_multiple_outputs(self): + dates = self.dates[5:10] + assets = self.assets + num_dates = len(dates) + open = USEquityPricing.open + close = USEquityPricing.close + open_values = array([self.constants[open]] * num_dates, dtype=float) + close_values = array([self.constants[close]] * num_dates, dtype=float) + engine = SimplePipelineEngine( + lambda column: self.loader, self.dates, self.asset_finder, + ) + + double_open, double_close = MultipleOutputs() + pipeline = Pipeline( + columns={ + 'double_open': double_open, + 'double_close': double_close, + }, + ) + + results = engine.run_pipeline(pipeline, dates[0], dates[-1]) + first_output_results = results['double_open'].unstack() + second_output_results = results['double_close'].unstack() + + first_output_expected = {asset: open_values * 2 for asset in assets} + second_output_expected = {asset: close_values * 2 for asset in assets} + + assert_frame_equal( + first_output_results, + DataFrame(first_output_expected, index=dates, columns=assets), + ) + assert_frame_equal( + second_output_results, + DataFrame(second_output_expected, index=dates, columns=assets), + ) + def test_loader_given_multiple_columns(self): class Loader1DataSet1(DataSet): diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 8a751bc520..29873af784 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -348,12 +348,19 @@ def compute_chunk(self, graph, dates, assets, initial_workspace): ) workspace.update(loaded) else: - workspace[term] = term._compute( + term_output = term._compute( self._inputs_for_term(term, workspace, graph), mask_dates, assets, mask, ) + if hasattr(term, 'siblings'): + siblings = term.siblings[term.factor] + while siblings: + sibling = siblings.pop() + workspace[sibling] = term_output[sibling.attribute] + else: + workspace[term] = term_output assert(workspace[term].shape == mask.shape) out = {} diff --git a/zipline/pipeline/factors/__init__.py b/zipline/pipeline/factors/__init__.py index a29fbcf63c..e8f030a669 100644 --- a/zipline/pipeline/factors/__init__.py +++ b/zipline/pipeline/factors/__init__.py @@ -1,7 +1,8 @@ from .factor import ( CustomFactor, Factor, - Latest + Latest, + RecarrayFactor, ) from .events import ( BusinessDaysSinceCashBuybackAuth, @@ -44,6 +45,7 @@ 'Latest', 'MaxDrawdown', 'RSI', + 'RecarrayFactor', 'Returns', 'SimpleMovingAverage', 'VWAP', diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 4edf66d4b2..802d30b88c 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -1,7 +1,7 @@ """ factor.py """ -from functools import wraps +from functools import partial, wraps from operator import attrgetter from numbers import Number @@ -1227,6 +1227,54 @@ def compute(self, today, assets, out, data): ''' dtype = float64_dtype + def __getattr__(self, name): + if name in self.outputs: + return RecarrayFactor(factor=self, attribute=name) + else: + raise AttributeError( + "This factor has no output called '{}'.".format(name) + ) + + def __iter__(self): + if self.outputs is NotSpecified: + raise ValueError('This factor does not have multiple outputs.') + RecarrayFactor_ = partial(RecarrayFactor, self) + return iter(map(RecarrayFactor_, self.outputs)) + + +class RecarrayFactor(Factor): + + siblings = {} + + def __new__(cls, factor, attribute): + new_instance = super(RecarrayFactor, cls).__new__( + cls, + factor=factor, + attribute=attribute, + inputs=factor.inputs, + outputs=factor.outputs, + window_length=factor.window_length, + dtype=factor.dtype, + missing_value=factor.missing_value, + ) + cls.siblings.setdefault(factor, set()).add(new_instance) + return new_instance + + def _init(self, factor, attribute, *args, **kwargs): + self.factor = factor + self.attribute = attribute + self.compute = factor.compute + self._compute = factor._compute + return super(RecarrayFactor, self)._init(*args, **kwargs) + + @classmethod + def static_identity(cls, factor, attribute, *args, **kwargs): + return ( + super(RecarrayFactor, cls).static_identity(*args, **kwargs), + factor, + attribute, + ) + class Latest(LatestMixin, CustomFactor): """ diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index da9297b88d..be37c14443 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -1,7 +1,7 @@ """ Mixins classes for use with Filters and Factors. """ -from numpy import full_like +from numpy import full_like, recarray from zipline.utils.control_flow import nullctx from zipline.errors import WindowLengthNotPositive, UnsupportedDataType @@ -69,6 +69,7 @@ class CustomTermMixin(object): def __new__(cls, inputs=NotSpecified, + outputs=NotSpecified, window_length=NotSpecified, mask=NotSpecified, dtype=NotSpecified, @@ -88,6 +89,7 @@ def __new__(cls, return super(CustomTermMixin, cls).__new__( cls, inputs=inputs, + outputs=outputs, window_length=window_length, mask=mask, dtype=dtype, @@ -109,7 +111,13 @@ def _compute(self, windows, dates, assets, mask): compute = self.compute missing_value = self.missing_value params = self.params - out = full_like(mask, missing_value, dtype=self.dtype) + outputs = self.outputs + if outputs is not NotSpecified and len(outputs) > 1: + out = recarray( + mask.shape, dtype=zip(outputs, [self.dtype] * len(outputs)), + ) + else: + out = full_like(mask, missing_value, dtype=self.dtype) with self.ctx: # TODO: Consider pre-filtering columns that are all-nan at each # time-step? diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 16faf2275e..e7b7a8061d 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -349,11 +349,13 @@ class ComputableTerm(Term): :class:`zipline.pipeline.Filter`, and :class:`zipline.pipeline.Factor`. """ inputs = NotSpecified + outputs = NotSpecified window_length = NotSpecified mask = NotSpecified def __new__(cls, inputs=inputs, + outputs=outputs, window_length=window_length, mask=mask, *args, **kwargs): @@ -368,6 +370,13 @@ def __new__(cls, # normalize to a tuple so that inputs is hashable. inputs = tuple(inputs) + if outputs is NotSpecified: + outputs = cls.outputs + if outputs is NotSpecified: + outputs = tuple() + else: + outputs = tuple(outputs) + if mask is NotSpecified: mask = cls.mask if mask is NotSpecified: @@ -379,22 +388,31 @@ def __new__(cls, return super(ComputableTerm, cls).__new__( cls, inputs=inputs, + outputs=outputs, mask=mask, window_length=window_length, *args, **kwargs ) - def _init(self, inputs, window_length, mask, *args, **kwargs): + def _init(self, inputs, outputs, window_length, mask, *args, **kwargs): self.inputs = inputs + self.outputs = outputs self.window_length = window_length self.mask = mask return super(ComputableTerm, self)._init(*args, **kwargs) @classmethod - def static_identity(cls, inputs, window_length, mask, *args, **kwargs): + def static_identity(cls, + inputs, + outputs, + window_length, + mask, + *args, + **kwargs): return ( super(ComputableTerm, cls).static_identity(*args, **kwargs), inputs, + outputs, window_length, mask, ) From a86a149edb1f236f4ec7b6f9e384ec733fecbb4f Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Fri, 8 Apr 2016 12:48:40 -0400 Subject: [PATCH 2/7] Fill in missing values for masked factors --- zipline/pipeline/mixins.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index be37c14443..121910c9d1 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -1,7 +1,7 @@ """ Mixins classes for use with Filters and Factors. """ -from numpy import full_like, recarray +from numpy import array, full_like, recarray from zipline.utils.control_flow import nullctx from zipline.errors import WindowLengthNotPositive, UnsupportedDataType @@ -134,6 +134,7 @@ def _compute(self, windows, dates, assets, mask): **params ) out[idx][col_mask] = masked_out + out[idx][~col_mask] = array([missing_value]) return out def short_repr(self): From 901fe8b1408b08f5809ae23c739170268732eefb Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Fri, 8 Apr 2016 14:53:47 -0400 Subject: [PATCH 3/7] Fix error message --- zipline/pipeline/factors/factor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 802d30b88c..be67eb8fa8 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -1236,7 +1236,7 @@ def __getattr__(self, name): ) def __iter__(self): - if self.outputs is NotSpecified: + if not self.outputs: raise ValueError('This factor does not have multiple outputs.') RecarrayFactor_ = partial(RecarrayFactor, self) return iter(map(RecarrayFactor_, self.outputs)) From 41cc8e27df624e119651174ffb01fcba40c0b7aa Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Fri, 8 Apr 2016 16:04:35 -0400 Subject: [PATCH 4/7] Cleanup, add _validate, add error --- zipline/errors.py | 15 +++++++++++++-- zipline/pipeline/factors/factor.py | 12 ++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/zipline/errors.py b/zipline/errors.py index dde41feac8..dc630b34fb 100644 --- a/zipline/errors.py +++ b/zipline/errors.py @@ -433,10 +433,21 @@ class TermInputsNotSpecified(ZiplineError): msg = "{termname} requires inputs, but no inputs list was passed." +class TermOutputsNotSpecified(ZiplineError): + """ + Raised if a user attempts to construct a CustomFactor without specifying + enough outputs and that factor does not have class-level default outputs. + """ + msg = ( + "{termname} requires at least two outputs, but was given " + "{num_outputs}." + ) + + class WindowLengthNotSpecified(ZiplineError): """ - Raised if a user attempts to construct a term without specifying inputs and - that term does not have class-level default inputs. + Raised if a user attempts to construct a term without specifying window + length and that term does not have a class-level default window length. """ msg = ( "{termname} requires a window_length, but no window_length was passed." diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index be67eb8fa8..ac53ab343d 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -8,7 +8,7 @@ from numpy import inf, where from toolz import curry -from zipline.errors import UnknownRankMethod +from zipline.errors import TermOutputsNotSpecified, UnknownRankMethod from zipline.lib.normalize import naive_grouped_rowwise_apply from zipline.lib.rank import masked_rankdata_2d from zipline.pipeline.classifiers import Classifier, Everything, Quantiles @@ -1236,7 +1236,7 @@ def __getattr__(self, name): ) def __iter__(self): - if not self.outputs: + if len(self.outputs) < 2: raise ValueError('This factor does not have multiple outputs.') RecarrayFactor_ = partial(RecarrayFactor, self) return iter(map(RecarrayFactor_, self.outputs)) @@ -1275,6 +1275,14 @@ def static_identity(cls, factor, attribute, *args, **kwargs): attribute, ) + def _validate(self): + super(RecarrayFactor, self)._validate() + num_outputs = len(self.outputs) + if num_outputs < 2: + raise TermOutputsNotSpecified( + termname=type(self).__name__, num_outputs=num_outputs, + ) + class Latest(LatestMixin, CustomFactor): """ From f235659aa93efd02cbfd4c775ed543d65a4a7aef Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Mon, 11 Apr 2016 15:54:39 -0400 Subject: [PATCH 5/7] Fill in missing values in 'out' --- zipline/pipeline/mixins.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 121910c9d1..76c3474eaa 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -116,6 +116,7 @@ def _compute(self, windows, dates, assets, mask): out = recarray( mask.shape, dtype=zip(outputs, [self.dtype] * len(outputs)), ) + out[:] = array([missing_value]) else: out = full_like(mask, missing_value, dtype=self.dtype) with self.ctx: @@ -134,7 +135,6 @@ def _compute(self, windows, dates, assets, mask): **params ) out[idx][col_mask] = masked_out - out[idx][~col_mask] = array([missing_value]) return out def short_repr(self): From 3fc69c1c2512b841de134382e958c49eb7a1d9f2 Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Tue, 12 Apr 2016 11:10:36 -0400 Subject: [PATCH 6/7] DOC: Docstring and whatsnew updates --- docs/source/whatsnew/0.9.1.txt | 8 ++++++- zipline/pipeline/factors/factor.py | 37 ++++++++++++++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/docs/source/whatsnew/0.9.1.txt b/docs/source/whatsnew/0.9.1.txt index fab9949759..fec34e47ef 100644 --- a/docs/source/whatsnew/0.9.1.txt +++ b/docs/source/whatsnew/0.9.1.txt @@ -17,7 +17,13 @@ None Enhancements ~~~~~~~~~~~~ -None +* Implemented :class:`zipline.pipeline.factors.RecarrayFactor`, a new pipeline + term designed to compute multiple factors at once while also keeping track of + which factors it has already computed. + +* Added optional `outputs` parameter to :class:`zipline.pipeline.CustomFactor`. + Custom factors are now capable of computing and returning multiple outputs, + each of which are themselves a Factor. Experimental Features ~~~~~~~~~~~~~~~~~~~~~ diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index ac53ab343d..d819d256a8 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -1130,13 +1130,27 @@ class CustomFactor(PositiveWindowLengthMixin, CustomTermMixin, Factor): inputs : iterable, optional An iterable of `BoundColumn` instances (e.g. USEquityPricing.close), describing the data to load and pass to `self.compute`. If this - argument is passed to the CustomFactor constructor, we look for a + argument is not passed to the CustomFactor constructor, we look for a class-level attribute named `inputs`. + outputs : iterable, optional + An iterable of strings which represent the names of each output this + factor should compute and return. If this argument is not passed to the + CustomFactor constructor, we look for a class-level attribute named + `outputs`. window_length : int, optional Number of rows to pass for each input. If this argument is not passed to the CustomFactor constructor, we look for a class-level attribute named `window_length`. + Returns + ------- + output or outputs : zipline.pipeline.factors.CustomFactor or iterable of + zipline.pipeline.factors.RecarrayFactor + The Factor instance(s) to be returned upon instantiation. + If passed an `outputs` argument, CustomFactor will return an iterable + of RecarrayFactors representing each output. If `outputs` is not + specified, it returns an instance of CustomFactor. + Notes ----- Users implementing their own Factors should subclass CustomFactor and @@ -1159,7 +1173,9 @@ def compute(self, today, assets, out, *inputs): Column labels for `out` and`inputs`. out : np.array[self.dtype, ndim=1] Output array of the same shape as `assets`. `compute` should write - its desired return values into `out`. + its desired return values into `out`. If multiple outputs are + specified, `compute` should write its desired return values into + `out.` for each output name in `self.outputs`. *inputs : tuple of np.array Raw data arrays corresponding to the values of `self.inputs`. @@ -1224,6 +1240,23 @@ def compute(self, today, assets, out, data): # MedianValue. median_close10 = MedianValue([USEquityPricing.close], window_length=10) median_low15 = MedianValue([USEquityPricing.low], window_length=15) + + A CustomFactor with multiple outputs: + + .. code-block:: python + + class MultipleOutputs(CustomFactor): + inputs = [USEquityPricing.close] + outputs = ['alpha', 'beta'] + window_length = N + + def compute(self, today, assets, out, close): + computed_alpha, computed_beta = some_function(close) + out.alpha = computed_alpha + out.beta = computed_beta + + # Each output is returned as its own Factor upon instantiation. + alpha, beta = MultipleOutputs() ''' dtype = float64_dtype From d21ece4888ecdecac35c9dc395eb305e8222a49f Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Tue, 12 Apr 2016 11:53:29 -0400 Subject: [PATCH 7/7] RecarrayFactor should compute using its parent's output --- zipline/pipeline/engine.py | 9 +-------- zipline/pipeline/factors/factor.py | 19 ++++++++----------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 29873af784..8a751bc520 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -348,19 +348,12 @@ def compute_chunk(self, graph, dates, assets, initial_workspace): ) workspace.update(loaded) else: - term_output = term._compute( + workspace[term] = term._compute( self._inputs_for_term(term, workspace, graph), mask_dates, assets, mask, ) - if hasattr(term, 'siblings'): - siblings = term.siblings[term.factor] - while siblings: - sibling = siblings.pop() - workspace[sibling] = term_output[sibling.attribute] - else: - workspace[term] = term_output assert(workspace[term].shape == mask.shape) out = {} diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index d819d256a8..1501ac7f11 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -1275,29 +1275,23 @@ def __iter__(self): return iter(map(RecarrayFactor_, self.outputs)) -class RecarrayFactor(Factor): - - siblings = {} +class RecarrayFactor(SingleInputMixin, Factor): def __new__(cls, factor, attribute): - new_instance = super(RecarrayFactor, cls).__new__( + return super(RecarrayFactor, cls).__new__( cls, factor=factor, attribute=attribute, - inputs=factor.inputs, + inputs=[factor], outputs=factor.outputs, - window_length=factor.window_length, + window_length=0, dtype=factor.dtype, missing_value=factor.missing_value, ) - cls.siblings.setdefault(factor, set()).add(new_instance) - return new_instance def _init(self, factor, attribute, *args, **kwargs): - self.factor = factor + self.parent = factor self.attribute = attribute - self.compute = factor.compute - self._compute = factor._compute return super(RecarrayFactor, self)._init(*args, **kwargs) @classmethod @@ -1316,6 +1310,9 @@ def _validate(self): termname=type(self).__name__, num_outputs=num_outputs, ) + def _compute(self, windows, dates, assets, mask): + return windows[0][self.attribute] + class Latest(LatestMixin, CustomFactor): """