Skip to content

Commit

Permalink
Merge 90ed74b into 9fe8076
Browse files Browse the repository at this point in the history
  • Loading branch information
analicia committed May 31, 2017
2 parents 9fe8076 + 90ed74b commit 30a4116
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 5 deletions.
37 changes: 37 additions & 0 deletions tests/pipeline/test_run_chunked_pipeline.py
@@ -0,0 +1,37 @@
from zipline.pipeline import Pipeline, run_chunked_pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.factors import Returns
from zipline.testing import ZiplineTestCase
from zipline.testing.fixtures import WithEquityPricingPipelineEngine


class ChunkedPipelineTestCase(WithEquityPricingPipelineEngine,
ZiplineTestCase):

def test_run_chunked_pipeline(self):
"""
Test that running a pipeline in chunks produces the same result as if
it were run all at once
"""
pipe = Pipeline(
columns={
'close': USEquityPricing.close.latest,
'returns': Returns(window_length=2),
},
)
sessions = self.nyse_calendar.all_sessions
start_date = sessions[sessions.get_loc(self.START_DATE) + 2]

pipeline_result = self.pipeline_engine.run_pipeline(
pipe,
start_date=start_date,
end_date=self.END_DATE,
)
chunked_result = run_chunked_pipeline(
engine=self.pipeline_engine,
pipeline=pipe,
start_date=start_date,
end_date=self.END_DATE,
chunksize=22
)
self.assertTrue(chunked_result.equals(pipeline_result))
89 changes: 89 additions & 0 deletions tests/utils/test_date_utils.py
@@ -0,0 +1,89 @@
from pandas import Timestamp

from nose_parameterized import parameterized

from zipline.testing import ZiplineTestCase
from zipline.utils.calendars import get_calendar
from zipline.utils.date_utils import (
compute_date_range_chunks,
roll_dates_to_previous_session
)


class TestDateUtils(ZiplineTestCase):

@classmethod
def init_class_fixtures(cls):
super(TestDateUtils, cls).init_class_fixtures()
cls.calendar = get_calendar('NYSE')

@parameterized.expand([
(
Timestamp('05-19-2017', tz='UTC'), # actual trading date
Timestamp('05-19-2017', tz='UTC'),
),
(
Timestamp('07-04-2015', tz='UTC'), # weekend nyse holiday
Timestamp('07-02-2015', tz='UTC'),
),
(
Timestamp('01-16-2017', tz='UTC'), # weeknight nyse holiday
Timestamp('01-13-2017', tz='UTC'),
),
])
def test_roll_dates_to_previous_session(self, date, expected_rolled_date):
self.assertEqual(
roll_dates_to_previous_session(self.calendar, date)[0],
expected_rolled_date
)

@parameterized.expand([
(
None,
[
(
Timestamp('01-03-2017', tz='UTC'),
Timestamp('01-31-2017', tz='UTC')
)
]
),
(
10,
[
(
Timestamp('01-03-2017', tz='UTC'),
Timestamp('01-17-2017', tz='UTC')
),
(
Timestamp('01-18-2017', tz='UTC'),
Timestamp('01-31-2017', tz='UTC')
)
]
),
(
15,
[
(
Timestamp('01-03-2017', tz='UTC'),
Timestamp('01-24-2017', tz='UTC')
),
(
Timestamp('01-25-2017', tz='UTC'),
Timestamp('01-31-2017', tz='UTC')
)
]
),
])
def test_compute_date_range_chunks(self, chunksize, expected):
# These date ranges result in 20 business days
start_date = Timestamp('01-03-2017')
end_date = Timestamp('01-31-2017')

date_ranges = compute_date_range_chunks(
self.calendar,
start_date,
end_date,
chunksize
)

self.assertListEqual(list(date_ranges), expected)
100 changes: 99 additions & 1 deletion tests/utils/test_pandas_utils.py
Expand Up @@ -4,7 +4,11 @@
import pandas as pd

from zipline.testing import parameter_space, ZiplineTestCase
from zipline.utils.pandas_utils import nearest_unequal_elements
from zipline.testing.predicates import assert_equal
from zipline.utils.pandas_utils import (
categorical_df_concat,
nearest_unequal_elements
)


class TestNearestUnequalElements(ZiplineTestCase):
Expand Down Expand Up @@ -80,3 +84,97 @@ def test_nearest_unequal_bad_input(self):
str(e.exception),
'dts must be sorted in increasing order',
)


class TestCatDFConcat(ZiplineTestCase):

def test_categorical_df_concat(self):

inp = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
'C': pd.Series(['x', 'x', 'x'], dtype='category'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'B': pd.Series([103, 102, 104], dtype='int64'),
'C': pd.Series(['y', 'y', 'y'], dtype='category'),
}
),
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'd'], dtype='category'),
'B': pd.Series([101, 102, 104], dtype='int64'),
'C': pd.Series(['z', 'z', 'z'], dtype='category'),
}
),
]
result = categorical_df_concat(inp)

