Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: improve performance of blaze core loader #1227

Merged
merged 5 commits into from Jun 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions etc/requirements_blaze.txt
@@ -1,3 +1,3 @@
-e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev
-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev
-e git://github.com/quantopian/blaze.git@9c3fa1327236f777ca112a5bd8c3bb7e442d1052#egg=blaze-dev
-e git://github.com/quantopian/datashape.git@bf06a41dc0908baf7c324aeacadba8820468ee78#egg=datashape-dev
-e git://github.com/quantopian/odo.git@9e16310b5f2c3f05162145200db7e7908f0a866e#egg=odo-dev
-e git://github.com/quantopian/blaze.git@8921fdd00bb040c61457937902036de5c404b6f3#egg=blaze-dev
37 changes: 2 additions & 35 deletions zipline/pipeline/loaders/blaze/core.py
Expand Up @@ -127,7 +127,7 @@
from abc import ABCMeta, abstractproperty
from collections import namedtuple, defaultdict
from copy import copy
from functools import partial, reduce
from functools import partial
from itertools import count
import warnings
from weakref import WeakKeyDictionary
Expand All @@ -137,7 +137,6 @@
Date,
DateTime,
Option,
floating,
isrecord,
isscalar,
)
Expand Down Expand Up @@ -904,44 +903,12 @@ def where(e):
q : Expr
The query to run.
"""
def lower_for_col(column):
pred = e[TS_FIELD_NAME] <= lower_dt
colname = column.name
schema = e[colname].schema.measure
if isinstance(schema, Option):
pred &= e[colname].notnull()
schema = schema.ty
if schema in floating:
pred &= ~e[colname].isnan()

filtered = e[pred]
lower = filtered[TS_FIELD_NAME].max()
if have_sids:
# If we have sids, then we need to take the earliest of the
# greatest date that has a non-null value by sid.
lower = bz.by(
filtered[SID_FIELD_NAME],
timestamp=lower,
).timestamp.min()
return lower

lower = odo(
reduce(
bz.least,
map(lower_for_col, columns),
),
pd.Timestamp,
**odo_kwargs
)
if lower is pd.NaT:
lower = lower_dt
return e[
(e[TS_FIELD_NAME] >= lower) &
(e[TS_FIELD_NAME] <= upper_dt)
][added_query_fields + list(map(getname, columns))]

def collect_expr(e):
"""Execute and merge all of the per-column subqueries.
"""Materialize the expression as a dataframe.

Parameters
----------
Expand Down
11 changes: 10 additions & 1 deletion zipline/pipeline/loaders/utils.py
Expand Up @@ -6,6 +6,7 @@
from six.moves import zip

from zipline.utils.numpy_utils import categorical_dtype, NaTns
from zipline.utils.pandas_utils import mask_between_time


def next_event_frame(events_by_sid,
Expand Down Expand Up @@ -209,6 +210,9 @@ def normalize_data_query_bounds(lower, upper, time, tz):
return lower, upper


_midnight = datetime.time(0, 0)


def normalize_timestamp_to_query_time(df,
time,
tz,
Expand Down Expand Up @@ -246,7 +250,12 @@ def normalize_timestamp_to_query_time(df,

dtidx = pd.DatetimeIndex(df.loc[:, ts_field], tz='utc')
dtidx_local_time = dtidx.tz_convert(tz)
to_roll_forward = dtidx_local_time.time >= time
to_roll_forward = mask_between_time(
dtidx_local_time,
time,
_midnight,
include_end=False,
)
# for all of the times that are greater than our query time add 1
# day and truncate to the date
df.loc[to_roll_forward, ts_field] = (
Expand Down
93 changes: 91 additions & 2 deletions zipline/utils/pandas_utils.py
@@ -1,6 +1,9 @@
"""
Utilities for working with pandas objects.
"""
from itertools import product
import operator as op

import pandas as pd


Expand All @@ -15,6 +18,92 @@ def explode(df):

try:
# pandas 0.16 compat
sort_values = pd.DataFrame.sort_values
_df_sort_values = pd.DataFrame.sort_values
_series_sort_values = pd.Series.sort_values
except AttributeError:
sort_values = pd.DataFrame.sort
_df_sort_values = pd.DataFrame.sort
_series_sort_values = pd.Series.sort


def sort_values(ob, *args, **kwargs):
if isinstance(ob, pd.DataFrame):
return _df_sort_values(ob, *args, **kwargs)
elif isinstance(ob, pd.Series):
return _series_sort_values(ob, *args, **kwargs)
raise ValueError(
'sort_values expected a dataframe or series, not %s: %r' % (
type(ob).__name__, ob,
),
)


def _time_to_micros(time):
"""Convert a time into microseconds since midnight.

