Skip to content

Commit

Permalink
ENH: Allows history to be dynamic and grow the container at runtime.
Browse files Browse the repository at this point in the history
Previously, all specs had to be pre-allocated by using the 'add_history'
function. This is now no longer required and instead serves as a hint to
the HistoryContainer to pre-allocate the space for the given spec.

History can grow by increasing the length for a frequency, adding a
frequency, or adding a field. It can grow with any combination of
these.
  • Loading branch information
llllllllll committed Oct 22, 2014
1 parent c6e85d0 commit c5ae828
Show file tree
Hide file tree
Showing 10 changed files with 708 additions and 134 deletions.
14 changes: 5 additions & 9 deletions tests/history_cases.py
Expand Up @@ -534,9 +534,7 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:31AM'),
],

# Missing volume data should manifest as 0's rather
# than nans.
).fillna(0 if 'volume' in key else np.nan),
),
pd.DataFrame(
data={
1: [np.nan, 0, 1],
Expand All @@ -547,7 +545,7 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
],
).fillna(0 if 'volume' in key else np.nan),
),

pd.DataFrame(
data={
Expand All @@ -560,11 +558,7 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:33AM'),
],

# Note: Calling fillna() here even though there are
# no NaNs because this makes it less likely
# for us to introduce a stupid bug by
# copy/pasting in the future.
).fillna(0 if 'volume' in key else np.nan),
),
pd.DataFrame(
data={
1: [2, np.nan, 3],
Expand All @@ -575,6 +569,8 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:34AM'),
to_utc('2013-06-28 9:35AM'),
],
# For volume, when we are missing data, we replace
# it with 0s to show that no trades occured.
).fillna(0 if 'volume' in key else np.nan),
],
)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_batchtransform.py
Expand Up @@ -122,8 +122,7 @@ def test_all_sids_passed(self):
for sid in self.sids[:i]:
self.assertIn(sid, df.columns)

last_elem = len(df) - 1
self.assertEqual(df[last_elem][last_elem], last_elem)
self.assertEqual(df.iloc[-1].iloc[-1], i)


class TestBatchTransformMinutely(TestCase):
Expand Down
212 changes: 207 additions & 5 deletions tests/test_history.py
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.

from unittest import TestCase
from itertools import product
from textwrap import dedent

from nose_parameterized import parameterized
import numpy as np
Expand Down Expand Up @@ -131,7 +133,6 @@ def get_index_at_dt(case_input):
case_input['frequency'],
None,
False,
daily_at_midnight=False,
data_frequency='minute',
)
return history.index_at_dt(history_spec, case_input['algo_dt'])
Expand Down Expand Up @@ -197,7 +198,7 @@ def test_history_container(self,
self.assertEqual(len(expected[spec.key_str]), len(updates))

container = HistoryContainer(
{spec.key_str: spec for spec in specs}, sids, dt
{spec.key_str: spec for spec in specs}, sids, dt, 'minute',
)

for update_count, update in enumerate(updates):
Expand All @@ -222,15 +223,16 @@ def test_container_nans_and_daily_roll(self):
frequency='1d',
field='price',
ffill=True,
daily_at_midnight=False
data_frequency='minute'
)
specs = {spec.key_str: spec}
initial_sids = [1, ]
initial_dt = pd.Timestamp(
'2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC')

container = HistoryContainer(
specs, initial_sids, initial_dt)
specs, initial_sids, initial_dt, 'minute'
)

bar_data = BarData()
container.update(bar_data, initial_dt)
Expand Down Expand Up @@ -342,6 +344,176 @@ def test_container_nans_and_daily_roll(self):
self.assertEqual(prices[1].ix[1], 20)
self.assertEqual(prices[1].ix[2], 20)

@parameterized.expand(
(freq, field, data_frequency, construct_digest)
for freq in ('1m', '1d')
for field in HistoryContainer.VALID_FIELDS
for data_frequency in ('minute', 'daily')
for construct_digest in (True, False)
if not (freq == '1m' and data_frequency == 'daily')
)
def test_history_grow_length(self,
freq,
field,
data_frequency,
construct_digest):
bar_count = 2 if construct_digest else 1
spec = history.HistorySpec(
bar_count=bar_count,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)

container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)

if construct_digest:
self.assertEqual(len(container.digest_panels[spec.frequency]), 1)

bar_data = BarData()
container.update(bar_data, initial_dt)

to_add = (
history.HistorySpec(
bar_count=bar_count + 1,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
),
history.HistorySpec(
bar_count=bar_count + 2,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
),
)

for n, spec in enumerate(to_add):
container.ensure_spec(spec, initial_dt)

self.assertEqual(
len(container.digest_panels[spec.frequency]),
spec.bar_count - 1,
)

@parameterized.expand(
(bar_count, freq, pair, data_frequency)
for bar_count in (1, 2)
for freq in ('1m', '1d')
for pair in product(HistoryContainer.VALID_FIELDS, repeat=2)
for data_frequency in ('minute', 'daily')
if not (freq == '1m' and data_frequency == 'daily')
)
def test_history_add_field(self, bar_count, freq, pair, data_frequency):
first, second = pair
spec = history.HistorySpec(
bar_count=bar_count,
frequency=freq,
field=first,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)

