Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Micro optimizations 2 #1561

Merged
merged 11 commits into from Oct 28, 2016
20 changes: 14 additions & 6 deletions setup.py
Expand Up @@ -78,18 +78,26 @@ def build_extensions(self):
return build_ext


def window_specialization(typename):
"""Make an extension for an AdjustedArrayWindow specialization."""
return Extension(
'zipline.lib._{name}window'.format(name=typename),
['zipline/lib/_{name}window.pyx'.format(name=typename)],
depends=['zipline/lib/_windowtemplate.pxi'],
)


ext_modules = [
Extension('zipline.assets._assets', ['zipline/assets/_assets.pyx']),
Extension('zipline.assets.continuous_futures',
['zipline/assets/continuous_futures.pyx']),
Extension('zipline.lib.adjustment', ['zipline/lib/adjustment.pyx']),
Extension('zipline.lib._factorize', ['zipline/lib/_factorize.pyx']),
Extension(
'zipline.lib._float64window', ['zipline/lib/_float64window.pyx']
),
Extension('zipline.lib._int64window', ['zipline/lib/_int64window.pyx']),
Extension('zipline.lib._uint8window', ['zipline/lib/_uint8window.pyx']),
Extension('zipline.lib._labelwindow', ['zipline/lib/_labelwindow.pyx']),
window_specialization('float64'),
window_specialization('int64'),
window_specialization('int64'),
window_specialization('uint8'),
window_specialization('label'),
Extension('zipline.lib.rank', ['zipline/lib/rank.pyx']),
Extension('zipline.data._equities', ['zipline/data/_equities.pyx']),
Extension('zipline.data._adjustments', ['zipline/data/_adjustments.pyx']),
Expand Down
5 changes: 4 additions & 1 deletion zipline/assets/assets.py
Expand Up @@ -362,7 +362,10 @@ def retrieve_asset(self, sid, default_none=False):
"""
Retrieve the Asset for a given sid.
"""
return self.retrieve_all((sid,), default_none=default_none)[0]
try:
return self._asset_cache[sid]
except KeyError:
return self.retrieve_all((sid,), default_none=default_none)[0]

def retrieve_all(self, sids, default_none=False):
"""
Expand Down
58 changes: 33 additions & 25 deletions zipline/data/data_portal.py
Expand Up @@ -17,8 +17,9 @@
from logbook import Logger

import numpy as np
from numpy import float64, int64
from numpy import float64, int64, nan
import pandas as pd
from pandas import isnull
from pandas.tslib import normalize_date
from six import iteritems
from six.moves import reduce
Expand Down Expand Up @@ -625,7 +626,7 @@ def _get_daily_data(self, asset, column, dt):
if column == "last_traded":
last_traded_dt = reader.get_last_traded_dt(asset, dt)

if pd.isnull(last_traded_dt):
if isnull(last_traded_dt):
return pd.NaT
else:
return last_traded_dt
Expand Down Expand Up @@ -856,34 +857,45 @@ def get_history_window(self, assets, end_dt, bar_count, frequency, field,
raise Exception(
"Only 1d and 1m are supported for forward-filling.")

dt_to_fill = df.index[0]

perspective_dt = df.index[-1]
assets_with_leading_nan = np.where(pd.isnull(df.iloc[0]))[0]
for missing_loc in assets_with_leading_nan:
asset = assets[missing_loc]
previous_dt = self.get_last_traded_dt(
asset, dt_to_fill, data_frequency)
if pd.isnull(previous_dt):
continue
previous_value = self.get_adjusted_value(
assets_with_leading_nan = np.where(isnull(df.iloc[0]))[0]

history_start, history_end = df.index[[0, -1]]
initial_values = []
for asset in df.columns[assets_with_leading_nan]:
last_traded = self.get_last_traded_dt(
asset,
field,
previous_dt,
perspective_dt,
history_start,
data_frequency,
)
df.iloc[0, missing_loc] = previous_value
if isnull(last_traded):
initial_values.append(nan)
else:
initial_values.append(
self.get_adjusted_value(
asset,
field,
dt=last_traded,
perspective_dt=history_end,
data_frequency=data_frequency,
)
)

# Set leading values for assets that were missing data, then ffill.
df.ix[0, assets_with_leading_nan] = np.array(
initial_values,
dtype=np.float64
)
df.fillna(method='ffill', inplace=True)

# forward-filling will incorrectly produce values after the end of
# an asset's lifetime, so write NaNs back over the asset's
# end_date.
normed_index = df.index.normalize()
for asset in df.columns:
if df.index[-1] >= asset.end_date:
if history_end >= asset.end_date:
# if the window extends past the asset's end date, set
# all post-end-date values to NaN in that asset's series
series = df[asset]
series[series.index.normalize() > asset.end_date] = np.NaN

df.loc[normed_index > asset.end_date, asset] = nan
return df

def _get_minute_window_for_assets(self, assets, field, minutes_for_window):
Expand All @@ -910,10 +922,6 @@ def _get_minute_window_for_assets(self, assets, field, minutes_for_window):
-------
A numpy array with requested values.
"""
return self._get_minute_window_data(assets, field, minutes_for_window)

def _get_minute_window_data(
self, assets, field, minutes_for_window):
return self._minute_history_loader.history(assets,
minutes_for_window,
field,
Expand Down
12 changes: 8 additions & 4 deletions zipline/data/history_loader.py
Expand Up @@ -18,8 +18,8 @@
abstractproperty,
)

from numpy import concatenate
from lru import LRU
from numpy import around, hstack
from pandas import isnull
from pandas.tslib import normalize_date
from toolz import sliding_window
Expand Down Expand Up @@ -271,7 +271,7 @@ class SlidingWindow(object):
def __init__(self, window, size, cal_start, offset):
self.window = window
self.cal_start = cal_start
self.current = around(next(window), 3)
self.current = next(window)
self.offset = offset
self.most_recent_ix = self.cal_start + size

Expand All @@ -286,7 +286,7 @@ def get(self, end_ix):
return self.current

target = end_ix - self.cal_start - self.offset + 1
self.current = around(self.window.seek(target), 3)
self.current = self.window.seek(target)

self.most_recent_ix = end_ix
return self.current
Expand Down Expand Up @@ -522,7 +522,11 @@ def history(self, assets, dts, field, is_perspective_after):
field,
is_perspective_after)
end_ix = self._calendar.get_loc(dts[-1])
return hstack([window.get(end_ix) for window in block])

