Skip to content

Commit

Permalink
Merge ba1a47f into 2c819a6
Browse files Browse the repository at this point in the history
  • Loading branch information
llllllllll committed Oct 28, 2016
2 parents 2c819a6 + ba1a47f commit 3033a52
Show file tree
Hide file tree
Showing 18 changed files with 813 additions and 49 deletions.
65 changes: 65 additions & 0 deletions tests/pipeline/test_alias.py
@@ -0,0 +1,65 @@
from nose.tools import nottest
import numpy as np

from zipline.testing.predicates import assert_equal
from zipline.pipeline import Classifier, Factor, Filter
from zipline.utils.numpy_utils import float64_dtype, int64_dtype

from .base import BasePipelineTestCase


@nottest
class BaseAliasTestCase(BasePipelineTestCase):

def test_alias(self):
f = self.Term()
alias = f.alias('ayy lmao')

f_values = np.random.RandomState(5).randn(5, 5)

self.check_terms(
terms={
'f_alias': alias,
},
expected={
'f_alias': f_values,
},
initial_workspace={f: f_values},
mask=self.build_mask(np.ones((5, 5))),
)

def test_repr(self):
assert_equal(
repr(self.Term().alias('ayy lmao')),
"Aliased%s(Term(...), name='ayy lmao')" % (
self.Term.__base__.__name__,
),
)

def test_short_repr(self):
for name in ('a', 'b'):
assert_equal(
self.Term().alias(name).short_repr(),
name,
)


class TestFactorAlias(BaseAliasTestCase):
class Term(Factor):
dtype = float64_dtype
inputs = ()
window_length = 0


class TestFilterAlias(BaseAliasTestCase):
class Term(Filter):
inputs = ()
window_length = 0


class TestClassifierAlias(BaseAliasTestCase):
class Term(Classifier):
dtype = int64_dtype
inputs = ()
window_length = 0
missing_value = -1
81 changes: 81 additions & 0 deletions tests/pipeline/test_classifier.py
Expand Up @@ -2,10 +2,13 @@
from operator import or_

import numpy as np
import pandas as pd

from zipline.lib.labelarray import LabelArray
from zipline.pipeline import Classifier
from zipline.testing import parameter_space
from zipline.testing.fixtures import ZiplineTestCase
from zipline.testing.predicates import assert_equal
from zipline.utils.numpy_utils import (
categorical_dtype,
int64_dtype,
Expand Down Expand Up @@ -464,3 +467,81 @@ class C(Classifier):
"TypeError(\"unhashable type: 'dict'\",)."
)
self.assertEqual(errmsg, expected)


class TestPostProcessAndToWorkSpaceValue(ZiplineTestCase):
def test_reversability_categorical(self):
class F(Classifier):
inputs = ()
window_length = 0
dtype = categorical_dtype
missing_value = '<missing>'

f = F()
column_data = LabelArray(
np.array(
[['a', f.missing_value],
['b', f.missing_value],
['c', 'd']],
),
missing_value=f.missing_value,
)

assert_equal(
f.postprocess(column_data.ravel()),
pd.Categorical(
['a', f.missing_value, 'b', f.missing_value, 'c', 'd'],
),
)

# only include the non-missing data
pipeline_output = pd.Series(
data=['a', 'b', 'c', 'd'],
index=pd.MultiIndex.from_arrays([
[pd.Timestamp('2014-01-01'),
pd.Timestamp('2014-01-02'),
pd.Timestamp('2014-01-03'),
pd.Timestamp('2014-01-03')],
[0, 0, 0, 1],
]),
dtype='category',
)

assert_equal(
f.to_workspace_value(pipeline_output, pd.Index([0, 1])),
column_data,
)

def test_reversability_int64(self):
class F(Classifier):
inputs = ()
window_length = 0
dtype = int64_dtype
missing_value = -1

f = F()
column_data = np.array(
[[0, f.missing_value],
[1, f.missing_value],
[2, 3]],
)

assert_equal(f.postprocess(column_data.ravel()), column_data.ravel())

# only include the non-missing data
pipeline_output = pd.Series(
data=[0, 1, 2, 3],
index=pd.MultiIndex.from_arrays([
[pd.Timestamp('2014-01-01'),
pd.Timestamp('2014-01-02'),
pd.Timestamp('2014-01-03'),
pd.Timestamp('2014-01-03')],
[0, 0, 0, 1],
]),
dtype=int64_dtype,
)

assert_equal(
f.to_workspace_value(pipeline_output, pd.Index([0, 1])),
column_data,
)
186 changes: 184 additions & 2 deletions tests/pipeline/test_engine.py
Expand Up @@ -14,6 +14,7 @@
float32,
float64,
full,
full_like,
log,
nan,
tile,
Expand Down Expand Up @@ -66,10 +67,12 @@
from zipline.testing import (
AssetID,
AssetIDPlusDay,
ExplodingObject,
check_arrays,
make_alternating_boolean_array,
make_cascading_boolean_array,
OpenPrice,
parameter_space,
product_upper_triangle,
)
from zipline.testing.fixtures import (
Expand All @@ -78,6 +81,7 @@
WithTradingEnvironment,
ZiplineTestCase,
)
from zipline.testing.predicates import assert_equal
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import bool_dtype, datetime64ns_dtype

Expand Down Expand Up @@ -163,14 +167,14 @@ def compute(self, today, assets, out, *inputs):
out[:] = sum(inputs).sum(axis=0)


class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase):
class WithConstantInputs(WithTradingEnvironment):
asset_ids = ASSET_FINDER_EQUITY_SIDS = 1, 2, 3, 4
START_DATE = Timestamp('2014-01-01', tz='utc')
END_DATE = Timestamp('2014-03-01', tz='utc')

