Skip to content

Commit

Permalink
Merge 7867946 into 9fe8076
Browse files Browse the repository at this point in the history
  • Loading branch information
analicia committed Jun 2, 2017
2 parents 9fe8076 + 7867946 commit acafff4
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 13 deletions.
34 changes: 34 additions & 0 deletions tests/pipeline/test_engine.py
Expand Up @@ -51,6 +51,7 @@
ExponentialWeightedMovingAverage,
ExponentialWeightedMovingStdDev,
MaxDrawdown,
Returns,
SimpleMovingAverage,
)
from zipline.pipeline.loaders.equity_pricing_loader import (
Expand All @@ -77,6 +78,7 @@
)
from zipline.testing.fixtures import (
WithAdjustmentReader,
WithEquityPricingPipelineEngine,
WithSeededRandomPipelineEngine,
WithTradingEnvironment,
ZiplineTestCase,
Expand Down Expand Up @@ -1497,3 +1499,35 @@ def dispatcher(c):
precomputed_term_value,
),
)


class ChunkedPipelineTestCase(WithEquityPricingPipelineEngine,
ZiplineTestCase):

PIPELINE_START_DATE = Timestamp('2006-01-05', tz='UTC')
END_DATE = Timestamp('2006-12-29', tz='UTC')

def test_run_chunked_pipeline(self):
"""
Test that running a pipeline in chunks produces the same result as if
it were run all at once
"""
pipe = Pipeline(
columns={
'close': USEquityPricing.close.latest,
'returns': Returns(window_length=2),
'categorical': USEquityPricing.close.latest.quantiles(5)
},
)
pipeline_result = self.pipeline_engine.run_pipeline(
pipe,
start_date=self.PIPELINE_START_DATE,
end_date=self.END_DATE,
)
chunked_result = self.pipeline_engine.run_chunked_pipeline(
pipeline=pipe,
start_date=self.PIPELINE_START_DATE,
end_date=self.END_DATE,
chunksize=22
)
self.assertTrue(chunked_result.equals(pipeline_result))
86 changes: 86 additions & 0 deletions tests/utils/test_date_utils.py
@@ -0,0 +1,86 @@
from pandas import Timestamp
from nose_parameterized import parameterized

from zipline.testing import ZiplineTestCase
from zipline.utils.calendars import get_calendar
from zipline.utils.date_utils import compute_date_range_chunks


def T(s):
"""
Helpful function to improve readibility.
"""
return Timestamp(s, tz='UTC')


class TestDateUtils(ZiplineTestCase):

@classmethod
def init_class_fixtures(cls):
super(TestDateUtils, cls).init_class_fixtures()
cls.calendar = get_calendar('NYSE')

@parameterized.expand([
(None, [(T('2017-01-03'), T('2017-01-31'))]),
(10, [
(T('2017-01-03'), T('2017-01-17')),
(T('2017-01-18'), T('2017-01-31'))
]),
(15, [
(T('2017-01-03'), T('2017-01-24')),
(T('2017-01-25'), T('2017-01-31'))
]),
])
def test_compute_date_range_chunks(self, chunksize, expected):
# This date range results in 20 business days
start_date = T('2017-01-03')
end_date = T('2017-01-31')

date_ranges = compute_date_range_chunks(
self.calendar.all_sessions,
start_date,
end_date,
chunksize
)

self.assertListEqual(list(date_ranges), expected)

def test_compute_date_range_chunks_invalid_input(self):
# Start date not found in calendar
with self.assertRaises(KeyError) as cm:
compute_date_range_chunks(
self.calendar.all_sessions,
T('2017-05-07'), # Sunday
T('2017-06-01'),
None
)
self.assertEqual(
str(cm.exception),
"'Start date 2017-05-07 is not found in calendar.'"
)

# End date not found in calendar
with self.assertRaises(KeyError) as cm:
compute_date_range_chunks(
self.calendar.all_sessions,
T('2017-05-01'),
T('2017-05-27'), # Saturday
None
)
self.assertEqual(
str(cm.exception),
"'End date 2017-05-27 is not found in calendar.'"
)

# End date before start date
with self.assertRaises(ValueError) as cm:
compute_date_range_chunks(
self.calendar.all_sessions,
T('2017-06-01'),
T('2017-05-01'),
None
)
self.assertEqual(
str(cm.exception),
"End date 2017-05-01 cannot precede start date 2017-06-01."
)
108 changes: 107 additions & 1 deletion tests/utils/test_pandas_utils.py
Expand Up @@ -4,7 +4,11 @@
import pandas as pd

from zipline.testing import parameter_space, ZiplineTestCase
from zipline.utils.pandas_utils import nearest_unequal_elements
from zipline.testing.predicates import assert_equal
from zipline.utils.pandas_utils import (
categorical_df_concat,
nearest_unequal_elements
)


class TestNearestUnequalElements(ZiplineTestCase):
Expand Down Expand Up @@ -80,3 +84,105 @@ def test_nearest_unequal_bad_input(self):
str(e.exception),
'dts must be sorted in increasing order',
)


class TestCatDFConcat(ZiplineTestCase):

def test_categorical_df_concat(self):

inp = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
'C': pd.Series(['x', 'x', 'x'], dtype='category'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'B': pd.Series([103, 102, 104], dtype='int64'),
'C': pd.Series(['y', 'y', 'y'], dtype='category'),
}
),
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'd'], dtype='category'),
'B': pd.Series([101, 102, 104], dtype='int64'),
'C': pd.Series(['z', 'z', 'z'], dtype='category'),
}
),
]
result = categorical_df_concat(inp)

