Skip to content

Commit

Permalink
Merge def9ec3 into db4e060
Browse files Browse the repository at this point in the history
  • Loading branch information
dmichalowicz committed Jul 13, 2016
2 parents db4e060 + def9ec3 commit 2d65967
Show file tree
Hide file tree
Showing 5 changed files with 893 additions and 379 deletions.
295 changes: 2 additions & 293 deletions tests/pipeline/test_engine.py
Expand Up @@ -14,7 +14,6 @@
float32,
float64,
full,
full_like,
log,
nan,
tile,
Expand All @@ -37,13 +36,10 @@
)
from pandas.compat.chainmap import ChainMap
from pandas.util.testing import assert_frame_equal
from scipy.stats.stats import linregress, pearsonr, spearmanr
from six import iteritems, itervalues
from toolz import merge

from zipline.assets import Equity
from zipline.assets.synthetic import make_rotating_equity_info
from zipline.errors import NonExistentAssetInTimeFrame
from zipline.lib.adjustment import MULTIPLY
from zipline.lib.labelarray import LabelArray
from zipline.pipeline import CustomFactor, Pipeline
Expand All @@ -57,10 +53,6 @@
ExponentialWeightedMovingAverage,
ExponentialWeightedMovingStdDev,
MaxDrawdown,
Returns,
RollingLinearRegressionOfReturns,
RollingPearsonOfReturns,
RollingSpearmanOfReturns,
SimpleMovingAverage,
)
from zipline.pipeline.loaders.equity_pricing_loader import (
Expand All @@ -80,7 +72,6 @@
make_alternating_boolean_array,
make_cascading_boolean_array,
OpenPrice,
parameter_space,
product_upper_triangle,
)
from zipline.testing.fixtures import (
Expand All @@ -90,6 +81,7 @@
ZiplineTestCase,
)
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import bool_dtype


class RollingSumDifference(CustomFactor):
Expand Down Expand Up @@ -584,7 +576,7 @@ def create_expected_results(expected_value, mask):
)

expected_no_mask_result = full(
shape=(num_dates, num_assets), fill_value=True, dtype=bool,
shape=(num_dates, num_assets), fill_value=True, dtype=bool_dtype,
)

masks = cascading_mask, alternating_mask, NotSpecified
Expand Down Expand Up @@ -1233,289 +1225,6 @@ def test_dollar_volume(self):
window=5)[5:]
assert_frame_equal(results['dv5_nan'].unstack(), expected_5_nan)

@parameter_space(returns_length=[2, 3], correlation_length=[3, 4])
def test_correlation_factors(self, returns_length, correlation_length):
"""
Tests for the built-in factors `RollingPearsonOfReturns` and
`RollingSpearmanOfReturns`.
"""
my_asset_column = 0
start_date_index = 14
end_date_index = 18

sids = self.sids
dates = self.dates
assets = self.asset_finder.retrieve_all(sids)
my_asset = assets[my_asset_column]
num_days = end_date_index - start_date_index + 1
num_assets = len(assets)

cascading_mask = \
AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day)
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_days, num_assets),
)

alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_days, num_assets),
)

expected_no_mask_result = full(
shape=(num_days, num_assets), fill_value=True, dtype=bool,
)

masks = cascading_mask, alternating_mask, NotSpecified
expected_mask_results = (
expected_cascading_mask_result,
expected_alternating_mask_result,
expected_no_mask_result,
)

for mask, expected_mask in zip(masks, expected_mask_results):
pearson_factor = RollingPearsonOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
mask=mask,
)
spearman_factor = RollingSpearmanOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
mask=mask,
)

pipeline = Pipeline(
columns={
'pearson_factor': pearson_factor,
'spearman_factor': spearman_factor,
},
)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')

results = self.engine.run_pipeline(
pipeline, dates[start_date_index], dates[end_date_index],
)
pearson_results = results['pearson_factor'].unstack()
spearman_results = results['spearman_factor'].unstack()
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)

# Run a separate pipeline that calculates returns starting
# (correlation_length - 1) days prior to our start date. This is
# because we need (correlation_length - 1) extra days of returns to
# compute our expected correlations.
returns = Returns(window_length=returns_length)
results = self.engine.run_pipeline(
Pipeline(columns={'returns': returns}),
dates[start_date_index - (correlation_length - 1)],
dates[end_date_index],
)
returns_results = results['returns'].unstack()

# On each day, calculate the expected correlation coefficients
# between the asset we are interested in and each other asset. Each
# correlation is calculated over `correlation_length` days.
expected_pearson_results = full_like(pearson_results, nan)
expected_spearman_results = full_like(spearman_results, nan)
for day in range(num_days):
todays_returns = returns_results.iloc[
day:day + correlation_length
]
my_asset_returns = todays_returns.iloc[:, my_asset_column]
for asset, other_asset_returns in todays_returns.iteritems():
asset_column = int(asset) - 1
expected_pearson_results[day, asset_column] = pearsonr(
my_asset_returns, other_asset_returns,
)[0]
expected_spearman_results[day, asset_column] = spearmanr(
my_asset_returns, other_asset_returns,
)[0]

