Skip to content

Commit

Permalink
Merge d21ece4 into 3f2a574
Browse files Browse the repository at this point in the history
  • Loading branch information
dmichalowicz committed Apr 12, 2016
2 parents 3f2a574 + d21ece4 commit 62cfc78
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 12 deletions.
8 changes: 7 additions & 1 deletion docs/source/whatsnew/0.9.1.txt
Expand Up @@ -17,7 +17,13 @@ None
Enhancements
~~~~~~~~~~~~

None
* Implemented :class:`zipline.pipeline.factors.RecarrayFactor`, a new pipeline
term designed to compute multiple factors at once while also keeping track of
which factors it has already computed.

* Added optional `outputs` parameter to :class:`zipline.pipeline.CustomFactor`.
Custom factors are now capable of computing and returning multiple outputs,
each of which are themselves a Factor.

Experimental Features
~~~~~~~~~~~~~~~~~~~~~
Expand Down
46 changes: 46 additions & 0 deletions tests/pipeline/test_engine.py
Expand Up @@ -112,6 +112,16 @@ def compute(self, today, assets, out, open):
out[:] = open


class MultipleOutputs(CustomFactor):
window_length = 1
inputs = [USEquityPricing.open, USEquityPricing.close]
outputs = ['double_open', 'double_close']

def compute(self, today, assets, out, open, close):
out.double_open[:] = open * 2
out.double_close[:] = close * 2


def assert_multi_index_is_product(testcase, index, *levels):
"""Assert that a MultiIndex contains the product of `*levels`."""
testcase.assertIsInstance(
Expand Down Expand Up @@ -510,6 +520,42 @@ def test_rolling_and_nonrolling(self):
),
)

def test_factor_with_multiple_outputs(self):
dates = self.dates[5:10]
assets = self.assets
num_dates = len(dates)
open = USEquityPricing.open
close = USEquityPricing.close
open_values = array([self.constants[open]] * num_dates, dtype=float)
close_values = array([self.constants[close]] * num_dates, dtype=float)
engine = SimplePipelineEngine(
lambda column: self.loader, self.dates, self.asset_finder,
)

double_open, double_close = MultipleOutputs()
pipeline = Pipeline(
columns={
'double_open': double_open,
'double_close': double_close,
},
)

results = engine.run_pipeline(pipeline, dates[0], dates[-1])
first_output_results = results['double_open'].unstack()
second_output_results = results['double_close'].unstack()

first_output_expected = {asset: open_values * 2 for asset in assets}
second_output_expected = {asset: close_values * 2 for asset in assets}

assert_frame_equal(
first_output_results,
DataFrame(first_output_expected, index=dates, columns=assets),
)
assert_frame_equal(
second_output_results,
DataFrame(second_output_expected, index=dates, columns=assets),
)

def test_loader_given_multiple_columns(self):

class Loader1DataSet1(DataSet):
Expand Down
15 changes: 13 additions & 2 deletions zipline/errors.py
Expand Up @@ -433,10 +433,21 @@ class TermInputsNotSpecified(ZiplineError):
msg = "{termname} requires inputs, but no inputs list was passed."


class TermOutputsNotSpecified(ZiplineError):
"""
Raised if a user attempts to construct a CustomFactor without specifying
enough outputs and that factor does not have class-level default outputs.
"""
msg = (
"{termname} requires at least two outputs, but was given "
"{num_outputs}."
)


