Skip to content

Commit

Permalink
Merge 9fce2f8 into d2ab5ed
Browse files Browse the repository at this point in the history
  • Loading branch information
dmichalowicz committed Apr 12, 2016
2 parents d2ab5ed + 9fce2f8 commit 7a45fa0
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 11 deletions.
7 changes: 7 additions & 0 deletions docs/source/whatsnew/0.9.1.txt
Expand Up @@ -22,6 +22,13 @@ Enhancements
factor to only compute over stocks for which the filter returns True, rather
than always computing over the entire universe of stocks. (:issue:`1095`)

* Implemented :class:`zipline.pipeline.factors.RecarrayFactor`, a new pipeline
term designed to be the output type of a CustomFactor with multiple outputs.

* Added optional `outputs` parameter to :class:`zipline.pipeline.CustomFactor`.
Custom factors are now capable of computing and returning multiple outputs,
each of which are themselves a Factor.

Experimental Features
~~~~~~~~~~~~~~~~~~~~~

Expand Down
46 changes: 46 additions & 0 deletions tests/pipeline/test_engine.py
Expand Up @@ -112,6 +112,16 @@ def compute(self, today, assets, out, open):
out[:] = open


class MultipleOutputs(CustomFactor):
window_length = 1
inputs = [USEquityPricing.open, USEquityPricing.close]
outputs = ['double_open', 'double_close']

def compute(self, today, assets, out, open, close):
out.double_open[:] = open * 2
out.double_close[:] = close * 2


def assert_multi_index_is_product(testcase, index, *levels):
"""Assert that a MultiIndex contains the product of `*levels`."""
testcase.assertIsInstance(
Expand Down Expand Up @@ -510,6 +520,42 @@ def test_rolling_and_nonrolling(self):
),
)

def test_factor_with_multiple_outputs(self):
dates = self.dates[5:10]
assets = self.assets
num_dates = len(dates)
open = USEquityPricing.open
close = USEquityPricing.close
open_values = array([self.constants[open]] * num_dates, dtype=float)
close_values = array([self.constants[close]] * num_dates, dtype=float)
engine = SimplePipelineEngine(
lambda column: self.loader, self.dates, self.asset_finder,
)

double_open, double_close = MultipleOutputs()
pipeline = Pipeline(
columns={
'double_open': double_open,
'double_close': double_close,
},
)

results = engine.run_pipeline(pipeline, dates[0], dates[-1])
first_output_results = results['double_open'].unstack()
second_output_results = results['double_close'].unstack()

first_output_expected = {asset: open_values * 2 for asset in assets}
second_output_expected = {asset: close_values * 2 for asset in assets}

assert_frame_equal(
first_output_results,
DataFrame(first_output_expected, index=dates, columns=assets),
)
assert_frame_equal(
second_output_results,
DataFrame(second_output_expected, index=dates, columns=assets),
)

def test_loader_given_multiple_columns(self):

class Loader1DataSet1(DataSet):
Expand Down
14 changes: 12 additions & 2 deletions zipline/errors.py
Expand Up @@ -433,10 +433,20 @@ class TermInputsNotSpecified(ZiplineError):
msg = "{termname} requires inputs, but no inputs list was passed."


class TermOutputsNotSpecified(ZiplineError):
"""
Raised if a user attempts to construct a term without specifying outputs
and that term does not have class-level default outputs.
"""
msg = (
"{termname} requires outputs, but no outputs list was passed."
)