expected_pearson_results = DataFrame(
data=where(expected_mask, expected_pearson_results, nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(pearson_results, expected_pearson_results)

expected_spearman_results = DataFrame(
data=where(expected_mask, expected_spearman_results, nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(spearman_results, expected_spearman_results)

@parameter_space(returns_length=[2, 3], regression_length=[3, 4])
def test_regression_of_returns_factor(self,
returns_length,
regression_length):
"""
Tests for the built-in factor `RollingLinearRegressionOfReturns`.
"""
my_asset_column = 0
start_date_index = 14
end_date_index = 18

sids = self.sids
dates = self.dates
assets = self.asset_finder.retrieve_all(sids)
my_asset = assets[my_asset_column]
num_days = end_date_index - start_date_index + 1
num_assets = len(assets)

cascading_mask = \
AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day)
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_days, num_assets),
)

alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_days, num_assets),
)

expected_no_mask_result = full(
shape=(num_days, num_assets), fill_value=True, dtype=bool,
)

masks = cascading_mask, alternating_mask, NotSpecified
expected_mask_results = (
expected_cascading_mask_result,
expected_alternating_mask_result,
expected_no_mask_result,
)

# The order of these is meant to align with the output of `linregress`.
outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr']

for mask, expected_mask in zip(masks, expected_mask_results):
regression_factor = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=returns_length,
regression_length=regression_length,
mask=mask,
)

pipeline = Pipeline(
columns={
output: getattr(regression_factor, output)
for output in outputs
},
)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')

results = self.engine.run_pipeline(
pipeline, dates[start_date_index], dates[end_date_index],
)
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)

output_results = {}
expected_output_results = {}
for output in outputs:
output_results[output] = results[output].unstack()
expected_output_results[output] = full_like(
output_results[output], nan,
)

# Run a separate pipeline that calculates returns starting
# (regression_length - 1) days prior to our start date. This is
# because we need (regression_length - 1) extra days of returns to
# compute our expected regressions.
returns = Returns(window_length=returns_length)
results = self.engine.run_pipeline(
Pipeline(columns={'returns': returns}),
dates[start_date_index - (regression_length - 1)],
dates[end_date_index],
)
returns_results = results['returns'].unstack()

# On each day, calculate the expected regression results for Y ~ X
# where Y is the asset we are interested in and X is each other
# asset. Each regression is calculated over `regression_length`
# days of data.
for day in range(num_days):
todays_returns = returns_results.iloc[
day:day + regression_length
]
my_asset_returns = todays_returns.iloc[:, my_asset_column]
for asset, other_asset_returns in todays_returns.iteritems():
asset_column = int(asset) - 1
expected_regression_results = linregress(
y=other_asset_returns, x=my_asset_returns,
)
for i, output in enumerate(outputs):
expected_output_results[output][day, asset_column] = \
expected_regression_results[i]

for output in outputs:
output_result = output_results[output]
expected_output_result = DataFrame(
where(expected_mask, expected_output_results[output], nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(output_result, expected_output_result)

def test_correlation_and_regression_with_bad_asset(self):
"""
Test that `RollingPearsonOfReturns`, `RollingSpearmanOfReturns` and
`RollingLinearRegressionOfReturns` raise the proper exception when
given a nonexistent target asset.
"""
start_date_index = 14
end_date_index = 18
my_asset = Equity(0)

# This filter is arbitrary; the important thing is that we test each
# factor both with and without a specified mask.
my_asset_filter = AssetID().eq(1)

for mask in (NotSpecified, my_asset_filter):
pearson_factor = RollingPearsonOfReturns(
target=my_asset,
returns_length=3,
correlation_length=3,
mask=mask,
)
spearman_factor = RollingSpearmanOfReturns(
target=my_asset,
returns_length=3,
correlation_length=3,
mask=mask,
)
regression_factor = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=3,
regression_length=3,
mask=mask,
)

with self.assertRaises(NonExistentAssetInTimeFrame):
self.engine.run_pipeline(
Pipeline(columns={'pearson_factor': pearson_factor}),
self.dates[start_date_index],
self.dates[end_date_index],
)
with self.assertRaises(NonExistentAssetInTimeFrame):
self.engine.run_pipeline(
Pipeline(columns={'spearman_factor': spearman_factor}),
self.dates[start_date_index],
self.dates[end_date_index],
)
with self.assertRaises(NonExistentAssetInTimeFrame):
self.engine.run_pipeline(
Pipeline(columns={'regression_factor': regression_factor}),
self.dates[start_date_index],
self.dates[end_date_index],
)


class StringColumnTestCase(WithSeededRandomPipelineEngine,
ZiplineTestCase):
Expand Down

0 comments on commit 2d65967

Please sign in to comment.