Parameters
----------
time : datetime.time
The time to convert.

Returns
-------
us : int
The number of microseconds since midnight.

Notes
-----
This does not account for leap seconds or daylight savings.
"""
seconds = time.hour * 60 * 60 + time.minute * 60 + time.second
return 1000000 * seconds + time.microsecond


_opmap = dict(zip(
product((True, False), repeat=3),
product((op.le, op.lt), (op.le, op.lt), (op.and_, op.or_)),
))


def mask_between_time(dts, start, end, include_start=True, include_end=True):
"""Return a mask of all of the datetimes in ``dts`` that are between
``start`` and ``end``.

Parameters
----------
dts : pd.DatetimeIndex
The index to mask.
start : time
Mask away times less than the start.
end : time
Mask away times greater than the end.
include_start : bool, optional
Inclusive on ``start``.
include_end : bool, optional
Inclusive on ``end``.

Returns
-------
mask : np.ndarray[bool]
A bool array masking ``dts``.

See Also
--------
:meth:`pandas.DatetimeIndex.indexer_between_time`
"""
# This function is adapted from
# `pandas.Datetime.Index.indexer_between_time` which was originally
# written by Wes McKinney, Chang She, and Grant Roch.
time_micros = dts._get_time_micros()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably comment here that this is extracted from indexer_between_times and give credit to the original author(s). (Looks like Wes McKinney, Chang She, and Grant Roch). We could also ask jreback if there's a preferred citation method for code that's ported out of pandas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I will put something in the Notes and check with Jeff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it needs to be in the docstring, since where the code came from isn't really public-facing info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're also still using DatetimeIndex._get_time_micros, which isn't public.

The implementation of that method (in 0.16 at least) is:

    def _get_time_micros(self):
        utc = _utc()
        values = self.asi8
        if self.tz is not None and self.tz is not utc:
            values = self._local_timestamps()
        return tslib.get_time_micros(values)

Where get_time_micros is a Cython function that looks like a vectorized version of _time_to_micros.
Should we at least use the tslib function instead of relying on a private DatetimeIndex method?

def get_time_micros(ndarray[int64_t] dtindex):
    """
    Datetime as int64 representation to a structured array of fields
    """
    cdef:
        Py_ssize_t i, n = len(dtindex)
        pandas_datetimestruct dts
        ndarray[int64_t] micros

    micros = np.empty(n, dtype=np.int64)

    for i in range(n):
        pandas_datetime_to_datetimestruct(dtindex[i], PANDAS_FR_ns, &dts)
        micros[i] = 1000000LL * (dts.hour * 60 * 60 +
                                 60 * dts.min + dts.sec) + dts.us

    return micros

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you suggesting inlining this to rely only on the tslib function? Isn't tslib also "private" but just not prefixed with "_"? If that is the case I would leave it like this to make it clear we are using some priivate methods.

start_micros = _time_to_micros(start)
end_micros = _time_to_micros(end)

left_op, right_op, join_op = _opmap[
bool(include_start),
bool(include_end),
start_micros <= end_micros,
]

return join_op(
left_op(start_micros, time_micros),
right_op(time_micros, end_micros),
)