container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)

if bar_count > 1:
self.assertEqual(len(container.digest_panels[spec.frequency]), 1)

bar_data = BarData()
container.update(bar_data, initial_dt)

new_spec = history.HistorySpec(
bar_count,
frequency=freq,
field=second,
ffill=True,
data_frequency=data_frequency,
)

container.ensure_spec(new_spec, initial_dt)

if bar_count > 1:
digest_panel = container.digest_panels[new_spec.frequency]
self.assertEqual(len(digest_panel), bar_count - 1)
self.assertIn(second, digest_panel.items)
else:
self.assertNotIn(new_spec.frequency, container.digest_panels)

@parameterized.expand(
(bar_count, pair, field, data_frequency)
for bar_count in (1, 2)
for pair in product(('1m', '1d'), repeat=2)
for field in HistoryContainer.VALID_FIELDS
for data_frequency in ('minute', 'daily')
if not ('1m' in pair and data_frequency == 'daily')
)
def test_history_add_freq(self, bar_count, pair, field, data_frequency):
first, second = pair
spec = history.HistorySpec(
bar_count=bar_count,
frequency=first,
field=field,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)

container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)

if bar_count > 1:
self.assertEqual(len(container.digest_panels[spec.frequency]), 1)

bar_data = BarData()
container.update(bar_data, initial_dt)

new_spec = history.HistorySpec(
bar_count,
frequency=second,
field=field,
ffill=True,
data_frequency=data_frequency,
)

container.ensure_spec(new_spec, initial_dt)

if bar_count > 1:
digest_panel = container.digest_panels[new_spec.frequency]
self.assertEqual(len(digest_panel), bar_count - 1)
else:
self.assertNotIn(new_spec.frequency, container.digest_panels)


class TestHistoryAlgo(TestCase):
def setUp(self):
Expand Down Expand Up @@ -373,7 +545,7 @@ def handle_data(context, data):
end = pd.Timestamp('2006-03-30', tz='UTC')

sim_params = factory.create_simulation_parameters(
start=start, end=end)
start=start, end=end, data_frequency='daily')

_, df = factory.create_test_df_source(sim_params)
df = df.astype(np.float64)
Expand Down Expand Up @@ -867,3 +1039,33 @@ def handle_data(context, data):
# Depends on seed
np.testing.assert_almost_equal(recorded_ma,
159.76304468946876)

def test_history_container_constructed_at_runtime(self):
algo_text = dedent(
"""\
from zipline.api import history
def handle_data(context, data):
context.prices = history(2, '1d', 'price')
"""
)
start = pd.Timestamp('2007-04-05', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')

sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='daily'
)

test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)

source = RandomWalkSource(start=start, end=end)

self.assertIsNone(test_algo.history_container)
self.assertIsNotNone(test_algo.run(source))
16 changes: 1 addition & 15 deletions tests/test_rolling_panel.py
Expand Up @@ -37,22 +37,16 @@ def test_basics(self, window=10):

major_deque = deque(maxlen=window)

frames = {}

for i, date in enumerate(dates):
frame = pd.DataFrame(np.random.randn(3, 4), index=items,
columns=minor)

rp.add_frame(date, frame)

frames[date] = frame
major_deque.append(date)

result = rp.get_current()
expected = pd.Panel(frames, items=list(major_deque),
major_axis=items, minor_axis=minor)

tm.assert_panel_equal(result, expected.swapaxes(0, 1))
tm.assert_frame_equal(result.loc[:, date, :], frame.T)

def test_adding_and_dropping_items(self, n_items=5, n_minor=10, window=10,
periods=30):
Expand All @@ -73,8 +67,6 @@ def test_adding_and_dropping_items(self, n_items=5, n_minor=10, window=10,

dates = pd.date_range('2000-01-01', periods=periods, tz='utc')

frames = {}

expected_frames = deque(maxlen=window)
expected_dates = deque()

Expand All @@ -85,7 +77,6 @@ def test_adding_and_dropping_items(self, n_items=5, n_minor=10, window=10,
if i >= window:
# Old labels and dates should start to get dropped at every
# call
del frames[expected_dates.popleft()]
expected_minor.popleft()
expected_items.popleft()

Expand All @@ -94,18 +85,13 @@ def test_adding_and_dropping_items(self, n_items=5, n_minor=10, window=10,

rp.add_frame(date, frame)

frames[date] = frame

result = rp.get_current()
np.testing.assert_array_equal(sorted(result.minor_axis.values),
sorted(expected_minor))
np.testing.assert_array_equal(sorted(result.items.values),
sorted(expected_items))
tm.assert_frame_equal(frame.T,
result.ix[frame.index, -1, frame.columns])
expected_result = pd.Panel(frames).swapaxes(0, 1)
tm.assert_panel_equal(expected_result,
result)

# Insert new items
minor.popleft()
Expand Down

0 comments on commit c5ae828

Please sign in to comment.