New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run chunked pipeline #1811

Merged
merged 6 commits into from Jun 2, 2017

Conversation

Projects
None yet
5 participants
@ahgnaw
Contributor

ahgnaw commented May 23, 2017

Running a pipeline in chunks can be more efficient when running over a long time period. This pr ultimately introduces a function to do just that, but other required functions are included.

Pipelines can sometimes have a column that is of type 'categorical', e.g. a country code. In order to combine the separate pipeline chunks, the categories of each column must be consolidated because pd.concat cannot concatenate two DataFrame columns with different categories. For this the function categorical_df_concat is added.

To roll back input to the last trading day (inclusive), the function roll_dates_to_previous_session is added.

Tests for each new function are included.

@ahgnaw ahgnaw force-pushed the run-chunked-pipeline branch from 43deef5 to 8329740 May 24, 2017

@twiecki

This comment has been minimized.

Contributor

twiecki commented May 31, 2017

This PR will be very helpful for the ML workflow.

@ahgnaw ahgnaw force-pushed the run-chunked-pipeline branch from dbb13de to f6190e6 May 31, 2017

@coveralls

This comment has been minimized.

coveralls commented May 31, 2017

Coverage Status

Coverage increased (+0.1%) to 87.723% when pulling f6190e6 on run-chunked-pipeline into 9fe8076 on master.

1 similar comment
@coveralls

This comment has been minimized.

coveralls commented May 31, 2017

Coverage Status

Coverage increased (+0.1%) to 87.723% when pulling f6190e6 on run-chunked-pipeline into 9fe8076 on master.

@ahgnaw ahgnaw force-pushed the run-chunked-pipeline branch from f6190e6 to 90ed74b May 31, 2017

@coveralls

This comment has been minimized.

coveralls commented May 31, 2017

Coverage Status

Coverage increased (+0.09%) to 87.671% when pulling 90ed74b on run-chunked-pipeline into 9fe8076 on master.

@coveralls

This comment has been minimized.

coveralls commented Jun 1, 2017

Coverage Status

Coverage increased (+0.03%) to 87.615% when pulling 741a5b5 on run-chunked-pipeline into 9fe8076 on master.

@coveralls

This comment has been minimized.

coveralls commented Jun 1, 2017

Coverage Status

Coverage increased (+0.03%) to 87.615% when pulling 741a5b5 on run-chunked-pipeline into 9fe8076 on master.

@ssanderson

@ahgnaw took a full pass on this.

