From 393f82e81eeea89bb982e408f65c050a451c8c9e Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Thu, 2 Jun 2016 12:43:32 -0400 Subject: [PATCH] ENH: Add single-column input/output capabilities to pipeline terms --- docs/source/whatsnew/1.0.1.txt | 4 + tests/pipeline/test_engine.py | 262 +++++------ tests/pipeline/test_slice.py | 516 +++++++++++++++++++++ tests/pipeline/test_term.py | 33 +- tests/test_testing.py | 73 ++- zipline/errors.py | 19 + zipline/pipeline/classifiers/classifier.py | 3 +- zipline/pipeline/data/dataset.py | 2 +- zipline/pipeline/engine.py | 5 +- zipline/pipeline/factors/__init__.py | 8 +- zipline/pipeline/factors/factor.py | 213 ++++++++- zipline/pipeline/factors/statistical.py | 435 +++++++++++++++++ zipline/pipeline/factors/technical.py | 303 ------------ zipline/pipeline/filters/filter.py | 14 +- zipline/pipeline/loaders/blaze/core.py | 2 +- zipline/pipeline/mixins.py | 49 +- zipline/pipeline/pipeline.py | 14 +- zipline/pipeline/sentinels.py | 10 + zipline/pipeline/term.py | 98 +++- zipline/testing/__init__.py | 5 + zipline/testing/core.py | 102 ++++ 21 files changed, 1681 insertions(+), 489 deletions(-) create mode 100644 tests/pipeline/test_slice.py create mode 100644 zipline/pipeline/factors/statistical.py create mode 100644 zipline/pipeline/sentinels.py diff --git a/docs/source/whatsnew/1.0.1.txt b/docs/source/whatsnew/1.0.1.txt index 371416627c..682e170c16 100644 --- a/docs/source/whatsnew/1.0.1.txt +++ b/docs/source/whatsnew/1.0.1.txt @@ -17,6 +17,10 @@ Enhancements - Added support for non-float columns to Blaze-backed Pipeline datasets (:issue:`1201`). +- Added :class:`zipline.pipeline.slice.Slice`, a new pipeline term designed to + extract a single column from another term. Slices can be created by indexing + into a term, keyed by asset. (:issue:`1267`) + Bug Fixes ~~~~~~~~~ diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index a26d20c402..d3a8501ea6 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -72,9 +72,14 @@ make_bar_data, expected_bar_values_2d, ) -from zipline.pipeline.term import NotSpecified +from zipline.pipeline.sentinels import NotSpecified from zipline.testing import ( + AssetID, + AssetIDPlusDay, check_arrays, + make_alternating_boolean_array, + make_cascading_boolean_array, + OpenPrice, parameter_space, product_upper_triangle, ) @@ -95,38 +100,6 @@ def compute(self, today, assets, out, open, close): out[:] = (open - close).sum(axis=0) -class AssetID(CustomFactor): - """ - CustomFactor that returns the AssetID of each asset. - - Useful for providing a Factor that produces a different value for each - asset. - """ - window_length = 1 - # HACK: We currently decide whether to load or compute a Term based on the - # length of its inputs. This means we have to provide a dummy input. - inputs = [USEquityPricing.close] - - def compute(self, today, assets, out, close): - out[:] = assets - - -class AssetIDPlusDay(CustomFactor): - window_length = 1 - inputs = [USEquityPricing.close] - - def compute(self, today, assets, out, close): - out[:] = assets + today.day - - -class OpenPrice(CustomFactor): - window_length = 1 - inputs = [USEquityPricing.open] - - def compute(self, today, assets, out, open): - out[:] = open - - class MultipleOutputs(CustomFactor): window_length = 1 inputs = [USEquityPricing.open, USEquityPricing.close] @@ -421,6 +394,8 @@ def test_masked_factor(self): assets = self.assets asset_ids = self.asset_ids constants = self.constants + num_dates = len(dates) + num_assets = len(assets) open = USEquityPricing.open close = USEquityPricing.close engine = SimplePipelineEngine( @@ -435,19 +410,13 @@ def create_expected_results(expected_value, mask): return DataFrame(expected_values, index=dates, columns=assets) cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day) - expected_cascading_mask_result = array( - [[True, True, True, False], - [True, True, False, False], - [True, False, False, False]], - dtype=bool, + expected_cascading_mask_result = make_cascading_boolean_array( + shape=(num_dates, num_assets), ) alternating_mask = (AssetIDPlusDay() % 2).eq(0) - expected_alternating_mask_result = array( - [[False, True, False, True], - [True, False, True, False], - [False, True, False, True]], - dtype=bool, + expected_alternating_mask_result = make_alternating_boolean_array( + shape=(num_dates, num_assets), first_value=False, ) masks = cascading_mask, alternating_mask @@ -592,6 +561,8 @@ def test_factor_with_multiple_outputs(self): assets = self.assets asset_ids = self.asset_ids constants = self.constants + num_dates = len(dates) + num_assets = len(assets) open = USEquityPricing.open close = USEquityPricing.close engine = SimplePipelineEngine( @@ -603,32 +574,17 @@ def create_expected_results(expected_value, mask): return DataFrame(expected_values, index=dates, columns=assets) cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day) - expected_cascading_mask_result = array( - [[True, True, True, False], - [True, True, False, False], - [True, False, False, False], - [False, False, False, False], - [False, False, False, False]], - dtype=bool, + expected_cascading_mask_result = make_cascading_boolean_array( + shape=(num_dates, num_assets), ) alternating_mask = (AssetIDPlusDay() % 2).eq(0) - expected_alternating_mask_result = array( - [[False, True, False, True], - [True, False, True, False], - [False, True, False, True], - [True, False, True, False], - [False, True, False, True]], - dtype=bool, + expected_alternating_mask_result = make_alternating_boolean_array( + shape=(num_dates, num_assets), first_value=False, ) - expected_no_mask_result = array( - [[True, True, True, True], - [True, True, True, True], - [True, True, True, True], - [True, True, True, True], - [True, True, True, True]], - dtype=bool, + expected_no_mask_result = full( + shape=(num_dates, num_assets), fill_value=True, dtype=bool, ) masks = cascading_mask, alternating_mask, NotSpecified @@ -1258,19 +1214,39 @@ def test_correlation_factors(self, returns_length, correlation_length): `RollingSpearmanOfReturns`. """ my_asset_column = 0 - start_date_index = 6 - end_date_index = 10 + start_date_index = 14 + end_date_index = 18 - assets = self.asset_finder.retrieve_all(self.sids) + sids = self.sids + dates = self.dates + assets = self.asset_finder.retrieve_all(sids) my_asset = assets[my_asset_column] - my_asset_filter = (AssetID() != (my_asset_column + 1)) num_days = end_date_index - start_date_index + 1 + num_assets = len(assets) - # Our correlation factors require that their target asset is not - # filtered out, so make sure that masking out our target asset does not - # take effect. That is, a filter which filters out only our target - # asset should produce the same result as if no mask was passed at all. - for mask in (NotSpecified, my_asset_filter): + cascading_mask = \ + AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day) + expected_cascading_mask_result = make_cascading_boolean_array( + shape=(num_days, num_assets), + ) + + alternating_mask = (AssetIDPlusDay() % 2).eq(0) + expected_alternating_mask_result = make_alternating_boolean_array( + shape=(num_days, num_assets), + ) + + expected_no_mask_result = full( + shape=(num_days, num_assets), fill_value=True, dtype=bool, + ) + + masks = cascading_mask, alternating_mask, NotSpecified + expected_mask_results = ( + expected_cascading_mask_result, + expected_alternating_mask_result, + expected_no_mask_result, + ) + + for mask, expected_mask in zip(masks, expected_mask_results): pearson_factor = RollingPearsonOfReturns( target=my_asset, returns_length=returns_length, @@ -1284,18 +1260,23 @@ def test_correlation_factors(self, returns_length, correlation_length): mask=mask, ) + pipeline = Pipeline( + columns={ + 'pearson_factor': pearson_factor, + 'spearman_factor': spearman_factor, + }, + ) + if mask is not NotSpecified: + pipeline.add(mask, 'mask') + results = self.engine.run_pipeline( - Pipeline( - columns={ - 'pearson_factor': pearson_factor, - 'spearman_factor': spearman_factor, - }, - ), - self.dates[start_date_index], - self.dates[end_date_index], + pipeline, dates[start_date_index], dates[end_date_index], ) pearson_results = results['pearson_factor'].unstack() spearman_results = results['spearman_factor'].unstack() + if mask is not NotSpecified: + mask_results = results['mask'].unstack() + check_arrays(mask_results.values, expected_mask) # Run a separate pipeline that calculates returns starting # (correlation_length - 1) days prior to our start date. This is @@ -1304,8 +1285,8 @@ def test_correlation_factors(self, returns_length, correlation_length): returns = Returns(window_length=returns_length) results = self.engine.run_pipeline( Pipeline(columns={'returns': returns}), - self.dates[start_date_index - (correlation_length - 1)], - self.dates[end_date_index], + dates[start_date_index - (correlation_length - 1)], + dates[end_date_index], ) returns_results = results['returns'].unstack() @@ -1328,22 +1309,19 @@ def test_correlation_factors(self, returns_length, correlation_length): my_asset_returns, other_asset_returns, )[0] - assert_frame_equal( - pearson_results, - DataFrame( - expected_pearson_results, - index=self.dates[start_date_index:end_date_index + 1], - columns=assets, - ), + expected_pearson_results = DataFrame( + data=where(expected_mask, expected_pearson_results, nan), + index=dates[start_date_index:end_date_index + 1], + columns=assets, ) - assert_frame_equal( - spearman_results, - DataFrame( - expected_spearman_results, - index=self.dates[start_date_index:end_date_index + 1], - columns=assets, - ), + assert_frame_equal(pearson_results, expected_pearson_results) + + expected_spearman_results = DataFrame( + data=where(expected_mask, expected_spearman_results, nan), + index=dates[start_date_index:end_date_index + 1], + columns=assets, ) + assert_frame_equal(spearman_results, expected_spearman_results) @parameter_space(returns_length=[2, 3], regression_length=[3, 4]) def test_regression_of_returns_factor(self, @@ -1353,38 +1331,65 @@ def test_regression_of_returns_factor(self, Tests for the built-in factor `RollingLinearRegressionOfReturns`. """ my_asset_column = 0 - start_date_index = 6 - end_date_index = 10 + start_date_index = 14 + end_date_index = 18 - assets = self.asset_finder.retrieve_all(self.sids) + sids = self.sids + dates = self.dates + assets = self.asset_finder.retrieve_all(sids) my_asset = assets[my_asset_column] - my_asset_filter = (AssetID() != (my_asset_column + 1)) num_days = end_date_index - start_date_index + 1 + num_assets = len(assets) + + cascading_mask = \ + AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day) + expected_cascading_mask_result = make_cascading_boolean_array( + shape=(num_days, num_assets), + ) + + alternating_mask = (AssetIDPlusDay() % 2).eq(0) + expected_alternating_mask_result = make_alternating_boolean_array( + shape=(num_days, num_assets), + ) + + expected_no_mask_result = full( + shape=(num_days, num_assets), fill_value=True, dtype=bool, + ) + + masks = cascading_mask, alternating_mask, NotSpecified + expected_mask_results = ( + expected_cascading_mask_result, + expected_alternating_mask_result, + expected_no_mask_result, + ) # The order of these is meant to align with the output of `linregress`. outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr'] - # Our regression factor requires that its target asset is not filtered - # out, so make sure that masking out our target asset does not take - # effect. That is, a filter which filters out only our target asset - # should produce the same result as if no mask was passed at all. - for mask in (NotSpecified, my_asset_filter): + for mask, expected_mask in zip(masks, expected_mask_results): regression_factor = RollingLinearRegressionOfReturns( target=my_asset, returns_length=returns_length, regression_length=regression_length, mask=mask, ) + + pipeline = Pipeline( + columns={ + output: getattr(regression_factor, output) + for output in outputs + }, + ) + if mask is not NotSpecified: + pipeline.add(mask, 'mask') + results = self.engine.run_pipeline( - Pipeline( - columns={ - output: getattr(regression_factor, output) - for output in outputs - }, - ), - self.dates[start_date_index], - self.dates[end_date_index], + pipeline, dates[start_date_index], dates[end_date_index], ) + if mask is not NotSpecified: + mask_results = results['mask'].unstack() + check_arrays(mask_results.values, expected_mask) + output_results = {} expected_output_results = {} for output in outputs: @@ -1393,15 +1398,15 @@ def test_regression_of_returns_factor(self, output_results[output], nan, ) - # Run a separate pipeline that calculates returns starting 2 days - # prior to our start date. This is because we need - # (regression_length - 1) extra days of returns to compute our - # expected regressions. + # Run a separate pipeline that calculates returns starting + # (regression_length - 1) days prior to our start date. This is + # because we need (regression_length - 1) extra days of returns to + # compute our expected regressions. returns = Returns(window_length=returns_length) results = self.engine.run_pipeline( Pipeline(columns={'returns': returns}), - self.dates[start_date_index - (regression_length - 1)], - self.dates[end_date_index], + dates[start_date_index - (regression_length - 1)], + dates[end_date_index], ) returns_results = results['returns'].unstack() @@ -1424,14 +1429,13 @@ def test_regression_of_returns_factor(self, expected_regression_results[i] for output in outputs: - assert_frame_equal( - output_results[output], - DataFrame( - expected_output_results[output], - index=self.dates[start_date_index:end_date_index + 1], - columns=assets, - ), + output_result = output_results[output] + expected_output_result = DataFrame( + where(expected_mask, expected_output_results[output], nan), + index=dates[start_date_index:end_date_index + 1], + columns=assets, ) + assert_frame_equal(output_result, expected_output_result) def test_correlation_and_regression_with_bad_asset(self): """ @@ -1439,8 +1443,8 @@ def test_correlation_and_regression_with_bad_asset(self): `RollingLinearRegressionOfReturns` raise the proper exception when given a nonexistent target asset. """ - start_date_index = 6 - end_date_index = 10 + start_date_index = 14 + end_date_index = 18 my_asset = Equity(0) # This filter is arbitrary; the important thing is that we test each diff --git a/tests/pipeline/test_slice.py b/tests/pipeline/test_slice.py new file mode 100644 index 0000000000..b778ff13b2 --- /dev/null +++ b/tests/pipeline/test_slice.py @@ -0,0 +1,516 @@ +""" +Tests for slicing pipeline terms. +""" +from numpy import where +from pandas import Int64Index, Timestamp +from pandas.util.testing import assert_frame_equal + +from zipline.assets import Asset +from zipline.errors import ( + NonExistentAssetInTimeFrame, + NonSliceableTerm, + NonWindowSafeInput, + UnsupportedPipelineOutput, +) +from zipline.pipeline import CustomFactor, Pipeline +from zipline.pipeline.data import USEquityPricing +from zipline.pipeline.data.testing import TestingDataSet +from zipline.pipeline.factors import ( + Returns, + RollingLinearRegressionOfReturns, + RollingPearsonOfReturns, + RollingSpearmanOfReturns, + SimpleMovingAverage, +) +from zipline.testing import ( + AssetID, + AssetIDPlusDay, + check_arrays, + OpenPrice, + parameter_space, +) +from zipline.testing.fixtures import ( + WithSeededRandomPipelineEngine, + ZiplineTestCase, +) +from zipline.utils.numpy_utils import datetime64ns_dtype + + +class SliceTestCase(WithSeededRandomPipelineEngine, ZiplineTestCase): + sids = ASSET_FINDER_EQUITY_SIDS = Int64Index([1, 2, 3]) + START_DATE = Timestamp('2015-01-31', tz='UTC') + END_DATE = Timestamp('2015-03-01', tz='UTC') + + @classmethod + def init_class_fixtures(cls): + super(SliceTestCase, cls).init_class_fixtures() + + # Using the date at index 14 as the start date because when running + # pipelines, especially those involving correlations or regressions, we + # want to make sure there are enough days to look back on. The end date + # at index 18 is chosen for convenience, as it makes for a contiguous + # five day span. + cls.pipeline_start_date = cls.trading_days[14] + cls.pipeline_end_date = cls.trading_days[18] + + # Random input for factors. + cls.col = TestingDataSet.float_col + + @parameter_space(my_asset_column=[0, 1, 2], window_length_=[1, 2, 3]) + def test_slice(self, my_asset_column, window_length_): + """ + Test that slices can be created by indexing into a term, and that they + have the correct shape when used as inputs. + """ + sids = self.sids + my_asset = self.asset_finder.retrieve_asset(self.sids[my_asset_column]) + + returns = Returns(window_length=2, inputs=[self.col]) + returns_slice = returns[my_asset] + + class UsesSlicedInput(CustomFactor): + window_length = window_length_ + inputs = [returns, returns_slice] + + def compute(self, today, assets, out, returns, returns_slice): + # Make sure that our slice is the correct shape (i.e. has only + # one column) and that it has the same values as the original + # returns factor from which it is derived. + assert returns_slice.shape == (self.window_length, 1) + assert returns.shape == (self.window_length, len(sids)) + check_arrays(returns_slice[:, 0], returns[:, my_asset_column]) + + # Assertions about the expected slice data are made in the `compute` + # function of our custom factor above. + self.run_pipeline( + Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}), + self.pipeline_start_date, + self.pipeline_end_date, + ) + + @parameter_space(unmasked_column=[0, 1, 2], slice_column=[0, 1, 2]) + def test_slice_with_masking(self, unmasked_column, slice_column): + """ + Test that masking a factor that uses slices as inputs does not mask the + slice data. + """ + sids = self.sids + asset_finder = self.asset_finder + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + + # Create a filter that masks out all but a single asset. + unmasked_asset = asset_finder.retrieve_asset(sids[unmasked_column]) + unmasked_asset_only = (AssetID().eq(unmasked_asset.sid)) + + # Asset used to create our slice. In the cases where this is different + # than `unmasked_asset`, our slice should still have non-missing data + # when used as an input to our custom factor. That is, it should not be + # masked out. + slice_asset = asset_finder.retrieve_asset(sids[slice_column]) + + returns = Returns(window_length=2, inputs=[self.col]) + returns_slice = returns[slice_asset] + + returns_results = self.run_pipeline( + Pipeline(columns={'returns': returns}), start_date, end_date, + ) + returns_results = returns_results['returns'].unstack() + + class UsesSlicedInput(CustomFactor): + window_length = 1 + inputs = [returns, returns_slice] + + def compute(self, today, assets, out, returns, returns_slice): + # Ensure that our mask correctly affects the `returns` input + # and does not affect the `returns_slice` input. + assert returns.shape == (1, 1) + assert returns_slice.shape == (1, 1) + assert returns[0, 0] == \ + returns_results.loc[today, unmasked_asset] + assert returns_slice[0, 0] == \ + returns_results.loc[today, slice_asset] + + columns = {'masked': UsesSlicedInput(mask=unmasked_asset_only)} + + # Assertions about the expected data are made in the `compute` function + # of our custom factor above. + self.run_pipeline(Pipeline(columns=columns), start_date, end_date) + + def test_adding_slice_column(self): + """ + Test that slices cannot be added as a pipeline column. + """ + my_asset = self.asset_finder.retrieve_asset(self.sids[0]) + open_slice = OpenPrice()[my_asset] + + with self.assertRaises(UnsupportedPipelineOutput): + Pipeline(columns={'open_slice': open_slice}) + + pipe = Pipeline(columns={}) + with self.assertRaises(UnsupportedPipelineOutput): + pipe.add(open_slice, 'open_slice') + + def test_loadable_term_slices(self): + """ + Test that slicing loadable terms raises the proper error. + """ + my_asset = self.asset_finder.retrieve_asset(self.sids[0]) + + with self.assertRaises(NonSliceableTerm): + USEquityPricing.close[my_asset] + + def test_non_existent_asset(self): + """ + Test that indexing into a term with a non-existent asset raises the + proper exception. + """ + my_asset = Asset(0) + returns = Returns(window_length=2, inputs=[self.col]) + returns_slice = returns[my_asset] + + class UsesSlicedInput(CustomFactor): + window_length = 1 + inputs = [returns_slice] + + def compute(self, today, assets, out, returns_slice): + pass + + with self.assertRaises(NonExistentAssetInTimeFrame): + self.run_pipeline( + Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}), + self.pipeline_start_date, + self.pipeline_end_date, + ) + + def test_window_safety_of_slices(self): + """ + Test that slices correctly inherit the `window_safe` property of the + term from which they are derived. + """ + col = self.col + my_asset = self.asset_finder.retrieve_asset(self.sids[0]) + + # SimpleMovingAverage is not window safe. + sma = SimpleMovingAverage(inputs=[self.col], window_length=10) + sma_slice = sma[my_asset] + + class UsesSlicedInput(CustomFactor): + window_length = 1 + inputs = [sma_slice] + + def compute(self, today, assets, out, sma_slice): + pass + + with self.assertRaises(NonWindowSafeInput): + self.run_pipeline( + Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}), + self.pipeline_start_date, + self.pipeline_end_date, + ) + + # Make sure that slices of custom factors are not window safe. + class MyUnsafeFactor(CustomFactor): + window_length = 1 + inputs = [col] + + def compute(self, today, assets, out, col): + pass + + my_unsafe_factor = MyUnsafeFactor() + my_unsafe_factor_slice = my_unsafe_factor[my_asset] + + class UsesSlicedInput(CustomFactor): + window_length = 1 + inputs = [my_unsafe_factor_slice] + + def compute(self, today, assets, out, my_unsafe_factor_slice): + pass + + with self.assertRaises(NonWindowSafeInput): + self.run_pipeline( + Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}), + self.pipeline_start_date, + self.pipeline_end_date, + ) + + # Create a window safe factor. + class MySafeFactor(CustomFactor): + window_length = 1 + inputs = [col] + window_safe = True + + def compute(self, today, assets, out, col): + pass + + my_safe_factor = MySafeFactor() + my_safe_factor_slice = my_safe_factor[my_asset] + + # Make sure that correlations are not safe if either the factor *or* + # the target slice are not window safe. + with self.assertRaises(NonWindowSafeInput): + my_unsafe_factor.pearsonr( + target=my_safe_factor_slice, correlation_length=10, + ) + + with self.assertRaises(NonWindowSafeInput): + my_safe_factor.pearsonr( + target=my_unsafe_factor_slice, correlation_length=10, + ) + + def test_single_column_output(self): + """ + Tests for custom factors that compute a 1D out. + """ + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + + alternating_mask = (AssetIDPlusDay() % 2).eq(0) + cascading_mask = AssetIDPlusDay() < (self.sids[-1] + start_date.day) + + class SingleColumnOutput(CustomFactor): + window_length = 1 + inputs = [self.col] + window_safe = True + ndim = 1 + + def compute(self, today, assets, out, col): + # Because we specified ndim as 1, `out` should be a singleton + # array but `close` should be a regular sized input. + assert out.shape == (1,) + assert col.shape == (1, 3) + out[:] = col.sum() + + # Since we cannot add single column output factors as pipeline + # columns, we have to test its output through another factor. + class UsesSingleColumnOutput(CustomFactor): + window_length = 1 + inputs = [SingleColumnOutput()] + + def compute(self, today, assets, out, single_column_output): + # Make sure that `single_column` has the correct shape. That + # is, it should always have one column regardless of any mask + # passed to `UsesSingleColumnInput`. + assert single_column_output.shape == (1, 1) + + for mask in (alternating_mask, cascading_mask): + columns = { + 'uses_single_column_output': UsesSingleColumnOutput(), + 'uses_single_column_output_masked': UsesSingleColumnOutput( + mask=mask, + ), + } + + # Assertions about the expected shapes of our data are made in the + # `compute` function of our custom factors above. + self.run_pipeline(Pipeline(columns=columns), start_date, end_date) + + def test_masked_single_column_output(self): + """ + Tests for masking custom factors that compute a 1D out. + """ + start_date = self.pipeline_start_date + end_date = self.pipeline_end_date + + alternating_mask = (AssetIDPlusDay() % 2).eq(0) + cascading_mask = AssetIDPlusDay() < (self.sids[-1] + start_date.day) + alternating_mask.window_safe = True + cascading_mask.window_safe = True + + for mask in (alternating_mask, cascading_mask): + class SingleColumnOutput(CustomFactor): + window_length = 1 + inputs = [self.col, mask] + window_safe = True + ndim = 1 + + def compute(self, today, assets, out, col, mask): + # Because we specified ndim as 1, `out` should always be a + # singleton array but `close` should be a sized based on + # the mask we passed. + assert out.shape == (1,) + assert col.shape == (1, mask.sum()) + out[:] = col.sum() + + # Since we cannot add single column output factors as pipeline + # columns, we have to test its output through another factor. + class UsesSingleColumnInput(CustomFactor): + window_length = 1 + inputs = [self.col, mask, SingleColumnOutput(mask=mask)] + + def compute(self, + today, + assets, + out, + col, + mask, + single_column_output): + # Make sure that `single_column` has the correct value + # based on the masked it used. + assert single_column_output.shape == (1, 1) + single_column_output_value = single_column_output[0][0] + expected_value = where(mask, col, 0).sum() + assert single_column_output_value == expected_value + + columns = {'uses_single_column_input': UsesSingleColumnInput()} + + # Assertions about the expected shapes of our data are made in the + # `compute` function of our custom factors above. + self.run_pipeline(Pipeline(columns=columns), start_date, end_date) + + @parameter_space(returns_length=[2, 3], correlation_length=[3, 4]) + def test_factor_correlation_methods(self, + returns_length, + correlation_length): + """ + Ensure that `Factor.pearsonr` and `Factor.spearmanr` are consistent + with the built-in factors `RollingPearsonOfReturns` and + `RollingSpearmanOfReturns`. + """ + my_asset = self.asset_finder.retrieve_asset(self.sids[0]) + + returns = Returns(window_length=returns_length, inputs=[self.col]) + returns_slice = returns[my_asset] + + pearson = returns.pearsonr( + target=returns_slice, correlation_length=correlation_length, + ) + spearman = returns.spearmanr( + target=returns_slice, correlation_length=correlation_length, + ) + expected_pearson = RollingPearsonOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + ) + expected_spearman = RollingSpearmanOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + ) + + # These built-ins construct their own Returns factor to use as inputs, + # so the only way to set our own inputs is to do so after the fact. + # This should not be done in practice. It is necessary here because we + # want Returns to use our random data as an input, but by default it is + # using USEquityPricing.close. + expected_pearson.inputs = [returns, returns_slice] + expected_spearman.inputs = [returns, returns_slice] + + columns = { + 'pearson': pearson, + 'spearman': spearman, + 'expected_pearson': expected_pearson, + 'expected_spearman': expected_spearman, + } + + results = self.run_pipeline( + Pipeline(columns=columns), + self.pipeline_start_date, + self.pipeline_end_date, + ) + pearson_results = results['pearson'].unstack() + spearman_results = results['spearman'].unstack() + expected_pearson_results = results['expected_pearson'].unstack() + expected_spearman_results = results['expected_spearman'].unstack() + + assert_frame_equal(pearson_results, expected_pearson_results) + assert_frame_equal(spearman_results, expected_spearman_results) + + # Make sure we cannot call the correlation methods on factors or slices + # of dtype `datetime64[ns]`. + class DateFactor(CustomFactor): + window_length = 1 + inputs = [] + dtype = datetime64ns_dtype + window_safe = True + + def compute(self, today, assets, out): + pass + + date_factor = DateFactor() + date_factor_slice = date_factor[my_asset] + + with self.assertRaises(TypeError): + date_factor.pearsonr( + target=returns_slice, correlation_length=correlation_length, + ) + with self.assertRaises(TypeError): + date_factor.spearmanr( + target=returns_slice, correlation_length=correlation_length, + ) + with self.assertRaises(TypeError): + returns.pearsonr( + target=date_factor_slice, + correlation_length=correlation_length, + ) + with self.assertRaises(TypeError): + returns.pearsonr( + target=date_factor_slice, + correlation_length=correlation_length, + ) + + @parameter_space(returns_length=[2, 3], regression_length=[3, 4]) + def test_factor_regression_method(self, returns_length, regression_length): + """ + Ensure that `Factor.linear_regression` is consistent with the built-in + factor `RollingLinearRegressionOfReturns`. + """ + my_asset = self.asset_finder.retrieve_asset(self.sids[0]) + + returns = Returns(window_length=returns_length, inputs=[self.col]) + returns_slice = returns[my_asset] + + regression = returns.linear_regression( + target=returns_slice, regression_length=regression_length, + ) + expected_regression = RollingLinearRegressionOfReturns( + target=my_asset, + returns_length=returns_length, + regression_length=regression_length, + ) + + # These built-ins construct their own Returns factor to use as inputs, + # so the only way to set our own inputs is to do so after the fact. + # This should not be done in practice. It is necessary here because we + # want Returns to use our random data as an input, but by default it is + # using USEquityPricing.close. + expected_regression.inputs = [returns, returns_slice] + + columns = { + 'regression': regression, + 'expected_regression': expected_regression, + } + + results = self.run_pipeline( + Pipeline(columns=columns), + self.pipeline_start_date, + self.pipeline_end_date, + ) + regression_results = results['regression'].unstack() + expected_regression_results = results['expected_regression'].unstack() + + assert_frame_equal(regression_results, expected_regression_results) + + # Make sure we cannot call the linear regression method on factors or + # slices of dtype `datetime64[ns]`. + class DateFactor(CustomFactor): + window_length = 1 + inputs = [] + dtype = datetime64ns_dtype + window_safe = True + + def compute(self, today, assets, out): + pass + + date_factor = DateFactor() + date_factor_slice = date_factor[my_asset] + + with self.assertRaises(TypeError): + date_factor.linear_regression( + target=returns_slice, regression_length=regression_length, + ) + with self.assertRaises(TypeError): + returns.linear_regression( + target=date_factor_slice, regression_length=regression_length, + ) diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index ca88776e45..eb97964e80 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -5,6 +5,7 @@ from itertools import product from unittest import TestCase +from zipline.assets import Asset from zipline.errors import ( DTypeNotSpecified, InvalidOutputName, @@ -25,9 +26,10 @@ ) from zipline.pipeline.data import Column, DataSet from zipline.pipeline.data.testing import TestingDataSet -from zipline.pipeline.factors import RecarrayField -from zipline.pipeline.term import AssetExists, NotSpecified from zipline.pipeline.expression import NUMEXPR_MATH_FUNCS +from zipline.pipeline.factors import RecarrayField +from zipline.pipeline.sentinels import NotSpecified +from zipline.pipeline.term import AssetExists, Slice from zipline.testing import parameter_space from zipline.testing.predicates import assert_equal, assert_raises from zipline.utils.numpy_utils import ( @@ -96,6 +98,18 @@ def some_method(self): return +class GenericFilter(Filter): + dtype = bool_dtype + window_length = 0 + inputs = [] + + +class GenericClassifier(Classifier): + dtype = categorical_dtype + window_length = 0 + inputs = [] + + def gen_equivalent_factors(): """ Return an iterator of SomeFactor instances that should all be the same @@ -268,6 +282,21 @@ def test_instance_caching_multiple_outputs(self): self.assertIs(alpha, multiple_outputs.alpha) self.assertIs(beta, multiple_outputs.beta) + def test_instance_caching_of_slices(self): + my_asset = Asset(1) + + f = GenericCustomFactor() + f_slice = f[my_asset] + self.assertIs(f_slice, Slice(GenericCustomFactor(), my_asset)) + + f = GenericFilter() + f_slice = f[my_asset] + self.assertIs(f_slice, Slice(GenericFilter(), my_asset)) + + c = GenericClassifier() + c_slice = c[my_asset] + self.assertIs(c_slice, Slice(GenericClassifier(), my_asset)) + def test_instance_non_caching(self): f = SomeFactor() diff --git a/tests/test_testing.py b/tests/test_testing.py index b72e152e1d..bde9501a62 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -4,7 +4,15 @@ from itertools import product from unittest import TestCase -from zipline.testing import parameter_space +from numpy import array, empty + +from zipline.testing import ( + check_arrays, + make_alternating_boolean_array, + make_cascading_boolean_array, + parameter_space, +) +from zipline.utils.numpy_utils import bool_dtype class TestParameterSpace(TestCase): @@ -38,3 +46,66 @@ def test_nothing(self): # our {setUp,tearDown}Class won't be called if, for example, # `parameter_space` returns None. pass + + +class TestMakeBooleanArray(TestCase): + + def test_make_alternating_boolean_array(self): + check_arrays( + make_alternating_boolean_array((3, 3)), + array( + [[True, False, True], + [False, True, False], + [True, False, True]] + ), + ) + check_arrays( + make_alternating_boolean_array((3, 3), first_value=False), + array( + [[False, True, False], + [True, False, True], + [False, True, False]] + ), + ) + check_arrays( + make_alternating_boolean_array((1, 3)), + array([[True, False, True]]), + ) + check_arrays( + make_alternating_boolean_array((3, 1)), + array([[True], [False], [True]]), + ) + check_arrays( + make_alternating_boolean_array((3, 0)), + empty((3, 0), dtype=bool_dtype), + ) + + def test_make_cascading_boolean_array(self): + check_arrays( + make_cascading_boolean_array((3, 3)), + array( + [[True, True, False], + [True, False, False], + [False, False, False]] + ), + ) + check_arrays( + make_cascading_boolean_array((3, 3), first_value=False), + array( + [[False, False, True], + [False, True, True], + [True, True, True]] + ), + ) + check_arrays( + make_cascading_boolean_array((1, 3)), + array([[True, True, False]]), + ) + check_arrays( + make_cascading_boolean_array((3, 1)), + array([[False], [False], [False]]), + ) + check_arrays( + make_cascading_boolean_array((3, 0)), + empty((3, 0), dtype=bool_dtype), + ) diff --git a/zipline/errors.py b/zipline/errors.py index 48c920decf..06d4a69441 100644 --- a/zipline/errors.py +++ b/zipline/errors.py @@ -666,3 +666,22 @@ class ScheduleFunctionWithoutCalendar(ZiplineError): "To use schedule_function, the TradingAlgorithm must be running on an " "ExchangeTradingSchedule, rather than {schedule}." ) + + +class UnsupportedPipelineOutput(ZiplineError): + """ + Raised when a 1D term is added as a column to a pipeline. + """ + msg = ( + "Cannot add column {column_name!r} with term {term}. Adding slices or " + "single-column-output terms as pipeline columns is not currently " + "supported." + ) + + +class NonSliceableTerm(ZiplineError): + """ + Raised when attempting to index into a non-sliceable term, e.g. instances + of `zipline.pipeline.term.LoadableTerm`. + """ + msg = "Taking slices of {term} is not currently supported." diff --git a/zipline/pipeline/classifiers/classifier.py b/zipline/pipeline/classifiers/classifier.py index e5d0422ec0..81f08e7d3a 100644 --- a/zipline/pipeline/classifiers/classifier.py +++ b/zipline/pipeline/classifiers/classifier.py @@ -10,7 +10,8 @@ from zipline.lib.labelarray import LabelArray from zipline.lib.quantiles import quantiles from zipline.pipeline.api_utils import restrict_to_dtype -from zipline.pipeline.term import ComputableTerm, NotSpecified +from zipline.pipeline.sentinels import NotSpecified +from zipline.pipeline.term import ComputableTerm from zipline.utils.compat import unicode from zipline.utils.input_validation import expect_types from zipline.utils.numpy_utils import ( diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index d77f21b69e..bca7310e2b 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -10,10 +10,10 @@ from zipline.pipeline.classifiers import Classifier, Latest as LatestClassifier from zipline.pipeline.factors import Factor, Latest as LatestFactor from zipline.pipeline.filters import Filter, Latest as LatestFilter +from zipline.pipeline.sentinels import NotSpecified from zipline.pipeline.term import ( AssetExists, LoadableTerm, - NotSpecified, validate_dtype, ) from zipline.utils.input_validation import ensure_dtype diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 4baa88bc82..8be01f4e6d 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -361,7 +361,10 @@ def compute_chunk(self, graph, dates, assets, initial_workspace): assets, mask, ) - assert(workspace[term].shape == mask.shape) + if term.ndim == 2: + assert workspace[term].shape == mask.shape + else: + assert workspace[term].shape == (mask.shape[0], 1) out = {} graph_extra_rows = graph.extra_rows diff --git a/zipline/pipeline/factors/__init__.py b/zipline/pipeline/factors/__init__.py index f4604711fa..481219b316 100644 --- a/zipline/pipeline/factors/__init__.py +++ b/zipline/pipeline/factors/__init__.py @@ -8,6 +8,11 @@ BusinessDaysSincePreviousEvent, BusinessDaysUntilNextEvent, ) +from .statistical import ( + RollingLinearRegressionOfReturns, + RollingPearsonOfReturns, + RollingSpearmanOfReturns, +) from .technical import ( Aroon, AverageDollarVolume, @@ -19,9 +24,6 @@ FastStochasticOscillator, MaxDrawdown, Returns, - RollingLinearRegressionOfReturns, - RollingPearsonOfReturns, - RollingSpearmanOfReturns, RSI, SimpleMovingAverage, VWAP, diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index e9484ce9c8..6c840ba85d 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -12,19 +12,6 @@ from zipline.lib.rank import masked_rankdata_2d from zipline.pipeline.api_utils import restrict_to_dtype from zipline.pipeline.classifiers import Classifier, Everything, Quantiles -from zipline.pipeline.mixins import ( - CustomTermMixin, - LatestMixin, - PositiveWindowLengthMixin, - RestrictedDTypeMixin, - SingleInputMixin, -) -from zipline.pipeline.term import ( - ComputableTerm, - NotSpecified, - NotSpecifiedType, - Term, -) from zipline.pipeline.expression import ( BadBinaryOperator, COMPARISONS, @@ -42,6 +29,19 @@ PercentileFilter, NullFilter, ) +from zipline.pipeline.mixins import ( + CustomTermMixin, + LatestMixin, + PositiveWindowLengthMixin, + RestrictedDTypeMixin, + SingleInputMixin, +) +from zipline.pipeline.sentinels import NotSpecified, NotSpecifiedType +from zipline.pipeline.term import ( + ComputableTerm, + Slice, + Term, +) from zipline.utils.functional import with_doc, with_name from zipline.utils.input_validation import expect_types from zipline.utils.math_utils import nanmean, nanstd @@ -625,6 +625,193 @@ def rank(self, method='ordinal', ascending=True, mask=NotSpecified): """ return Rank(self, method=method, ascending=ascending, mask=mask) + @expect_types( + target=Slice, correlation_length=int, mask=(Filter, NotSpecifiedType), + ) + def pearsonr(self, target, correlation_length, mask=NotSpecified): + """ + Construct a new Factor that computes rolling pearson correlation + coefficients between `target` and the columns of `self`. + + This method can only be called on factors which are deemed safe for use + as inputs to other factors. This includes `Returns` and any factors + created from `Factor.rank` or `Factor.zscore`. + + Parameters + ---------- + target : zipline.pipeline.slice.Slice + The column of data with which to compute correlations against each + column of data produced by `self`. + correlation_length : int + Length of the lookback window over which to compute each + correlation coefficient. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets should have their correlation with + the target slice computed each day. + + Returns + ------- + correlations : zipline.pipeline.factors.RollingPearson + A new Factor that will compute correlations between `target` and + the columns of `self`. + + Example + ------- + Suppose we want to create a factor that computes the correlation + between AAPL's 10-day returns and the 10-day returns of all other + assets, computing each correlation over 30 days. This can be achieved + by doing the following:: + + returns = Returns(window_length=10) + returns_slice = returns[Asset(24)] + aapl_correlations = returns.pearsonr( + target=returns_slice, correlation_length=30, + ) + + This is equivalent to doing:: + + aapl_correlations = RollingPearsonOfReturns( + target=Asset(24), returns_length=10, correlation_length=30, + ) + + See Also + -------- + :func:`scipy.stats.pearsonr` + :class:`zipline.pipeline.factors.RollingPearsonOfReturns` + :meth:`Factor.spearmanr` + """ + from .statistical import RollingPearson + return RollingPearson( + target_factor=self, + target_slice=target, + correlation_length=correlation_length, + mask=mask, + ) + + @expect_types( + target=Slice, correlation_length=int, mask=(Filter, NotSpecifiedType), + ) + def spearmanr(self, target, correlation_length, mask=NotSpecified): + """ + Construct a new Factor that computes rolling spearman rank correlation + coefficients between `target` and the columns of `self`. + + This method can only be called on factors which are deemed safe for use + as inputs to other factors. This includes `Returns` and any factors + created from `Factor.rank` or `Factor.zscore`. + + Parameters + ---------- + target : zipline.pipeline.slice.Slice + The column of data with which to compute correlations against each + column of data produced by `self`. + correlation_length : int + Length of the lookback window over which to compute each + correlation coefficient. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets should have their correlation with + the target slice computed each day. + + Returns + ------- + correlations : zipline.pipeline.factors.RollingSpearman + A new Factor that will compute correlations between `target` and + the columns of `self`. + + Example + ------- + Suppose we want to create a factor that computes the correlation + between AAPL's 10-day returns and the 10-day returns of all other + assets, computing each correlation over 30 days. This can be achieved + by doing the following:: + + returns = Returns(window_length=10) + returns_slice = returns[Asset(24)] + aapl_correlations = returns.spearmanr( + target=returns_slice, correlation_length=30, + ) + + This is equivalent to doing:: + + aapl_correlations = RollingSpearmanOfReturns( + target=Asset(24), returns_length=10, correlation_length=30, + ) + + See Also + -------- + :func:`scipy.stats.spearmanr` + :class:`zipline.pipeline.factors.RollingSpearmanOfReturns` + :meth:`Factor.pearsonr` + """ + from .statistical import RollingSpearman + return RollingSpearman( + target_factor=self, + target_slice=target, + correlation_length=correlation_length, + mask=mask, + ) + + @expect_types( + target=Slice, regression_length=int, mask=(Filter, NotSpecifiedType), + ) + def linear_regression(self, target, regression_length, mask=NotSpecified): + """ + Construct a new Factor that performs an ordinary least-squares + regression predicting the columns of `self` from `target`. + + This method can only be called on factors which are deemed safe for use + as inputs to other factors. This includes `Returns` and any factors + created from `Factor.rank` or `Factor.zscore`. + + Parameters + ---------- + target : zipline.pipeline.slice.Slice + The column of data to use as the predictor/independent variable in + each regression. + correlation_length : int + Length of the lookback window over which to compute each + regression. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets should be regressed with the + target slice each day. + + Returns + ------- + regressions : zipline.pipeline.factors.RollingLinearRegression + A new Factor that will compute linear regressions of `target` + against the columns of `self`. + + Example + ------- + Suppose we want to create a factor that regresses AAPL's 10-day returns + against the 10-day returns of all other assets, computing each + regression over 30 days. This can be achieved by doing the following:: + + returns = Returns(window_length=10) + returns_slice = returns[Asset(24)] + aapl_regressions = returns.linear_regression( + target=returns_slice, regression_length=30, + ) + + This is equivalent to doing:: + + aapl_regressions = RollingLinearRegressionOfReturns( + target=Asset(24), returns_length=10, regression_length=30, + ) + + See Also + -------- + :func:`scipy.stats.linregress` + :class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns` + """ + from .statistical import RollingLinearRegression + return RollingLinearRegression( + target_factor=self, + target_slice=target, + regression_length=regression_length, + mask=mask, + ) + @expect_types(bins=int, mask=(Filter, NotSpecifiedType)) def quantiles(self, bins, mask=NotSpecified): """ diff --git a/zipline/pipeline/factors/statistical.py b/zipline/pipeline/factors/statistical.py new file mode 100644 index 0000000000..9e1cf5ca57 --- /dev/null +++ b/zipline/pipeline/factors/statistical.py @@ -0,0 +1,435 @@ + +from scipy.stats import ( + linregress, + pearsonr, + spearmanr, +) + +from zipline.pipeline.factors import CustomFactor +from zipline.pipeline.filters import SingleAsset +from zipline.pipeline.mixins import SingleInputMixin +from zipline.pipeline.sentinels import NotSpecified +from zipline.pipeline.term import AssetExists +from zipline.utils.input_validation import expect_dtypes +from zipline.utils.numpy_utils import float64_dtype, int64_dtype + +from .technical import Returns + + +ALLOWED_DTYPES = (float64_dtype, int64_dtype) + + +class _RollingCorrelation(CustomFactor, SingleInputMixin): + + @expect_dtypes(target_factor=ALLOWED_DTYPES, target_slice=ALLOWED_DTYPES) + def __new__(cls, + target_factor, + target_slice, + correlation_length, + mask=NotSpecified): + return super(_RollingCorrelation, cls).__new__( + cls, + inputs=[target_factor, target_slice], + window_length=correlation_length, + mask=mask, + ) + + +class RollingPearson(_RollingCorrelation): + """ + A Factor that computes pearson correlation coefficients between a single + column of data and the columns of another Factor. + + Parameters + ---------- + target_factor : zipline.pipeline.factors.Factor + The factor for which to compute correlations of each of its columns + with `target_slice`. + target_slice : zipline.pipeline.slice.Slice + The column of data with which to compute correlations against each + column of data produced by `target_factor`. + correlation_length : int + Length of the lookback window over which to compute each + correlation coefficient. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets (columns) of `target_factor` should + have their correlation with `target_slice` computed each day. + + See Also + -------- + :func:`scipy.stats.pearsonr` + :meth:`Factor.pearsonr` + :class:`zipline.pipeline.factors.RollingPearsonOfReturns` + + Notes + ----- + Most users should call Factor.pearsonr rather than directly construct an + instance of this class. + """ + def compute(self, today, assets, out, factor_data, slice_data): + slice_data_column = slice_data[:, 0] + for i in range(len(out)): + # pearsonr returns the R-value and the P-value. + out[i] = pearsonr(factor_data[:, i], slice_data_column)[0] + + +class RollingSpearman(_RollingCorrelation): + """ + A Factor that computes spearman rank correlation coefficients between a + single column of data and the columns of another Factor. + + Parameters + ---------- + target_factor : zipline.pipeline.factors.Factor + The factor for which to compute correlations of each of its columns + with `target_slice`. + target_slice : zipline.pipeline.slice.Slice + The column of data with which to compute correlations against each + column of data produced by `target_factor`. + correlation_length : int + Length of the lookback window over which to compute each + correlation coefficient. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets (columns) of `target_factor` should + have their correlation with `target_slice` computed each day. + + See Also + -------- + :func:`scipy.stats.spearmanr` + :meth:`Factor.spearmanr` + :class:`zipline.pipeline.factors.RollingSpearmanOfReturns` + + Notes + ----- + Most users should call Factor.spearmanr rather than directly construct an + instance of this class. + """ + def compute(self, today, assets, out, factor_data, slice_data): + slice_data_column = slice_data[:, 0] + for i in range(len(out)): + # spearmanr returns the R-value and the P-value. + out[i] = spearmanr(factor_data[:, i], slice_data_column)[0] + + +class RollingLinearRegression(CustomFactor, SingleInputMixin): + """ + A Factor that performs an ordinary least-squares regression predicting the + columns of another Factor from a single column of data. + + Parameters + ---------- + target_factor : zipline.pipeline.factors.Factor + The factor whose columns are the predicted/dependent variable of each + regression with `target_slice`. + target_slice : zipline.pipeline.slice.Slice + The column of data to use as the predictor/independent variable in + each regression with the columns of `target_factor`. + correlation_length : int + Length of the lookback window over which to compute each regression. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets (columns) of `target_factor` should be + regressed against `target_slice` each day. + + See Also + -------- + :func:`scipy.stats.linregress` + :meth:`Factor.linear_regression` + :class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns` + + Notes + ----- + Most users should call Factor.linear_regression rather than directly + construct an instance of this class. + """ + outputs = ['alpha', 'beta', 'r_value', 'p_value', 'stderr'] + + @expect_dtypes(target_factor=ALLOWED_DTYPES, target_slice=ALLOWED_DTYPES) + def __new__(cls, + target_factor, + target_slice, + regression_length, + mask=NotSpecified): + return super(RollingLinearRegression, cls).__new__( + cls, + inputs=[target_factor, target_slice], + window_length=regression_length, + mask=mask, + ) + + def compute(self, today, assets, out, factor_data, slice_data): + slice_data_column = slice_data[:, 0] + + alpha = out.alpha + beta = out.beta + r_value = out.r_value + p_value = out.p_value + stderr = out.stderr + for i in range(len(out)): + regr_results = linregress(y=factor_data[:, i], x=slice_data_column) + # `linregress` returns its results in the following order: + # slope, intercept, r-value, p-value, stderr + alpha[i] = regr_results[1] + beta[i] = regr_results[0] + r_value[i] = regr_results[2] + p_value[i] = regr_results[3] + stderr[i] = regr_results[4] + + +class RollingPearsonOfReturns(RollingPearson): + """ + Calculates the Pearson product-moment correlation coefficient of the + returns of the given asset with the returns of all other assets. + + Pearson correlation is what most people mean when they say "correlation + coefficient" or "R-value". + + Parameters + ---------- + target : zipline.assets.Asset + The asset to correlate with all other assets. + returns_length : int >= 2 + Length of the lookback window over which to compute returns. Daily + returns require a window length of 2. + correlation_length : int >= 1 + Length of the lookback window over which to compute each correlation + coefficient. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets should have their correlation with the + target asset computed each day. + + Note + ---- + Computing this factor over many assets can be time consuming. It is + recommended that a mask be used in order to limit the number of assets over + which correlations are computed. + + Example + ------- + Let the following be example 10-day returns for three different assets:: + + SPY MSFT FB + 2017-03-13 -.03 .03 .04 + 2017-03-14 -.02 -.03 .02 + 2017-03-15 -.01 .02 .01 + 2017-03-16 0 -.02 .01 + 2017-03-17 .01 .04 -.01 + 2017-03-20 .02 -.03 -.02 + 2017-03-21 .03 .01 -.02 + 2017-03-22 .04 -.02 -.02 + + Suppose we are interested in SPY's rolling returns correlation with each + stock from 2017-03-17 to 2017-03-22, using a 5-day look back window (that + is, we calculate each correlation coefficient over 5 days of data). We can + achieve this by doing:: + + rolling_correlations = RollingPearsonOfReturns( + target=Equity(8554), + returns_length=10, + correlation_length=5, + ) + + The result of computing ``rolling_correlations`` from 2017-03-17 to + 2017-03-22 gives:: + + SPY MSFT FB + 2017-03-17 1 .15 -.96 + 2017-03-20 1 .10 -.96 + 2017-03-21 1 -.16 -.94 + 2017-03-22 1 -.16 -.85 + + Note that the column for SPY is all 1's, as the correlation of any data + series with itself is always 1. To understand how each of the other values + were calculated, take for example the .15 in MSFT's column. This is the + correlation coefficient between SPY's returns looking back from 2017-03-17 + (-.03, -.02, -.01, 0, .01) and MSFT's returns (.03, -.03, .02, -.02, .04). + + See Also + -------- + :class:`zipline.pipeline.factors.RollingSpearmanOfReturns` + :class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns` + """ + def __new__(cls, + target, + returns_length, + correlation_length, + mask=NotSpecified): + # Use the `SingleAsset` filter here because it protects against + # inputting a non-existent target asset. + returns = Returns( + window_length=returns_length, + mask=(AssetExists() | SingleAsset(asset=target)), + ) + return super(RollingPearsonOfReturns, cls).__new__( + cls, + target_factor=returns, + target_slice=returns[target], + correlation_length=correlation_length, + mask=mask, + ) + + +class RollingSpearmanOfReturns(RollingSpearman): + """ + Calculates the Spearman rank correlation coefficient of the returns of the + given asset with the returns of all other assets. + + Parameters + ---------- + target : zipline.assets.Asset + The asset to correlate with all other assets. + returns_length : int >= 2 + Length of the lookback window over which to compute returns. Daily + returns require a window length of 2. + correlation_length : int >= 1 + Length of the lookback window over which to compute each correlation + coefficient. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets should have their correlation with the + target asset computed each day. + + Note + ---- + Computing this factor over many assets can be time consuming. It is + recommended that a mask be used in order to limit the number of assets over + which correlations are computed. + + See Also + -------- + :class:`zipline.pipeline.factors.RollingPearsonOfReturns` + :class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns` + """ + def __new__(cls, + target, + returns_length, + correlation_length, + mask=NotSpecified): + # Use the `SingleAsset` filter here because it protects against + # inputting a non-existent target asset. + returns = Returns( + window_length=returns_length, + mask=(AssetExists() | SingleAsset(asset=target)), + ) + return super(RollingSpearmanOfReturns, cls).__new__( + cls, + target_factor=returns, + target_slice=returns[target], + correlation_length=correlation_length, + mask=mask, + ) + + +class RollingLinearRegressionOfReturns(RollingLinearRegression): + """ + Perform an ordinary least-squares regression predicting the returns of all + other assets on the given asset. + + Parameters + ---------- + target : zipline.assets.Asset + The asset to regress against all other assets. + returns_length : int >= 2 + Length of the lookback window over which to compute returns. Daily + returns require a window length of 2. + regression_length : int >= 1 + Length of the lookback window over which to compute each regression. + mask : zipline.pipeline.Filter, optional + A Filter describing which assets should be regressed against the target + asset each day. + + Notes + ----- + Computing this factor over many assets can be time consuming. It is + recommended that a mask be used in order to limit the number of assets over + which regressions are computed. + + This factor is designed to return five outputs: + + - alpha, a factor that computes the intercepts of each regression. + - beta, a factor that computes the slopes of each regression. + - r_value, a factor that computes the correlation coefficient of each + regression. + - p_value, a factor that computes, for each regression, the two-sided + p-value for a hypothesis test whose null hypothesis is that the slope is + zero. + - stderr, a factor that computes the standard error of the estimate of each + regression. + + For more help on factors with multiple outputs, see + :class:`zipline.pipeline.factors.CustomFactor`. + + Example + ------- + Let the following be example 10-day returns for three different assets:: + + SPY MSFT FB + 2017-03-13 -.03 .03 .04 + 2017-03-14 -.02 -.03 .02 + 2017-03-15 -.01 .02 .01 + 2017-03-16 0 -.02 .01 + 2017-03-17 .01 .04 -.01 + 2017-03-20 .02 -.03 -.02 + 2017-03-21 .03 .01 -.02 + 2017-03-22 .04 -.02 -.02 + + Suppose we are interested in predicting each stock's returns from SPY's + over rolling 5-day look back windows. We can compute rolling regression + coefficients (alpha and beta) from 2017-03-17 to 2017-03-22 by doing:: + + regression_factor = RollingRegressionOfReturns( + target=Equity(8554), + returns_length=10, + regression_length=5, + ) + alpha = regression_factor.alpha + beta = regression_factor.beta + + The result of computing ``alpha`` from 2017-03-17 to 2017-03-22 gives:: + + SPY MSFT FB + 2017-03-17 0 .011 .003 + 2017-03-20 0 -.004 .004 + 2017-03-21 0 .007 .006 + 2017-03-22 0 .002 .008 + + And the result of computing ``beta`` from 2017-03-17 to 2017-03-22 gives:: + + SPY MSFT FB + 2017-03-17 1 .3 -1.1 + 2017-03-20 1 .2 -1 + 2017-03-21 1 -.3 -1 + 2017-03-22 1 -.3 -.9 + + Note that SPY's column for alpha is all 0's and for beta is all 1's, as the + regression line of SPY with itself is simply the function y = x. + + To understand how each of the other values were calculated, take for + example MSFT's ``alpha`` and ``beta`` values on 2017-03-17 (.011 and .3, + respectively). These values are the result of running a linear regression + predicting MSFT's returns from SPY's returns, using values starting at + 2017-03-17 and looking back 5 days. That is, the regression was run with + x = [-.03, -.02, -.01, 0, .01] and y = [.03, -.03, .02, -.02, .04], and it + produced a slope of .3 and an intercept of .011. + + See Also + -------- + :class:`zipline.pipeline.factors.RollingPearsonOfReturns` + :class:`zipline.pipeline.factors.RollingSpearmanOfReturns` + """ + def __new__(cls, + target, + returns_length, + regression_length, + mask=NotSpecified): + # Use the `SingleAsset` filter here because it protects against + # inputting a non-existent target asset. + returns = Returns( + window_length=returns_length, + mask=(AssetExists() | SingleAsset(asset=target)), + ) + return super(RollingLinearRegressionOfReturns, cls).__new__( + cls, + target_factor=returns, + target_slice=returns[target], + regression_length=regression_length, + mask=mask, + ) diff --git a/zipline/pipeline/factors/technical.py b/zipline/pipeline/factors/technical.py index c54a6f1def..10666a8a52 100644 --- a/zipline/pipeline/factors/technical.py +++ b/zipline/pipeline/factors/technical.py @@ -18,17 +18,13 @@ isnan, log, NINF, - searchsorted, sqrt, sum as np_sum, ) from numexpr import evaluate -from scipy.stats import linregress, pearsonr, spearmanr from zipline.pipeline.data import USEquityPricing -from zipline.pipeline.filters import SingleAsset from zipline.pipeline.mixins import SingleInputMixin -from zipline.pipeline.term import AssetExists, NotSpecified from zipline.utils.numpy_utils import ignore_nanwarnings from zipline.utils.input_validation import expect_types from zipline.utils.math_utils import ( @@ -163,305 +159,6 @@ def compute(self, today, assets, out, close, volume): out[:] = nanmean(close * volume, axis=0) -class _RollingCorrelationOfReturns(CustomFactor, SingleInputMixin): - """ - Base class for factors computing a rolling correlation over a window of - Returns. - - Parameters - ---------- - target : zipline.assets.Asset - The asset to correlate with all other assets. - returns_length : int >= 2 - Length of the lookback window over which to compute returns. Daily - returns require a window length of 2. - correlation_length : int >= 1 - Length of the lookback window over which to compute each correlation - coefficient. - """ - params = ['target'] - - def __new__(cls, - target, - returns_length, - correlation_length, - mask=NotSpecified, - **kwargs): - if mask is NotSpecified: - mask = AssetExists() - - # Make sure we do not filter out the asset of interest. - mask = mask | SingleAsset(asset=target) - - return super(_RollingCorrelationOfReturns, cls).__new__( - cls, - target=target, - inputs=[Returns(window_length=returns_length)], - window_length=correlation_length, - mask=mask, - **kwargs - ) - - -class RollingPearsonOfReturns(_RollingCorrelationOfReturns): - """ - Calculates the Pearson product-moment correlation coefficient of the - returns of the given asset with the returns of all other assets. - - Pearson correlation is what most people mean when they say "correlation - coefficient" or "R-value". - - Parameters - ---------- - target : zipline.assets.Asset - The asset to correlate with all other assets. - returns_length : int >= 2 - Length of the lookback window over which to compute returns. Daily - returns require a window length of 2. - correlation_length : int >= 1 - Length of the lookback window over which to compute each correlation - coefficient. - mask : zipline.pipeline.Filter, optional - A Filter describing which assets should have their correlation with the - target asset computed each day. - - Note - ---- - Computing this factor over many assets can be time consuming. It is - recommended that a mask be used in order to limit the number of assets over - which correlations are computed. - - Example - ------- - Let the following be example 10-day returns for three different assets:: - - SPY MSFT FB - 2017-03-13 -.03 .03 .04 - 2017-03-14 -.02 -.03 .02 - 2017-03-15 -.01 .02 .01 - 2017-03-16 0 -.02 .01 - 2017-03-17 .01 .04 -.01 - 2017-03-20 .02 -.03 -.02 - 2017-03-21 .03 .01 -.02 - 2017-03-22 .04 -.02 -.02 - - Suppose we are interested in SPY's rolling returns correlation with each - stock from 2017-03-17 to 2017-03-22, using a 5-day look back window (that - is, we calculate each correlation coefficient over 5 days of data). We can - achieve this by doing:: - - rolling_correlations = RollingPearsonOfReturns( - target=Equity(8554), - returns_length=10, - correlation_length=5, - ) - - The result of computing ``rolling_correlations`` from 2017-03-17 to - 2017-03-22 gives:: - - SPY MSFT FB - 2017-03-17 1 .15 -.96 - 2017-03-20 1 .10 -.96 - 2017-03-21 1 -.16 -.94 - 2017-03-22 1 -.16 -.85 - - Note that the column for SPY is all 1's, as the correlation of any data - series with itself is always 1. To understand how each of the other values - were calculated, take for example the .15 in MSFT's column. This is the - correlation coefficient between SPY's returns looking back from 2017-03-17 - (-.03, -.02, -.01, 0, .01) and MSFT's returns (.03, -.03, .02, -.02, .04). - - See Also - -------- - :class:`zipline.pipeline.factors.technical.RollingSpearmanOfReturns` - :class:`zipline.pipeline.factors.technical.RollingLinearRegressionOfReturns` - """ - def compute(self, today, assets, out, data, target): - target_col = data[:, searchsorted(assets.values, target.sid)] - for i in range(len(out)): - # pearsonr returns the R-value and the P-value. - out[i] = pearsonr(data[:, i], target_col)[0] - - -class RollingSpearmanOfReturns(_RollingCorrelationOfReturns): - """ - Calculates the Spearman rank correlation coefficient of the returns of the - given asset with the returns of all other assets. - - Parameters - ---------- - target : zipline.assets.Asset - The asset to correlate with all other assets. - returns_length : int >= 2 - Length of the lookback window over which to compute returns. Daily - returns require a window length of 2. - correlation_length : int >= 1 - Length of the lookback window over which to compute each correlation - coefficient. - mask : zipline.pipeline.Filter, optional - A Filter describing which assets should have their correlation with the - target asset computed each day. - - Note - ---- - Computing this factor over many assets can be time consuming. It is - recommended that a mask be used in order to limit the number of assets over - which correlations are computed. - - See Also - -------- - :class:`zipline.pipeline.factors.technical.RollingPearsonOfReturns` - :class:`zipline.pipeline.factors.technical.RollingLinearRegressionOfReturns` - """ - def compute(self, today, assets, out, data, target): - target_col = data[:, searchsorted(assets.values, target.sid)] - for i in range(len(out)): - # spearmanr returns the R-value and the P-value. - out[i] = spearmanr(data[:, i], target_col)[0] - - -class RollingLinearRegressionOfReturns(CustomFactor, SingleInputMixin): - """ - Perform an ordinary least-squares regression predicting the returns of all - other assets on the given asset. - - Parameters - ---------- - target : zipline.assets.Asset - The asset to regress against all other assets. - returns_length : int >= 2 - Length of the lookback window over which to compute returns. Daily - returns require a window length of 2. - regression_length : int >= 1 - Length of the lookback window over which to compute each regression. - mask : zipline.pipeline.Filter, optional - A Filter describing which assets should be regressed against the target - asset each day. - - Notes - ----- - Computing this factor over many assets can be time consuming. It is - recommended that a mask be used in order to limit the number of assets over - which regressions are computed. - - This factor is designed to return five outputs: - - - alpha, a factor that computes the intercepts of each regression. - - beta, a factor that computes the slopes of each regression. - - r_value, a factor that computes the correlation coefficient of each - regression. - - p_value, a factor that computes, for each regression, the two-sided - p-value for a hypothesis test whose null hypothesis is that the slope is - zero. - - stderr, a factor that computes the standard error of the estimate of each - regression. - - For more help on factors with multiple outputs, see - :class:`zipline.pipeline.factors.CustomFactor`. - - Example - ------- - Let the following be example 10-day returns for three different assets:: - - SPY MSFT FB - 2017-03-13 -.03 .03 .04 - 2017-03-14 -.02 -.03 .02 - 2017-03-15 -.01 .02 .01 - 2017-03-16 0 -.02 .01 - 2017-03-17 .01 .04 -.01 - 2017-03-20 .02 -.03 -.02 - 2017-03-21 .03 .01 -.02 - 2017-03-22 .04 -.02 -.02 - - Suppose we are interested in predicting each stock's returns from SPY's - over rolling 5-day look back windows. We can compute rolling regression - coefficients (alpha and beta) from 2017-03-17 to 2017-03-22 by doing:: - - regression_factor = RollingRegressionOfReturns( - target=Equity(8554), - returns_length=10, - regression_length=5, - ) - alpha = regression_factor.alpha - beta = regression_factor.beta - - The result of computing ``alpha`` from 2017-03-17 to 2017-03-22 gives:: - - SPY MSFT FB - 2017-03-17 0 .011 .003 - 2017-03-20 0 -.004 .004 - 2017-03-21 0 .007 .006 - 2017-03-22 0 .002 .008 - - And the result of computing ``beta`` from 2017-03-17 to 2017-03-22 gives:: - - SPY MSFT FB - 2017-03-17 1 .3 -1.1 - 2017-03-20 1 .2 -1 - 2017-03-21 1 -.3 -1 - 2017-03-22 1 -.3 -.9 - - Note that SPY's column for alpha is all 0's and for beta is all 1's, as the - regression line of SPY with itself is simply the function y = x. - - To understand how each of the other values were calculated, take for - example MSFT's ``alpha`` and ``beta`` values on 2017-03-17 (.011 and .3, - respectively). These values are the result of running a linear regression - predicting MSFT's returns from SPY's returns, using values starting at - 2017-03-17 and looking back 5 days. That is, the regression was run with - x = [-.03, -.02, -.01, 0, .01] and y = [.03, -.03, .02, -.02, .04], and it - produced a slope of .3 and an intercept of .011. - - See Also - -------- - :class:`zipline.pipeline.factors.technical.RollingPearsonOfReturns` - :class:`zipline.pipeline.factors.technical.RollingSpearmanOfReturns` - """ - outputs = ['alpha', 'beta', 'r_value', 'p_value', 'stderr'] - params = ['target'] - - def __new__(cls, - target, - returns_length, - regression_length, - mask=NotSpecified, - **kwargs): - if mask is NotSpecified: - mask = AssetExists() - - # Make sure we do not filter out the asset of interest. - mask = mask | SingleAsset(asset=target) - - return super(RollingLinearRegressionOfReturns, cls).__new__( - cls, - target=target, - inputs=[Returns(window_length=returns_length)], - window_length=regression_length, - mask=mask, - **kwargs - ) - - def compute(self, today, assets, out, returns, target): - asset_col = searchsorted(assets.values, target.sid) - my_asset = returns[:, asset_col] - - alpha = out.alpha - beta = out.beta - r_value = out.r_value - p_value = out.p_value - stderr = out.stderr - for i in range(len(out)): - other_asset = returns[:, i] - regr_results = linregress(y=other_asset, x=my_asset) - # `linregress` returns its results in the following order: - # slope, intercept, r-value, p-value, stderr - alpha[i] = regr_results[1] - beta[i] = regr_results[0] - r_value[i] = regr_results[2] - p_value[i] = regr_results[3] - stderr[i] = regr_results[4] - - class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor): """ Base class for factors implementing exponential-weighted operations. diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 5fcad60f70..c184e0d6b8 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -4,12 +4,12 @@ from itertools import chain from operator import attrgetter - from numpy import ( float64, nan, nanpercentile, ) + from zipline.errors import ( BadPercentileBounds, NonExistentAssetInTimeFrame, @@ -17,6 +17,12 @@ ) from zipline.lib.labelarray import LabelArray from zipline.lib.rank import is_missing +from zipline.pipeline.expression import ( + BadBinaryOperator, + FILTER_BINOPS, + method_name_for_op, + NumericalExpression, +) from zipline.pipeline.mixins import ( CustomTermMixin, LatestMixin, @@ -25,12 +31,6 @@ SingleInputMixin, ) from zipline.pipeline.term import ComputableTerm, Term -from zipline.pipeline.expression import ( - BadBinaryOperator, - FILTER_BINOPS, - method_name_for_op, - NumericalExpression, -) from zipline.utils.input_validation import expect_types from zipline.utils.numpy_utils import bool_dtype, repeat_first_axis diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 128326161e..89011ad32c 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -179,7 +179,7 @@ normalize_data_query_bounds, normalize_timestamp_to_query_time, ) -from zipline.pipeline.term import NotSpecified +from zipline.pipeline.sentinels import NotSpecified from zipline.lib.adjusted_array import AdjustedArray, can_represent_dtype from zipline.lib.adjustment import Float64Overwrite from zipline.utils.input_validation import ( diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 1e87256c47..a2896267b5 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -1,12 +1,16 @@ """ Mixins classes for use with Filters and Factors. """ -from numpy import full, recarray +from numpy import ( + array, + full, + recarray, +) from zipline.utils.control_flow import nullctx from zipline.errors import WindowLengthNotPositive, UnsupportedDataType -from .term import NotSpecified +from .sentinels import NotSpecified class PositiveWindowLengthMixin(object): @@ -148,29 +152,44 @@ def _allocate_output(self, windows, shape): out = full(shape, missing_value, dtype=self.dtype) return out + def _format_inputs(self, windows, column_mask): + inputs = [] + for input_ in windows: + window = next(input_) + if window.shape[1] == 1: + # Do not mask single-column inputs. + inputs.append(window) + else: + inputs.append(window[:, column_mask]) + return inputs + def _compute(self, windows, dates, assets, mask): """ Call the user's `compute` function on each window with a pre-built output array. """ + format_inputs = self._format_inputs compute = self.compute params = self.params - out = self._allocate_output(windows, mask.shape) + ndim = self.ndim + + shape = (len(mask), 1) if ndim == 1 else mask.shape + out = self._allocate_output(windows, shape) with self.ctx: for idx, date in enumerate(dates): - col_mask = mask[idx] - masked_out = out[idx][col_mask] - masked_assets = assets[col_mask] - - compute( - date, - masked_assets, - masked_out, - *(next(w)[:, col_mask] for w in windows), - **params - ) - out[idx][col_mask] = masked_out + # Never apply a mask to 1D outputs. + out_mask = array([True]) if ndim == 1 else mask[idx] + + # Mask our inputs as usual. + inputs_mask = mask[idx] + + masked_assets = assets[inputs_mask] + out_row = out[idx][out_mask] + inputs = format_inputs(windows, inputs_mask) + + compute(date, masked_assets, out_row, *inputs, **params) + out[idx][out_mask] = out_row return out def short_repr(self): diff --git a/zipline/pipeline/pipeline.py b/zipline/pipeline/pipeline.py index 1967f727a9..53276b82e2 100644 --- a/zipline/pipeline/pipeline.py +++ b/zipline/pipeline/pipeline.py @@ -1,3 +1,5 @@ + +from zipline.errors import UnsupportedPipelineOutput from zipline.utils.input_validation import expect_types, optional from .term import AssetExists, ComputableTerm, Term @@ -34,10 +36,12 @@ class Pipeline(object): screen=optional(Filter), ) def __init__(self, columns=None, screen=None): - if columns is None: columns = {} + + validate_column = self.validate_column for column_name, term in columns.items(): + validate_column(column_name, term) if not isinstance(term, ComputableTerm): raise TypeError( "Column {column_name!r} contains an invalid pipeline term " @@ -45,6 +49,7 @@ def __init__(self, columns=None, screen=None): column_name=column_name, term=term, ) ) + self._columns = columns self._screen = screen @@ -80,6 +85,8 @@ def add(self, term, name, overwrite=False): Whether to overwrite the existing entry if we already have a column named `name`. """ + self.validate_column(name, term) + columns = self.columns if name in columns: if overwrite: @@ -178,3 +185,8 @@ def show_graph(self, format='svg'): return g.jpeg else: raise ValueError("Unknown graph format %r." % format) + + @staticmethod + def validate_column(column_name, term): + if term.ndim == 1: + raise UnsupportedPipelineOutput(column_name=column_name, term=term) diff --git a/zipline/pipeline/sentinels.py b/zipline/pipeline/sentinels.py new file mode 100644 index 0000000000..b0c3a5cb16 --- /dev/null +++ b/zipline/pipeline/sentinels.py @@ -0,0 +1,10 @@ + +from zipline.utils.sentinel import sentinel + + +NotSpecified = sentinel( + 'NotSpecified', + 'Singleton sentinel value used for Term defaults.', +) + +NotSpecifiedType = type(NotSpecified) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index cd29cec077..563a0c818c 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -5,11 +5,20 @@ from bisect import insort from weakref import WeakValueDictionary -from numpy import array, dtype as dtype_class, ndarray +from numpy import ( + array, + dtype as dtype_class, + ndarray, + searchsorted, +) from six import with_metaclass + +from zipline.assets import Asset from zipline.errors import ( DTypeNotSpecified, InvalidOutputName, + NonExistentAssetInTimeFrame, + NonSliceableTerm, NonWindowSafeInput, NotDType, TermInputsNotSpecified, @@ -26,15 +35,9 @@ categorical_dtype, default_missing_value_for_dtype, ) -from zipline.utils.sentinel import sentinel - - -NotSpecified = sentinel( - 'NotSpecified', - 'Singleton sentinel value used for Term defaults.', -) -NotSpecifiedType = type(NotSpecified) +from .mixins import SingleInputMixin +from .sentinels import NotSpecified class Term(with_metaclass(ABCMeta, object)): @@ -53,6 +56,9 @@ class Term(with_metaclass(ABCMeta, object)): # Determines if a term is safe to be used as a windowed input. window_safe = False + # The dimensions of the term's output (1D or 2D). + ndim = 2 + _term_cache = WeakValueDictionary() def __new__(cls, @@ -60,6 +66,7 @@ def __new__(cls, dtype=dtype, missing_value=missing_value, window_safe=NotSpecified, + ndim=NotSpecified, # params is explicitly not allowed to be passed to an instance. *args, **kwargs): @@ -81,6 +88,8 @@ def __new__(cls, dtype = cls.dtype if missing_value is NotSpecified: missing_value = cls.missing_value + if ndim is NotSpecified: + ndim = cls.ndim if window_safe is NotSpecified: window_safe = cls.window_safe @@ -96,6 +105,7 @@ def __new__(cls, dtype=dtype, missing_value=missing_value, window_safe=window_safe, + ndim=ndim, params=params, *args, **kwargs ) @@ -109,6 +119,7 @@ def __new__(cls, dtype=dtype, missing_value=missing_value, window_safe=window_safe, + ndim=ndim, params=params, *args, **kwargs ) @@ -179,12 +190,19 @@ def __init__(self, *args, **kwargs): """ pass + @expect_types(key=Asset) + def __getitem__(self, key): + if isinstance(self, LoadableTerm): + raise NonSliceableTerm(term=self) + return Slice(self, key) + @classmethod def _static_identity(cls, domain, dtype, missing_value, window_safe, + ndim, params): """ Return the identity of the Term that would be constructed from the @@ -197,9 +215,9 @@ def _static_identity(cls, This is a classmethod so that it can be called from Term.__new__ to determine whether to produce a new instance. """ - return (cls, domain, dtype, missing_value, window_safe, params) + return (cls, domain, dtype, missing_value, window_safe, ndim, params) - def _init(self, domain, dtype, missing_value, window_safe, params): + def _init(self, domain, dtype, missing_value, window_safe, ndim, params): """ Parameters ---------- @@ -214,6 +232,7 @@ def _init(self, domain, dtype, missing_value, window_safe, params): self.dtype = dtype self.missing_value = missing_value self.window_safe = window_safe + self.ndim = ndim for name, value in params: if hasattr(self, name): @@ -500,6 +519,63 @@ def __repr__(self): ) +class Slice(ComputableTerm, SingleInputMixin): + """ + Term for extracting a single column of a another term's output. + + Parameters + ---------- + term : zipline.pipeline.term.Term + The term from which to extract a column of data. + asset : zipline.assets.Asset + The asset corresponding to the column of `term` to be extracted. + + Notes + ----- + Users should rarely construct instances of `Slice` directly. Instead, they + should construct instances via indexing, e.g. `MyFactor()[Asset(24)]`. + """ + def __new__(cls, term, asset): + return super(Slice, cls).__new__( + cls, + asset=asset, + inputs=[term], + window_length=0, + mask=term.mask, + dtype=term.dtype, + missing_value=term.missing_value, + window_safe=term.window_safe, + ndim=1, + ) + + def __repr__(self): + return "{type}({parent_term}, column={asset})".format( + type=type(self).__name__, + parent_term=type(self.inputs[0]).__name__, + asset=self._asset, + ) + + def _init(self, asset, *args, **kwargs): + self._asset = asset + return super(Slice, self)._init(*args, **kwargs) + + @classmethod + def _static_identity(cls, asset, *args, **kwargs): + return (super(Slice, cls)._static_identity(*args, **kwargs), asset) + + def _compute(self, windows, dates, assets, mask): + asset = self._asset + asset_column = searchsorted(assets.values, asset.sid) + if assets[asset_column] != asset.sid: + raise NonExistentAssetInTimeFrame( + asset=asset, start_date=dates[0], end_date=dates[-1], + ) + + # Return a 2D array with one column rather than a 1D array of the + # column. + return windows[0][:, [asset_column]] + + def validate_dtype(termname, dtype, missing_value): """ Validate a `dtype` and `missing_value` passed to Term.__new__. diff --git a/zipline/testing/__init__.py b/zipline/testing/__init__.py index bc9cff4657..28957049fd 100644 --- a/zipline/testing/__init__.py +++ b/zipline/testing/__init__.py @@ -1,9 +1,12 @@ from .core import ( # noqa + AssetID, + AssetIDPlusDay, EPOCH, ExplodingObject, FakeDataPortal, FetcherDataPortal, MockDailyBarReader, + OpenPrice, add_security_data, all_pairs_matching_predicate, all_subindices, @@ -22,6 +25,8 @@ empty_asset_finder, empty_assets_db, empty_trading_env, + make_alternating_boolean_array, + make_cascading_boolean_array, make_test_handler, make_trade_data_for_asset_info, parameter_space, diff --git a/zipline/testing/core.py b/zipline/testing/core.py index 1456d7cd24..3b464583b9 100644 --- a/zipline/testing/core.py +++ b/zipline/testing/core.py @@ -42,7 +42,9 @@ from zipline.finance.trading import TradingEnvironment from zipline.finance.order import ORDER_STATUS from zipline.lib.labelarray import LabelArray +from zipline.pipeline.data import USEquityPricing from zipline.pipeline.engine import SimplePipelineEngine +from zipline.pipeline.factors import CustomFactor from zipline.pipeline.loaders.testing import make_seeded_random_loader from zipline.utils import security_list from zipline.utils.input_validation import expect_dimensions @@ -1151,6 +1153,72 @@ def create_empty_splits_mergers_frame(): ) +def make_alternating_boolean_array(shape, first_value=True): + """ + Create a 2D numpy array with the given shape containing alternating values + of False, True, False, True,... along each row and each column. + + Examples + -------- + >>> make_alternating_boolean_array((4,4)) + array([[ True, False, True, False], + [False, True, False, True], + [ True, False, True, False], + [False, True, False, True]], dtype=bool) + >>> make_alternating_boolean_array((4,3), first_value=False) + array([[False, True, False], + [ True, False, True], + [False, True, False], + [ True, False, True]], dtype=bool) + """ + if len(shape) != 2: + raise ValueError( + 'Shape must be 2-dimensional. Given shape was {}'.format(shape) + ) + alternating = np.empty(shape, dtype=np.bool) + for row in alternating: + row[::2] = first_value + row[1::2] = not(first_value) + first_value = not(first_value) + return alternating + + +def make_cascading_boolean_array(shape, first_value=True): + """ + Create a numpy array with the given shape containing cascading boolean + values, with `first_value` being the top-left value. + + Examples + -------- + >>> make_cascading_boolean_array((4,4)) + array([[ True, True, True, False], + [ True, True, False, False], + [ True, False, False, False], + [False, False, False, False]], dtype=bool) + >>> make_cascading_boolean_array((4,2)) + array([[ True, False], + [False, False], + [False, False], + [False, False]], dtype=bool) + >>> make_cascading_boolean_array((2,4)) + array([[ True, True, True, False], + [ True, True, False, False]], dtype=bool) + """ + if len(shape) != 2: + raise ValueError( + 'Shape must be 2-dimensional. Given shape was {}'.format(shape) + ) + cascading = np.full(shape, not(first_value), dtype=np.bool) + ending_col = shape[1] - 1 + for row in cascading: + if ending_col > 0: + row[:ending_col] = first_value + ending_col -= 1 + else: + break + return cascading + + @expect_dimensions(array=2) def permute_rows(seed, array): """ @@ -1400,3 +1468,37 @@ def ensure_doctest(f, name=None): f.__name__ if name is None else name ] = f return f + + +#################################### +# Shared factors for pipeline tests. +#################################### + +class AssetID(CustomFactor): + """ + CustomFactor that returns the AssetID of each asset. + + Useful for providing a Factor that produces a different value for each + asset. + """ + window_length = 1 + inputs = () + + def compute(self, today, assets, out): + out[:] = assets + + +class AssetIDPlusDay(CustomFactor): + window_length = 1 + inputs = () + + def compute(self, today, assets, out): + out[:] = assets + today.day + + +class OpenPrice(CustomFactor): + window_length = 1 + inputs = [USEquityPricing.open] + + def compute(self, today, assets, out, open): + out[:] = open