Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run chunked pipeline #1811

Merged
merged 6 commits into from Jun 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion etc/conda_build_matrix.py
Expand Up @@ -4,7 +4,7 @@

import click

py_versions = ('2.7', '3.4')
py_versions = ('2.7', '3.4', '3.5')
npy_versions = ('1.9', '1.10')
zipline_path = os.path.join(
os.path.dirname(__file__),
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -305,6 +305,7 @@ def setup_requirements(requirements_path, module_names, strict_bounds,
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Operating System :: OS Independent',
'Intended Audience :: Science/Research',
'Topic :: Office/Business :: Financial',
Expand Down
34 changes: 34 additions & 0 deletions tests/pipeline/test_engine.py
Expand Up @@ -51,6 +51,7 @@
ExponentialWeightedMovingAverage,
ExponentialWeightedMovingStdDev,
MaxDrawdown,
Returns,
SimpleMovingAverage,
)
from zipline.pipeline.loaders.equity_pricing_loader import (
Expand All @@ -77,6 +78,7 @@
)
from zipline.testing.fixtures import (
WithAdjustmentReader,
WithEquityPricingPipelineEngine,
WithSeededRandomPipelineEngine,
WithTradingEnvironment,
ZiplineTestCase,
Expand Down Expand Up @@ -1497,3 +1499,35 @@ def dispatcher(c):
precomputed_term_value,
),
)


class ChunkedPipelineTestCase(WithEquityPricingPipelineEngine,
ZiplineTestCase):

PIPELINE_START_DATE = Timestamp('2006-01-05', tz='UTC')
END_DATE = Timestamp('2006-12-29', tz='UTC')

def test_run_chunked_pipeline(self):
"""
Test that running a pipeline in chunks produces the same result as if
it were run all at once
"""
pipe = Pipeline(
columns={
'close': USEquityPricing.close.latest,
'returns': Returns(window_length=2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include other columns of different dtypes to exercise the chunking categorical concat logic?

'categorical': USEquityPricing.close.latest.quantiles(5)
},
)
pipeline_result = self.pipeline_engine.run_pipeline(
pipe,
start_date=self.PIPELINE_START_DATE,
end_date=self.END_DATE,
)
chunked_result = self.pipeline_engine.run_chunked_pipeline(
pipeline=pipe,
start_date=self.PIPELINE_START_DATE,
end_date=self.END_DATE,
chunksize=22
)
self.assertTrue(chunked_result.equals(pipeline_result))
86 changes: 86 additions & 0 deletions tests/utils/test_date_utils.py
@@ -0,0 +1,86 @@
from pandas import Timestamp
from nose_parameterized import parameterized

from zipline.testing import ZiplineTestCase
from zipline.utils.calendars import get_calendar
from zipline.utils.date_utils import compute_date_range_chunks


def T(s):
"""
Helpful function to improve readibility.
"""
return Timestamp(s, tz='UTC')


class TestDateUtils(ZiplineTestCase):

@classmethod
def init_class_fixtures(cls):
super(TestDateUtils, cls).init_class_fixtures()
cls.calendar = get_calendar('NYSE')

@parameterized.expand([
(None, [(T('2017-01-03'), T('2017-01-31'))]),
(10, [
(T('2017-01-03'), T('2017-01-17')),
(T('2017-01-18'), T('2017-01-31'))
]),
(15, [
(T('2017-01-03'), T('2017-01-24')),
(T('2017-01-25'), T('2017-01-31'))
]),
])
def test_compute_date_range_chunks(self, chunksize, expected):
# This date range results in 20 business days
start_date = T('2017-01-03')
end_date = T('2017-01-31')

date_ranges = compute_date_range_chunks(
self.calendar.all_sessions,
start_date,
end_date,
chunksize
)

self.assertListEqual(list(date_ranges), expected)

def test_compute_date_range_chunks_invalid_input(self):
# Start date not found in calendar
with self.assertRaises(KeyError) as cm:
compute_date_range_chunks(
self.calendar.all_sessions,
T('2017-05-07'), # Sunday
T('2017-06-01'),
None
)
self.assertEqual(
str(cm.exception),
"'Start date 2017-05-07 is not found in calendar.'"
)

# End date not found in calendar
with self.assertRaises(KeyError) as cm:
compute_date_range_chunks(
self.calendar.all_sessions,
T('2017-05-01'),
T('2017-05-27'), # Saturday
None
)
self.assertEqual(
str(cm.exception),
"'End date 2017-05-27 is not found in calendar.'"
)

# End date before start date
with self.assertRaises(ValueError) as cm:
compute_date_range_chunks(
self.calendar.all_sessions,
T('2017-06-01'),
T('2017-05-01'),
None
)
self.assertEqual(
str(cm.exception),
"End date 2017-05-01 cannot precede start date 2017-06-01."
)
108 changes: 107 additions & 1 deletion tests/utils/test_pandas_utils.py
Expand Up @@ -4,7 +4,11 @@
import pandas as pd

from zipline.testing import parameter_space, ZiplineTestCase
from zipline.utils.pandas_utils import nearest_unequal_elements
from zipline.testing.predicates import assert_equal
from zipline.utils.pandas_utils import (
categorical_df_concat,
nearest_unequal_elements
)


class TestNearestUnequalElements(ZiplineTestCase):
Expand Down Expand Up @@ -80,3 +84,105 @@ def test_nearest_unequal_bad_input(self):
str(e.exception),
'dts must be sorted in increasing order',
)


