Skip to content

Commit

Permalink
ENH: Allows history to be dynamic and grow the container at runtime.
Browse files Browse the repository at this point in the history
Previously, all specs had to be pre-allocated by using the 'add_history'
function. This is now no longer required and instead serves as a hint to
the HistoryContainer to pre-allocate the space for the given spec.

History can grow by increasing the length for a frequency, adding a
frequency, or adding a field. It can grow with any combination of
these.

HistoryContainer now is aware of the data_frequency of the algorithm,
and no longer uses the daily_at_midnight flag; instead, this is the
default behavior.
  • Loading branch information
llllllllll committed Oct 31, 2014
1 parent 1c08f60 commit 196c0a7
Show file tree
Hide file tree
Showing 10 changed files with 901 additions and 178 deletions.
14 changes: 5 additions & 9 deletions tests/history_cases.py
Expand Up @@ -534,9 +534,7 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:31AM'),
],

# Missing volume data should manifest as 0's rather
# than nans.
).fillna(0 if 'volume' in key else np.nan),
),
pd.DataFrame(
data={
1: [np.nan, 0, 1],
Expand All @@ -547,7 +545,7 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
],
).fillna(0 if 'volume' in key else np.nan),
),

pd.DataFrame(
data={
Expand All @@ -560,11 +558,7 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:33AM'),
],

# Note: Calling fillna() here even though there are
# no NaNs because this makes it less likely
# for us to introduce a stupid bug by
# copy/pasting in the future.
).fillna(0 if 'volume' in key else np.nan),
),
pd.DataFrame(
data={
1: [2, np.nan, 3],
Expand All @@ -575,6 +569,8 @@ def mixed_frequency_expected_data(count, frequency):
to_utc('2013-06-28 9:34AM'),
to_utc('2013-06-28 9:35AM'),
],
# For volume, when we are missing data, we replace
# it with 0s to show that no trades occured.
).fillna(0 if 'volume' in key else np.nan),
],
)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_batchtransform.py
Expand Up @@ -122,8 +122,7 @@ def test_all_sids_passed(self):
for sid in self.sids[:i]:
self.assertIn(sid, df.columns)

last_elem = len(df) - 1
self.assertEqual(df[last_elem][last_elem], last_elem)
self.assertEqual(df.iloc[-1].iloc[-1], i)


class TestBatchTransformMinutely(TestCase):
Expand Down
276 changes: 269 additions & 7 deletions tests/test_history.py
Expand Up @@ -14,18 +14,24 @@
# limitations under the License.

from unittest import TestCase
from itertools import product
from textwrap import dedent

from nose_parameterized import parameterized
import numpy as np
import pandas as pd
from pandas.util.testing import assert_frame_equal

from zipline.history import history
from zipline.history import history, Frequency
from zipline.history.history_container import HistoryContainer
from zipline.protocol import BarData
import zipline.utils.factory as factory
from zipline import TradingAlgorithm
from zipline.finance.trading import SimulationParameters, TradingEnvironment
from zipline.finance.trading import (
SimulationParameters,
TradingEnvironment,
with_environment,
)
from zipline.errors import IncompatibleHistoryFrequency

from zipline.sources import RandomWalkSource, DataFrameSource
Expand Down Expand Up @@ -131,7 +137,6 @@ def get_index_at_dt(case_input):
case_input['frequency'],
None,
False,
daily_at_midnight=False,
data_frequency='minute',
)
return history.index_at_dt(history_spec, case_input['algo_dt'])
Expand Down Expand Up @@ -197,7 +202,7 @@ def test_history_container(self,
self.assertEqual(len(expected[spec.key_str]), len(updates))

container = HistoryContainer(
{spec.key_str: spec for spec in specs}, sids, dt
{spec.key_str: spec for spec in specs}, sids, dt, 'minute',
)

for update_count, update in enumerate(updates):
Expand All @@ -222,15 +227,16 @@ def test_container_nans_and_daily_roll(self):
frequency='1d',
field='price',
ffill=True,
daily_at_midnight=False
data_frequency='minute'
)
specs = {spec.key_str: spec}
initial_sids = [1, ]
initial_dt = pd.Timestamp(
'2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC')

container = HistoryContainer(
specs, initial_sids, initial_dt)
specs, initial_sids, initial_dt, 'minute'
)

bar_data = BarData()
container.update(bar_data, initial_dt)
Expand Down Expand Up @@ -373,7 +379,7 @@ def handle_data(context, data):
end = pd.Timestamp('2006-03-30', tz='UTC')

sim_params = factory.create_simulation_parameters(
start=start, end=end)
start=start, end=end, data_frequency='daily')

_, df = factory.create_test_df_source(sim_params)
df = df.astype(np.float64)
Expand Down Expand Up @@ -867,3 +873,259 @@ def handle_data(context, data):
# Depends on seed
np.testing.assert_almost_equal(recorded_ma,
159.76304468946876)

