forked from quantopian/zipline
/
filter.py
257 lines (224 loc) · 7.77 KB
/
filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
"""
filter.py
"""
from numpy import (
float64,
nan,
nanpercentile,
)
from itertools import chain
from operator import attrgetter
from zipline.errors import (
BadPercentileBounds,
UnsupportedDataType,
)
from zipline.pipeline.mixins import (
CustomTermMixin,
PositiveWindowLengthMixin,
SingleInputMixin,
)
from zipline.pipeline.term import CompositeTerm
from zipline.pipeline.expression import (
BadBinaryOperator,
FILTER_BINOPS,
method_name_for_op,
NumericalExpression,
)
from zipline.utils.control_flow import nullctx
from zipline.utils.numpy_utils import bool_dtype
def concat_tuples(*tuples):
"""
Concatenate a sequence of tuples into one tuple.
"""
return tuple(chain(*tuples))
def binary_operator(op):
"""
Factory function for making binary operator methods on a Filter subclass.
Returns a function "binary_operator" suitable for implementing functions
like __and__ or __or__.
"""
# When combining a Filter with a NumericalExpression, we use this
# attrgetter instance to defer to the commuted interpretation of the
# NumericalExpression operator.
commuted_method_getter = attrgetter(method_name_for_op(op, commute=True))
def binary_operator(self, other):
if isinstance(self, NumericalExpression):
self_expr, other_expr, new_inputs = self.build_binary_op(
op, other,
)
return NumExprFilter.create(
"({left}) {op} ({right})".format(
left=self_expr,
op=op,
right=other_expr,
),
new_inputs,
)
elif isinstance(other, NumericalExpression):
# NumericalExpression overrides numerical ops to correctly handle
# merging of inputs. Look up and call the appropriate
# right-binding operator with ourself as the input.
return commuted_method_getter(other)(self)
elif isinstance(other, Filter):
if self is other:
return NumExprFilter.create(
"x_0 {op} x_0".format(op=op),
(self,),
)
return NumExprFilter.create(
"x_0 {op} x_1".format(op=op),
(self, other),
)
elif isinstance(other, int): # Note that this is true for bool as well
return NumExprFilter.create(
"x_0 {op} ({constant})".format(op=op, constant=int(other)),
binds=(self,),
)
raise BadBinaryOperator(op, self, other)
binary_operator.__doc__ = "Binary Operator: '%s'" % op
return binary_operator
def unary_operator(op):
"""
Factory function for making unary operator methods for Filters.
"""
valid_ops = {'~'}
if op not in valid_ops:
raise ValueError("Invalid unary operator %s." % op)
def unary_operator(self):
# This can't be hoisted up a scope because the types returned by
# unary_op_return_type aren't defined when the top-level function is
# invoked.
if isinstance(self, NumericalExpression):
return NumExprFilter.create(
"{op}({expr})".format(op=op, expr=self._expr),
self.inputs,
)
else:
return NumExprFilter.create("{op}x_0".format(op=op), (self,))
unary_operator.__doc__ = "Unary Operator: '%s'" % op
return unary_operator
class Filter(CompositeTerm):
"""
Pipeline API expression producing boolean-valued outputs.
"""
dtype = bool_dtype
clsdict = locals()
clsdict.update(
{
method_name_for_op(op): binary_operator(op)
for op in FILTER_BINOPS
}
)
clsdict.update(
{
method_name_for_op(op, commute=True): binary_operator(op)
for op in FILTER_BINOPS
}
)
__invert__ = unary_operator('~')
def _validate(self):
# Run superclass validation first so that we handle `dtype not passed`
# before this.
retval = super(Filter, self)._validate()
if self.dtype != bool_dtype:
raise UnsupportedDataType(
typename=type(self).__name__,
dtype=self.dtype
)
return retval
class NumExprFilter(NumericalExpression, Filter):
"""
A Filter computed from a numexpr expression.
"""
@classmethod
def create(cls, expr, binds):
"""
Helper for creating new NumExprFactors.
This is just a wrapper around NumExprFactor.__new__ that always
forwards `bool` as the dtype, since Filters can only be of boolean
dtype.
"""
return cls(expr=expr, binds=binds, dtype=bool_dtype)
def _compute(self, arrays, dates, assets, mask):
"""
Compute our result with numexpr, then re-apply `mask`.
"""
return super(NumExprFilter, self)._compute(
arrays,
dates,
assets,
mask,
) & mask
class PercentileFilter(SingleInputMixin, Filter):
"""
A Filter representing assets falling between percentile bounds of a Factor.
Parameters
----------
factor : zipline.pipeline.factor.Factor
The factor over which to compute percentile bounds.
min_percentile : float [0.0, 1.0]
The minimum percentile rank of an asset that will pass the filter.
max_percentile : float [0.0, 1.0]
The maxiumum percentile rank of an asset that will pass the filter.
"""
window_length = 0
def __new__(cls, factor, min_percentile, max_percentile, mask):
return super(PercentileFilter, cls).__new__(
cls,
inputs=(factor,),
mask=mask,
min_percentile=min_percentile,
max_percentile=max_percentile,
)
def _init(self, min_percentile, max_percentile, *args, **kwargs):
self._min_percentile = min_percentile
self._max_percentile = max_percentile
return super(PercentileFilter, self)._init(*args, **kwargs)
@classmethod
def static_identity(cls, min_percentile, max_percentile, *args, **kwargs):
return (
super(PercentileFilter, cls).static_identity(*args, **kwargs),
min_percentile,
max_percentile,
)
def _validate(self):
"""
Ensure that our percentile bounds are well-formed.
"""
if not 0.0 <= self._min_percentile < self._max_percentile <= 100.0:
raise BadPercentileBounds(
min_percentile=self._min_percentile,
max_percentile=self._max_percentile,
)
return super(PercentileFilter, self)._validate()
def _compute(self, arrays, dates, assets, mask):
"""
For each row in the input, compute a mask of all values falling between
the given percentiles.
"""
# TODO: Review whether there's a better way of handling small numbers
# of columns.
data = arrays[0].copy().astype(float64)
data[~mask] = nan
# FIXME: np.nanpercentile **should** support computing multiple bounds
# at once, but there's a bug in the logic for multiple bounds in numpy
# 1.9.2. It will be fixed in 1.10.
# c.f. https://github.com/numpy/numpy/pull/5981
lower_bounds = nanpercentile(
data,
self._min_percentile,
axis=1,
keepdims=True,
)
upper_bounds = nanpercentile(
data,
self._max_percentile,
axis=1,
keepdims=True,
)
return (lower_bounds <= data) & (data <= upper_bounds)
class CustomFilter(PositiveWindowLengthMixin, CustomTermMixin, Filter):
"""
Filter analog to ``CustomFactor``.
"""
ctx = nullctx()