Skip to content

Commit

Permalink
MAiNT: Use load_adjustments for history.
Browse files Browse the repository at this point in the history
Instead of `HistoryLoader` containing separate adjustment calculation
logic, use `SQLiteAdjustmentReader.load_adjustments`.

This change required the addition of two offset parameters to
`load_adjustments` since the perspective on the data from within
`schedule_function` is skewed from how Pipeline looks at historical
data.

This is working towards creating an `AdjustmentReader` abc which
`SQLiteAdjustmentReader` and a upcoming continuous future adjustment
reader will share.
  • Loading branch information
Eddie Hebert committed Sep 15, 2016
1 parent fb0981e commit 069abae
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 90 deletions.
47 changes: 33 additions & 14 deletions zipline/data/_adjustments.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ cdef _adjustments(object adjustments_db,
cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection
list columns,
DatetimeIndex_t dates,
Int64Index_t assets):
Int64Index_t assets,
int end_offset,
int date_offset):
"""
Load a dictionary of Adjustment objects from adjustments_db
Expand All @@ -162,6 +164,11 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection
Dates for which adjustments are needed
assets : pd.Int64Index
Assets for which adjustments are needed.
end_offset : int
Offset to apply when calculating the last row of the adjustment.
date_offset : int
Offset to apply when calculating the date on which to first apply the
adjustment.
Returns
-------
Expand Down Expand Up @@ -210,7 +217,7 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection
int sid
double ratio
int eff_date
int date_loc
int date_loc, end_loc, out_loc
Py_ssize_t asset_ix
dict col_adjustments

Expand All @@ -228,26 +235,30 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection

date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds)

end_loc = date_loc + end_offset

out_loc = date_loc + date_offset

if not PyDict_Contains(asset_ixs, sid):
asset_ixs[sid] = assets.get_loc(sid)
asset_ix = asset_ixs[sid]

price_adj = Float64Multiply(0, date_loc, asset_ix, asset_ix, ratio)
price_adj = Float64Multiply(0, end_loc, asset_ix, asset_ix, ratio)
for i, column in enumerate(columns):
col_adjustments = results[i]
if column != 'volume':
try:
col_adjustments[date_loc].append(price_adj)
col_adjustments[out_loc].append(price_adj)
except KeyError:
col_adjustments[date_loc] = [price_adj]
col_adjustments[out_loc] = [price_adj]
else:
volume_adj = Float64Multiply(
0, date_loc, asset_ix, asset_ix, 1.0 / ratio
0, end_loc, asset_ix, asset_ix, 1.0 / ratio
)
try:
col_adjustments[date_loc].append(volume_adj)
col_adjustments[out_loc].append(volume_adj)
except KeyError:
col_adjustments[date_loc] = [volume_adj]
col_adjustments[out_loc] = [volume_adj]

# mergers affect prices only
for sid, ratio, eff_date in mergers:
Expand All @@ -256,18 +267,22 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection

date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds)

end_loc = date_loc + end_offset

out_loc = date_loc + date_offset

if not PyDict_Contains(asset_ixs, sid):
asset_ixs[sid] = assets.get_loc(sid)
asset_ix = asset_ixs[sid]

adj = Float64Multiply(0, date_loc, asset_ix, asset_ix, ratio)
adj = Float64Multiply(0, end_loc, asset_ix, asset_ix, ratio)
for i, column in enumerate(columns):
col_adjustments = results[i]
if column != 'volume':
try:
col_adjustments[date_loc].append(adj)
col_adjustments[out_loc].append(adj)
except KeyError:
col_adjustments[date_loc] = [adj]
col_adjustments[out_loc] = [adj]

# dividends affect prices only
for sid, ratio, eff_date in dividends:
Expand All @@ -276,18 +291,22 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection

date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds)

end_loc = date_loc + end_offset

out_loc = date_loc + date_offset

if not PyDict_Contains(asset_ixs, sid):
asset_ixs[sid] = assets.get_loc(sid)
asset_ix = asset_ixs[sid]

adj = Float64Multiply(0, date_loc, asset_ix, asset_ix, ratio)
adj = Float64Multiply(0, end_loc, asset_ix, asset_ix, ratio)
for i, column in enumerate(columns):
col_adjustments = results[i]
if column != 'volume':
try:
col_adjustments[date_loc].append(adj)
col_adjustments[out_loc].append(adj)
except KeyError:
col_adjustments[date_loc] = [adj]
col_adjustments[out_loc] = [adj]

return results

Expand Down
80 changes: 5 additions & 75 deletions zipline/data/history_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

from cachetools import LRUCache
from numpy import around, hstack
from pandas.tslib import normalize_date
from pandas import Int64Index

from six import with_metaclass

from zipline.lib._float64window import AdjustedArrayWindow as Float64Window
from zipline.lib.adjustment import Float64Multiply
from zipline.utils.cache import ExpiringCache
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import float64_dtype
Expand Down Expand Up @@ -141,79 +140,10 @@ def _get_adjustments_in_range(self, asset, dts, field,
-------
out : The adjustments as a dict of loc -> Float64Multiply
"""
sid = int(asset)
start = normalize_date(dts[0])
end = normalize_date(dts[-1])
adjs = {}
if field != 'volume':
mergers = self._adjustments_reader.get_adjustments_for_sid(
'mergers', sid)
for m in mergers:
dt = m[0]
if start < dt <= end:
end_loc = dts.searchsorted(dt)
adj_loc = end_loc
if is_perspective_after:
# Set adjustment pop location so that it applies
# to last value if adjustment occurs immediately after
# the last slot.
adj_loc -= 1
mult = Float64Multiply(0,
end_loc - 1,
0,
0,
m[1])
try:
adjs[adj_loc].append(mult)
except KeyError:
adjs[adj_loc] = [mult]
divs = self._adjustments_reader.get_adjustments_for_sid(
'dividends', sid)
for d in divs:
dt = d[0]
if start < dt <= end:
end_loc = dts.searchsorted(dt)
adj_loc = end_loc
if is_perspective_after:
# Set adjustment pop location so that it applies
# to last value if adjustment occurs immediately after
# the last slot.
adj_loc -= 1
mult = Float64Multiply(0,
end_loc - 1,
0,
0,
d[1])
try:
adjs[adj_loc].append(mult)
except KeyError:
adjs[adj_loc] = [mult]
splits = self._adjustments_reader.get_adjustments_for_sid(
'splits', sid)
for s in splits:
dt = s[0]
if start < dt <= end:
if field == 'volume':
ratio = 1.0 / s[1]
else:
ratio = s[1]
end_loc = dts.searchsorted(dt)
adj_loc = end_loc
if is_perspective_after:
# Set adjustment pop location so that it applies
# to last value if adjustment occurs immediately after
# the last slot.
adj_loc -= 1
mult = Float64Multiply(0,
end_loc - 1,
0,
0,
ratio)
try:
adjs[adj_loc].append(mult)
except KeyError:
adjs[adj_loc] = [mult]
return adjs
end_offset = -1
date_offset = -1 if is_perspective_after else 0
return self._adjustments_reader.load_adjustments(
[field], dts, Int64Index([asset]), end_offset, date_offset)[0]

def _ensure_sliding_windows(self, assets, dts, field,
is_perspective_after):
Expand Down
5 changes: 4 additions & 1 deletion zipline/data/us_equity_pricing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,12 +1230,15 @@ class SQLiteAdjustmentReader(object):
def __init__(self, conn):
self.conn = conn

def load_adjustments(self, columns, dates, assets):
def load_adjustments(self, columns, dates, assets,
end_offset=0, date_offset=0):
return load_adjustments_from_sqlite(
self.conn,
list(columns),
dates,
assets,
end_offset,
date_offset
)

def get_adjustments_for_sid(self, table_name, sid):
Expand Down

0 comments on commit 069abae

Please sign in to comment.