From 54f434ea8eb27b0b8c68b6f20d7cda395f8ad8a9 Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Tue, 28 Jun 2016 10:43:46 -0400 Subject: [PATCH] ENH: Factor-to-factor correlations/regressions --- tests/pipeline/test_engine.py | 295 +-------- tests/pipeline/test_statistical.py | 759 ++++++++++++++++++++++++ zipline/errors.py | 11 + zipline/pipeline/factors/factor.py | 33 +- zipline/pipeline/factors/statistical.py | 148 +++-- 5 files changed, 883 insertions(+), 363 deletions(-) create mode 100644 tests/pipeline/test_statistical.py diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index d3a8501ea6..249465d31a 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -14,7 +14,6 @@ float32, float64, full, - full_like, log, nan, tile, @@ -37,13 +36,10 @@ ) from pandas.compat.chainmap import ChainMap from pandas.util.testing import assert_frame_equal -from scipy.stats.stats import linregress, pearsonr, spearmanr from six import iteritems, itervalues from toolz import merge -from zipline.assets import Equity from zipline.assets.synthetic import make_rotating_equity_info -from zipline.errors import NonExistentAssetInTimeFrame from zipline.lib.adjustment import MULTIPLY from zipline.lib.labelarray import LabelArray from zipline.pipeline import CustomFactor, Pipeline @@ -57,10 +53,6 @@ ExponentialWeightedMovingAverage, ExponentialWeightedMovingStdDev, MaxDrawdown, - Returns, - RollingLinearRegressionOfReturns, - RollingPearsonOfReturns, - RollingSpearmanOfReturns, SimpleMovingAverage, ) from zipline.pipeline.loaders.equity_pricing_loader import ( @@ -80,7 +72,6 @@ make_alternating_boolean_array, make_cascading_boolean_array, OpenPrice, - parameter_space, product_upper_triangle, ) from zipline.testing.fixtures import ( @@ -90,6 +81,7 @@ ZiplineTestCase, ) from zipline.utils.memoize import lazyval +from zipline.utils.numpy_utils import bool_dtype class RollingSumDifference(CustomFactor): @@ -584,7 +576,7 @@ def create_expected_results(expected_value, mask): ) expected_no_mask_result = full( - shape=(num_dates, num_assets), fill_value=True, dtype=bool, + shape=(num_dates, num_assets), fill_value=True, dtype=bool_dtype, ) masks = cascading_mask, alternating_mask, NotSpecified @@ -1207,289 +1199,6 @@ def test_dollar_volume(self): expected_5 = rolling_mean((self.raw_data ** 2) * 2, window=5)[5:] assert_frame_equal(results['dv5'].unstack(), expected_5) - @parameter_space(returns_length=[2, 3], correlation_length=[3, 4]) - def test_correlation_factors(self, returns_length, correlation_length): - """ - Tests for the built-in factors `RollingPearsonOfReturns` and - `RollingSpearmanOfReturns`. - """ - my_asset_column = 0 - start_date_index = 14 - end_date_index = 18 - - sids = self.sids - dates = self.dates - assets = self.asset_finder.retrieve_all(sids) - my_asset = assets[my_asset_column] - num_days = end_date_index - start_date_index + 1 - num_assets = len(assets) - - cascading_mask = \ - AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day) - expected_cascading_mask_result = make_cascading_boolean_array( - shape=(num_days, num_assets), - ) - - alternating_mask = (AssetIDPlusDay() % 2).eq(0) - expected_alternating_mask_result = make_alternating_boolean_array( - shape=(num_days, num_assets), - ) - - expected_no_mask_result = full( - shape=(num_days, num_assets), fill_value=True, dtype=bool, - ) - - masks = cascading_mask, alternating_mask, NotSpecified - expected_mask_results = ( - expected_cascading_mask_result, - expected_alternating_mask_result, - expected_no_mask_result, - ) - - for mask, expected_mask in zip(masks, expected_mask_results): - pearson_factor = RollingPearsonOfReturns( - target=my_asset, - returns_length=returns_length, - correlation_length=correlation_length, - mask=mask, - ) - spearman_factor = RollingSpearmanOfReturns( - target=my_asset, - returns_length=returns_length, - correlation_length=correlation_length, - mask=mask, - ) - - pipeline = Pipeline( - columns={ - 'pearson_factor': pearson_factor, - 'spearman_factor': spearman_factor, - }, - ) - if mask is not NotSpecified: - pipeline.add(mask, 'mask') - - results = self.engine.run_pipeline( - pipeline, dates[start_date_index], dates[end_date_index], - ) - pearson_results = results['pearson_factor'].unstack() - spearman_results = results['spearman_factor'].unstack() - if mask is not NotSpecified: - mask_results = results['mask'].unstack() - check_arrays(mask_results.values, expected_mask) - - # Run a separate pipeline that calculates returns starting - # (correlation_length - 1) days prior to our start date. This is - # because we need (correlation_length - 1) extra days of returns to - # compute our expected correlations. - returns = Returns(window_length=returns_length) - results = self.engine.run_pipeline( - Pipeline(columns={'returns': returns}), - dates[start_date_index - (correlation_length - 1)], - dates[end_date_index], - ) - returns_results = results['returns'].unstack() - - # On each day, calculate the expected correlation coefficients - # between the asset we are interested in and each other asset. Each - # correlation is calculated over `correlation_length` days. - expected_pearson_results = full_like(pearson_results, nan) - expected_spearman_results = full_like(spearman_results, nan) - for day in range(num_days): - todays_returns = returns_results.iloc[ - day:day + correlation_length - ] - my_asset_returns = todays_returns.iloc[:, my_asset_column] - for asset, other_asset_returns in todays_returns.iteritems(): - asset_column = int(asset) - 1 - expected_pearson_results[day, asset_column] = pearsonr( - my_asset_returns, other_asset_returns, - )[0] - expected_spearman_results[day, asset_column] = spearmanr( - my_asset_returns, other_asset_returns, - )[0] - - expected_pearson_results = DataFrame( - data=where(expected_mask, expected_pearson_results, nan), - index=dates[start_date_index:end_date_index + 1], - columns=assets, - ) - assert_frame_equal(pearson_results, expected_pearson_results) - - expected_spearman_results = DataFrame( - data=where(expected_mask, expected_spearman_results, nan), - index=dates[start_date_index:end_date_index + 1], - columns=assets, - ) - assert_frame_equal(spearman_results, expected_spearman_results) - - @parameter_space(returns_length=[2, 3], regression_length=[3, 4]) - def test_regression_of_returns_factor(self, - returns_length, - regression_length): - """ - Tests for the built-in factor `RollingLinearRegressionOfReturns`. - """ - my_asset_column = 0 - start_date_index = 14 - end_date_index = 18 - - sids = self.sids - dates = self.dates - assets = self.asset_finder.retrieve_all(sids) - my_asset = assets[my_asset_column] - num_days = end_date_index - start_date_index + 1 - num_assets = len(assets) - - cascading_mask = \ - AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day) - expected_cascading_mask_result = make_cascading_boolean_array( - shape=(num_days, num_assets), - ) - - alternating_mask = (AssetIDPlusDay() % 2).eq(0) - expected_alternating_mask_result = make_alternating_boolean_array( - shape=(num_days, num_assets), - ) - - expected_no_mask_result = full( - shape=(num_days, num_assets), fill_value=True, dtype=bool, - ) - - masks = cascading_mask, alternating_mask, NotSpecified - expected_mask_results = ( - expected_cascading_mask_result, - expected_alternating_mask_result, - expected_no_mask_result, - ) - - # The order of these is meant to align with the output of `linregress`. - outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr'] - - for mask, expected_mask in zip(masks, expected_mask_results): - regression_factor = RollingLinearRegressionOfReturns( - target=my_asset, - returns_length=returns_length, - regression_length=regression_length, - mask=mask, - ) - - pipeline = Pipeline( - columns={ - output: getattr(regression_factor, output) - for output in outputs - }, - ) - if mask is not NotSpecified: - pipeline.add(mask, 'mask') - - results = self.engine.run_pipeline( - pipeline, dates[start_date_index], dates[end_date_index], - ) - if mask is not NotSpecified: - mask_results = results['mask'].unstack() - check_arrays(mask_results.values, expected_mask) - - output_results = {} - expected_output_results = {} - for output in outputs: - output_results[output] = results[output].unstack() - expected_output_results[output] = full_like( - output_results[output], nan, - ) - - # Run a separate pipeline that calculates returns starting - # (regression_length - 1) days prior to our start date. This is - # because we need (regression_length - 1) extra days of returns to - # compute our expected regressions. - returns = Returns(window_length=returns_length) - results = self.engine.run_pipeline( - Pipeline(columns={'returns': returns}), - dates[start_date_index - (regression_length - 1)], - dates[end_date_index], - ) - returns_results = results['returns'].unstack() - - # On each day, calculate the expected regression results for Y ~ X - # where Y is the asset we are interested in and X is each other - # asset. Each regression is calculated over `regression_length` - # days of data. - for day in range(num_days): - todays_returns = returns_results.iloc[ - day:day + regression_length - ] - my_asset_returns = todays_returns.iloc[:, my_asset_column] - for asset, other_asset_returns in todays_returns.iteritems(): - asset_column = int(asset) - 1 - expected_regression_results = linregress( - y=other_asset_returns, x=my_asset_returns, - ) - for i, output in enumerate(outputs): - expected_output_results[output][day, asset_column] = \ - expected_regression_results[i] - - for output in outputs: - output_result = output_results[output] - expected_output_result = DataFrame( - where(expected_mask, expected_output_results[output], nan), - index=dates[start_date_index:end_date_index + 1], - columns=assets, - ) - assert_frame_equal(output_result, expected_output_result) - - def test_correlation_and_regression_with_bad_asset(self): - """ - Test that `RollingPearsonOfReturns`, `RollingSpearmanOfReturns` and - `RollingLinearRegressionOfReturns` raise the proper exception when - given a nonexistent target asset. - """ - start_date_index = 14 - end_date_index = 18 - my_asset = Equity(0) - - # This filter is arbitrary; the important thing is that we test each - # factor both with and without a specified mask. - my_asset_filter = AssetID().eq(1) - - for mask in (NotSpecified, my_asset_filter): - pearson_factor = RollingPearsonOfReturns( - target=my_asset, - returns_length=3, - correlation_length=3, - mask=mask, - ) - spearman_factor = RollingSpearmanOfReturns( - target=my_asset, - returns_length=3, - correlation_length=3, - mask=mask, - ) - regression_factor = RollingLinearRegressionOfReturns( - target=my_asset, - returns_length=3, - regression_length=3, - mask=mask, - ) - - with self.assertRaises(NonExistentAssetInTimeFrame): - self.engine.run_pipeline( - Pipeline(columns={'pearson_factor': pearson_factor}), - self.dates[start_date_index], - self.dates[end_date_index], - ) - with self.assertRaises(NonExistentAssetInTimeFrame): - self.engine.run_pipeline( - Pipeline(columns={'spearman_factor': spearman_factor}), - self.dates[start_date_index], - self.dates[end_date_index], - ) - with self.assertRaises(NonExistentAssetInTimeFrame): - self.engine.run_pipeline( - Pipeline(columns={'regression_factor': regression_factor}), - self.dates[start_date_index], - self.dates[end_date_index], - ) - class StringColumnTestCase(WithSeededRandomPipelineEngine, ZiplineTestCase): diff --git a/tests/pipeline/test_statistical.py b/tests/pipeline/test_statistical.py new file mode 100644 index 0000000000..589d47ea83 --- /dev/null +++ b/tests/pipeline/test_statistical.py @@ -0,0 +1,759 @@ +""" +Tests for statistical pipeline terms. +""" +from numpy import ( + arange, + full, + full_like, + nan, + where, +) +from pandas import ( + DataFrame, + date_range, + Int64Index, + Timestamp, +) +from pandas.util.testing import assert_frame_equal +from scipy.stats import linregress, pearsonr, spearmanr + +from zipline.assets import Equity +from zipline.errors import IncompatibleTerms, NonExistentAssetInTimeFrame +from zipline.pipeline import CustomFactor, Pipeline +from zipline.pipeline.data import USEquityPricing +from zipline.pipeline.data.testing import TestingDataSet +from zipline.pipeline.engine import SimplePipelineEngine +from zipline.pipeline.factors import ( + Returns, + RollingLinearRegressionOfReturns, + RollingPearsonOfReturns, + RollingSpearmanOfReturns, +) +from zipline.pipeline.loaders.frame import DataFrameLoader +from zipline.pipeline.sentinels import NotSpecified +from zipline.testing import ( + AssetID, + AssetIDPlusDay, + check_arrays, + make_alternating_boolean_array, + make_cascading_boolean_array, + parameter_space, +) +from zipline.testing.fixtures import ( + WithSeededRandomPipelineEngine, + WithTradingEnvironment, + ZiplineTestCase, +) +from zipline.utils.numpy_utils import ( + bool_dtype, + datetime64ns_dtype, + float64_dtype, +) + + +class StatisticalBuiltInsTestCase(WithTradingEnvironment, ZiplineTestCase): + sids = ASSET_FINDER_EQUITY_SIDS = Int64Index([1, 2, 3]) + START_DATE = Timestamp('2015-01-31', tz='UTC') + END_DATE = Timestamp('2015-03-01', tz='UTC') + + @classmethod + def init_class_fixtures(cls): + super(StatisticalBuiltInsTestCase, cls).init_class_fixtures() + + day = cls.trading_schedule.day + cls.dates = dates = date_range( + '2015-02-01', '2015-02-28', freq=day, tz='UTC', + ) + + # Using these start and end dates because they are a contigous span of + # 5 days (Monday - Friday) and they allow for plenty of days to look + # back on when computing correlations and regressions. + cls.start_date_index = start_date_index = 14 + cls.end_date_index = end_date_index = 18 + cls.pipeline_start_date = dates[start_date_index] + cls.pipeline_end_date = dates[end_date_index] + cls.num_days = num_days = end_date_index - start_date_index + 1 + + sids = cls.sids + cls.assets = assets = cls.asset_finder.retrieve_all(sids) + cls.my_asset_column = my_asset_column = 0 + cls.my_asset = assets[my_asset_column] + cls.num_assets = num_assets = len(assets) + + cls.raw_data = raw_data = DataFrame( + data=arange(len(dates) * len(sids), dtype=float64_dtype).reshape( + len(dates), len(sids), + ), + index=dates, + columns=assets, + ) + + # Using mock 'close' data here because the correlation and regression + # built-ins use USEquityPricing.close as the input to their `Returns` + # factors. Since there is no way to change that when constructing an + # instance of these built-ins, we need to test with mock 'close' data + # to most accurately reflect their true behavior and results. + close_loader = DataFrameLoader(USEquityPricing.close, raw_data) + + cls.run_pipeline = SimplePipelineEngine( + {USEquityPricing.close: close_loader}.__getitem__, + dates, + cls.asset_finder, + ).run_pipeline + + cls.cascading_mask = \ + AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day) + cls.expected_cascading_mask_result = make_cascading_boolean_array( + shape=(num_days, num_assets), + ) + cls.alternating_mask = (AssetIDPlusDay() % 2).eq(0) + cls.expected_alternating_mask_result = make_alternating_boolean_array( + shape=(num_days, num_assets), + ) + cls.expected_no_mask_result = full( + shape=(num_days, num_assets), fill_value=True, dtype=bool_dtype, + ) + + @parameter_space(returns_length=[2, 3], correlation_length=[3, 4]) + def test_correlation_factors(self, returns_length, correlation_length): + """ + Tests for the built-in factors `RollingPearsonOfReturns` and + `RollingSpearmanOfReturns`. + """ + assets = self.assets + my_asset = self.my_asset + my_asset_column = self.my_asset_column + dates = self.dates + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + start_date_index = self.start_date_index + end_date_index = self.end_date_index + num_days = self.num_days + run_pipeline = self.run_pipeline + + returns = Returns(window_length=returns_length) + masks = (self.cascading_mask, self.alternating_mask, NotSpecified) + expected_mask_results = ( + self.expected_cascading_mask_result, + self.expected_alternating_mask_result, + self.expected_no_mask_result, + ) + + for mask, expected_mask in zip(masks, expected_mask_results): + pearson_factor = RollingPearsonOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + mask=mask, + ) + spearman_factor = RollingSpearmanOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + mask=mask, + ) + + columns = { + 'pearson_factor': pearson_factor, + 'spearman_factor': spearman_factor, + } + pipeline = Pipeline(columns=columns) + if mask is not NotSpecified: + pipeline.add(mask, 'mask') + + results = run_pipeline(pipeline, start_date, end_date) + pearson_results = results['pearson_factor'].unstack() + spearman_results = results['spearman_factor'].unstack() + if mask is not NotSpecified: + mask_results = results['mask'].unstack() + check_arrays(mask_results.values, expected_mask) + + # Run a separate pipeline that calculates returns starting + # (correlation_length - 1) days prior to our start date. This is + # because we need (correlation_length - 1) extra days of returns to + # compute our expected correlations. + results = run_pipeline( + Pipeline(columns={'returns': returns}), + dates[start_date_index - (correlation_length - 1)], + dates[end_date_index], + ) + returns_results = results['returns'].unstack() + + # On each day, calculate the expected correlation coefficients + # between the asset we are interested in and each other asset. Each + # correlation is calculated over `correlation_length` days. + expected_pearson_results = full_like(pearson_results, nan) + expected_spearman_results = full_like(spearman_results, nan) + for day in range(num_days): + todays_returns = returns_results.iloc[ + day:day + correlation_length + ] + my_asset_returns = todays_returns.iloc[:, my_asset_column] + for asset, other_asset_returns in todays_returns.iteritems(): + asset_column = int(asset) - 1 + expected_pearson_results[day, asset_column] = pearsonr( + my_asset_returns, other_asset_returns, + )[0] + expected_spearman_results[day, asset_column] = spearmanr( + my_asset_returns, other_asset_returns, + )[0] + + expected_pearson_results = DataFrame( + data=where(expected_mask, expected_pearson_results, nan), + index=dates[start_date_index:end_date_index + 1], + columns=assets, + ) + assert_frame_equal(pearson_results, expected_pearson_results) + + expected_spearman_results = DataFrame( + data=where(expected_mask, expected_spearman_results, nan), + index=dates[start_date_index:end_date_index + 1], + columns=assets, + ) + assert_frame_equal(spearman_results, expected_spearman_results) + + @parameter_space(returns_length=[2, 3], regression_length=[3, 4]) + def test_regression_of_returns_factor(self, + returns_length, + regression_length): + """ + Tests for the built-in factor `RollingLinearRegressionOfReturns`. + """ + assets = self.assets + my_asset = self.my_asset + my_asset_column = self.my_asset_column + dates = self.dates + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + start_date_index = self.start_date_index + end_date_index = self.end_date_index + num_days = self.num_days + run_pipeline = self.run_pipeline + + # The order of these is meant to align with the output of `linregress`. + outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr'] + + returns = Returns(window_length=returns_length) + masks = self.cascading_mask, self.alternating_mask, NotSpecified + expected_mask_results = ( + self.expected_cascading_mask_result, + self.expected_alternating_mask_result, + self.expected_no_mask_result, + ) + + for mask, expected_mask in zip(masks, expected_mask_results): + regression_factor = RollingLinearRegressionOfReturns( + target=my_asset, + returns_length=returns_length, + regression_length=regression_length, + mask=mask, + ) + + columns = { + output: getattr(regression_factor, output) + for output in outputs + } + pipeline = Pipeline(columns=columns) + if mask is not NotSpecified: + pipeline.add(mask, 'mask') + + results = run_pipeline(pipeline, start_date, end_date) + if mask is not NotSpecified: + mask_results = results['mask'].unstack() + check_arrays(mask_results.values, expected_mask) + + output_results = {} + expected_output_results = {} + for output in outputs: + output_results[output] = results[output].unstack() + expected_output_results[output] = full_like( + output_results[output], nan, + ) + + # Run a separate pipeline that calculates returns starting + # (regression_length - 1) days prior to our start date. This is + # because we need (regression_length - 1) extra days of returns to + # compute our expected regressions. + results = run_pipeline( + Pipeline(columns={'returns': returns}), + dates[start_date_index - (regression_length - 1)], + dates[end_date_index], + ) + returns_results = results['returns'].unstack() + + # On each day, calculate the expected regression results for Y ~ X + # where Y is the asset we are interested in and X is each other + # asset. Each regression is calculated over `regression_length` + # days of data. + for day in range(num_days): + todays_returns = returns_results.iloc[ + day:day + regression_length + ] + my_asset_returns = todays_returns.iloc[:, my_asset_column] + for asset, other_asset_returns in todays_returns.iteritems(): + asset_column = int(asset) - 1 + expected_regression_results = linregress( + y=other_asset_returns, x=my_asset_returns, + ) + for i, output in enumerate(outputs): + expected_output_results[output][day, asset_column] = \ + expected_regression_results[i] + + for output in outputs: + output_result = output_results[output] + expected_output_result = DataFrame( + where(expected_mask, expected_output_results[output], nan), + index=dates[start_date_index:end_date_index + 1], + columns=assets, + ) + assert_frame_equal(output_result, expected_output_result) + + def test_correlation_and_regression_with_bad_asset(self): + """ + Test that `RollingPearsonOfReturns`, `RollingSpearmanOfReturns` and + `RollingLinearRegressionOfReturns` raise the proper exception when + given a nonexistent target asset. + """ + my_asset = Equity(0) + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + run_pipeline = self.run_pipeline + + # This filter is arbitrary; the important thing is that we test each + # factor both with and without a specified mask. + my_asset_filter = AssetID().eq(1) + + for mask in (NotSpecified, my_asset_filter): + pearson_factor = RollingPearsonOfReturns( + target=my_asset, + returns_length=3, + correlation_length=3, + mask=mask, + ) + spearman_factor = RollingSpearmanOfReturns( + target=my_asset, + returns_length=3, + correlation_length=3, + mask=mask, + ) + regression_factor = RollingLinearRegressionOfReturns( + target=my_asset, + returns_length=3, + regression_length=3, + mask=mask, + ) + + with self.assertRaises(NonExistentAssetInTimeFrame): + run_pipeline( + Pipeline(columns={'pearson_factor': pearson_factor}), + start_date, + end_date, + ) + with self.assertRaises(NonExistentAssetInTimeFrame): + run_pipeline( + Pipeline(columns={'spearman_factor': spearman_factor}), + start_date, + end_date, + ) + with self.assertRaises(NonExistentAssetInTimeFrame): + run_pipeline( + Pipeline(columns={'regression_factor': regression_factor}), + start_date, + end_date, + ) + + +class StatisticalMethodsTestCase(WithSeededRandomPipelineEngine, + ZiplineTestCase): + sids = ASSET_FINDER_EQUITY_SIDS = Int64Index([1, 2, 3]) + START_DATE = Timestamp('2015-01-31', tz='UTC') + END_DATE = Timestamp('2015-03-01', tz='UTC') + + @classmethod + def init_class_fixtures(cls): + super(StatisticalMethodsTestCase, cls).init_class_fixtures() + + # Using these start and end dates because they are a contigous span of + # 5 days (Monday - Friday) and they allow for plenty of days to look + # back on when computing correlations and regressions. + cls.dates = dates = cls.trading_days + cls.start_date_index = start_date_index = 14 + cls.end_date_index = end_date_index = 18 + cls.pipeline_start_date = cls.trading_days[start_date_index] + cls.pipeline_end_date = cls.trading_days[end_date_index] + + sids = cls.sids + cls.assets = assets = cls.asset_finder.retrieve_all(sids) + cls.my_asset_column = my_asset_column = 0 + cls.my_asset = assets[my_asset_column] + cls.num_days = num_days = end_date_index - start_date_index + 1 + cls.num_assets = num_assets = len(assets) + + cls.cascading_mask = \ + AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day) + cls.expected_cascading_mask_result = make_cascading_boolean_array( + shape=(num_days, num_assets), + ) + cls.alternating_mask = (AssetIDPlusDay() % 2).eq(0) + cls.expected_alternating_mask_result = make_alternating_boolean_array( + shape=(num_days, num_assets), + ) + cls.expected_no_mask_result = full( + shape=(num_days, num_assets), fill_value=True, dtype=bool_dtype, + ) + + # Random input for factors. + cls.col = TestingDataSet.float_col + + @parameter_space(returns_length=[2, 3], correlation_length=[3, 4]) + def test_factor_correlation_methods(self, + returns_length, + correlation_length): + """ + Ensure that `Factor.pearsonr` and `Factor.spearmanr` are consistent + with the built-in factors `RollingPearsonOfReturns` and + `RollingSpearmanOfReturns`. + """ + my_asset = self.my_asset + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + run_pipeline = self.run_pipeline + + returns = Returns(window_length=returns_length, inputs=[self.col]) + returns_slice = returns[my_asset] + + pearson = returns.pearsonr( + target=returns_slice, correlation_length=correlation_length, + ) + spearman = returns.spearmanr( + target=returns_slice, correlation_length=correlation_length, + ) + expected_pearson = RollingPearsonOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + ) + expected_spearman = RollingSpearmanOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + ) + + # These built-ins construct their own Returns factor to use as inputs, + # so the only way to set our own inputs is to do so after the fact. + # This should not be done in practice. It is necessary here because we + # want Returns to use our random data as an input, but by default it is + # using USEquityPricing.close. + expected_pearson.inputs = [returns, returns_slice] + expected_spearman.inputs = [returns, returns_slice] + + columns = { + 'pearson': pearson, + 'spearman': spearman, + 'expected_pearson': expected_pearson, + 'expected_spearman': expected_spearman, + } + + results = run_pipeline(Pipeline(columns=columns), start_date, end_date) + pearson_results = results['pearson'].unstack() + spearman_results = results['spearman'].unstack() + expected_pearson_results = results['expected_pearson'].unstack() + expected_spearman_results = results['expected_spearman'].unstack() + + assert_frame_equal(pearson_results, expected_pearson_results) + assert_frame_equal(spearman_results, expected_spearman_results) + + # Make sure we cannot call the correlation methods on factors or slices + # of dtype `datetime64[ns]`. + class DateFactor(CustomFactor): + inputs = [] + window_length = 1 + dtype = datetime64ns_dtype + window_safe = True + + def compute(self, today, assets, out): + pass + + date_factor = DateFactor() + date_factor_slice = date_factor[my_asset] + + with self.assertRaises(TypeError): + date_factor.pearsonr( + target=returns_slice, correlation_length=correlation_length, + ) + with self.assertRaises(TypeError): + date_factor.spearmanr( + target=returns_slice, correlation_length=correlation_length, + ) + with self.assertRaises(TypeError): + returns.pearsonr( + target=date_factor_slice, + correlation_length=correlation_length, + ) + with self.assertRaises(TypeError): + returns.spearmanr( + target=date_factor_slice, + correlation_length=correlation_length, + ) + + @parameter_space(returns_length=[2, 3], regression_length=[3, 4]) + def test_factor_regression_method(self, returns_length, regression_length): + """ + Ensure that `Factor.linear_regression` is consistent with the built-in + factor `RollingLinearRegressionOfReturns`. + """ + my_asset = self.my_asset + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + run_pipeline = self.run_pipeline + + returns = Returns(window_length=returns_length, inputs=[self.col]) + returns_slice = returns[my_asset] + + regression = returns.linear_regression( + target=returns_slice, regression_length=regression_length, + ) + expected_regression = RollingLinearRegressionOfReturns( + target=my_asset, + returns_length=returns_length, + regression_length=regression_length, + ) + + # This built-in constructs its own Returns factor to use as an input, + # so the only way to set our own input is to do so after the fact. This + # should not be done in practice. It is necessary here because we want + # Returns to use our random data as an input, but by default it is + # using USEquityPricing.close. + expected_regression.inputs = [returns, returns_slice] + + columns = { + 'regression': regression, + 'expected_regression': expected_regression, + } + + results = run_pipeline(Pipeline(columns=columns), start_date, end_date) + regression_results = results['regression'].unstack() + expected_regression_results = results['expected_regression'].unstack() + + assert_frame_equal(regression_results, expected_regression_results) + + # Make sure we cannot call the linear regression method on factors or + # slices of dtype `datetime64[ns]`. + class DateFactor(CustomFactor): + window_length = 1 + inputs = [] + dtype = datetime64ns_dtype + window_safe = True + + def compute(self, today, assets, out): + pass + + date_factor = DateFactor() + date_factor_slice = date_factor[my_asset] + + with self.assertRaises(TypeError): + date_factor.linear_regression( + target=returns_slice, regression_length=regression_length, + ) + with self.assertRaises(TypeError): + returns.linear_regression( + target=date_factor_slice, regression_length=regression_length, + ) + + @parameter_space(correlation_length=[1, 2, 3, 4]) + def test_factor_correlation_methods_two_factors(self, correlation_length): + """ + Tests for `Factor.pearsonr` and `Factor.spearmanr` when passed another + 2D factor instead of a Slice. + """ + assets = self.assets + dates = self.dates + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + start_date_index = self.start_date_index + end_date_index = self.end_date_index + num_days = self.num_days + run_pipeline = self.run_pipeline + + # Ensure that the correlation methods cannot be called with two 2D + # factors which have different masks. + returns_masked_1 = Returns( + window_length=5, inputs=[self.col], mask=AssetID().eq(1), + ) + returns_masked_2 = Returns( + window_length=5, inputs=[self.col], mask=AssetID().eq(2), + ) + with self.assertRaises(IncompatibleTerms): + returns_masked_1.pearsonr( + target=returns_masked_2, correlation_length=correlation_length, + ) + with self.assertRaises(IncompatibleTerms): + returns_masked_1.spearmanr( + target=returns_masked_2, correlation_length=correlation_length, + ) + + returns_5 = Returns(window_length=5, inputs=[self.col]) + returns_10 = Returns(window_length=10, inputs=[self.col]) + + pearson_factor = returns_5.pearsonr( + target=returns_10, correlation_length=correlation_length, + ) + spearman_factor = returns_5.spearmanr( + target=returns_10, correlation_length=correlation_length, + ) + + columns = { + 'pearson_factor': pearson_factor, + 'spearman_factor': spearman_factor, + } + pipeline = Pipeline(columns=columns) + + results = run_pipeline(pipeline, start_date, end_date) + pearson_results = results['pearson_factor'].unstack() + spearman_results = results['spearman_factor'].unstack() + + # Run a separate pipeline that calculates returns starting + # (correlation_length - 1) days prior to our start date. This is + # because we need (correlation_length - 1) extra days of returns to + # compute our expected correlations. + columns = {'returns_5': returns_5, 'returns_10': returns_10} + results = run_pipeline( + Pipeline(columns=columns), + dates[start_date_index - (correlation_length - 1)], + dates[end_date_index], + ) + returns_5_results = results['returns_5'].unstack() + returns_10_results = results['returns_10'].unstack() + + # On each day, calculate the expected correlation coefficients + # between each asset's 5 and 10 day rolling returns. Each correlation + # is calculated over `correlation_length` days. + expected_pearson_results = full_like(pearson_results, nan) + expected_spearman_results = full_like(spearman_results, nan) + for day in range(num_days): + todays_returns_5 = returns_5_results.iloc[ + day:day + correlation_length + ] + todays_returns_10 = returns_10_results.iloc[ + day:day + correlation_length + ] + for asset, asset_returns_5 in todays_returns_5.iteritems(): + asset_column = int(asset) - 1 + asset_returns_10 = todays_returns_10[asset] + expected_pearson_results[day, asset_column] = pearsonr( + asset_returns_5, asset_returns_10, + )[0] + expected_spearman_results[day, asset_column] = spearmanr( + asset_returns_5, asset_returns_10, + )[0] + + expected_pearson_results = DataFrame( + data=expected_pearson_results, + index=dates[start_date_index:end_date_index + 1], + columns=assets, + ) + assert_frame_equal(pearson_results, expected_pearson_results) + + expected_spearman_results = DataFrame( + data=expected_spearman_results, + index=dates[start_date_index:end_date_index + 1], + columns=assets, + ) + assert_frame_equal(spearman_results, expected_spearman_results) + + @parameter_space(regression_length=[1, 2, 3, 4]) + def test_factor_regression_method_two_factors(self, regression_length): + """ + Tests for `Factor.linear_regression` when passed another 2D factor + instead of a Slice. + """ + assets = self.assets + dates = self.dates + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + start_date_index = self.start_date_index + end_date_index = self.end_date_index + num_days = self.num_days + run_pipeline = self.run_pipeline + + # The order of these is meant to align with the output of `linregress`. + outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr'] + + # Ensure that the `linear_regression` method cannot be called with two + # 2D factors which have different masks. + returns_masked_1 = Returns( + window_length=5, inputs=[self.col], mask=AssetID().eq(1), + ) + returns_masked_2 = Returns( + window_length=5, inputs=[self.col], mask=AssetID().eq(2), + ) + with self.assertRaises(IncompatibleTerms): + returns_masked_1.linear_regression( + target=returns_masked_2, regression_length=regression_length, + ) + + returns_5 = Returns(window_length=5, inputs=[self.col]) + returns_10 = Returns(window_length=10, inputs=[self.col]) + + regression_factor = returns_5.linear_regression( + target=returns_10, regression_length=regression_length, + ) + + columns = { + output: getattr(regression_factor, output) + for output in outputs + } + pipeline = Pipeline(columns=columns) + + results = run_pipeline(pipeline, start_date, end_date) + + output_results = {} + expected_output_results = {} + for output in outputs: + output_results[output] = results[output].unstack() + expected_output_results[output] = full_like( + output_results[output], nan, + ) + + # Run a separate pipeline that calculates returns starting + # (regression_length - 1) days prior to our start date. This is because + # we need (regression_length - 1) extra days of returns to compute our + # expected regressions. + columns = {'returns_5': returns_5, 'returns_10': returns_10} + results = run_pipeline( + Pipeline(columns=columns), + dates[start_date_index - (regression_length - 1)], + dates[end_date_index], + ) + returns_5_results = results['returns_5'].unstack() + returns_10_results = results['returns_10'].unstack() + + # On each day, for each asset, calculate the expected regression + # results of Y ~ X where Y is the asset's rolling 5 day returns and X + # is the asset's rolling 10 day returns. Each regression is calculated + # over `regression_length` days of data. + for day in range(num_days): + todays_returns_5 = returns_5_results.iloc[ + day:day + regression_length + ] + todays_returns_10 = returns_10_results.iloc[ + day:day + regression_length + ] + for asset, asset_returns_5 in todays_returns_5.iteritems(): + asset_column = int(asset) - 1 + asset_returns_10 = todays_returns_10[asset] + expected_regression_results = linregress( + y=asset_returns_5, x=asset_returns_10, + ) + for i, output in enumerate(outputs): + expected_output_results[output][day, asset_column] = \ + expected_regression_results[i] + + for output in outputs: + output_result = output_results[output] + expected_output_result = DataFrame( + expected_output_results[output], + index=dates[start_date_index:end_date_index + 1], + columns=assets, + ) + assert_frame_equal(output_result, expected_output_result) diff --git a/zipline/errors.py b/zipline/errors.py index 06d4a69441..e6e1978108 100644 --- a/zipline/errors.py +++ b/zipline/errors.py @@ -685,3 +685,14 @@ class NonSliceableTerm(ZiplineError): of `zipline.pipeline.term.LoadableTerm`. """ msg = "Taking slices of {term} is not currently supported." + + +class IncompatibleTerms(ZiplineError): + """ + Raised when trying to compute correlations/regressions between two 2D + factors with different masks. + """ + msg = ( + "{term_1} and {term_2} must have the same mask in order to compute " + "correlations and regressions asset-wise." + ) diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 6c840ba85d..118f73e54d 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -1,11 +1,13 @@ """ factor.py """ +from abc import ABCMeta from functools import wraps from operator import attrgetter from numbers import Number from numpy import inf, where +from six import with_metaclass from zipline.errors import UnknownRankMethod from zipline.lib.normalize import naive_grouped_rowwise_apply @@ -316,10 +318,13 @@ def mathfunc(self): ) ) - FACTOR_DTYPES = frozenset([datetime64ns_dtype, float64_dtype, int64_dtype]) +class FactorProxy(with_metaclass(ABCMeta, object)): + pass + + class Factor(RestrictedDTypeMixin, ComputableTerm): """ Pipeline API expression producing a numerical or date-valued output. @@ -626,7 +631,9 @@ def rank(self, method='ordinal', ascending=True, mask=NotSpecified): return Rank(self, method=method, ascending=ascending, mask=mask) @expect_types( - target=Slice, correlation_length=int, mask=(Filter, NotSpecifiedType), + target=(FactorProxy, Slice), + correlation_length=int, + mask=(Filter, NotSpecifiedType), ) def pearsonr(self, target, correlation_length, mask=NotSpecified): """ @@ -682,14 +689,16 @@ def pearsonr(self, target, correlation_length, mask=NotSpecified): """ from .statistical import RollingPearson return RollingPearson( - target_factor=self, - target_slice=target, + base_factor=self, + target=target, correlation_length=correlation_length, mask=mask, ) @expect_types( - target=Slice, correlation_length=int, mask=(Filter, NotSpecifiedType), + target=(FactorProxy, Slice), + correlation_length=int, + mask=(Filter, NotSpecifiedType), ) def spearmanr(self, target, correlation_length, mask=NotSpecified): """ @@ -745,14 +754,16 @@ def spearmanr(self, target, correlation_length, mask=NotSpecified): """ from .statistical import RollingSpearman return RollingSpearman( - target_factor=self, - target_slice=target, + base_factor=self, + target=target, correlation_length=correlation_length, mask=mask, ) @expect_types( - target=Slice, regression_length=int, mask=(Filter, NotSpecifiedType), + target=(FactorProxy, Slice), + regression_length=int, + mask=(Filter, NotSpecifiedType), ) def linear_regression(self, target, regression_length, mask=NotSpecified): """ @@ -806,8 +817,8 @@ def linear_regression(self, target, regression_length, mask=NotSpecified): """ from .statistical import RollingLinearRegression return RollingLinearRegression( - target_factor=self, - target_slice=target, + dependent=self, + independent=target, regression_length=regression_length, mask=mask, ) @@ -1044,6 +1055,8 @@ def isfinite(self): """ return (-inf < self) & (self < inf) +FactorProxy.register(Factor) + class NumExprFactor(NumericalExpression, Factor): """ diff --git a/zipline/pipeline/factors/statistical.py b/zipline/pipeline/factors/statistical.py index 9e1cf5ca57..b3cc514418 100644 --- a/zipline/pipeline/factors/statistical.py +++ b/zipline/pipeline/factors/statistical.py @@ -5,6 +5,7 @@ spearmanr, ) +from zipline.errors import IncompatibleTerms from zipline.pipeline.factors import CustomFactor from zipline.pipeline.filters import SingleAsset from zipline.pipeline.mixins import SingleInputMixin @@ -21,15 +22,17 @@ class _RollingCorrelation(CustomFactor, SingleInputMixin): - @expect_dtypes(target_factor=ALLOWED_DTYPES, target_slice=ALLOWED_DTYPES) + @expect_dtypes(base_factor=ALLOWED_DTYPES, target=ALLOWED_DTYPES) def __new__(cls, - target_factor, - target_slice, + base_factor, + target, correlation_length, mask=NotSpecified): + if target.ndim == 2 and base_factor.mask is not target.mask: + raise IncompatibleTerms(term_1=base_factor, term_2=target) return super(_RollingCorrelation, cls).__new__( cls, - inputs=[target_factor, target_slice], + inputs=[base_factor, target], window_length=correlation_length, mask=mask, ) @@ -37,23 +40,25 @@ def __new__(cls, class RollingPearson(_RollingCorrelation): """ - A Factor that computes pearson correlation coefficients between a single - column of data and the columns of another Factor. + A Factor that computes pearson correlation coefficients between the columns + of a given Factor and either the columns of another Factor or a single + column (slice) of data. Parameters ---------- - target_factor : zipline.pipeline.factors.Factor + base_factor : zipline.pipeline.factors.Factor The factor for which to compute correlations of each of its columns - with `target_slice`. - target_slice : zipline.pipeline.slice.Slice - The column of data with which to compute correlations against each - column of data produced by `target_factor`. + with `target`. + target : zipline.pipeline.slice.Slice or zipline.pipeline.Factor + The data with which to compute correlations against each column of data + produced by `base_factor`. If `target` is a Factor, correlations are + computed asset-wise. correlation_length : int - Length of the lookback window over which to compute each - correlation coefficient. + Length of the lookback window over which to compute each correlation + coefficient. mask : zipline.pipeline.Filter, optional - A Filter describing which assets (columns) of `target_factor` should - have their correlation with `target_slice` computed each day. + A Filter describing which assets (columns) of `base_factor` should have + their correlation with `target` computed each day. See Also -------- @@ -66,32 +71,38 @@ class RollingPearson(_RollingCorrelation): Most users should call Factor.pearsonr rather than directly construct an instance of this class. """ - def compute(self, today, assets, out, factor_data, slice_data): - slice_data_column = slice_data[:, 0] - for i in range(len(out)): - # pearsonr returns the R-value and the P-value. - out[i] = pearsonr(factor_data[:, i], slice_data_column)[0] + def compute(self, today, assets, out, base_data, target_data): + if target_data.shape[1] > 1: + # Both inputs are 2D, so compute sid-by-sid. + for i in range(len(out)): + out[i] = pearsonr(base_data[:, i], target_data[:, i])[0] + else: + # Second input is a slice, so always compute with its only column. + for i in range(len(out)): + out[i] = pearsonr(base_data[:, i], target_data[:, 0])[0] class RollingSpearman(_RollingCorrelation): """ - A Factor that computes spearman rank correlation coefficients between a - single column of data and the columns of another Factor. + A Factor that computes spearman rank correlation coefficients between the + columns of a given Factor and either the columns of another Factor or a + single column (slice) of data. Parameters ---------- - target_factor : zipline.pipeline.factors.Factor + base_factor : zipline.pipeline.factors.Factor The factor for which to compute correlations of each of its columns - with `target_slice`. - target_slice : zipline.pipeline.slice.Slice - The column of data with which to compute correlations against each - column of data produced by `target_factor`. + with `target`. + target : zipline.pipeline.slice.Slice or zipline.pipeline.Factor + The data with which to compute correlations against each column of data + produced by `base_factor`. If `target` is a Factor, correlations are + computed asset-wise. correlation_length : int - Length of the lookback window over which to compute each - correlation coefficient. + Length of the lookback window over which to compute each correlation + coefficient. mask : zipline.pipeline.Filter, optional - A Filter describing which assets (columns) of `target_factor` should - have their correlation with `target_slice` computed each day. + A Filter describing which assets (columns) of `base_factor` should have + their correlation with `target` computed each day. See Also -------- @@ -104,31 +115,37 @@ class RollingSpearman(_RollingCorrelation): Most users should call Factor.spearmanr rather than directly construct an instance of this class. """ - def compute(self, today, assets, out, factor_data, slice_data): - slice_data_column = slice_data[:, 0] - for i in range(len(out)): - # spearmanr returns the R-value and the P-value. - out[i] = spearmanr(factor_data[:, i], slice_data_column)[0] + def compute(self, today, assets, out, base_data, target_data): + if target_data.shape[1] > 1: + # Both inputs are 2D, so compute sid-by-sid. + for i in range(len(out)): + out[i] = spearmanr(base_data[:, i], target_data[:, i])[0] + else: + # Second input is a slice, so always compute with its only column. + for i in range(len(out)): + out[i] = spearmanr(base_data[:, i], target_data[:, 0])[0] class RollingLinearRegression(CustomFactor, SingleInputMixin): """ A Factor that performs an ordinary least-squares regression predicting the - columns of another Factor from a single column of data. + columns of a given Factor from either the columns of another Factor or a + single column (slice) of data. Parameters ---------- - target_factor : zipline.pipeline.factors.Factor + dependent : zipline.pipeline.factors.Factor The factor whose columns are the predicted/dependent variable of each - regression with `target_slice`. - target_slice : zipline.pipeline.slice.Slice - The column of data to use as the predictor/independent variable in - each regression with the columns of `target_factor`. - correlation_length : int + regression with `independent`. + independent : zipline.pipeline.slice.Slice or zipline.pipeline.Factor + The factor/slice whose columns are the predictor/independent variable + of each regression with `dependent`. If `independent` is a Factor, + regressions are computed asset-wise. + regression_length : int Length of the lookback window over which to compute each regression. mask : zipline.pipeline.Filter, optional - A Filter describing which assets (columns) of `target_factor` should be - regressed against `target_slice` each day. + A Filter describing which assets (columns) of `dependent` should be + regressed against `independent` each day. See Also -------- @@ -143,29 +160,30 @@ class RollingLinearRegression(CustomFactor, SingleInputMixin): """ outputs = ['alpha', 'beta', 'r_value', 'p_value', 'stderr'] - @expect_dtypes(target_factor=ALLOWED_DTYPES, target_slice=ALLOWED_DTYPES) + @expect_dtypes(dependent=ALLOWED_DTYPES, independent=ALLOWED_DTYPES) def __new__(cls, - target_factor, - target_slice, + dependent, + independent, regression_length, mask=NotSpecified): + if independent.ndim == 2 and dependent.mask is not independent.mask: + raise IncompatibleTerms(term_1=dependent, term_2=independent) return super(RollingLinearRegression, cls).__new__( cls, - inputs=[target_factor, target_slice], + inputs=[dependent, independent], window_length=regression_length, mask=mask, ) - def compute(self, today, assets, out, factor_data, slice_data): - slice_data_column = slice_data[:, 0] - + def compute(self, today, assets, out, dependent, independent): alpha = out.alpha beta = out.beta r_value = out.r_value p_value = out.p_value stderr = out.stderr - for i in range(len(out)): - regr_results = linregress(y=factor_data[:, i], x=slice_data_column) + + def regress(y, x): + regr_results = linregress(y=y, x=x) # `linregress` returns its results in the following order: # slope, intercept, r-value, p-value, stderr alpha[i] = regr_results[1] @@ -174,6 +192,16 @@ def compute(self, today, assets, out, factor_data, slice_data): p_value[i] = regr_results[3] stderr[i] = regr_results[4] + if independent.shape[1] > 1: + # Both inputs are 2D, so compute sid-by-sid. + for i in range(len(out)): + regress(y=dependent[:, i], x=independent[:, i]) + else: + # Second input is a slice, so always compute with its only column. + slice_data = independent[:, 0] + for i in range(len(out)): + regress(y=dependent[:, i], x=slice_data) + class RollingPearsonOfReturns(RollingPearson): """ @@ -261,8 +289,8 @@ def __new__(cls, ) return super(RollingPearsonOfReturns, cls).__new__( cls, - target_factor=returns, - target_slice=returns[target], + base_factor=returns, + target=returns[target], correlation_length=correlation_length, mask=mask, ) @@ -311,8 +339,8 @@ def __new__(cls, ) return super(RollingSpearmanOfReturns, cls).__new__( cls, - target_factor=returns, - target_slice=returns[target], + base_factor=returns, + target=returns[target], correlation_length=correlation_length, mask=mask, ) @@ -428,8 +456,8 @@ def __new__(cls, ) return super(RollingLinearRegressionOfReturns, cls).__new__( cls, - target_factor=returns, - target_slice=returns[target], + dependent=returns, + independent=returns[target], regression_length=regression_length, mask=mask, )