Skip to content

Commit

Permalink
Merge b784a4c into 7af53dd
Browse files Browse the repository at this point in the history
  • Loading branch information
llllllllll committed Dec 16, 2016
2 parents 7af53dd + b784a4c commit 2cecec1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
23 changes: 18 additions & 5 deletions tests/pipeline/test_pipeline_algo.py
Expand Up @@ -8,6 +8,7 @@
)

from nose_parameterized import parameterized
import numpy as np
from numpy import (
array,
arange,
Expand Down Expand Up @@ -285,21 +286,33 @@ def before_trading_start(context, data):
('day', 1),
('week', 5),
('year', 252),
('all_but_one_day', 'all_but_one_day')])
def test_assets_appear_on_correct_days(self, test_name, chunksize):
('all_but_one_day', 'all_but_one_day'),
('custom_iter', 'custom_iter')])
def test_assets_appear_on_correct_days(self, test_name, chunks):
"""
Assert that assets appear at correct times during a backtest, with
correctly-adjusted close price values.
"""

if chunksize == 'all_but_one_day':
chunksize = (
if chunks == 'all_but_one_day':
chunks = (
self.dates.get_loc(self.last_asset_end) -
self.dates.get_loc(self.first_asset_start)
) - 1
elif chunks == 'custom_iter':
chunks = []
st = np.random.RandomState(12345)
remaining = (
self.dates.get_loc(self.last_asset_end) -
self.dates.get_loc(self.first_asset_start)
)
while remaining > 0:
chunk = st.randint(3)
chunks.append(chunk)
remaining -= chunk

def initialize(context):
p = attach_pipeline(Pipeline(), 'test', chunksize=chunksize)
p = attach_pipeline(Pipeline(), 'test', chunks=chunks)
p.add(USEquityPricing.close.latest, 'close')

def handle_data(context, data):
Expand Down
20 changes: 11 additions & 9 deletions zipline/algorithm.py
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import Iterable
from copy import copy
import operator as op
import warnings
Expand Down Expand Up @@ -2261,9 +2262,9 @@ def set_long_only(self, on_error='fail'):
@expect_types(
pipeline=Pipeline,
name=string_types,
chunksize=optional(int),
chunks=(int, Iterable, type(None)),
)
def attach_pipeline(self, pipeline, name, chunksize=None):
def attach_pipeline(self, pipeline, name, chunks=None):
"""Register a pipeline to be computed at the start of each day.
Parameters
Expand All @@ -2272,10 +2273,11 @@ def attach_pipeline(self, pipeline, name, chunksize=None):
The pipeline to have computed.
name : str
The name of the pipeline.
chunksize : int, optional
chunks : int or iterator, optional
The number of days to compute pipeline results for. Increasing
this number will make it longer to get the first results but
may improve the total runtime of the simulation.
may improve the total runtime of the simulation. If an iterator
is passed, we will run in chunks based on values of the itereator.
Returns
-------
Expand All @@ -2288,13 +2290,13 @@ def attach_pipeline(self, pipeline, name, chunksize=None):
"""
if self._pipelines:
raise NotImplementedError("Multiple pipelines are not supported.")
if chunksize is None:
if chunks is None:
# Make the first chunk smaller to get more immediate results:
# (one week, then every half year)
chunks = iter(chain([5], repeat(126)))
else:
chunks = iter(repeat(int(chunksize)))
self._pipelines[name] = pipeline, chunks
chunks = chain([5], repeat(126))
elif isinstance(chunks, int):
chunks = repeat(chunks)
self._pipelines[name] = pipeline, iter(chunks)

# Return the pipeline to allow expressions like
# p = attach_pipeline(Pipeline(), 'name')
Expand Down

0 comments on commit 2cecec1

Please sign in to comment.