return concatenate(
[window.get(end_ix) for window in block],
axis=1,
).round(3)


class DailyHistoryLoader(HistoryLoader):
Expand Down
6 changes: 3 additions & 3 deletions zipline/data/us_equity_pricing.py
Expand Up @@ -635,21 +635,21 @@ def get_last_traded_dt(self, asset, day):
try:
ix = self.sid_day_index(asset, search_day)
except NoDataBeforeDate:
return None
return NaT
except NoDataAfterDate:
prev_day_ix = self.sessions.get_loc(search_day) - 1
if prev_day_ix > -1:
search_day = self.sessions[prev_day_ix]
continue
except NoDataOnDate:
return None
return NaT
if volumes[ix] != 0:
return search_day
prev_day_ix = self.sessions.get_loc(search_day) - 1
if prev_day_ix > -1:
search_day = self.sessions[prev_day_ix]
else:
return None
return NaT

def sid_day_index(self, sid, day):
"""
Expand Down
86 changes: 48 additions & 38 deletions zipline/lib/_windowtemplate.pxi
Expand Up @@ -15,6 +15,10 @@ from numpy cimport ndarray
from numpy import asanyarray


class Exhausted(Exception):
pass


cdef class AdjustedArrayWindow:
"""
An iterator representing a moving view over an AdjustedArray.
Expand All @@ -34,11 +38,11 @@ cdef class AdjustedArrayWindow:
readonly databuffer data
readonly dict view_kwargs
readonly Py_ssize_t window_length
Py_ssize_t anchor, next_anchor, max_anchor, next_adj
Py_ssize_t anchor, max_anchor, next_adj
Py_ssize_t perspective_offset
dict adjustments
list adjustment_indices
ndarray last_out
ndarray output

