From a1226faa3031636338e9240f29edc1ea3bcbf1fe Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Thu, 1 Dec 2016 12:38:35 -0500 Subject: [PATCH] BUG: fix blaze pipeline queries for asof_date --- tests/pipeline/test_blaze.py | 78 +++++++++++++++++++------- zipline/pipeline/loaders/blaze/core.py | 10 ++-- 2 files changed, 64 insertions(+), 24 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 4e71769412..68843a3353 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -857,28 +857,28 @@ def test_custom_query_time_tz(self): def test_id(self): """ input (self.df): - asof_date sid timestamp value - 0 2014-01-01 65 2014-01-01 0 - 1 2014-01-01 66 2014-01-01 1 - 2 2014-01-01 67 2014-01-01 2 - 3 2014-01-02 65 2014-01-02 1 - 4 2014-01-02 66 2014-01-02 2 - 5 2014-01-02 67 2014-01-02 3 - 6 2014-01-03 65 2014-01-03 2 - 7 2014-01-03 66 2014-01-03 3 - 8 2014-01-03 67 2014-01-03 4 + asof_date sid timestamp int_value value + 0 2014-01-01 65 2014-01-01 0 0 + 1 2014-01-01 66 2014-01-01 1 1 + 2 2014-01-01 67 2014-01-01 2 2 + 3 2014-01-02 65 2014-01-02 1 1 + 4 2014-01-02 66 2014-01-02 2 2 + 5 2014-01-02 67 2014-01-02 3 3 + 6 2014-01-03 65 2014-01-03 2 2 + 7 2014-01-03 66 2014-01-03 3 3 + 8 2014-01-03 67 2014-01-03 4 4 output (expected) - value - 2014-01-01 Equity(65 [A]) 0 - Equity(66 [B]) 1 - Equity(67 [C]) 2 - 2014-01-02 Equity(65 [A]) 1 - Equity(66 [B]) 2 - Equity(67 [C]) 3 - 2014-01-03 Equity(65 [A]) 2 - Equity(66 [B]) 3 - Equity(67 [C]) 4 + int_value value + 2014-01-01 Equity(65 [A]) 0 0 + Equity(66 [B]) 1 1 + Equity(67 [C]) 2 2 + 2014-01-02 Equity(65 [A]) 1 1 + Equity(66 [B]) 2 2 + Equity(67 [C]) 3 3 + 2014-01-03 Equity(65 [A]) 2 2 + Equity(66 [B]) 3 3 + Equity(67 [C]) 4 4 """ expected = self.df.drop('asof_date', axis=1).set_index( ['timestamp', 'sid'], @@ -892,6 +892,44 @@ def test_id(self): ('int_value', 'value',) ) + def test_id_with_asof_date(self): + """ + input (self.df): + asof_date sid timestamp int_value value + 0 2014-01-01 65 2014-01-01 0 0 + 1 2014-01-01 66 2014-01-01 1 1 + 2 2014-01-01 67 2014-01-01 2 2 + 3 2014-01-02 65 2014-01-02 1 1 + 4 2014-01-02 66 2014-01-02 2 2 + 5 2014-01-02 67 2014-01-02 3 3 + 6 2014-01-03 65 2014-01-03 2 2 + 7 2014-01-03 66 2014-01-03 3 3 + 8 2014-01-03 67 2014-01-03 4 4 + + output (expected) + asof_date + 2014-01-01 Equity(65 [A]) 2014-01-01 + Equity(66 [B]) 2014-01-01 + Equity(67 [C]) 2014-01-01 + 2014-01-02 Equity(65 [A]) 2014-01-02 + Equity(66 [B]) 2014-01-02 + Equity(67 [C]) 2014-01-02 + 2014-01-03 Equity(65 [A]) 2014-01-03 + Equity(66 [B]) 2014-01-03 + Equity(67 [C]) 2014-01-03 + """ + expected = self.df.drop(['value', 'int_value'], axis=1).set_index( + ['timestamp', 'sid'], + ) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + self.asset_finder.retrieve_all(expected.index.levels[1]), + )) + self._test_id( + self.df, self.dshape, expected, self.asset_finder, + ('asof_date',) + ) + def test_id_ffill_out_of_window(self): """ input (df): diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 145c503d14..007550789f 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -988,10 +988,11 @@ def _load_dataset(self, dates, assets, mask, columns): have_sids = (dataset.ndim == 2) asset_idx = pd.Series(index=assets, data=np.arange(len(assets))) assets = list(map(int, assets)) # coerce from numpy.int64 - added_query_fields = [AD_FIELD_NAME, TS_FIELD_NAME] + ( - [SID_FIELD_NAME] if have_sids else [] + added_query_fields = {AD_FIELD_NAME, TS_FIELD_NAME} | ( + {SID_FIELD_NAME} if have_sids else set() ) - colnames = added_query_fields + list(map(getname, columns)) + requested_columns = set(map(getname, columns)) + colnames = sorted(added_query_fields | requested_columns) data_query_time = self._data_query_time data_query_tz = self._data_query_tz @@ -1078,7 +1079,8 @@ def collect_expr(e, lower): materialized_deltas, dates, ) - sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True) + if AD_FIELD_NAME not in requested_columns: + sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True) sparse_deltas = last_in_date_group(non_novel_deltas, dates,