class TestCatDFConcat(ZiplineTestCase):

def test_categorical_df_concat(self):

inp = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
'C': pd.Series(['x', 'x', 'x'], dtype='category'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'B': pd.Series([103, 102, 104], dtype='int64'),
'C': pd.Series(['y', 'y', 'y'], dtype='category'),
}
),
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'd'], dtype='category'),
'B': pd.Series([101, 102, 104], dtype='int64'),
'C': pd.Series(['z', 'z', 'z'], dtype='category'),
}
),
]
result = categorical_df_concat(inp)

expected = pd.DataFrame(
{
'A': pd.Series(
['a', 'b', 'c', 'c', 'b', 'd', 'a', 'b', 'd'],
dtype='category'
),
'B': pd.Series(
[100, 102, 103, 103, 102, 104, 101, 102, 104],
dtype='int64'
),
'C': pd.Series(
['x', 'x', 'x', 'y', 'y', 'y', 'z', 'z', 'z'],
dtype='category'
),
},
)
expected.index = pd.Int64Index([0, 1, 2, 0, 1, 2, 0, 1, 2])
assert_equal(expected, result)
assert_equal(
expected['A'].cat.categories,
result['A'].cat.categories
)
assert_equal(
expected['C'].cat.categories,
result['C'].cat.categories
)

def test_categorical_df_concat_value_error(self):

mismatched_dtypes = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'B': pd.Series([103, 102, 104], dtype='float64'),
}
),
]
mismatched_column_names = [
pd.DataFrame(
{
'A': pd.Series(['a', 'b', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'b', 'd'], dtype='category'),
'X': pd.Series([103, 102, 104], dtype='int64'),
}
),
]

with self.assertRaises(ValueError) as cm:
categorical_df_concat(mismatched_dtypes)
self.assertEqual(
str(cm.exception),
"Input DataFrames must have the same columns/dtypes."
)

with self.assertRaises(ValueError) as cm:
categorical_df_concat(mismatched_column_names)
self.assertEqual(
str(cm.exception),
"Input DataFrames must have the same columns/dtypes."
)
21 changes: 21 additions & 0 deletions tests/utils/test_sharedoc.py
@@ -0,0 +1,21 @@
from zipline.testing import ZiplineTestCase
from zipline.utils.sharedoc import copydoc


class TestSharedoc(ZiplineTestCase):

def test_copydoc(self):
def original_docstring_function():
"""
My docstring brings the boys to the yard.
"""
pass

@copydoc(original_docstring_function)
def copied_docstring_function():
pass

self.assertEqual(
original_docstring_function.__doc__,
copied_docstring_function.__doc__
)