@classmethod
def init_class_fixtures(cls):
super(ConstantInputTestCase, cls).init_class_fixtures()
super(WithConstantInputs, cls).init_class_fixtures()
cls.constants = {
# Every day, assume every stock starts at 2, goes down to 1,
# goes up to 4, and finishes at 3.
Expand All @@ -192,6 +196,8 @@ def init_class_fixtures(cls):
)
cls.assets = cls.asset_finder.retrieve_all(cls.asset_ids)


class ConstantInputTestCase(WithConstantInputs, ZiplineTestCase):
def test_bad_dates(self):
loader = self.loader
engine = SimplePipelineEngine(
Expand Down Expand Up @@ -1315,3 +1321,179 @@ def test_string_classifiers_produce_categoricals(self):
columns=self.asset_finder.retrieve_all(self.asset_finder.sids),
)
assert_frame_equal(result.c.unstack(), expected_final_result)


class WindowSafetyPropagationTestCase(WithSeededRandomPipelineEngine,
ZiplineTestCase):

SEEDED_RANDOM_PIPELINE_SEED = 5

def test_window_safety_propagation(self):
dates = self.trading_days[-30:]
start_date, end_date = dates[[-10, -1]]

col = TestingDataSet.float_col
pipe = Pipeline(
columns={
'average_of_rank_plus_one': SimpleMovingAverage(
inputs=[col.latest.rank() + 1],
window_length=10,
),
'average_of_aliased_rank_plus_one': SimpleMovingAverage(
inputs=[col.latest.rank().alias('some_alias') + 1],
window_length=10,
),
'average_of_rank_plus_one_aliased': SimpleMovingAverage(
inputs=[(col.latest.rank() + 1).alias('some_alias')],
window_length=10,
),
}
)
results = self.run_pipeline(pipe, start_date, end_date).unstack()

expected_ranks = DataFrame(
self.raw_expected_values(
col,
dates[-19],
dates[-1],
),
index=dates[-19:],
columns=self.asset_finder.retrieve_all(
self.ASSET_FINDER_EQUITY_SIDS,
)
).rank(axis='columns')

# All three expressions should be equivalent and evaluate to this.
expected_result = (
(expected_ranks + 1)
.rolling(10)
.mean()
.dropna(how='any')
)

for colname in results.columns.levels[0]:
assert_equal(expected_result, results[colname])


class PopulateInitialWorkspaceTestCase(WithConstantInputs, ZiplineTestCase):

@parameter_space(window_length=[3, 5], pipeline_length=[5, 10])
def test_populate_initial_workspace(self, window_length, pipeline_length):
column = USEquityPricing.low
base_term = column.latest

# Take a Z-Score here so that the precomputed term is window-safe. The
# z-score will never actually get computed because we swap it out.
precomputed_term = (base_term.zscore()).alias('precomputed_term')

# A term that has `precomputed_term` as an input.
depends_on_precomputed_term = precomputed_term + 1
# A term that requires a window of `precomputed_term`.
depends_on_window_of_precomputed_term = SimpleMovingAverage(
inputs=[precomputed_term],
window_length=window_length,
)

precomputed_term_with_window = SimpleMovingAverage(
inputs=(column,),
window_length=window_length,
).alias('precomputed_term_with_window')
depends_on_precomputed_term_with_window = (
precomputed_term_with_window + 1
)

column_value = self.constants[column]
precomputed_term_value = -column_value
precomputed_term_with_window_value = -(column_value + 1)

def populate_initial_workspace(initial_workspace,
root_mask_term,
execution_plan,
dates,
assets):
def shape_for_term(term):
ndates = len(execution_plan.mask_and_dates_for_term(
term,
root_mask_term,
initial_workspace,
dates,
)[1])
nassets = len(assets)
return (ndates, nassets)

ws = initial_workspace.copy()
ws[precomputed_term] = full(
shape_for_term(precomputed_term),
precomputed_term_value,
dtype=float64,
)
ws[precomputed_term_with_window] = full(
shape_for_term(precomputed_term_with_window),
precomputed_term_with_window_value,
dtype=float64,
)
return ws

def dispatcher(c):
if c is column:
# the base_term should never be loaded, its initial refcount
# should be zero
return ExplodingObject()
return self.loader

engine = SimplePipelineEngine(
dispatcher,
self.dates,
self.asset_finder,
populate_initial_workspace=populate_initial_workspace,
)

results = engine.run_pipeline(
Pipeline({
'precomputed_term': precomputed_term,
'precomputed_term_with_window': precomputed_term_with_window,
'depends_on_precomputed_term': depends_on_precomputed_term,
'depends_on_precomputed_term_with_window':
depends_on_precomputed_term_with_window,
'depends_on_window_of_precomputed_term':
depends_on_window_of_precomputed_term,
}),
self.dates[-pipeline_length],
self.dates[-1],
)

assert_equal(
results['precomputed_term'].values,
full_like(
results['precomputed_term'],
precomputed_term_value,
),
),
assert_equal(
results['precomputed_term_with_window'].values,
full_like(
results['precomputed_term_with_window'],
precomputed_term_with_window_value,
),
),
assert_equal(
results['depends_on_precomputed_term'].values,
full_like(
results['depends_on_precomputed_term'],
precomputed_term_value + 1,
),
)
assert_equal(
results['depends_on_precomputed_term_with_window'].values,
full_like(
results['depends_on_precomputed_term_with_window'],
precomputed_term_with_window_value + 1,
),
)
assert_equal(
results['depends_on_window_of_precomputed_term'].values,
full_like(
results['depends_on_window_of_precomputed_term'],
precomputed_term_value,
),
)

0 comments on commit 3033a52

Please sign in to comment.