From d2dff0ad01d5d3464bd0e1c773b3180a85d30452 Mon Sep 17 00:00:00 2001 From: Ana Ruelas Date: Fri, 19 May 2017 13:59:41 -0400 Subject: [PATCH 1/6] ENH: Add function to concatenate list of dataframes with categoricals --- tests/utils/test_pandas_utils.py | 100 ++++++++++++++++++++++++++++++- zipline/utils/pandas_utils.py | 43 +++++++++++++ 2 files changed, 142 insertions(+), 1 deletion(-) diff --git a/tests/utils/test_pandas_utils.py b/tests/utils/test_pandas_utils.py index e84ed3fece..870a99ebc1 100644 --- a/tests/utils/test_pandas_utils.py +++ b/tests/utils/test_pandas_utils.py @@ -4,7 +4,11 @@ import pandas as pd from zipline.testing import parameter_space, ZiplineTestCase -from zipline.utils.pandas_utils import nearest_unequal_elements +from zipline.testing.predicates import assert_equal +from zipline.utils.pandas_utils import ( + categorical_df_concat, + nearest_unequal_elements +) class TestNearestUnequalElements(ZiplineTestCase): @@ -80,3 +84,97 @@ def test_nearest_unequal_bad_input(self): str(e.exception), 'dts must be sorted in increasing order', ) + + +class TestCatDFConcat(ZiplineTestCase): + + def test_categorical_df_concat(self): + + inp = [ + pd.DataFrame( + { + 'A': pd.Series(['a', 'b', 'c'], dtype='category'), + 'B': pd.Series([100, 102, 103], dtype='int64'), + 'C': pd.Series(['x', 'x', 'x'], dtype='category'), + } + ), + pd.DataFrame( + { + 'A': pd.Series(['c', 'b', 'd'], dtype='category'), + 'B': pd.Series([103, 102, 104], dtype='int64'), + 'C': pd.Series(['y', 'y', 'y'], dtype='category'), + } + ), + pd.DataFrame( + { + 'A': pd.Series(['a', 'b', 'd'], dtype='category'), + 'B': pd.Series([101, 102, 104], dtype='int64'), + 'C': pd.Series(['z', 'z', 'z'], dtype='category'), + } + ), + ] + result = categorical_df_concat(inp) + + expected = pd.DataFrame( + { + 'A': pd.Series( + ['a', 'b', 'c', 'c', 'b', 'd', 'a', 'b', 'd'], + dtype='category' + ), + 'B': pd.Series( + [100, 102, 103, 103, 102, 104, 101, 102, 104], + dtype='int64' + ), + 'C': pd.Series( + ['x', 'x', 'x', 'y', 'y', 'y', 'z', 'z', 'z'], + dtype='category' + ), + }, + ) + expected.index = pd.Int64Index([0, 1, 2, 0, 1, 2, 0, 1, 2]) + assert_equal(expected, result) + assert_equal( + expected['A'].cat.categories, + result['A'].cat.categories + ) + assert_equal( + expected['C'].cat.categories, + result['C'].cat.categories + ) + + def test_categorical_df_concat_value_error(self): + + mismatched_dtypes = [ + pd.DataFrame( + { + 'A': pd.Series(['a', 'b', 'c'], dtype='category'), + 'B': pd.Series([100, 102, 103], dtype='int64'), + } + ), + pd.DataFrame( + { + 'A': pd.Series(['c', 'b', 'd'], dtype='category'), + 'B': pd.Series([103, 102, 104], dtype='float64'), + } + ), + ] + mismatched_column_names = [ + pd.DataFrame( + { + 'A': pd.Series(['a', 'b', 'c'], dtype='category'), + 'B': pd.Series([100, 102, 103], dtype='int64'), + } + ), + pd.DataFrame( + { + 'A': pd.Series(['c', 'b', 'd'], dtype='category'), + 'X': pd.Series([103, 102, 104], dtype='int64'), + } + ), + ] + + with self.assertRaises(ValueError): + categorical_df_concat(mismatched_dtypes) + + with self.assertRaises(ValueError): + categorical_df_concat(mismatched_column_names) diff --git a/zipline/utils/pandas_utils.py b/zipline/utils/pandas_utils.py index ccac273a95..39e4040b8f 100644 --- a/zipline/utils/pandas_utils.py +++ b/zipline/utils/pandas_utils.py @@ -2,6 +2,7 @@ Utilities for working with pandas objects. """ from contextlib import contextmanager +from copy import deepcopy from itertools import product import operator as op import warnings @@ -222,3 +223,45 @@ def clear_dataframe_indexer_caches(df): delattr(df, attr) except AttributeError: pass + + +def categorical_df_concat(df_list, inplace=False): + """ + Prepare list of pandas DataFrames to be used as input to pd.concat. + Ensure any columns of type 'category' have the same categories across each + dataframe. + + Parameters + ---------- + df_list : list + List of dataframes with same columns. + inplace : bool + True if input list can be modified. Default is False. + + Returns + ------- + concatenated : df + Dataframe of concatenated list. + """ + + if not inplace: + df_list = deepcopy(df_list) + + # Assert each dataframe has the same columns/dtypes + df = df_list[0] + if not all([(df.dtypes.equals(df_i.dtypes)) for df_i in df_list[1:]]): + raise ValueError("Input DataFrames must have the same columns/dtypes.") + + categorical_columns = df.columns[df.dtypes == 'category'] + + for col in categorical_columns: + new_categories = sorted( + set().union( + *(frame[col].cat.categories for frame in df_list) + ) + ) + + for df in df_list: + df[col].cat.set_categories(new_categories, inplace=True) + + return pd.concat(df_list) From 08072ac5952d800ade67a906c55b6af08fe068f7 Mon Sep 17 00:00:00 2001 From: Ana Ruelas Date: Fri, 19 May 2017 14:13:57 -0400 Subject: [PATCH 2/6] STY: Alphabetized import list --- zipline/utils/calendars/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/zipline/utils/calendars/__init__.py b/zipline/utils/calendars/__init__.py index d3f3ecf009..61c22e6ba7 100644 --- a/zipline/utils/calendars/__init__.py +++ b/zipline/utils/calendars/__init__.py @@ -15,20 +15,20 @@ from .trading_calendar import TradingCalendar from .calendar_utils import ( + clear_calendars, + deregister_calendar, get_calendar, - register_calendar_alias, register_calendar, + register_calendar_alias, register_calendar_type, - deregister_calendar, - clear_calendars ) __all__ = [ - 'TradingCalendar', 'clear_calendars', 'deregister_calendar', 'get_calendar', 'register_calendar', 'register_calendar_alias', 'register_calendar_type', + 'TradingCalendar', ] From 181b93785e3f4692eb46af8b8811d91522414395 Mon Sep 17 00:00:00 2001 From: Ana Ruelas Date: Fri, 19 May 2017 15:18:08 -0400 Subject: [PATCH 3/6] ENH: Include function to roll dates to last trading date, inclusive --- tests/utils/test_date_utils.py | 29 +++++++++++++++++++++++++++++ zipline/utils/date_utils.py | 21 +++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 tests/utils/test_date_utils.py create mode 100644 zipline/utils/date_utils.py diff --git a/tests/utils/test_date_utils.py b/tests/utils/test_date_utils.py new file mode 100644 index 0000000000..ab43e11e2d --- /dev/null +++ b/tests/utils/test_date_utils.py @@ -0,0 +1,29 @@ +from pandas import Timestamp + +from nose_parameterized import parameterized + +from zipline.testing import ZiplineTestCase +from zipline.utils.calendars import get_calendar +from zipline.utils.date_utils import roll_dates_to_previous_session + + +class TestRollDatesToPreviousSession(ZiplineTestCase): + + @parameterized.expand([ + ( + Timestamp('05-19-2017', tz='UTC'), # actual trading date + Timestamp('05-19-2017', tz='UTC'), + ), + ( + Timestamp('07-04-2015', tz='UTC'), # weekend nyse holiday + Timestamp('07-02-2015', tz='UTC'), + ), + ( + Timestamp('01-16-2017', tz='UTC'), # weeknight nyse holiday + Timestamp('01-13-2017', tz='UTC'), + ), + ]) + def test_roll_dates_to_previous_session(self, date, expected_rolled_date): + calendar = get_calendar('NYSE') + result = roll_dates_to_previous_session(calendar, date) + self.assertEqual(result[0], expected_rolled_date) diff --git a/zipline/utils/date_utils.py b/zipline/utils/date_utils.py new file mode 100644 index 0000000000..07ceeda910 --- /dev/null +++ b/zipline/utils/date_utils.py @@ -0,0 +1,21 @@ +def roll_dates_to_previous_session(calendar, *dates): + """ + Roll ``dates`` to the next session of ``calendar``. + + Parameters + ---------- + calendar : zipline.utils.calendars.trading_calendar.TradingCalendar + The calendar to use as a reference. + *dates : pd.Timestamp + The dates for which the last trading date is needed. + + Returns + ------- + rolled_dates: pandas.tseries.index.DatetimeIndex + The last trading date of the input dates, inclusive. + + """ + all_sessions = calendar.all_sessions + + locs = [all_sessions.get_loc(dt, method='ffill') for dt in dates] + return all_sessions[locs] From 591d55c42d559d631a29bfb373f74252e16127aa Mon Sep 17 00:00:00 2001 From: Ana Ruelas Date: Fri, 19 May 2017 16:43:31 -0400 Subject: [PATCH 4/6] ENH: Add function to calculate chunks within a date range --- tests/pipeline/test_caching.py | 271 +++++++++++++++++++++++++++++++++ tests/utils/test_date_utils.py | 70 ++++++++- zipline/pipeline/__init__.py | 2 + zipline/pipeline/caching.py | 204 +++++++++++++++++++++++++ zipline/utils/date_utils.py | 41 ++++- 5 files changed, 581 insertions(+), 7 deletions(-) create mode 100644 tests/pipeline/test_caching.py create mode 100644 zipline/pipeline/caching.py diff --git a/tests/pipeline/test_caching.py b/tests/pipeline/test_caching.py new file mode 100644 index 0000000000..bd68415b59 --- /dev/null +++ b/tests/pipeline/test_caching.py @@ -0,0 +1,271 @@ +import os + +import bcolz +import numpy as np +import pandas as pd + +from zipline.pipeline.caching import PipelineResult +from zipline.testing import ZiplineTestCase +from zipline.testing.core import tmp_dir +from zipline.testing.predicates import assert_equal + + +class PipelineResultTestCase(ZiplineTestCase): + + @classmethod + def init_class_fixtures(cls): + super(PipelineResultTestCase, cls).init_class_fixtures() + cls.dates = np.array( + [ + '2017-01-03', '2017-01-03', '2017-01-04', '2017-01-04', + '2017-01-05', '2017-01-05', '2017-01-06', '2017-01-06', + '2017-01-09', '2017-01-09', '2017-01-10', '2017-01-10', + ], + dtype='datetime64' + ) + cls.sid = np.tile([100, 8554], 6) + cls.returns = np.random.normal(.0015, .015, 12) + cls.close = np.random.normal(100, 10, 12) + cls.open = np.random.normal(100, 10, 12) + cls.version = 1 + cls.attrs = { + 'start_date': pd.Timestamp(cls.dates[0]).value, + 'end_date': pd.Timestamp(cls.dates[-1]).value, + 'version': cls.version + } + cls.columns = (cls.dates, cls.sid, cls.returns, cls.close, cls.open) + cls.data_columns = ['returns', 'close', 'open'] + cls.names = ['dates', 'sid', 'returns', 'close', 'open'] + cls.data_root_dir = cls.enter_class_context(tmp_dir()) + cls.caching_dir = cls.data_root_dir.makedir('caching') + + def make_test_ctable(self, + columns=None, + names=None, + attrs=None): + columns = self.columns if columns is None else columns + names = self.names if names is None else names + attrs = self.attrs if attrs is None else attrs + + ct = bcolz.ctable(columns=columns, names=names) + for key, value in attrs.items(): + ct.attrs[key] = value + return ct + + def test_pipeline_result_bad_init(self): + """ The columns 'dates' and 'sid' are required to create a + PipelineResult object. + """ + + table_missing_sid_column = self.make_test_ctable( + columns=(self.dates, self.returns), + names=['dates', 'returns'] + ) + with self.assertRaises(ValueError): + PipelineResult(table_missing_sid_column) + + table_missing_dates_column = self.make_test_ctable( + columns=(self.sid, self.returns), + names=['sid', 'returns'] + ) + with self.assertRaises(ValueError): + PipelineResult(table_missing_dates_column) + + table_missing_attrs_start_date = self.make_test_ctable( + attrs={'end_date': self.dates[-1], 'version': self.version} + ) + with self.assertRaises(ValueError): + PipelineResult(table_missing_attrs_start_date) + + table_missing_attrs_end_date = self.make_test_ctable( + attrs={'start_date': self.dates[0], 'version': self.version} + ) + with self.assertRaises(ValueError): + PipelineResult(table_missing_attrs_end_date) + + table_missing_attrs_version = self.make_test_ctable( + attrs={'start_date': self.dates[0], 'end_date': self.dates[-1]} + ) + with self.assertRaises(ValueError): + PipelineResult(table_missing_attrs_version) + + def test_pipeline_result_properties(self): + pr = PipelineResult(self.make_test_ctable()) + + self.assertEqual( + pr.start_date, + pd.Timestamp(self.dates[0], tz='UTC') + ) + self.assertEqual( + pr.end_date, + pd.Timestamp(self.dates[-1], tz='UTC') + ) + assert_equal(set(pr.term_names), set(self.data_columns)) + self.assertIsNone(pr.path) + assert_equal(pr.dates[:], self.dates) + assert_equal(pr.sids[:], self.sid) + + def test_pipeline_result_from_dataframe(self): + index = pd.MultiIndex.from_tuples( + tuple(zip(self.dates, self.sid)), + names=['dates', 'sid'] + ) + df = pd.DataFrame( + { + 'returns': self.returns, + 'open': self.open, + 'close': self.close, + }, + index=index + ) + pr = PipelineResult.from_dataframe(df) + + self.assertEqual( + pr.start_date, + pd.Timestamp(self.dates[0], tz='UTC') + ) + self.assertEqual( + pr.end_date, + pd.Timestamp(self.dates[-1], tz='UTC') + ) + assert_equal(set(pr.term_names), set(self.data_columns)) + self.assertIsNone(pr.path) + assert_equal(pr.dates[:], self.dates) + assert_equal(pr.sids[:], self.sid) + + def test_pipeline_result_from_data_frame_value_error(self): + df = pd.DataFrame({'returns': self.returns}, index=self.dates) + with self.assertRaises(ValueError): + PipelineResult.from_dataframe(df) + + def test_pipeline_result_to_dataframe(self): + pr = PipelineResult(self.make_test_ctable()) + df = pr.to_dataframe(PipelineResult.ALL_COLUMNS) + df_flat = df.reset_index(level=[0, 1]) + + assert_equal(np.array(df_flat['dates']), self.dates) + assert_equal(np.array(df_flat['sid']), self.sid) + assert_equal(np.array(df_flat['returns']), self.returns) + assert_equal(np.array(df_flat['close']), self.close) + assert_equal(np.array(df_flat['open']), self.open) + + def test_pipeline_result_to_dateframe_specific_column(self): + pr = PipelineResult(self.make_test_ctable()) + df = pr.to_dataframe(['returns']) + assert_equal(df.columns.tolist(), ['returns']) + + def test_pipeline_result_to_dataframe_value_error(self): + pr = PipelineResult(self.make_test_ctable()) + with self.assertRaises(ValueError): + pr.to_dataframe(['sid']) + with self.assertRaises(ValueError): + pr.to_dataframe(['dates']) + with self.assertRaises(ValueError): + pr.to_dataframe(['some other col']) + + def test_pipeline_result_write(self): + + pr = PipelineResult(self.make_test_ctable()) + full_path = os.path.join(self.caching_dir, 'write_test_1') + pr.write(full_path) + + expected = pd.DataFrame( + { + 'dates': self.dates, + 'sid': self.sid, + 'returns': self.returns, + 'close': self.close, + 'open': self.open, + } + ) + result = bcolz.open(full_path).todataframe() + + self.assertEqual( + set(expected.columns), + set(result.columns) + ) + self.assertTrue(result[self.names].equals(expected[self.names])) + + def test_pipeline_result_write_specific_column(self): + column = 'returns' + result_columns = ['dates', 'sid', 'returns'] + + pr = PipelineResult(self.make_test_ctable()) + full_path = os.path.join(self.caching_dir, 'write_test_2') + pr.write(full_path, [column]) + expected = pd.DataFrame( + { + 'dates': self.dates, + 'sid': self.sid, + 'returns': self.returns, + } + ) + result = bcolz.open(full_path).todataframe() + self.assertEqual( + set(expected.columns), + set(result.columns) + ) + self.assertTrue( + result[result_columns].equals(expected[result_columns]) + ) + + def test_pipeline_result_write_specific_column_key_error(self): + + pr = PipelineResult(self.make_test_ctable()) + full_path = os.path.join(self.caching_dir, 'write_test_3') + + with self.assertRaises(ValueError): + pr.write(full_path, ['some_other_column']) + + def test_pipeline_result_dates_indexer(self): + pr = PipelineResult(self.make_test_ctable()) + indexer = pr.dates_indexer( + pd.Timestamp('2017-01-04', tz='UTC'), + pd.Timestamp('2017-01-09', tz='UTC') + ) + result = pd.DataFrame(pr[indexer])[self.names] + expected = pd.DataFrame( + { + 'dates': self.dates[2:-2], + 'sid': self.sid[2:-2], + 'returns': self.returns[2:-2], + 'close': self.close[2:-2], + 'open': self.open[2:-2], + } + )[self.names] + + self.assertTrue(result.equals(expected)) + + def test_pipeline_result_dates_indexer_bad_dates(self): + pr = PipelineResult(self.make_test_ctable()) + # start date is before the PipelineResult start date + with self.assertRaises(IndexError): + pr.dates_indexer( + pd.Timestamp('2016-12-31', tz='UTC'), + pd.Timestamp('2017-01-09', tz='UTC') + ) + # end date is after the PipelineResult end date + with self.assertRaises(IndexError): + pr.dates_indexer( + pd.Timestamp('2017-01-04', tz='UTC'), + pd.Timestamp('2017-05-23', tz='UTC') + ) + + def test_pipeline_result_open(self): + start_pr = PipelineResult(self.make_test_ctable()) + full_path = os.path.join(self.caching_dir, 'open_test_1') + start_pr.write(full_path) + + pr = PipelineResult.open(full_path) + self.assertEqual( + pr.start_date, + pd.Timestamp(self.dates[0], tz='UTC') + ) + self.assertEqual( + pr.end_date, + pd.Timestamp(self.dates[-1], tz='UTC') + ) + assert_equal(set(pr.term_names), set(self.data_columns)) + self.assertEqual(full_path, pr.path) + assert_equal(pr.dates[:], self.dates) + assert_equal(pr.sids[:], self.sid) diff --git a/tests/utils/test_date_utils.py b/tests/utils/test_date_utils.py index ab43e11e2d..9d0bc8e13c 100644 --- a/tests/utils/test_date_utils.py +++ b/tests/utils/test_date_utils.py @@ -4,10 +4,18 @@ from zipline.testing import ZiplineTestCase from zipline.utils.calendars import get_calendar -from zipline.utils.date_utils import roll_dates_to_previous_session +from zipline.utils.date_utils import ( + compute_date_range_chunks, + roll_dates_to_previous_session +) -class TestRollDatesToPreviousSession(ZiplineTestCase): +class TestDateUtils(ZiplineTestCase): + + @classmethod + def init_class_fixtures(cls): + super(TestDateUtils, cls).init_class_fixtures() + cls.calendar = get_calendar('NYSE') @parameterized.expand([ ( @@ -24,6 +32,58 @@ class TestRollDatesToPreviousSession(ZiplineTestCase): ), ]) def test_roll_dates_to_previous_session(self, date, expected_rolled_date): - calendar = get_calendar('NYSE') - result = roll_dates_to_previous_session(calendar, date) - self.assertEqual(result[0], expected_rolled_date) + self.assertEqual( + roll_dates_to_previous_session(self.calendar, date)[0], + expected_rolled_date + ) + + @parameterized.expand([ + ( + None, + [ + ( + Timestamp('01-03-2017', tz='UTC'), + Timestamp('01-31-2017', tz='UTC') + ) + ] + ), + ( + 10, + [ + ( + Timestamp('01-03-2017', tz='UTC'), + Timestamp('01-17-2017', tz='UTC') + ), + ( + Timestamp('01-18-2017', tz='UTC'), + Timestamp('01-31-2017', tz='UTC') + ) + ] + ), + ( + 15, + [ + ( + Timestamp('01-03-2017', tz='UTC'), + Timestamp('01-24-2017', tz='UTC') + ), + ( + Timestamp('01-25-2017', tz='UTC'), + Timestamp('01-31-2017', tz='UTC') + ) + ] + ), + ]) + def test_compute_date_range_chunks(self, chunksize, expected): + # These date ranges result in 20 business days + start_date = Timestamp('01-03-2017') + end_date = Timestamp('01-31-2017') + + date_ranges = compute_date_range_chunks( + self.calendar, + start_date, + end_date, + chunksize + ) + + self.assertListEqual(list(date_ranges), expected) diff --git a/zipline/pipeline/__init__.py b/zipline/pipeline/__init__.py index a169256bb9..131257530c 100644 --- a/zipline/pipeline/__init__.py +++ b/zipline/pipeline/__init__.py @@ -1,6 +1,7 @@ from __future__ import print_function from zipline.assets import AssetFinder +from .caching import PipelineResult from .classifiers import Classifier, CustomClassifier from .engine import SimplePipelineEngine from .factors import Factor, CustomFactor @@ -57,6 +58,7 @@ def engine_from_files(daily_bar_path, 'Factor', 'Filter', 'Pipeline', + 'PipelineResult', 'SimplePipelineEngine', 'Term', 'TermGraph', diff --git a/zipline/pipeline/caching.py b/zipline/pipeline/caching.py new file mode 100644 index 0000000000..9eae37190f --- /dev/null +++ b/zipline/pipeline/caching.py @@ -0,0 +1,204 @@ +import os + +import bcolz +import numpy as np +import pandas as pd + +from zipline.utils.sentinel import sentinel + + +class PipelineResult(object): + """An object for caching and reading cached pipeline results. + + Parameters + ---------- + ctable : bcolz.ctable + The ctable that backs this pipeline result. + + See Also + -------- + PipelineResult.from_dataframe + PipelineResult.open + """ + ALL_COLUMNS = sentinel( + 'ALL_COLUMNS', + "Sentinel indicating that all columns should be read/written.", + ) + + # The result file format version. If the result of running a pipeline + # change or we change the data format we need to increment this. + version = 1 + + _dates_column_name = 'dates' + _sid_column_name = 'sid' + _metadata_columns = frozenset({_dates_column_name, _sid_column_name}) + _metadata_attributes = frozenset({'start_date', 'end_date', 'version'}) + + def __init__(self, ctable): + if not self._metadata_columns <= set(ctable.names): + raise ValueError( + 'missing expected metadata columns: %r' % ( + self._metadata_columns - set(ctable.names) + ), + ) + if not self._metadata_attributes <= set(ctable.attrs.attrs): + raise ValueError( + 'missing expected attributes: %r' % ( + self._metadata_attributes - set(ctable.attrs.attrs) + ), + ) + if ctable.attrs['version'] != self.version: + raise ValueError( + 'mismatched result version, found %r expected %r' % ( + ctable.attrs['version'], + self.version, + ), + ) + + self._ctable = ctable + + def __getitem__(self, key): + return self._ctable[key] + + @property + def start_date(self): + return pd.Timestamp(self._ctable.attrs['start_date'], tz='utc') + + @property + def end_date(self): + return pd.Timestamp(self._ctable.attrs['end_date'], tz='utc') + + @property + def term_names(self): + return sorted(set(self._ctable.names) - self._metadata_columns) + + @property + def path(self): + return self._ctable.rootdir + + @property + def dates(self): + return self[self._dates_column_name] + + @property + def sids(self): + return self[self._sid_column_name] + + @classmethod + def from_dataframe(cls, df): + if (not isinstance(df.index, pd.MultiIndex) or + len(df.index.levels) != 2): + raise ValueError('expected a two level multi-indexed dataframe') + + df = df.reset_index(level=[0, 1]) + df.rename( + columns={ + 'level_0': cls._dates_column_name, + 'level_1': cls._sid_column_name, + }, + inplace=True, + ) + df[cls._sid_column_name] = df[cls._sid_column_name].astype(np.int64) + ctable = bcolz.ctable.fromdataframe(df) + dates = df[cls._dates_column_name] + ctable.attrs['start_date'] = dates.iloc[0].value + ctable.attrs['end_date'] = dates.iloc[-1].value + ctable.attrs['version'] = cls.version + return cls(ctable) + + def to_dataframe(self, columns): + """ + Convert the PipelineResult into a DataFrame. + + Parameters + ---------- + columns : list[str] or PipelineResult.ALL_COLUMNS + A list of strings indicating which columns to read into memory. + Passing ALL_COLUMNS indicates that all columns should be read. + """ + ctable = self._ctable + index_cols = [self._dates_column_name, self._sid_column_name] + + if columns is not self.ALL_COLUMNS: + bad_columns = set(columns) - set(self.term_names) + if bad_columns: + raise ValueError( + "Invalid columns: {bad}. Use result.term_names for " + " available columns.".format(bad=bad_columns) + ) + + ctable = ctable[columns + index_cols] + + return pd.DataFrame(ctable[:]).set_index(index_cols) + + def write(self, path, write_cols=ALL_COLUMNS): + """Write the result to a given location. + + Parameters + ---------- + path : str + The file path to write this result to. + write_cols : list + The names of the columns in the written file. If None, all columns + are used. + + """ + if write_cols is not self.ALL_COLUMNS: + invalid_columns = set(write_cols) - set(self.term_names) + if invalid_columns: + raise ValueError( + "Invalid columns: {0}. Use result.term_names for" + " list of available columns.".format(invalid_columns) + ) + index_cols = [self._dates_column_name, self._sid_column_name] + ctable = self._ctable[write_cols + index_cols] + for k, v in self._ctable.attrs: + ctable.attrs[k] = v + else: + ctable = self._ctable + + if (ctable.rootdir is not None and + os.path.abspath(ctable.rootdir) == os.path.abspath(path)): + ctable.flush() + else: + copy = ctable.copy(rootdir=path, mode='w') + for k, v in ctable.attrs: + copy.attrs[k] = v + copy.flush() + + @classmethod + def open(cls, path): + """Constructor from a filepath. + + Parameters + ---------- + path : str + The path to open. + """ + return cls(bcolz.open(path)) + + def dates_indexer(self, start_date, end_date): + """Create an indexer into the results for the given date range. + + Parameters + ---------- + start_date : pd.Timestamp + The starting date of the slice. + end_date : pd.Timestamp + The ending date of the slice. + + Returns + ------- + indexer : slice[int, int] + The slice into the other columns to get the needed data. + """ + if not (self.start_date <= start_date and + self.end_date >= end_date): + raise IndexError( + 'cannot build indexer for %s:%s' % (start_date, end_date), + ) + + dates = self.dates[:] + start_idx = np.searchsorted(dates, start_date.to_datetime64()) + end_idx = np.searchsorted(dates, end_date.to_datetime64(), 'right') + return np.s_[start_idx:end_idx] diff --git a/zipline/utils/date_utils.py b/zipline/utils/date_utils.py index 07ceeda910..3b3b94d00e 100644 --- a/zipline/utils/date_utils.py +++ b/zipline/utils/date_utils.py @@ -1,3 +1,6 @@ +from toolz import partition_all + + def roll_dates_to_previous_session(calendar, *dates): """ Roll ``dates`` to the next session of ``calendar``. @@ -11,11 +14,45 @@ def roll_dates_to_previous_session(calendar, *dates): Returns ------- - rolled_dates: pandas.tseries.index.DatetimeIndex + rolled_dates: (np.datetime64, np.datetime64) The last trading date of the input dates, inclusive. """ all_sessions = calendar.all_sessions locs = [all_sessions.get_loc(dt, method='ffill') for dt in dates] - return all_sessions[locs] + return all_sessions[locs].tolist() + + +def compute_date_range_chunks(calendar, start_date, end_date, chunksize): + """Compute the start and end dates to run a pipeline for. + + Parameters + ---------- + calendar : TradingCalendar + The trading calendar to align the dates with. + start_date : pd.Timestamp + The first date in the pipeline. + end_date : pd.Timestamp + The last date in the pipeline. + chunksize : int or None + The size of the chunks to run. + + Returns + ------- + ranges : iterable[(np.datetime64, np.datetime64)] + A sequence of start and end dates to run the pipeline for. + """ + if chunksize is None: + dates = roll_dates_to_previous_session(calendar, start_date, end_date) + + return [(dates[0], dates[1])] + + all_sessions = calendar.all_sessions + all_sessions.offset = None + start_ix, end_ix = all_sessions.slice_locs(start_date, end_date) + return ( + (r[0], r[-1]) for r in partition_all( + chunksize, all_sessions[start_ix:end_ix] + ) + ) From 18deaa033a5b32442cbfb5ff567b857c4c06c628 Mon Sep 17 00:00:00 2001 From: Ana Ruelas Date: Tue, 23 May 2017 16:56:05 -0400 Subject: [PATCH 5/6] ENH: Add function for running pipelines in chunks --- tests/pipeline/test_run_chunked_pipeline.py | 37 +++++++++++++++++++++ zipline/pipeline/__init__.py | 37 +++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 tests/pipeline/test_run_chunked_pipeline.py diff --git a/tests/pipeline/test_run_chunked_pipeline.py b/tests/pipeline/test_run_chunked_pipeline.py new file mode 100644 index 0000000000..2294b39759 --- /dev/null +++ b/tests/pipeline/test_run_chunked_pipeline.py @@ -0,0 +1,37 @@ +from zipline.pipeline import Pipeline, run_chunked_pipeline +from zipline.pipeline.data import USEquityPricing +from zipline.pipeline.factors import Returns +from zipline.testing import ZiplineTestCase +from zipline.testing.fixtures import WithEquityPricingPipelineEngine + + +class ChunkedPipelineTestCase(WithEquityPricingPipelineEngine, + ZiplineTestCase): + + def test_run_chunked_pipeline(self): + """ + Test that running a pipeline in chunks produces the same result as if + it were run all at once + """ + pipe = Pipeline( + columns={ + 'close': USEquityPricing.close.latest, + 'returns': Returns(window_length=2), + }, + ) + sessions = self.nyse_calendar.all_sessions + start_date = sessions[sessions.get_loc(self.START_DATE) + 2] + + pipeline_result = self.pipeline_engine.run_pipeline( + pipe, + start_date=start_date, + end_date=self.END_DATE, + ) + chunked_result = run_chunked_pipeline( + engine=self.pipeline_engine, + pipeline=pipe, + start_date=start_date, + end_date=self.END_DATE, + chunksize=22 + ) + self.assertTrue(chunked_result.equals(pipeline_result)) diff --git a/zipline/pipeline/__init__.py b/zipline/pipeline/__init__.py index 131257530c..7cbd486b0d 100644 --- a/zipline/pipeline/__init__.py +++ b/zipline/pipeline/__init__.py @@ -1,5 +1,8 @@ from __future__ import print_function from zipline.assets import AssetFinder +from zipline.utils.calendars import get_calendar +from zipline.utils.date_utils import compute_date_range_chunks +from zipline.utils.pandas_utils import categorical_df_concat from .caching import PipelineResult from .classifiers import Classifier, CustomClassifier @@ -48,6 +51,39 @@ def engine_from_files(daily_bar_path, ) +def run_chunked_pipeline(engine, pipeline, start_date, end_date, chunksize): + """Run a pipeline to collect the results. + + Parameters + ---------- + engine : Engine + The pipeline engine. + pipeline : Pipeline + The pipeline to run. + start_date : pd.Timestamp + The start date to run the pipeline for. + end_date : pd.Timestamp + The end date to run the pipeline for. + chunksize : int or None + The number of days to execute at a time. If this is None, all the days + will be run at once. + + Returns + ------- + results : dict[str, PipelineResult] + The results for each output term in the pipeline. + """ + ranges = compute_date_range_chunks( + get_calendar('NYSE'), + start_date, + end_date, + chunksize, + ) + chunks = [engine.run_pipeline(pipeline, s, e) for s, e in ranges] + + return categorical_df_concat(chunks, inplace=True) + + __all__ = ( 'Classifier', 'CustomFactor', @@ -60,6 +96,7 @@ def engine_from_files(daily_bar_path, 'Pipeline', 'PipelineResult', 'SimplePipelineEngine', + 'run_chunked_pipeline', 'Term', 'TermGraph', ) From 90ed74b9330966f4d0de1323575dd0bd78a9b8cc Mon Sep 17 00:00:00 2001 From: Ana Ruelas Date: Wed, 24 May 2017 17:44:45 -0400 Subject: [PATCH 6/6] BUG: Dont modify fixture object --- tests/pipeline/test_caching.py | 271 -------------------- tests/pipeline/test_run_chunked_pipeline.py | 2 +- zipline/pipeline/__init__.py | 3 +- zipline/pipeline/caching.py | 204 --------------- zipline/utils/date_utils.py | 1 - 5 files changed, 2 insertions(+), 479 deletions(-) delete mode 100644 tests/pipeline/test_caching.py delete mode 100644 zipline/pipeline/caching.py diff --git a/tests/pipeline/test_caching.py b/tests/pipeline/test_caching.py deleted file mode 100644 index bd68415b59..0000000000 --- a/tests/pipeline/test_caching.py +++ /dev/null @@ -1,271 +0,0 @@ -import os - -import bcolz -import numpy as np -import pandas as pd - -from zipline.pipeline.caching import PipelineResult -from zipline.testing import ZiplineTestCase -from zipline.testing.core import tmp_dir -from zipline.testing.predicates import assert_equal - - -class PipelineResultTestCase(ZiplineTestCase): - - @classmethod - def init_class_fixtures(cls): - super(PipelineResultTestCase, cls).init_class_fixtures() - cls.dates = np.array( - [ - '2017-01-03', '2017-01-03', '2017-01-04', '2017-01-04', - '2017-01-05', '2017-01-05', '2017-01-06', '2017-01-06', - '2017-01-09', '2017-01-09', '2017-01-10', '2017-01-10', - ], - dtype='datetime64' - ) - cls.sid = np.tile([100, 8554], 6) - cls.returns = np.random.normal(.0015, .015, 12) - cls.close = np.random.normal(100, 10, 12) - cls.open = np.random.normal(100, 10, 12) - cls.version = 1 - cls.attrs = { - 'start_date': pd.Timestamp(cls.dates[0]).value, - 'end_date': pd.Timestamp(cls.dates[-1]).value, - 'version': cls.version - } - cls.columns = (cls.dates, cls.sid, cls.returns, cls.close, cls.open) - cls.data_columns = ['returns', 'close', 'open'] - cls.names = ['dates', 'sid', 'returns', 'close', 'open'] - cls.data_root_dir = cls.enter_class_context(tmp_dir()) - cls.caching_dir = cls.data_root_dir.makedir('caching') - - def make_test_ctable(self, - columns=None, - names=None, - attrs=None): - columns = self.columns if columns is None else columns - names = self.names if names is None else names - attrs = self.attrs if attrs is None else attrs - - ct = bcolz.ctable(columns=columns, names=names) - for key, value in attrs.items(): - ct.attrs[key] = value - return ct - - def test_pipeline_result_bad_init(self): - """ The columns 'dates' and 'sid' are required to create a - PipelineResult object. - """ - - table_missing_sid_column = self.make_test_ctable( - columns=(self.dates, self.returns), - names=['dates', 'returns'] - ) - with self.assertRaises(ValueError): - PipelineResult(table_missing_sid_column) - - table_missing_dates_column = self.make_test_ctable( - columns=(self.sid, self.returns), - names=['sid', 'returns'] - ) - with self.assertRaises(ValueError): - PipelineResult(table_missing_dates_column) - - table_missing_attrs_start_date = self.make_test_ctable( - attrs={'end_date': self.dates[-1], 'version': self.version} - ) - with self.assertRaises(ValueError): - PipelineResult(table_missing_attrs_start_date) - - table_missing_attrs_end_date = self.make_test_ctable( - attrs={'start_date': self.dates[0], 'version': self.version} - ) - with self.assertRaises(ValueError): - PipelineResult(table_missing_attrs_end_date) - - table_missing_attrs_version = self.make_test_ctable( - attrs={'start_date': self.dates[0], 'end_date': self.dates[-1]} - ) - with self.assertRaises(ValueError): - PipelineResult(table_missing_attrs_version) - - def test_pipeline_result_properties(self): - pr = PipelineResult(self.make_test_ctable()) - - self.assertEqual( - pr.start_date, - pd.Timestamp(self.dates[0], tz='UTC') - ) - self.assertEqual( - pr.end_date, - pd.Timestamp(self.dates[-1], tz='UTC') - ) - assert_equal(set(pr.term_names), set(self.data_columns)) - self.assertIsNone(pr.path) - assert_equal(pr.dates[:], self.dates) - assert_equal(pr.sids[:], self.sid) - - def test_pipeline_result_from_dataframe(self): - index = pd.MultiIndex.from_tuples( - tuple(zip(self.dates, self.sid)), - names=['dates', 'sid'] - ) - df = pd.DataFrame( - { - 'returns': self.returns, - 'open': self.open, - 'close': self.close, - }, - index=index - ) - pr = PipelineResult.from_dataframe(df) - - self.assertEqual( - pr.start_date, - pd.Timestamp(self.dates[0], tz='UTC') - ) - self.assertEqual( - pr.end_date, - pd.Timestamp(self.dates[-1], tz='UTC') - ) - assert_equal(set(pr.term_names), set(self.data_columns)) - self.assertIsNone(pr.path) - assert_equal(pr.dates[:], self.dates) - assert_equal(pr.sids[:], self.sid) - - def test_pipeline_result_from_data_frame_value_error(self): - df = pd.DataFrame({'returns': self.returns}, index=self.dates) - with self.assertRaises(ValueError): - PipelineResult.from_dataframe(df) - - def test_pipeline_result_to_dataframe(self): - pr = PipelineResult(self.make_test_ctable()) - df = pr.to_dataframe(PipelineResult.ALL_COLUMNS) - df_flat = df.reset_index(level=[0, 1]) - - assert_equal(np.array(df_flat['dates']), self.dates) - assert_equal(np.array(df_flat['sid']), self.sid) - assert_equal(np.array(df_flat['returns']), self.returns) - assert_equal(np.array(df_flat['close']), self.close) - assert_equal(np.array(df_flat['open']), self.open) - - def test_pipeline_result_to_dateframe_specific_column(self): - pr = PipelineResult(self.make_test_ctable()) - df = pr.to_dataframe(['returns']) - assert_equal(df.columns.tolist(), ['returns']) - - def test_pipeline_result_to_dataframe_value_error(self): - pr = PipelineResult(self.make_test_ctable()) - with self.assertRaises(ValueError): - pr.to_dataframe(['sid']) - with self.assertRaises(ValueError): - pr.to_dataframe(['dates']) - with self.assertRaises(ValueError): - pr.to_dataframe(['some other col']) - - def test_pipeline_result_write(self): - - pr = PipelineResult(self.make_test_ctable()) - full_path = os.path.join(self.caching_dir, 'write_test_1') - pr.write(full_path) - - expected = pd.DataFrame( - { - 'dates': self.dates, - 'sid': self.sid, - 'returns': self.returns, - 'close': self.close, - 'open': self.open, - } - ) - result = bcolz.open(full_path).todataframe() - - self.assertEqual( - set(expected.columns), - set(result.columns) - ) - self.assertTrue(result[self.names].equals(expected[self.names])) - - def test_pipeline_result_write_specific_column(self): - column = 'returns' - result_columns = ['dates', 'sid', 'returns'] - - pr = PipelineResult(self.make_test_ctable()) - full_path = os.path.join(self.caching_dir, 'write_test_2') - pr.write(full_path, [column]) - expected = pd.DataFrame( - { - 'dates': self.dates, - 'sid': self.sid, - 'returns': self.returns, - } - ) - result = bcolz.open(full_path).todataframe() - self.assertEqual( - set(expected.columns), - set(result.columns) - ) - self.assertTrue( - result[result_columns].equals(expected[result_columns]) - ) - - def test_pipeline_result_write_specific_column_key_error(self): - - pr = PipelineResult(self.make_test_ctable()) - full_path = os.path.join(self.caching_dir, 'write_test_3') - - with self.assertRaises(ValueError): - pr.write(full_path, ['some_other_column']) - - def test_pipeline_result_dates_indexer(self): - pr = PipelineResult(self.make_test_ctable()) - indexer = pr.dates_indexer( - pd.Timestamp('2017-01-04', tz='UTC'), - pd.Timestamp('2017-01-09', tz='UTC') - ) - result = pd.DataFrame(pr[indexer])[self.names] - expected = pd.DataFrame( - { - 'dates': self.dates[2:-2], - 'sid': self.sid[2:-2], - 'returns': self.returns[2:-2], - 'close': self.close[2:-2], - 'open': self.open[2:-2], - } - )[self.names] - - self.assertTrue(result.equals(expected)) - - def test_pipeline_result_dates_indexer_bad_dates(self): - pr = PipelineResult(self.make_test_ctable()) - # start date is before the PipelineResult start date - with self.assertRaises(IndexError): - pr.dates_indexer( - pd.Timestamp('2016-12-31', tz='UTC'), - pd.Timestamp('2017-01-09', tz='UTC') - ) - # end date is after the PipelineResult end date - with self.assertRaises(IndexError): - pr.dates_indexer( - pd.Timestamp('2017-01-04', tz='UTC'), - pd.Timestamp('2017-05-23', tz='UTC') - ) - - def test_pipeline_result_open(self): - start_pr = PipelineResult(self.make_test_ctable()) - full_path = os.path.join(self.caching_dir, 'open_test_1') - start_pr.write(full_path) - - pr = PipelineResult.open(full_path) - self.assertEqual( - pr.start_date, - pd.Timestamp(self.dates[0], tz='UTC') - ) - self.assertEqual( - pr.end_date, - pd.Timestamp(self.dates[-1], tz='UTC') - ) - assert_equal(set(pr.term_names), set(self.data_columns)) - self.assertEqual(full_path, pr.path) - assert_equal(pr.dates[:], self.dates) - assert_equal(pr.sids[:], self.sid) diff --git a/tests/pipeline/test_run_chunked_pipeline.py b/tests/pipeline/test_run_chunked_pipeline.py index 2294b39759..533ccf06be 100644 --- a/tests/pipeline/test_run_chunked_pipeline.py +++ b/tests/pipeline/test_run_chunked_pipeline.py @@ -12,7 +12,7 @@ def test_run_chunked_pipeline(self): """ Test that running a pipeline in chunks produces the same result as if it were run all at once - """ + """ pipe = Pipeline( columns={ 'close': USEquityPricing.close.latest, diff --git a/zipline/pipeline/__init__.py b/zipline/pipeline/__init__.py index 7cbd486b0d..27b1f6284e 100644 --- a/zipline/pipeline/__init__.py +++ b/zipline/pipeline/__init__.py @@ -4,7 +4,6 @@ from zipline.utils.date_utils import compute_date_range_chunks from zipline.utils.pandas_utils import categorical_df_concat -from .caching import PipelineResult from .classifiers import Classifier, CustomClassifier from .engine import SimplePipelineEngine from .factors import Factor, CustomFactor @@ -70,7 +69,7 @@ def run_chunked_pipeline(engine, pipeline, start_date, end_date, chunksize): Returns ------- - results : dict[str, PipelineResult] + results : pd.DataFrame The results for each output term in the pipeline. """ ranges = compute_date_range_chunks( diff --git a/zipline/pipeline/caching.py b/zipline/pipeline/caching.py deleted file mode 100644 index 9eae37190f..0000000000 --- a/zipline/pipeline/caching.py +++ /dev/null @@ -1,204 +0,0 @@ -import os - -import bcolz -import numpy as np -import pandas as pd - -from zipline.utils.sentinel import sentinel - - -class PipelineResult(object): - """An object for caching and reading cached pipeline results. - - Parameters - ---------- - ctable : bcolz.ctable - The ctable that backs this pipeline result. - - See Also - -------- - PipelineResult.from_dataframe - PipelineResult.open - """ - ALL_COLUMNS = sentinel( - 'ALL_COLUMNS', - "Sentinel indicating that all columns should be read/written.", - ) - - # The result file format version. If the result of running a pipeline - # change or we change the data format we need to increment this. - version = 1 - - _dates_column_name = 'dates' - _sid_column_name = 'sid' - _metadata_columns = frozenset({_dates_column_name, _sid_column_name}) - _metadata_attributes = frozenset({'start_date', 'end_date', 'version'}) - - def __init__(self, ctable): - if not self._metadata_columns <= set(ctable.names): - raise ValueError( - 'missing expected metadata columns: %r' % ( - self._metadata_columns - set(ctable.names) - ), - ) - if not self._metadata_attributes <= set(ctable.attrs.attrs): - raise ValueError( - 'missing expected attributes: %r' % ( - self._metadata_attributes - set(ctable.attrs.attrs) - ), - ) - if ctable.attrs['version'] != self.version: - raise ValueError( - 'mismatched result version, found %r expected %r' % ( - ctable.attrs['version'], - self.version, - ), - ) - - self._ctable = ctable - - def __getitem__(self, key): - return self._ctable[key] - - @property - def start_date(self): - return pd.Timestamp(self._ctable.attrs['start_date'], tz='utc') - - @property - def end_date(self): - return pd.Timestamp(self._ctable.attrs['end_date'], tz='utc') - - @property - def term_names(self): - return sorted(set(self._ctable.names) - self._metadata_columns) - - @property - def path(self): - return self._ctable.rootdir - - @property - def dates(self): - return self[self._dates_column_name] - - @property - def sids(self): - return self[self._sid_column_name] - - @classmethod - def from_dataframe(cls, df): - if (not isinstance(df.index, pd.MultiIndex) or - len(df.index.levels) != 2): - raise ValueError('expected a two level multi-indexed dataframe') - - df = df.reset_index(level=[0, 1]) - df.rename( - columns={ - 'level_0': cls._dates_column_name, - 'level_1': cls._sid_column_name, - }, - inplace=True, - ) - df[cls._sid_column_name] = df[cls._sid_column_name].astype(np.int64) - ctable = bcolz.ctable.fromdataframe(df) - dates = df[cls._dates_column_name] - ctable.attrs['start_date'] = dates.iloc[0].value - ctable.attrs['end_date'] = dates.iloc[-1].value - ctable.attrs['version'] = cls.version - return cls(ctable) - - def to_dataframe(self, columns): - """ - Convert the PipelineResult into a DataFrame. - - Parameters - ---------- - columns : list[str] or PipelineResult.ALL_COLUMNS - A list of strings indicating which columns to read into memory. - Passing ALL_COLUMNS indicates that all columns should be read. - """ - ctable = self._ctable - index_cols = [self._dates_column_name, self._sid_column_name] - - if columns is not self.ALL_COLUMNS: - bad_columns = set(columns) - set(self.term_names) - if bad_columns: - raise ValueError( - "Invalid columns: {bad}. Use result.term_names for " - " available columns.".format(bad=bad_columns) - ) - - ctable = ctable[columns + index_cols] - - return pd.DataFrame(ctable[:]).set_index(index_cols) - - def write(self, path, write_cols=ALL_COLUMNS): - """Write the result to a given location. - - Parameters - ---------- - path : str - The file path to write this result to. - write_cols : list - The names of the columns in the written file. If None, all columns - are used. - - """ - if write_cols is not self.ALL_COLUMNS: - invalid_columns = set(write_cols) - set(self.term_names) - if invalid_columns: - raise ValueError( - "Invalid columns: {0}. Use result.term_names for" - " list of available columns.".format(invalid_columns) - ) - index_cols = [self._dates_column_name, self._sid_column_name] - ctable = self._ctable[write_cols + index_cols] - for k, v in self._ctable.attrs: - ctable.attrs[k] = v - else: - ctable = self._ctable - - if (ctable.rootdir is not None and - os.path.abspath(ctable.rootdir) == os.path.abspath(path)): - ctable.flush() - else: - copy = ctable.copy(rootdir=path, mode='w') - for k, v in ctable.attrs: - copy.attrs[k] = v - copy.flush() - - @classmethod - def open(cls, path): - """Constructor from a filepath. - - Parameters - ---------- - path : str - The path to open. - """ - return cls(bcolz.open(path)) - - def dates_indexer(self, start_date, end_date): - """Create an indexer into the results for the given date range. - - Parameters - ---------- - start_date : pd.Timestamp - The starting date of the slice. - end_date : pd.Timestamp - The ending date of the slice. - - Returns - ------- - indexer : slice[int, int] - The slice into the other columns to get the needed data. - """ - if not (self.start_date <= start_date and - self.end_date >= end_date): - raise IndexError( - 'cannot build indexer for %s:%s' % (start_date, end_date), - ) - - dates = self.dates[:] - start_idx = np.searchsorted(dates, start_date.to_datetime64()) - end_idx = np.searchsorted(dates, end_date.to_datetime64(), 'right') - return np.s_[start_idx:end_idx] diff --git a/zipline/utils/date_utils.py b/zipline/utils/date_utils.py index 3b3b94d00e..23d86da8cb 100644 --- a/zipline/utils/date_utils.py +++ b/zipline/utils/date_utils.py @@ -49,7 +49,6 @@ def compute_date_range_chunks(calendar, start_date, end_date, chunksize): return [(dates[0], dates[1])] all_sessions = calendar.all_sessions - all_sessions.offset = None start_ix, end_ix = all_sessions.slice_locs(start_date, end_date) return ( (r[0], r[-1]) for r in partition_all(