@parameterized.expand([
(
Timestamp('05-19-2017', tz='UTC'), # actual trading date

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

Pretty much everywhere else in Zipline we use the ISO-8601 convention of YYYY-MM-DD for dates. Is there a reason for these to be different?

# Count strings of the form Timestamp(<4 numbers>-<2 numbers>-<2 numbers>).
$ git grep Timestamp\(\'[0-9]\\{4\\}-[0-9]\\{2\\}-[0-9]\\{2\\}\' | wc -l
977

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

FWIW, there's actually a real reason to prefer this convention besides consistency; there's a fast path in the pandas datetime parser for ISO-8601, so it's about 150x faster to parse strings in that format:

In [6]: %timeit pd.Timestamp('01-02-2017')
10000 loops, best of 3: 97.5 µs per loop

In [7]: %timeit pd.Timestamp('2017-01-02')
1000000 loops, best of 3: 650 ns per loop

This comment has been minimized.

@ahgnaw

ahgnaw Jun 1, 2017

Contributor

TIL

},
)
sessions = self.nyse_calendar.all_sessions
start_date = sessions[sessions.get_loc(self.START_DATE) + 2]

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

Since this test depends for correctness on the value of START_DATE (in particular, it needs to be long enough before END_DATE to force multiple chunk calculations), we should set START_DATE (and probably END_DATE as well for clarity) explicitly at class scope.

pipe = Pipeline(
columns={
'close': USEquityPricing.close.latest,
'returns': Returns(window_length=2),

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

Should we include other columns of different dtypes to exercise the chunking categorical concat logic?

None,
[
(
Timestamp('01-03-2017', tz='UTC'),

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

A challenge with this formatting style is that, because the parameterized inputs are so spread out, it's hard to see how they correspond to the test inputs.

One thing I've done elsewhere to avoid having a ton of indentation at sites like this is to define something like:

def T(s):
    return pd.Timestamp(s, tz='UTC')

at the top of a file.

@@ -210,7 +249,7 @@ def run_pipeline(self, pipeline, start_date, end_date):
See Also
--------
PipelineEngine.run_pipeline
:meth:`PipelineEngine.run_pipeline`

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

Should we throw run_chunked_pipeline on here as well now?

@@ -256,6 +295,36 @@ def run_pipeline(self, pipeline, start_date, end_date):
assets,
)
def run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize):
"""Run a pipeline to collect the results.

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

I'm not sure what "to collect the results" means here. Is there a reason to make this sentence different from the one in the interface definition? Honestly we could probably just copy the docstring over from the base class using something like the copydoc function we use downsteam:

# Taken from odo.utils.
@toolz.curry
def copydoc(from_, to):
    """Copies the docstring from one function to another.

    Parameters
    ----------
    from_ : any
        The object to copy the docstring from.
    to : any
        The object to copy the docstring to.

    Returns
    -------
    to : any
        ``to`` with the docstring from ``from_``
    """
    to.__doc__ = from_.__doc__
    return to
@abstractmethod
def run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize):
"""
Compute values for `pipeline` for date range chunks the size

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

Feels like there's a preposition missing in this sentence somewhere.

def run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize):
"""
Compute values for `pipeline` for date range chunks the size
`chunksize`. Computing in chunks could reduce the amount of memory

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

Is there something more specific we can say here besides 'could reduce the amount of memory'?

-------
rolled_dates: (np.datetime64, np.datetime64)
The last trading date of the input dates, inclusive.

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

Is the extra whitespace here intentional?

from toolz import partition_all
def roll_dates_to_previous_session(sessions, *dates):

This comment has been minimized.

@ssanderson

ssanderson Jun 1, 2017

Member

I don't think this function is actually used anywhere in zipline anymore?

@coveralls

This comment has been minimized.

coveralls commented Jun 1, 2017

Coverage Status

Coverage increased (+0.02%) to 87.608% when pulling 835889b on run-chunked-pipeline into 9fe8076 on master.

None
)
self.assertEqual(
cm.exception.__str__(),

This comment has been minimized.

@ssanderson

ssanderson Jun 2, 2017

Member

Usually the more idiomatic way to write this would just be str(cm.exception).

with self.assertRaises(ValueError) as cm:
categorical_df_concat(mismatched_dtypes)
self.assertEqual(
cm.exception.message,

This comment has been minimized.

@ssanderson

ssanderson Jun 2, 2017

Member

Exception.message is deprecated in python2 and doesn't exist in python 3. You just want str(cm.exception).

@@ -12,6 +12,7 @@
with_metaclass,
)
from numpy import array
from odo.utils import copydoc

This comment has been minimized.

@ssanderson

ssanderson Jun 2, 2017

Member

odo is an optional requirement for Zipline. We should just copy this function and put it somewhere like zipline.utils.sharedoc.

new_categories = sorted(
set().union(
*(frame[col].cat.categories for frame in df_list)
) - {None}

This comment has been minimized.

@ssanderson

ssanderson Jun 2, 2017

Member

Why are we removing None from the categories here? This will change results for missing values.

This comment has been minimized.

@ahgnaw

ahgnaw Jun 2, 2017

Contributor

None is a deprecated category and this could produce a FutureWarning. It doesn't matter regardless because an array entry missing a categorical code is assumed to be None.

Setting NaNs in `categories` is deprecated and will be removed in a future version of pandas.

This comment has been minimized.

@ssanderson

ssanderson Jun 2, 2017

Member

Right, this is a known issue that we've punted on updating because it's a breaking change for consumers of the API. In LabelArray.as_categorical, for example, we explicitly silence this warning.

The issue here is that when we first added support for Classifiers in Pipeline, the api allowed for consumers to specify how missing data should be represented. For strings data, we default to None as the missing value, which means that pipeline results for string-dtype classifiers contain None for locations where we don't have any data. When we upgraded to pandas 18, pandas started emitting warnings about None values in categories, but we ignored the warning because what pandas wants us to do instead is let it use its preferred value of NaN, which is a significant behavioral change for users. I think we want to revisit that issue at some point, but I don't think right now is the time for that change (my guess is that we'll deal with this the next time we upgrade pandas).

For the time being, my recommendation would be to wrap this for-loop in the same ignore_pandas_categorical_nan_warning block that we use in LabelArray.

This comment has been minimized.

@ssanderson

ssanderson Jun 2, 2017

Member

It doesn't matter regardless because an array entry missing a categorical code is assumed to be None.

To be clear, I don't think this is true; you'd get NaN back, rather than None, which has very different behavior. In particular, NaN compares unequal with any other python object, including itself, which means you need to test for its presence explicitly.

@ahgnaw ahgnaw force-pushed the run-chunked-pipeline branch from 835889b to 2e8e731 Jun 2, 2017

@coveralls

This comment has been minimized.

coveralls commented Jun 2, 2017

Coverage Status

Coverage increased (+0.08%) to 87.665% when pulling 7867946 on run-chunked-pipeline into 9fe8076 on master.

@coveralls

This comment has been minimized.

coveralls commented Jun 2, 2017

Coverage Status

Coverage increased (+0.08%) to 87.666% when pulling 7867946 on run-chunked-pipeline into 9fe8076 on master.

@ahgnaw ahgnaw force-pushed the run-chunked-pipeline branch from 7867946 to 74a6cc6 Jun 2, 2017

@coveralls

This comment has been minimized.

coveralls commented Jun 2, 2017

Coverage Status

Coverage increased (+0.08%) to 87.666% when pulling 74a6cc6 on run-chunked-pipeline into 9fe8076 on master.

@ssanderson

This comment has been minimized.

Member

ssanderson commented Jun 2, 2017

LGTM!

@ahgnaw ahgnaw force-pushed the run-chunked-pipeline branch from 74a6cc6 to 6b19e45 Jun 2, 2017

@coveralls

This comment has been minimized.

coveralls commented Jun 2, 2017

Coverage Status

Coverage increased (+0.08%) to 87.666% when pulling 6b19e45 on run-chunked-pipeline into 9fe8076 on master.

@ahgnaw ahgnaw merged commit 3081386 into master Jun 2, 2017

2 checks passed

continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details

@ahgnaw ahgnaw deleted the run-chunked-pipeline branch Jun 2, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment