Skip to content

Commit

Permalink
Merge 5a0f840 into e0f6abd
Browse files Browse the repository at this point in the history
  • Loading branch information
jbredeche committed Jul 15, 2016
2 parents e0f6abd + 5a0f840 commit 6581306
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 34 deletions.
9 changes: 7 additions & 2 deletions tests/data/test_us_equity_pricing.py
Expand Up @@ -24,7 +24,6 @@
)
from pandas import (
DataFrame,
DatetimeIndex,
Timestamp,
)
from pandas.util.testing import assert_index_equal
Expand All @@ -46,6 +45,7 @@
WithBcolzEquityDailyBarReader,
ZiplineTestCase,
)
from zipline.utils.calendars import get_calendar

TEST_CALENDAR_START = Timestamp('2015-06-01', tz='UTC')
TEST_CALENDAR_STOP = Timestamp('2015-06-30', tz='UTC')
Expand Down Expand Up @@ -180,9 +180,14 @@ def test_write_attrs(self):
result.attrs['calendar_offset'],
expected_calendar_offset,
)
cal = get_calendar(result.attrs['calendar_name'])
first_session = Timestamp(result.attrs['start_session_ns'], tz='UTC')
end_session = Timestamp(result.attrs['end_session_ns'], tz='UTC')
sessions = cal.sessions_in_range(first_session, end_session)

assert_index_equal(
self.sessions,
DatetimeIndex(result.attrs['calendar'], tz='UTC'),
sessions
)

def test_read_first_trading_day(self):
Expand Down
5 changes: 3 additions & 2 deletions tests/test_algorithm.py
Expand Up @@ -3424,9 +3424,10 @@ def make_data(self, auto_close_delta, frequency,
frequency=frequency
)
path = self.tmpdir.getpath("testdaily.bcolz")
BcolzDailyBarWriter(path, dates, self.trading_calendar).write(
iteritems(trade_data_by_sid),
writer = BcolzDailyBarWriter(
path, self.trading_calendar, dates[0], dates[-1]
)
writer.write(iteritems(trade_data_by_sid))
reader = BcolzDailyBarReader(path)
data_portal = DataPortal(
env.asset_finder, self.trading_calendar,
Expand Down
3 changes: 2 additions & 1 deletion tests/test_finance.py
Expand Up @@ -255,7 +255,8 @@ def transaction_sim(self, **params):
}

path = os.path.join(tempdir.path, "testdata.bcolz")
BcolzDailyBarWriter(path, days, self.trading_calendar).write(
BcolzDailyBarWriter(path, self.trading_calendar, days[0],
days[-1]).write(
assets.items()
)

Expand Down
4 changes: 3 additions & 1 deletion zipline/data/bundles/core.py
Expand Up @@ -333,7 +333,9 @@ def ingest(name,
)).path
daily_bar_writer = BcolzDailyBarWriter(
daily_bars_path,
bundle.calendar,
nyse_cal,
bundle.calendar[0],
bundle.calendar[-1]
)
# Do an empty write to ensure that the daily ctables exist
# when we create the SQLiteAdjustmentWriter below. The
Expand Down
2 changes: 1 addition & 1 deletion zipline/data/us_equity_loader.py
Expand Up @@ -387,7 +387,7 @@ def _prefetch_length(self):

@property
def _calendar(self):
return self._reader._calendar
return self._reader._sessions

def _array(self, dts, assets, field):
return self._reader.load_raw_arrays(
Expand Down
95 changes: 73 additions & 22 deletions zipline/data/us_equity_pricing.py
Expand Up @@ -40,19 +40,19 @@
)
from pandas import (
DataFrame,
DatetimeIndex,
read_csv,
Timestamp,
NaT,
isnull,
)
DatetimeIndex)
from pandas.tslib import iNaT
from six import (
iteritems,
with_metaclass,
viewkeys,
)