class WindowLengthNotSpecified(ZiplineError):
"""
Raised if a user attempts to construct a term without specifying inputs and
that term does not have class-level default inputs.
Raised if a user attempts to construct a term without specifying window
length and that term does not have a class-level default window length.
"""
msg = (
"{termname} requires a window_length, but no window_length was passed."
Expand Down
4 changes: 3 additions & 1 deletion zipline/pipeline/factors/__init__.py
@@ -1,7 +1,8 @@
from .factor import (
CustomFactor,
Factor,
Latest
Latest,
RecarrayFactor,
)
from .events import (
BusinessDaysSinceCashBuybackAuth,
Expand Down Expand Up @@ -44,6 +45,7 @@
'Latest',
'MaxDrawdown',
'RSI',
'RecarrayFactor',
'Returns',
'SimpleMovingAverage',
'VWAP',
Expand Down
94 changes: 90 additions & 4 deletions zipline/pipeline/factors/factor.py
@@ -1,14 +1,14 @@
"""
factor.py
"""
from functools import wraps
from functools import partial, wraps
from operator import attrgetter
from numbers import Number

from numpy import inf, where
from toolz import curry

from zipline.errors import UnknownRankMethod
from zipline.errors import TermOutputsNotSpecified, UnknownRankMethod
from zipline.lib.normalize import naive_grouped_rowwise_apply
from zipline.lib.rank import masked_rankdata_2d
from zipline.pipeline.classifiers import Classifier, Everything, Quantiles
Expand Down Expand Up @@ -1130,13 +1130,27 @@ class CustomFactor(PositiveWindowLengthMixin, CustomTermMixin, Factor):
inputs : iterable, optional
An iterable of `BoundColumn` instances (e.g. USEquityPricing.close),
describing the data to load and pass to `self.compute`. If this
argument is passed to the CustomFactor constructor, we look for a
argument is not passed to the CustomFactor constructor, we look for a
class-level attribute named `inputs`.
outputs : iterable, optional
An iterable of strings which represent the names of each output this
factor should compute and return. If this argument is not passed to the
CustomFactor constructor, we look for a class-level attribute named
`outputs`.
window_length : int, optional
Number of rows to pass for each input. If this argument is not passed
to the CustomFactor constructor, we look for a class-level attribute
named `window_length`.
Returns
-------
output or outputs : zipline.pipeline.factors.CustomFactor or iterable of
zipline.pipeline.factors.RecarrayFactor
The Factor instance(s) to be returned upon instantiation.
If passed an `outputs` argument, CustomFactor will return an iterable
of RecarrayFactors representing each output. If `outputs` is not
specified, it returns an instance of CustomFactor.
Notes
-----
Users implementing their own Factors should subclass CustomFactor and
Expand All @@ -1159,7 +1173,9 @@ def compute(self, today, assets, out, *inputs):
Column labels for `out` and`inputs`.
out : np.array[self.dtype, ndim=1]
Output array of the same shape as `assets`. `compute` should write
its desired return values into `out`.
its desired return values into `out`. If multiple outputs are
specified, `compute` should write its desired return values into
`out.<output_name>` for each output name in `self.outputs`.
*inputs : tuple of np.array
Raw data arrays corresponding to the values of `self.inputs`.
Expand Down Expand Up @@ -1224,9 +1240,79 @@ def compute(self, today, assets, out, data):
# MedianValue.
median_close10 = MedianValue([USEquityPricing.close], window_length=10)
median_low15 = MedianValue([USEquityPricing.low], window_length=15)
A CustomFactor with multiple outputs:
.. code-block:: python
class MultipleOutputs(CustomFactor):
inputs = [USEquityPricing.close]
outputs = ['alpha', 'beta']
window_length = N
def compute(self, today, assets, out, close):
computed_alpha, computed_beta = some_function(close)
out.alpha = computed_alpha
out.beta = computed_beta
# Each output is returned as its own Factor upon instantiation.
alpha, beta = MultipleOutputs()
'''
dtype = float64_dtype

def __getattr__(self, name):
if name in self.outputs:
return RecarrayFactor(factor=self, attribute=name)
else:
raise AttributeError(
"This factor has no output called '{}'.".format(name)
)

def __iter__(self):
if len(self.outputs) < 2:
raise ValueError('This factor does not have multiple outputs.')
RecarrayFactor_ = partial(RecarrayFactor, self)
return iter(map(RecarrayFactor_, self.outputs))


class RecarrayFactor(SingleInputMixin, Factor):

def __new__(cls, factor, attribute):
return super(RecarrayFactor, cls).__new__(
cls,
factor=factor,
attribute=attribute,
inputs=[factor],
outputs=factor.outputs,
window_length=0,
dtype=factor.dtype,
missing_value=factor.missing_value,
)

def _init(self, factor, attribute, *args, **kwargs):
self.parent = factor
self.attribute = attribute
return super(RecarrayFactor, self)._init(*args, **kwargs)

@classmethod
def static_identity(cls, factor, attribute, *args, **kwargs):
return (
super(RecarrayFactor, cls).static_identity(*args, **kwargs),
factor,
attribute,
)

def _validate(self):
super(RecarrayFactor, self)._validate()
num_outputs = len(self.outputs)
if num_outputs < 2:
raise TermOutputsNotSpecified(
termname=type(self).__name__, num_outputs=num_outputs,
)

def _compute(self, windows, dates, assets, mask):
return windows[0][self.attribute]


class Latest(LatestMixin, CustomFactor):
"""
Expand Down
13 changes: 11 additions & 2 deletions zipline/pipeline/mixins.py
@@ -1,7 +1,7 @@
"""
Mixins classes for use with Filters and Factors.
"""
from numpy import full_like
from numpy import array, full_like, recarray

from zipline.utils.control_flow import nullctx
from zipline.errors import WindowLengthNotPositive, UnsupportedDataType
Expand Down Expand Up @@ -69,6 +69,7 @@ class CustomTermMixin(object):

def __new__(cls,
inputs=NotSpecified,
outputs=NotSpecified,
window_length=NotSpecified,
mask=NotSpecified,
dtype=NotSpecified,
Expand All @@ -88,6 +89,7 @@ def __new__(cls,
return super(CustomTermMixin, cls).__new__(
cls,
inputs=inputs,
outputs=outputs,
window_length=window_length,
mask=mask,
dtype=dtype,
Expand All @@ -109,7 +111,14 @@ def _compute(self, windows, dates, assets, mask):
compute = self.compute
missing_value = self.missing_value
params = self.params
out = full_like(mask, missing_value, dtype=self.dtype)
outputs = self.outputs
if outputs is not NotSpecified and len(outputs) > 1:
out = recarray(
mask.shape, dtype=zip(outputs, [self.dtype] * len(outputs)),
)
out[:] = array([missing_value])
else:
out = full_like(mask, missing_value, dtype=self.dtype)
with self.ctx:
# TODO: Consider pre-filtering columns that are all-nan at each
# time-step?
Expand Down
22 changes: 20 additions & 2 deletions zipline/pipeline/term.py
Expand Up @@ -349,11 +349,13 @@ class ComputableTerm(Term):
:class:`zipline.pipeline.Filter`, and :class:`zipline.pipeline.Factor`.
"""
inputs = NotSpecified
outputs = NotSpecified
window_length = NotSpecified
mask = NotSpecified

def __new__(cls,
inputs=inputs,
outputs=outputs,
window_length=window_length,
mask=mask,
*args, **kwargs):
Expand All @@ -368,6 +370,13 @@ def __new__(cls,
# normalize to a tuple so that inputs is hashable.
inputs = tuple(inputs)

if outputs is NotSpecified:
outputs = cls.outputs
if outputs is NotSpecified:
outputs = tuple()
else:
outputs = tuple(outputs)

if mask is NotSpecified:
mask = cls.mask
if mask is NotSpecified:
Expand All @@ -379,22 +388,31 @@ def __new__(cls,
return super(ComputableTerm, cls).__new__(
cls,
inputs=inputs,
outputs=outputs,
mask=mask,
window_length=window_length,
*args, **kwargs
)

def _init(self, inputs, window_length, mask, *args, **kwargs):
def _init(self, inputs, outputs, window_length, mask, *args, **kwargs):
self.inputs = inputs
self.outputs = outputs
self.window_length = window_length
self.mask = mask
return super(ComputableTerm, self)._init(*args, **kwargs)

@classmethod
def static_identity(cls, inputs, window_length, mask, *args, **kwargs):
def static_identity(cls,
inputs,
outputs,
window_length,
mask,
*args,
**kwargs):
return (
super(ComputableTerm, cls).static_identity(*args, **kwargs),
inputs,
outputs,
window_length,
mask,
)
Expand Down

0 comments on commit 62cfc78

Please sign in to comment.