expected = pd.DataFrame(
{
'A': pd.Series(
['a', 'b', 'c', 'c', 'b', 'd', 'a', 'b', 'd'],
dtype='category'
),
'B': pd.Series(
[100, 102, 103, 103, 102, 104, 101, 102, 104],
dtype='int64'
),
'C': pd.Series(
['x', 'x', 'x', 'y', 'y', 'y', 'z', 'z', 'z'],
dtype='category'
),
},
)
expected.index = pd.Int64Index([0, 1, 2, 0, 1, 2, 0, 1, 2])
assert_equal(expected, result)
assert_equal(
expected['A'].cat.categories,
result['A'].cat.categories
)
assert_equal(
expected['C'].cat.categories,
result['C'].cat.categories
)

def test_categorical_df_concat_value_error(self):

mismatched_dtypes = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'B': pd.Series([103, 102, 104], dtype='float64'),
}
),
]
mismatched_column_names = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'X': pd.Series([103, 102, 104], dtype='int64'),
}
),
]

with self.assertRaises(ValueError):
categorical_df_concat(mismatched_dtypes)

with self.assertRaises(ValueError):
categorical_df_concat(mismatched_column_names)
38 changes: 38 additions & 0 deletions zipline/pipeline/__init__.py
@@ -1,5 +1,8 @@
from __future__ import print_function
from zipline.assets import AssetFinder
from zipline.utils.calendars import get_calendar
from zipline.utils.date_utils import compute_date_range_chunks
from zipline.utils.pandas_utils import categorical_df_concat

from .classifiers import Classifier, CustomClassifier
from .engine import SimplePipelineEngine
Expand Down Expand Up @@ -47,6 +50,39 @@ def engine_from_files(daily_bar_path,
)


def run_chunked_pipeline(engine, pipeline, start_date, end_date, chunksize):
"""Run a pipeline to collect the results.
Parameters
----------
engine : Engine
The pipeline engine.
pipeline : Pipeline
The pipeline to run.
start_date : pd.Timestamp
The start date to run the pipeline for.
end_date : pd.Timestamp
The end date to run the pipeline for.
chunksize : int or None
The number of days to execute at a time. If this is None, all the days
will be run at once.
Returns
-------
results : pd.DataFrame
The results for each output term in the pipeline.
"""
ranges = compute_date_range_chunks(
get_calendar('NYSE'),
start_date,
end_date,
chunksize,
)
chunks = [engine.run_pipeline(pipeline, s, e) for s, e in ranges]

return categorical_df_concat(chunks, inplace=True)


__all__ = (
'Classifier',
'CustomFactor',
Expand All @@ -57,7 +93,9 @@ def engine_from_files(daily_bar_path,
'Factor',
'Filter',
'Pipeline',
'PipelineResult',
'SimplePipelineEngine',
'run_chunked_pipeline',
'Term',
'TermGraph',
)
8 changes: 4 additions & 4 deletions zipline/utils/calendars/__init__.py
Expand Up @@ -15,20 +15,20 @@

from .trading_calendar import TradingCalendar
from .calendar_utils import (
clear_calendars,
deregister_calendar,
get_calendar,
register_calendar_alias,
register_calendar,
register_calendar_alias,
register_calendar_type,
deregister_calendar,
clear_calendars
)

__all__ = [
'TradingCalendar',
'clear_calendars',
'deregister_calendar',
'get_calendar',
'register_calendar',
'register_calendar_alias',
'register_calendar_type',
'TradingCalendar',
]
57 changes: 57 additions & 0 deletions zipline/utils/date_utils.py
@@ -0,0 +1,57 @@
from toolz import partition_all


def roll_dates_to_previous_session(calendar, *dates):
"""
Roll ``dates`` to the next session of ``calendar``.
Parameters
----------
calendar : zipline.utils.calendars.trading_calendar.TradingCalendar
The calendar to use as a reference.
*dates : pd.Timestamp
The dates for which the last trading date is needed.
Returns
-------
rolled_dates: (np.datetime64, np.datetime64)
The last trading date of the input dates, inclusive.
"""
all_sessions = calendar.all_sessions

locs = [all_sessions.get_loc(dt, method='ffill') for dt in dates]
return all_sessions[locs].tolist()


def compute_date_range_chunks(calendar, start_date, end_date, chunksize):
"""Compute the start and end dates to run a pipeline for.
Parameters
----------
calendar : TradingCalendar
The trading calendar to align the dates with.
start_date : pd.Timestamp
The first date in the pipeline.
end_date : pd.Timestamp
The last date in the pipeline.
chunksize : int or None
The size of the chunks to run.
Returns
-------
ranges : iterable[(np.datetime64, np.datetime64)]
A sequence of start and end dates to run the pipeline for.
"""
if chunksize is None:
dates = roll_dates_to_previous_session(calendar, start_date, end_date)

return [(dates[0], dates[1])]

all_sessions = calendar.all_sessions
start_ix, end_ix = all_sessions.slice_locs(start_date, end_date)
return (
(r[0], r[-1]) for r in partition_all(
chunksize, all_sessions[start_ix:end_ix]
)
)

0 comments on commit 30a4116

Please sign in to comment.