from zipline.utils.calendars import get_calendar
from zipline.utils.functional import apply
from zipline.utils.preprocess import call
from zipline.utils.input_validation import (
Expand Down Expand Up @@ -182,15 +182,19 @@ def to_ctable(raw_data, invalid_data_behavior):

class BcolzDailyBarWriter(object):
"""
Class capable of writing daily OHLCV data to disk in a format that can be
read efficiently by BcolzDailyOHLCVReader.
Class capable of writing daily OHLCV data to disk in a format that can
be read efficiently by BcolzDailyOHLCVReader.
Parameters
----------
filename : str
The location at which we should write our output.
sessions : pandas.DatetimeIndex
calendar : zipline.utils.calendar.trading_calendar
Calendar to use to compute asset calendar offsets.
start_session: pd.Timestamp
Midnight UTC session label.
end_session: pd.Timestamp
Midnight UTC session label.
See Also
--------
Expand All @@ -204,9 +208,15 @@ class BcolzDailyBarWriter(object):
'volume': float64,
}

def __init__(self, filename, sessions, calendar):
def __init__(self, filename, calendar, start_session, end_session):
self._filename = filename
self._sessions = sessions

assert calendar.is_session(start_session), "Start session is invalid!"
assert calendar.is_session(end_session), "End session is invalid!"

self._start_session = start_session
self._end_session = end_session

self._calendar = calendar

@property
Expand Down Expand Up @@ -300,7 +310,9 @@ def _write_internal(self, iterator, assets):
}

earliest_date = None
sessions = self._sessions
sessions = self._calendar.sessions_in_range(
self._start_session, self._end_session
)

if assets is not None:
@apply
Expand Down Expand Up @@ -363,10 +375,13 @@ def iterator(iterator=iterator, assets=set(assets)):
full_table.attrs['first_trading_day'] = (
earliest_date if earliest_date is not None else iNaT
)

full_table.attrs['first_row'] = first_row
full_table.attrs['last_row'] = last_row
full_table.attrs['calendar_offset'] = calendar_offset
full_table.attrs['calendar'] = sessions.asi8.tolist()
full_table.attrs['calendar_name'] = self._calendar.name
full_table.attrs['start_session_ns'] = self._start_session.value
full_table.attrs['end_session_ns'] = self._end_session.value
full_table.flush()
return full_table

Expand All @@ -387,6 +402,14 @@ def spot_price(self, sid, day, colname):
def last_available_dt(self):
pass

@abstractproperty
def trading_calendar(self):
"""
Returns the zipline.utils.calendar.trading_calendar used to read
the data. Can be None (if the writer didn't specify it).
"""
pass


class BcolzDailyBarReader(DailyBarReader):
"""
Expand Down Expand Up @@ -415,8 +438,12 @@ class BcolzDailyBarReader(DailyBarReader):
Map from asset_id -> index of last row in the dataset with that id.
calendar_offset : dict
Map from asset_id -> calendar index of first row.
calendar : list[int64]
Calendar used to compute offsets, in asi8 format (ns since EPOCH).
start_session_ns: int
Epoch ns of the first session used in this dataset.
end_session_ns: int
Epoch ns of the last session used in this dataset.
calendar_name: str
String identifier of trading calendar used (ie, "NYSE").
We use first_row and last_row together to quickly find ranges of rows to
load when reading an asset's data into memory.
Expand Down Expand Up @@ -473,8 +500,21 @@ def _table(self):
return ctable(rootdir=maybe_table_rootdir, mode='r')

@lazyval
def _calendar(self):
return DatetimeIndex(self._table.attrs['calendar'], tz='UTC')
def _sessions(self):
if 'calendar' in self._table.attrs.attrs:
# backwards compatibility with old formats, will remove
return DatetimeIndex(self._table.attrs['calendar'], tz='UTC')
else:
cal = get_calendar(self._table.attrs['calendar_name'])
start_session_ns = self._table.attrs['start_session_ns']
start_session = Timestamp(start_session_ns, tz='UTC')

end_session_ns = self._table.attrs['end_session_ns']
end_session = Timestamp(end_session_ns, tz='UTC')

sessions = cal.sessions_in_range(start_session, end_session)

return sessions

@lazyval
def _first_rows(self):
Expand Down Expand Up @@ -514,9 +554,16 @@ def first_trading_day(self):
except KeyError:
return None

@lazyval
def trading_calendar(self):
if 'calendar_name' in self._table.attrs.attrs:
return get_calendar(self._table.attrs['calendar_name'])
else:
return None

@property
def last_available_dt(self):
return self._calendar[-1]
return self._sessions[-1]

def _compute_slices(self, start_idx, end_idx, assets):
"""
Expand Down Expand Up @@ -562,8 +609,8 @@ def _compute_slices(self, start_idx, end_idx, assets):

def load_raw_arrays(self, columns, start_date, end_date, assets):
# Assumes that the given dates are actually in calendar.
start_idx = self._calendar.get_loc(start_date)
end_idx = self._calendar.get_loc(end_date)
start_idx = self._sessions.get_loc(start_date)
end_idx = self._sessions.get_loc(end_date)
first_rows, last_rows, offsets = self._compute_slices(
start_idx,
end_idx,
Expand Down Expand Up @@ -607,8 +654,8 @@ def get_last_traded_dt(self, asset, day):

if day >= asset.end_date:
# go back to one day before the asset ended
search_day = self._calendar[
self._calendar.searchsorted(asset.end_date) - 1
search_day = self._sessions[
self._sessions.searchsorted(asset.end_date) - 1
]
else:
search_day = day
Expand All @@ -620,9 +667,9 @@ def get_last_traded_dt(self, asset, day):
return None
if volumes[ix] != 0:
return search_day
prev_day_ix = self._calendar.get_loc(search_day) - 1
prev_day_ix = self._sessions.get_loc(search_day) - 1
if prev_day_ix > -1:
search_day = self._calendar[prev_day_ix]
search_day = self._sessions[prev_day_ix]
else:
return None

Expand All @@ -643,10 +690,10 @@ def sid_day_index(self, sid, day):
or after the date range of the equity.
"""
try:
day_loc = self._calendar.get_loc(day)
day_loc = self._sessions.get_loc(day)
except:
raise NoDataOnDate("day={0} is outside of calendar={1}".format(
day, self._calendar))
day, self._sessions))
offset = day_loc - self._calendar_offsets[sid]
if offset < 0:
raise NoDataOnDate(
Expand Down Expand Up @@ -729,6 +776,10 @@ def __init__(self, calendar, panel):
def last_available_dt(self):
return self._calendar[-1]

@property
def trading_calendar(self):
return None

def load_raw_arrays(self, columns, start_date, end_date, assets):
columns = list(columns)
cal = self._calendar
Expand Down
7 changes: 5 additions & 2 deletions zipline/pipeline/loaders/equity_pricing_loader.py
Expand Up @@ -40,7 +40,10 @@ def __init__(self, raw_price_loader, adjustments_loader):
self.raw_price_loader = raw_price_loader
self.adjustments_loader = adjustments_loader

self._calendar = get_calendar("NYSE").all_sessions
cal = self.raw_price_loader.trading_calendar or \
get_calendar("NYSE")

self._all_sessions = cal.all_sessions

@classmethod
def from_files(cls, pricing_path, adjustments_path):
Expand All @@ -67,7 +70,7 @@ def load_adjusted_array(self, columns, dates, assets, mask):
# known on day N is the data from day (N - 1), so we shift all query
# dates back by a day.
start_date, end_date = _shift_dates(
self._calendar, dates[0], dates[-1], shift=1,
self._all_sessions, dates[0], dates[-1], shift=1,
)
colnames = [c.name for c in columns]
raw_arrays = self.raw_price_loader.load_raw_arrays(
Expand Down
11 changes: 9 additions & 2 deletions zipline/testing/core.py
Expand Up @@ -462,7 +462,9 @@ def create_daily_bar_data(sessions, sids):

def write_daily_data(tempdir, sim_params, sids, trading_calendar):
path = os.path.join(tempdir.path, "testdaily.bcolz")
BcolzDailyBarWriter(path, sim_params.sessions, trading_calendar).write(
BcolzDailyBarWriter(path, trading_calendar,
sim_params.start_session,
sim_params.end_session).write(
create_daily_bar_data(sim_params.sessions, sids),
)

Expand Down Expand Up @@ -612,7 +614,12 @@ def create_data_portal_from_trade_history(asset_finder, trading_calendar,
tempdir, sim_params, trades_by_sid):
if sim_params.data_frequency == "daily":
path = os.path.join(tempdir.path, "testdaily.bcolz")
BcolzDailyBarWriter(path, sim_params.sessions, trading_calendar).write(
writer = BcolzDailyBarWriter(
path, trading_calendar,
sim_params.start_session,
sim_params.end_session
)
writer.write(
trades_by_sid_to_dfs(trades_by_sid, sim_params.sessions),
)

Expand Down
2 changes: 1 addition & 1 deletion zipline/testing/fixtures.py
Expand Up @@ -746,7 +746,7 @@ def init_class_fixtures(cls):
days = cls.equity_daily_bar_days

cls.bcolz_daily_bar_ctable = t = getattr(
BcolzDailyBarWriter(p, days, cls.trading_calendar),
BcolzDailyBarWriter(p, cls.trading_calendar, days[0], days[-1]),
cls._write_method_name,
)(cls.make_equity_daily_bar_data())

Expand Down

0 comments on commit 6581306

Please sign in to comment.