-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
engine.py
509 lines (440 loc) · 18.2 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
"""
Compute Engine definitions for the Pipeline API.
"""
from abc import (
ABCMeta,
abstractmethod,
)
from uuid import uuid4
from six import (
iteritems,
with_metaclass,
)
from numpy import array
from pandas import DataFrame, MultiIndex
from toolz import groupby, juxt
from toolz.curried.operator import getitem
from zipline.lib.adjusted_array import ensure_adjusted_array, ensure_ndarray
from zipline.errors import NoFurtherDataError
from zipline.utils.numpy_utils import (
as_column,
repeat_first_axis,
repeat_last_axis,
)
from zipline.utils.pandas_utils import explode
from .term import AssetExists, InputDates, LoadableTerm
class PipelineEngine(with_metaclass(ABCMeta)):
@abstractmethod
def run_pipeline(self, pipeline, start_date, end_date):
"""
Compute values for `pipeline` between `start_date` and `end_date`.
Returns a DataFrame with a MultiIndex of (date, asset) pairs.
Parameters
----------
pipeline : zipline.pipeline.Pipeline
The pipeline to run.
start_date : pd.Timestamp
Start date of the computed matrix.
end_date : pd.Timestamp
End date of the computed matrix.
Returns
-------
result : pd.DataFrame
A frame of computed results.
The columns `result` correspond to the entries of
`pipeline.columns`, which should be a dictionary mapping strings to
instances of `zipline.pipeline.term.Term`.
For each date between `start_date` and `end_date`, `result` will
contain a row for each asset that passed `pipeline.screen`. A
screen of None indicates that a row should be returned for each
asset that existed each day.
"""
raise NotImplementedError("run_pipeline")
class NoEngineRegistered(Exception):
"""
Raised if a user tries to call pipeline_output in an algorithm that hasn't
set up a pipeline engine.
"""
class ExplodingPipelineEngine(PipelineEngine):
"""
A PipelineEngine that doesn't do anything.
"""
def run_pipeline(self, pipeline, start_date, end_date):
raise NoEngineRegistered(
"Attempted to run a pipeline but no pipeline "
"resources were registered."
)
def _default_populate_initial_workspace(initial_workspace,
root_mask_term,
execution_plan,
dates,
assets):
return initial_workspace
class SimplePipelineEngine(object):
"""
PipelineEngine class that computes each term independently.
Parameters
----------
get_loader : callable
A function that is given a loadable term and returns a PipelineLoader
to use to retrieve raw data for that term.
calendar : DatetimeIndex
Array of dates to consider as trading days when computing a range
between a fixed start and end.
asset_finder : zipline.assets.AssetFinder
An AssetFinder instance. We depend on the AssetFinder to determine
which assets are in the top-level universe at any point in time.
populate_initial_workspace : callable, optional
A function which will be used to populate the initial workspace when
computing a pipeline. This function will be passed the
initial_workspace, the root mask term, the execution_plan, the dates
being computed for, and the assets requested and should return a new
dictionary which will be used as the initial_workspace.
"""
__slots__ = (
'_get_loader',
'_calendar',
'_finder',
'_root_mask_term',
'_root_mask_dates_term',
'_populate_initial_workspace',
'__weakref__',
)
def __init__(self,
get_loader,
calendar,
asset_finder,
populate_initial_workspace=None):
self._get_loader = get_loader
self._calendar = calendar
self._finder = asset_finder
self._root_mask_term = AssetExists()
self._root_mask_dates_term = InputDates()
self._populate_initial_workspace = (
populate_initial_workspace or _default_populate_initial_workspace
)
def run_pipeline(self, pipeline, start_date, end_date):
"""
Compute a pipeline.
Parameters
----------
pipeline : zipline.pipeline.Pipeline
The pipeline to run.
start_date : pd.Timestamp
Start date of the computed matrix.
end_date : pd.Timestamp
End date of the computed matrix.
The algorithm implemented here can be broken down into the following
stages:
0. Build a dependency graph of all terms in `pipeline`. Topologically
sort the graph to determine an order in which we can compute the
terms.
1. Ask our AssetFinder for a "lifetimes matrix", which should contain,
for each date between start_date and end_date, a boolean value for
each known asset indicating whether the asset existed on that date.
2. Compute each term in the dependency order determined in (0), caching
the results in a a dictionary to that they can be fed into future
terms.
3. For each date, determine the number of assets passing
pipeline.screen. The sum, N, of all these values is the total
number of rows in our output frame, so we pre-allocate an output
array of length N for each factor in `terms`.
4. Fill in the arrays allocated in (3) by copying computed values from
our output cache into the corresponding rows.
5. Stick the values computed in (4) into a DataFrame and return it.
Step 0 is performed by ``Pipeline.to_graph``.
Step 1 is performed in ``SimplePipelineEngine._compute_root_mask``.
Step 2 is performed in ``SimplePipelineEngine.compute_chunk``.
Steps 3, 4, and 5 are performed in ``SimplePiplineEngine._to_narrow``.
See Also
--------
PipelineEngine.run_pipeline
"""
if end_date < start_date:
raise ValueError(
"start_date must be before or equal to end_date \n"
"start_date=%s, end_date=%s" % (start_date, end_date)
)
screen_name = uuid4().hex
graph = pipeline.to_execution_plan(
screen_name,
self._root_mask_term,
self._calendar,
start_date,
end_date,
)
extra_rows = graph.extra_rows[self._root_mask_term]
root_mask = self._compute_root_mask(start_date, end_date, extra_rows)
dates, assets, root_mask_values = explode(root_mask)
initial_workspace = self._populate_initial_workspace(
{
self._root_mask_term: root_mask_values,
self._root_mask_dates_term: as_column(dates.values)
},
self._root_mask_term,
graph,
dates,
assets,
)
results = self.compute_chunk(
graph,
dates,
assets,
initial_workspace,
)
return self._to_narrow(
graph.outputs,
results,
results.pop(screen_name),
dates[extra_rows:],
assets,
)
def _compute_root_mask(self, start_date, end_date, extra_rows):
"""
Compute a lifetimes matrix from our AssetFinder, then drop columns that
didn't exist at all during the query dates.
Parameters
----------
start_date : pd.Timestamp
Base start date for the matrix.
end_date : pd.Timestamp
End date for the matrix.
extra_rows : int
Number of extra rows to compute before `start_date`.
Extra rows are needed by terms like moving averages that require a
trailing window of data.
Returns
-------
lifetimes : pd.DataFrame
Frame of dtype `bool` containing dates from `extra_rows` days
before `start_date`, continuing through to `end_date`. The
returned frame contains as columns all assets in our AssetFinder
that existed for at least one day between `start_date` and
`end_date`.
"""
calendar = self._calendar
finder = self._finder
start_idx, end_idx = self._calendar.slice_locs(start_date, end_date)
if start_idx < extra_rows:
raise NoFurtherDataError.from_lookback_window(
initial_message="Insufficient data to compute Pipeline:",
first_date=calendar[0],
lookback_start=start_date,
lookback_length=extra_rows,
)
# Build lifetimes matrix reaching back to `extra_rows` days before
# `start_date.`
lifetimes = finder.lifetimes(
calendar[start_idx - extra_rows:end_idx],
include_start_date=False
)
assert lifetimes.index[extra_rows] == start_date
assert lifetimes.index[-1] == end_date
if not lifetimes.columns.unique:
columns = lifetimes.columns
duplicated = columns[columns.duplicated()].unique()
raise AssertionError("Duplicated sids: %d" % duplicated)
# Filter out columns that didn't exist between the requested start and
# end dates.
existed = lifetimes.iloc[extra_rows:].any()
ret = lifetimes.loc[:, existed]
shape = ret.shape
assert shape[0] * shape[1] != 0, 'root mask cannot be empty'
return ret
@staticmethod
def _inputs_for_term(term, workspace, graph):
"""
Compute inputs for the given term.
This is mostly complicated by the fact that for each input we store as
many rows as will be necessary to serve **any** computation requiring
that input.
"""
offsets = graph.offset
out = []
if term.windowed:
# If term is windowed, then all input data should be instances of
# AdjustedArray.
for input_ in term.inputs:
adjusted_array = ensure_adjusted_array(
workspace[input_], input_.missing_value,
)
out.append(
adjusted_array.traverse(
window_length=term.window_length,
offset=offsets[term, input_],
)
)
else:
# If term is not windowed, input_data may be an AdjustedArray or
# np.ndarray. Coerce the former to the latter.
for input_ in term.inputs:
input_data = ensure_ndarray(workspace[input_])
offset = offsets[term, input_]
# OPTIMIZATION: Don't make a copy by doing input_data[0:] if
# offset is zero.
if offset:
input_data = input_data[offset:]
out.append(input_data)
return out
def get_loader(self, term):
return self._get_loader(term)
def compute_chunk(self, graph, dates, assets, initial_workspace):
"""
Compute the Pipeline terms in the graph for the requested start and end
dates.
Parameters
----------
graph : zipline.pipeline.graph.TermGraph
dates : pd.DatetimeIndex
Row labels for our root mask.
assets : pd.Int64Index
Column labels for our root mask.
initial_workspace : dict
Map from term -> output.
Must contain at least entry for `self._root_mask_term` whose shape
is `(len(dates), len(assets))`, but may contain additional
pre-computed terms for testing or optimization purposes.
Returns
-------
results : dict
Dictionary mapping requested results to outputs.
"""
self._validate_compute_chunk_params(dates, assets, initial_workspace)
get_loader = self.get_loader
# Copy the supplied initial workspace so we don't mutate it in place.
workspace = initial_workspace.copy()
# If loadable terms share the same loader and extra_rows, load them all
# together.
loader_group_key = juxt(get_loader, getitem(graph.extra_rows))
loader_groups = groupby(loader_group_key, graph.loadable_terms)
refcounts = graph.initial_refcounts(workspace)
for term in graph.execution_order(refcounts):
# `term` may have been supplied in `initial_workspace`, and in the
# future we may pre-compute loadable terms coming from the same
# dataset. In either case, we will already have an entry for this
# term, which we shouldn't re-compute.
if term in workspace:
continue
# Asset labels are always the same, but date labels vary by how
# many extra rows are needed.
mask, mask_dates = graph.mask_and_dates_for_term(
term,
self._root_mask_term,
workspace,
dates,
)
if isinstance(term, LoadableTerm):
to_load = sorted(
loader_groups[loader_group_key(term)],
key=lambda t: t.dataset
)
loader = get_loader(term)
loaded = loader.load_adjusted_array(
to_load, mask_dates, assets, mask,
)
workspace.update(loaded)
else:
workspace[term] = term._compute(
self._inputs_for_term(term, workspace, graph),
mask_dates,
assets,
mask,
)
if term.ndim == 2:
assert workspace[term].shape == mask.shape
else:
assert workspace[term].shape == (mask.shape[0], 1)
# Decref dependencies of ``term``, and clear any terms whose
# refcounts hit 0.
for garbage_term in graph.decref_dependencies(term, refcounts):
del workspace[garbage_term]
out = {}
graph_extra_rows = graph.extra_rows
for name, term in iteritems(graph.outputs):
# Truncate off extra rows from outputs.
out[name] = workspace[term][graph_extra_rows[term]:]
return out
def _to_narrow(self, terms, data, mask, dates, assets):
"""
Convert raw computed pipeline results into a DataFrame for public APIs.
Parameters
----------
terms : dict[str -> Term]
Dict mapping column names to terms.
data : dict[str -> ndarray[ndim=2]]
Dict mapping column names to computed results for those names.
mask : ndarray[bool, ndim=2]
Mask array of values to keep.
dates : ndarray[datetime64, ndim=1]
Row index for arrays `data` and `mask`
assets : ndarray[int64, ndim=2]
Column index for arrays `data` and `mask`
Returns
-------
results : pd.DataFrame
The indices of `results` are as follows:
index : two-tiered MultiIndex of (date, asset).
Contains an entry for each (date, asset) pair corresponding to
a `True` value in `mask`.
columns : Index of str
One column per entry in `data`.
If mask[date, asset] is True, then result.loc[(date, asset), colname]
will contain the value of data[colname][date, asset].
"""
if not mask.any():
# Manually handle the empty DataFrame case. This is a workaround
# to pandas failing to tz_localize an empty dataframe with a
# MultiIndex. It also saves us the work of applying a known-empty
# mask to each array.
#
# Slicing `dates` here to preserve pandas metadata.
empty_dates = dates[:0]
empty_assets = array([], dtype=object)
return DataFrame(
data={
name: array([], dtype=arr.dtype)
for name, arr in iteritems(data)
},
index=MultiIndex.from_arrays([empty_dates, empty_assets]),
)
resolved_assets = array(self._finder.retrieve_all(assets))
dates_kept = repeat_last_axis(dates.values, len(assets))[mask]
assets_kept = repeat_first_axis(resolved_assets, len(dates))[mask]
final_columns = {}
for name in data:
# Each term that computed an output has its postprocess method
# called on the filtered result.
#
# As of Mon May 2 15:38:47 2016, we only use this to convert
# LabelArrays into categoricals.
final_columns[name] = terms[name].postprocess(data[name][mask])
return DataFrame(
data=final_columns,
index=MultiIndex.from_arrays([dates_kept, assets_kept]),
).tz_localize('UTC', level=0)
def _validate_compute_chunk_params(self, dates, assets, initial_workspace):
"""
Verify that the values passed to compute_chunk are well-formed.
"""
root = self._root_mask_term
clsname = type(self).__name__
# Writing this out explicitly so this errors in testing if we change
# the name without updating this line.
compute_chunk_name = self.compute_chunk.__name__
if root not in initial_workspace:
raise AssertionError(
"root_mask values not supplied to {cls}.{method}".format(
cls=clsname,
method=compute_chunk_name,
)
)
shape = initial_workspace[root].shape
implied_shape = len(dates), len(assets)
if shape != implied_shape:
raise AssertionError(
"root_mask shape is {shape}, but received dates/assets "
"imply that shape should be {implied}".format(
shape=shape,
implied=implied_shape,
)
)