def test_history_container_constructed_at_runtime(self):
algo_text = dedent(
"""\
from zipline.api import history
def handle_data(context, data):
context.prices = history(2, '1d', 'price')
"""
)
start = pd.Timestamp('2007-04-05', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')

sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='daily'
)

test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)

source = RandomWalkSource(start=start, end=end)

self.assertIsNone(test_algo.history_container)
test_algo.run(source)
self.assertIsNotNone(
test_algo.history_container,
msg='HistoryContainer was not constructed at runtime',
)

container = test_algo.history_container
self.assertEqual(
container.buffer_panel.window_length,
Frequency.MAX_MINUTES['d'],
msg='HistoryContainer.buffer_panel was not large enough to service'
' the given HistorySpec',
)

self.assertEqual(
len(container.digest_panels),
1,
msg='The HistoryContainer created too many digest panels',
)

freq, digest = container.digest_panels.items()[0]
self.assertEqual(
freq.unit_str,
'd',
)

self.assertEqual(
digest.window_length,
1,
msg='The digest panel is not large enough to service the given'
' HistorySpec',
)


class TestHistoryContainerResize(TestCase):
@parameterized.expand(
(freq, field, data_frequency, construct_digest)
for freq in ('1m', '1d')
for field in HistoryContainer.VALID_FIELDS
for data_frequency in ('minute', 'daily')
for construct_digest in (True, False)
if not (freq == '1m' and data_frequency == 'daily')
)
def test_history_grow_length(self,
freq,
field,
data_frequency,
construct_digest):
bar_count = 2 if construct_digest else 1
spec = history.HistorySpec(
bar_count=bar_count,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)

container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)

if construct_digest:
self.assertEqual(
container.digest_panels[spec.frequency].window_length, 1,
)

bar_data = BarData()
container.update(bar_data, initial_dt)

to_add = (
history.HistorySpec(
bar_count=bar_count + 1,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
),
history.HistorySpec(
bar_count=bar_count + 2,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
),
)

for spec in to_add:
container.ensure_spec(spec, initial_dt)

self.assertEqual(
container.digest_panels[spec.frequency].window_length,
spec.bar_count - 1,
)

self.assert_history(container, spec, initial_dt)

@parameterized.expand(
(bar_count, freq, pair, data_frequency)
for bar_count in (1, 2)
for freq in ('1m', '1d')
for pair in product(HistoryContainer.VALID_FIELDS, repeat=2)
for data_frequency in ('minute', 'daily')
if not (freq == '1m' and data_frequency == 'daily')
)
def test_history_add_field(self, bar_count, freq, pair, data_frequency):
first, second = pair
spec = history.HistorySpec(
bar_count=bar_count,
frequency=freq,
field=first,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)

container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)

if bar_count > 1:
self.assertEqual(
container.digest_panels[spec.frequency].window_length, 1,
)

bar_data = BarData()
container.update(bar_data, initial_dt)

new_spec = history.HistorySpec(
bar_count,
frequency=freq,
field=second,
ffill=True,
data_frequency=data_frequency,
)

container.ensure_spec(new_spec, initial_dt)

if bar_count > 1:
digest_panel = container.digest_panels[new_spec.frequency]
self.assertEqual(digest_panel.window_length, bar_count - 1)
self.assertIn(second, digest_panel.items)
else:
self.assertNotIn(new_spec.frequency, container.digest_panels)

self.assert_history(container, new_spec, initial_dt)

@parameterized.expand(
(bar_count, pair, field, data_frequency)
for bar_count in (1, 2)
for pair in product(('1m', '1d'), repeat=2)
for field in HistoryContainer.VALID_FIELDS
for data_frequency in ('minute', 'daily')
if not ('1m' in pair and data_frequency == 'daily')
)
def test_history_add_freq(self, bar_count, pair, field, data_frequency):
first, second = pair
spec = history.HistorySpec(
bar_count=bar_count,
frequency=first,
field=field,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)

container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)

if bar_count > 1:
self.assertEqual(
container.digest_panels[spec.frequency].window_length, 1,
)

bar_data = BarData()
container.update(bar_data, initial_dt)

new_spec = history.HistorySpec(
bar_count,
frequency=second,
field=field,
ffill=True,
data_frequency=data_frequency,
)

container.ensure_spec(new_spec, initial_dt)

if bar_count > 1:
digest_panel = container.digest_panels[new_spec.frequency]
self.assertEqual(digest_panel.window_length, bar_count - 1)
else:
self.assertNotIn(new_spec.frequency, container.digest_panels)

self.assert_history(container, new_spec, initial_dt)

@with_environment()
def assert_history(self, container, spec, dt, env=None):
hst = container.get_history(spec, dt)

self.assertEqual(len(hst), spec.bar_count)

back = spec.frequency.prev_bar
for n in reversed(hst.index):
self.assertEqual(dt, n)
dt = back(dt)

0 comments on commit 196c0a7

Please sign in to comment.