class WindowLengthNotSpecified(ZiplineError):
"""
Raised if a user attempts to construct a term without specifying inputs and
that term does not have class-level default inputs.
Raised if a user attempts to construct a term without specifying window
length and that term does not have a class-level default window length.
"""
msg = (
"{termname} requires a window_length, but no window_length was passed."
Expand Down
4 changes: 3 additions & 1 deletion zipline/pipeline/factors/__init__.py
@@ -1,7 +1,8 @@
from .factor import (
CustomFactor,
Factor,
Latest
Latest,
RecarrayFactor,
)
from .events import (
BusinessDaysSinceCashBuybackAuth,
Expand Down Expand Up @@ -44,6 +45,7 @@
'Latest',
'MaxDrawdown',
'RSI',
'RecarrayFactor',
'Returns',
'SimpleMovingAverage',
'VWAP',
Expand Down
79 changes: 75 additions & 4 deletions zipline/pipeline/factors/factor.py
@@ -1,14 +1,14 @@
"""
factor.py
"""
from functools import wraps
from functools import partial, wraps
from operator import attrgetter
from numbers import Number

from numpy import inf, where
from toolz import curry

from zipline.errors import UnknownRankMethod
from zipline.errors import TermOutputsNotSpecified, UnknownRankMethod
from zipline.lib.normalize import naive_grouped_rowwise_apply
from zipline.lib.rank import masked_rankdata_2d
from zipline.pipeline.classifiers import Classifier, Everything, Quantiles
Expand Down Expand Up @@ -1130,8 +1130,13 @@ class CustomFactor(PositiveWindowLengthMixin, CustomTermMixin, Factor):
inputs : iterable, optional
An iterable of `BoundColumn` instances (e.g. USEquityPricing.close),
describing the data to load and pass to `self.compute`. If this
argument is passed to the CustomFactor constructor, we look for a
argument is not passed to the CustomFactor constructor, we look for a
class-level attribute named `inputs`.
outputs : iterable, optional
An iterable of strings which represent the names of each output this
factor should compute and return. If this argument is not passed to the
CustomFactor constructor, we look for a class-level attribute named
`outputs`.
window_length : int, optional
Number of rows to pass for each input. If this argument is not passed
to the CustomFactor constructor, we look for a class-level attribute
Expand Down Expand Up @@ -1164,7 +1169,9 @@ def compute(self, today, assets, out, *inputs):
Column labels for `out` and`inputs`.
out : np.array[self.dtype, ndim=1]
Output array of the same shape as `assets`. `compute` should write
its desired return values into `out`.
its desired return values into `out`. If multiple outputs are
specified, `compute` should write its desired return values into
`out.<output_name>` for each output name in `self.outputs`.
*inputs : tuple of np.array
Raw data arrays corresponding to the values of `self.inputs`.
Expand Down Expand Up @@ -1229,9 +1236,73 @@ def compute(self, today, assets, out, data):
# MedianValue.
median_close10 = MedianValue([USEquityPricing.close], window_length=10)
median_low15 = MedianValue([USEquityPricing.low], window_length=15)
A CustomFactor with multiple outputs:
.. code-block:: python
class MultipleOutputs(CustomFactor):
inputs = [USEquityPricing.close]
outputs = ['alpha', 'beta']
window_length = N
def compute(self, today, assets, out, close):
computed_alpha, computed_beta = some_function(close)
out.alpha = computed_alpha
out.beta = computed_beta
# Each output is returned as its own Factor upon instantiation.
alpha, beta = MultipleOutputs()
'''
dtype = float64_dtype

def __getattr__(self, name):
if name in self.outputs:
return RecarrayFactor(factor=self, attribute=name)
else:
raise AttributeError(
"This factor has no output called '{}'.".format(name)
)

def __iter__(self):
if len(self.outputs) < 2:
raise ValueError('This factor does not have multiple outputs.')
RecarrayFactor_ = partial(RecarrayFactor, self)
return iter(map(RecarrayFactor_, self.outputs))


class RecarrayFactor(SingleInputMixin, Factor):

def __new__(cls, factor, attribute):
return super(RecarrayFactor, cls).__new__(
cls,
attribute=attribute,
inputs=[factor],
outputs=factor.outputs,
window_length=0,
dtype=factor.dtype,
missing_value=factor.missing_value,
)

def _init(self, attribute, *args, **kwargs):
self.attribute = attribute
return super(RecarrayFactor, self)._init(*args, **kwargs)

@classmethod
def static_identity(cls, attribute, *args, **kwargs):
return (
super(RecarrayFactor, cls).static_identity(*args, **kwargs),
attribute,
)

def _validate(self):
super(RecarrayFactor, self)._validate()
if not self.outputs:
raise TermOutputsNotSpecified(termname=type(self).__name__)

def _compute(self, windows, dates, assets, mask):
return windows[0][self.attribute]


class Latest(LatestMixin, CustomFactor):
"""
Expand Down
13 changes: 11 additions & 2 deletions zipline/pipeline/mixins.py
@@ -1,7 +1,7 @@
"""
Mixins classes for use with Filters and Factors.
"""
from numpy import full_like
from numpy import array, full_like, recarray

from zipline.utils.control_flow import nullctx
from zipline.errors import WindowLengthNotPositive, UnsupportedDataType
Expand Down Expand Up @@ -69,6 +69,7 @@ class CustomTermMixin(object):

def __new__(cls,
inputs=NotSpecified,
outputs=NotSpecified,
window_length=NotSpecified,
mask=NotSpecified,
dtype=NotSpecified,
Expand All @@ -88,6 +89,7 @@ def __new__(cls,
return super(CustomTermMixin, cls).__new__(
cls,
inputs=inputs,
outputs=outputs,
window_length=window_length,
mask=mask,
dtype=dtype,
Expand All @@ -109,7 +111,14 @@ def _compute(self, windows, dates, assets, mask):
compute = self.compute
missing_value = self.missing_value
params = self.params
out = full_like(mask, missing_value, dtype=self.dtype)
outputs = self.outputs
if outputs:
out = recarray(
mask.shape, dtype=zip(outputs, [self.dtype] * len(outputs)),
)
out[:] = array([missing_value])
else:
out = full_like(mask, missing_value, dtype=self.dtype)
with self.ctx:
# TODO: Consider pre-filtering columns that are all-nan at each
# time-step?
Expand Down
22 changes: 20 additions & 2 deletions zipline/pipeline/term.py
Expand Up @@ -349,11 +349,13 @@ class ComputableTerm(Term):
:class:`zipline.pipeline.Filter`, and :class:`zipline.pipeline.Factor`.
"""
inputs = NotSpecified
outputs = NotSpecified
window_length = NotSpecified
mask = NotSpecified

def __new__(cls,
inputs=inputs,
outputs=outputs,
window_length=window_length,
mask=mask,
*args, **kwargs):
Expand All @@ -368,6 +370,13 @@ def __new__(cls,
# normalize to a tuple so that inputs is hashable.
inputs = tuple(inputs)

if outputs is NotSpecified:
outputs = cls.outputs
if outputs is NotSpecified:
outputs = ()
else:
outputs = tuple(outputs)

if mask is NotSpecified:
mask = cls.mask
if mask is NotSpecified:
Expand All @@ -379,22 +388,31 @@ def __new__(cls,
return super(ComputableTerm, cls).__new__(
cls,
inputs=inputs,
outputs=outputs,
mask=mask,
window_length=window_length,
*args, **kwargs
)

def _init(self, inputs, window_length, mask, *args, **kwargs):
def _init(self, inputs, outputs, window_length, mask, *args, **kwargs):
self.inputs = inputs
self.outputs = outputs
self.window_length = window_length
self.mask = mask
return super(ComputableTerm, self)._init(*args, **kwargs)

@classmethod
def static_identity(cls, inputs, window_length, mask, *args, **kwargs):
def static_identity(cls,
inputs,
outputs,
window_length,
mask,
*args,
**kwargs):
return (
super(ComputableTerm, cls).static_identity(*args, **kwargs),
inputs,
outputs,
window_length,
mask,
)
Expand Down

0 comments on commit 7a45fa0

Please sign in to comment.