def __cinit__(self,
databuffer data not None,
Expand All @@ -52,7 +56,7 @@ cdef class AdjustedArrayWindow:
self.adjustments = adjustments
self.adjustment_indices = sorted(adjustments, reverse=True)
self.window_length = window_length
self.anchor = window_length + offset
self.anchor = window_length + offset - 1
if perspective_offset > 1:
# Limit perspective_offset to 1.
# To support an offset greater than 1, work must be done to
Expand All @@ -63,11 +67,10 @@ cdef class AdjustedArrayWindow:
"is perspective_offset={0}".format(
perspective_offset))
self.perspective_offset = perspective_offset
self.next_anchor = self.anchor
self.max_anchor = data.shape[0]

self.next_adj = self.pop_next_adj()
self.last_out = None
self.output = None

cdef pop_next_adj(self):
"""
Expand All @@ -82,54 +85,61 @@ cdef class AdjustedArrayWindow:
return self

def __next__(self):
try:
self._tick_forward(1)
except Exhausted:
raise StopIteration()

self._update_output()
return self.output

def seek(self, Py_ssize_t target_anchor):
cdef:
Py_ssize_t anchor = self.anchor

if target_anchor < anchor:
raise Exception('Can not access data after window has passed.')

if target_anchor == anchor:
return self.output

self._tick_forward(target_anchor - anchor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should Exhausted be handled here? Or is the intent to have that raise for the consumer of seek to handle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if target_anchor < anchor: check should preclude Exhausted ever being raised here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I misread the code. The check above is for making sure we don't seek backwards.

It's an error for the caller to try to seek past the end of the array. In the old code, that would fail with a StopIteration, which is actually probably bad because that can be caught and misinterpreted if seek is called inside a generator. I don't have a strong opinion on whether it's worthwhile to try to catch and re-raise Exhausted here with a more descriptive error. Given that this is in performance-critical code, I'm inclined to say that this is fine as-is.

self._update_output()

return self.output

cdef inline _tick_forward(self, int N):
cdef:
object adjustment
ndarray out
Py_ssize_t start, anchor
dict view_kwargs
Py_ssize_t anchor = self.anchor
Py_ssize_t target = anchor + N

anchor = self.anchor = self.next_anchor
if anchor > self.max_anchor:
raise StopIteration()
if target > self.max_anchor:
raise Exhausted()

# Apply any adjustments that occured before our current anchor.
# Equivalently, apply any adjustments known **on or before** the date
# for which we're calculating a window.
while self.next_adj < anchor + self.perspective_offset:
while self.next_adj < target + self.perspective_offset:

for adjustment in self.adjustments[self.next_adj]:
adjustment.mutate(self.data)

self.next_adj = self.pop_next_adj()

start = anchor - self.window_length

# If our data is a custom subclass of ndarray, preserve that subclass
# by using asanyarray instead of asarray.
out = asanyarray(self.data[start:self.anchor])
view_kwargs = self.view_kwargs
if view_kwargs:
out = out.view(**view_kwargs)
out.setflags(write=False)

self.next_anchor = self.anchor + 1
self.last_out = out
return out

def seek(self, target_anchor):
cdef ndarray out = None
self.anchor = target

if target_anchor < self.anchor:
raise Exception('Can not access data after window has passed.')

if target_anchor == self.anchor:
return self.last_out

while self.anchor < target_anchor:
out = next(self)
cdef inline _update_output(self):
cdef:
ndarray new_out
Py_ssize_t anchor = self.anchor
dict view_kwargs = self.view_kwargs

self.last_out = out
return out
new_out = asanyarray(self.data[anchor - self.window_length:anchor])
if view_kwargs:
new_out = new_out.view(**view_kwargs)
new_out.setflags(write=False)
self.output = new_out

def __repr__(self):
return "<%s: window_length=%d, anchor=%d, max_anchor=%d, dtype=%r>" % (
Expand Down