Skip to content

Commit

Permalink
Merge 8cafb69 into 8a672be
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Daniels committed Apr 6, 2017
2 parents 8a672be + 8cafb69 commit 6ddbb16
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 34 deletions.
104 changes: 75 additions & 29 deletions tests/data/test_resample.py
Expand Up @@ -30,6 +30,7 @@
ReindexSessionBarReader,
)

from zipline.testing import parameter_space
from zipline.testing.fixtures import (
WithEquityMinuteBarData,
WithBcolzEquityMinuteBarReader,
Expand Down Expand Up @@ -253,6 +254,7 @@


class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
WithBcolzFutureMinuteBarReader,
ZiplineTestCase):

# March 2016
Expand All @@ -269,7 +271,11 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp(
'2016-03-31', tz='UTC',
)

TRADING_CALENDAR_STRS = ('NYSE', 'us_futures')

ASSET_FINDER_EQUITY_SIDS = 1, 2, 3, 4, 5
ASSET_FINDER_FUTURE_SIDS = 1001, 1002, 1003, 1004

@classmethod
def make_equity_info(cls):
Expand All @@ -285,47 +291,87 @@ def make_equity_minute_bar_data(cls):
frame = EQUITY_CASES[sid]
yield sid, frame

@classmethod
def make_futures_info(cls):
future_dict = {}

for future_sid in cls.ASSET_FINDER_FUTURE_SIDS:
future_dict[future_sid] = {
'multiplier': 1000,
'exchange': 'CME',
'root_symbol': "ABC"
}

return pd.DataFrame.from_dict(future_dict, orient='index')

@classmethod
def make_future_minute_bar_data(cls):
for sid in cls.ASSET_FINDER_FUTURE_SIDS:
frame = FUTURE_CASES[sid]
yield sid, frame

def init_instance_fixtures(self):
super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures()
# Set up a fresh data portal for each test, since order of calling
# needs to be tested.
self.equity_daily_aggregator = DailyHistoryAggregator(
self.trading_calendar.schedule.market_open,
self.nyse_calendar.schedule.market_open,
self.bcolz_equity_minute_bar_reader,
self.trading_calendar
self.nyse_calendar,
)

@parameterized.expand([
('open_sid_1', 'open', 1),
('high_1', 'high', 1),
('low_1', 'low', 1),
('close_1', 'close', 1),
('volume_1', 'volume', 1),
('open_2', 'open', 2),
('high_2', 'high', 2),
('low_2', 'low', 2),
('close_2', 'close', 2),
('volume_2', 'volume', 2),
('open_3', 'open', 3),
('high_3', 'high', 3),
('low_3', 'low', 3),
('close_3', 'close', 3),
('volume_3', 'volume', 3),
('open_4', 'open', 4),
('high_4', 'high', 4),
('low_4', 'low', 4),
('close_4', 'close', 4),
('volume_4', 'volume', 4),
])
def test_contiguous_minutes_individual(self, name, field, sid):
self.future_daily_aggregator = DailyHistoryAggregator(
self.us_futures_calendar.schedule.market_open,
self.bcolz_future_minute_bar_reader,
self.us_futures_calendar
)

@parameter_space(
field=OHLCV,
sid=ASSET_FINDER_EQUITY_SIDS,
__fail_fast=True,
)
def test_equity_contiguous_minutes_individual(self, field, sid):
asset = self.asset_finder.retrieve_asset(sid)
minutes = EQUITY_CASES[asset].index

self._test_contiguous_minutes_individual(
field,
asset,
minutes,
self.equity_daily_aggregator,
)

@parameter_space(
field=OHLCV,
sid=ASSET_FINDER_FUTURE_SIDS,
__fail_fast=True,
)
def test_future_contiguous_minutes_individual(self, field, sid):
asset = self.asset_finder.retrieve_asset(sid)
minutes = FUTURE_CASES[asset].index

self._test_contiguous_minutes_individual(
field,
asset,
minutes,
self.future_daily_aggregator,
)

def _test_contiguous_minutes_individual(
self,
field,
asset,
minutes,
aggregator,
):
# First test each minute in order.
method_name = field + 's'
results = []
repeat_results = []
asset = self.asset_finder.retrieve_asset(sid)
minutes = EQUITY_CASES[asset].index

for minute in minutes:
value = getattr(self.equity_daily_aggregator, method_name)(
value = getattr(aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
Expand All @@ -334,7 +380,7 @@ def test_contiguous_minutes_individual(self, name, field, sid):
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
value = getattr(self.equity_daily_aggregator, method_name)(
value = getattr(aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
Expand Down
24 changes: 19 additions & 5 deletions zipline/data/resample.py
Expand Up @@ -384,6 +384,23 @@ def closes(self, assets, dt):
closes = []
session_label = self._trading_calendar.minute_to_session_label(dt)

def _get_filled_close(asset):
"""
Returns the most recent non-nan close for the asset in this
session. If there has been no data in this session on or before the
`dt`, returns `nan`
"""
window = self._minute_reader.load_raw_arrays(
['close'],
market_open,
dt,
[asset],
)[0]
try:
return window[~np.isnan(window)][-1]
except IndexError:
return np.NaN

for asset in assets:
if not asset.is_alive_for_session(session_label):
closes.append(np.NaN)
Expand Down Expand Up @@ -412,18 +429,15 @@ def closes(self, assets, dt):
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes(
[asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
val = _get_filled_close(asset)
entries[asset] = (dt_value, val)
closes.append(val)
continue
except KeyError:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes([asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
val = _get_filled_close(asset)
entries[asset] = (dt_value, val)
closes.append(val)
continue
Expand Down

0 comments on commit 6ddbb16

Please sign in to comment.