expected = pd.DataFrame(
{
'A': pd.Series(
['a', 'b', 'c', 'c', 'b', 'd', 'a', 'b', 'd'],
dtype='category'
),
'B': pd.Series(
[100, 102, 103, 103, 102, 104, 101, 102, 104],
dtype='int64'
),
'C': pd.Series(
['x', 'x', 'x', 'y', 'y', 'y', 'z', 'z', 'z'],
dtype='category'
),
},
)
expected.index = pd.Int64Index([0, 1, 2, 0, 1, 2, 0, 1, 2])
assert_equal(expected, result)
assert_equal(
expected['A'].cat.categories,
result['A'].cat.categories
)
assert_equal(
expected['C'].cat.categories,
result['C'].cat.categories
)

def test_categorical_df_concat_value_error(self):

mismatched_dtypes = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'B': pd.Series([103, 102, 104], dtype='float64'),
}
),
]
mismatched_column_names = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'X': pd.Series([103, 102, 104], dtype='int64'),
}
),
]

with self.assertRaises(ValueError) as cm:
categorical_df_concat(mismatched_dtypes)
self.assertEqual(
str(cm.exception),
"Input DataFrames must have the same columns/dtypes."
)

with self.assertRaises(ValueError) as cm:
categorical_df_concat(mismatched_column_names)
self.assertEqual(
str(cm.exception),
"Input DataFrames must have the same columns/dtypes."
)
21 changes: 21 additions & 0 deletions tests/utils/test_sharedoc.py
@@ -0,0 +1,21 @@
from zipline.testing import ZiplineTestCase
from zipline.utils.sharedoc import copydoc


class TestSharedoc(ZiplineTestCase):

def test_copydoc(self):
def original_docstring_function():
"""
My docstring brings the boys to the yard.
"""
pass

@copydoc(original_docstring_function)
def copied_docstring_function():
pass

self.assertEqual(
original_docstring_function.__doc__,
copied_docstring_function.__doc__
)
67 changes: 64 additions & 3 deletions zipline/pipeline/engine.py
Expand Up @@ -27,6 +27,10 @@

from .term import AssetExists, InputDates, LoadableTerm

from zipline.utils.date_utils import compute_date_range_chunks
from zipline.utils.pandas_utils import categorical_df_concat
from zipline.utils.sharedoc import copydoc


class PipelineEngine(with_metaclass(ABCMeta)):

Expand Down Expand Up @@ -62,6 +66,45 @@ def run_pipeline(self, pipeline, start_date, end_date):
"""
raise NotImplementedError("run_pipeline")

@abstractmethod
def run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize):
"""
Compute values for `pipeline` in number of days equal to `chunksize`
and return stitched up result. Computing in chunks is useful for
pipelines computed over a long period of time.
Parameters
----------
pipeline : Pipeline
The pipeline to run.
start_date : pd.Timestamp
The start date to run the pipeline for.
end_date : pd.Timestamp
The end date to run the pipeline for.
chunksize : int or None
The number of days to execute at a time. If None, then
results will be calculated for entire date range at once.
Returns
-------
result : pd.DataFrame
A frame of computed results.
The columns `result` correspond to the entries of
`pipeline.columns`, which should be a dictionary mapping strings to
instances of `zipline.pipeline.term.Term`.
For each date between `start_date` and `end_date`, `result` will
contain a row for each asset that passed `pipeline.screen`. A
screen of None indicates that a row should be returned for each
asset that existed each day.
See Also
--------
:meth:`PipelineEngine.run_pipeline`
"""
raise NotImplementedError("run_chunked_pipeline")


class NoEngineRegistered(Exception):
"""
Expand All @@ -80,6 +123,12 @@ def run_pipeline(self, pipeline, start_date, end_date):
"resources were registered."
)

def run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize):
raise NoEngineRegistered(
"Attempted to run a chunked pipeline but no pipeline "
"resources were registered."
)


def default_populate_initial_workspace(initial_workspace,
root_mask_term,
Expand Down Expand Up @@ -114,7 +163,7 @@ def default_populate_initial_workspace(initial_workspace,
return initial_workspace


class SimplePipelineEngine(object):
class SimplePipelineEngine(PipelineEngine):
"""
PipelineEngine class that computes each term independently.
Expand Down Expand Up @@ -146,7 +195,6 @@ class SimplePipelineEngine(object):
'_root_mask_term',
'_root_mask_dates_term',
'_populate_initial_workspace',
'__weakref__',
)

def __init__(self,
Expand Down Expand Up @@ -210,7 +258,8 @@ def run_pipeline(self, pipeline, start_date, end_date):
See Also
--------
PipelineEngine.run_pipeline
:meth:`PipelineEngine.run_pipeline`
:meth:`PipelineEngine.run_chunked_pipeline`
"""
if end_date < start_date:
raise ValueError(
Expand Down Expand Up @@ -256,6 +305,18 @@ def run_pipeline(self, pipeline, start_date, end_date):
assets,
)

@copydoc(PipelineEngine.run_chunked_pipeline)
def run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize):
ranges = compute_date_range_chunks(
self._calendar,
start_date,
end_date,
chunksize,
)
chunks = [self.run_pipeline(pipeline, s, e) for s, e in ranges]

return categorical_df_concat(chunks, inplace=True)

def _compute_root_mask(self, start_date, end_date, extra_rows):
"""
Compute a lifetimes matrix from our AssetFinder, then drop columns that
Expand Down

0 comments on commit acafff4

Please sign in to comment.