Skip to content

Commit

Permalink
Merge pull request #962 from quantopian/pipeline-missing-values
Browse files Browse the repository at this point in the history
Pipeline missing values
  • Loading branch information
Scott Sanderson committed Feb 16, 2016
2 parents d5bd2a9 + 6287987 commit b632c41
Show file tree
Hide file tree
Showing 26 changed files with 893 additions and 136 deletions.
16 changes: 16 additions & 0 deletions docs/source/whatsnew/0.8.4.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Highlights
* :class:`~zipline.assets.assets.AssetFinder` speedups (:issue:`830` and
:issue:`817`).

* Improved support for non-float dtypes in Pipeline. Most notably, we now
support ``datetime64`` and ``int64`` dtypes for ``Factor``, and
``BoundColumn.latest`` now returns a proper ``Filter`` object when the column
is of dtype ``bool``.

Enhancements
~~~~~~~~~~~~

Expand Down Expand Up @@ -83,6 +88,17 @@ Enhancements
data that is timestamped on or after ``8:45`` will not seen on that day in the
simulation. The data will be made available on the next day (:issue:`947`).

* ``BoundColumn.latest`` now returns a
:class:`~zipline.pipeline.filters.Filter` for columns of dtype
``bool`` (:issue:`962`).

* Added support for :class:`~zipline.pipeline.factors.Factor` instances with
``int64`` dtype. :class:`~zipline.pipeline.data.dataset.Column` now requires
a ``missing_value`` when dtype is integral. (:issue:`962`)

* It is also now possible to specify custom ``missing_value`` values for
``float``, ``datetime``, and ``bool`` Pipeline terms. (:issue:`962`)

Experimental Features
~~~~~~~~~~~~~~~~~~~~~

Expand Down
57 changes: 51 additions & 6 deletions tests/pipeline/test_adjusted_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
arange,
array,
full,
where,
)
from numpy.testing import assert_array_equal
from six.moves import zip_longest
Expand All @@ -23,9 +24,21 @@
from zipline.lib.adjusted_array import AdjustedArray, NOMASK
from zipline.utils.numpy_utils import (
datetime64ns_dtype,
default_missing_value_for_dtype,
float64_dtype,
int64_dtype,
make_datetime64ns,
)
from zipline.utils.test_utils import check_arrays, parameter_space


def moving_window(array, nrows):
"""
Simple moving window generator over a 2D numpy array.
"""
count = num_windows_of_length_M_on_buffers_of_length_N(nrows, len(array))
for i in range(count):
yield array[i:i + nrows]


def num_windows_of_length_M_on_buffers_of_length_N(M, N):
Expand Down Expand Up @@ -66,6 +79,7 @@ def _gen_unadjusted_cases(dtype):
nrows = 6
ncols = 3
data = arange(nrows * ncols).astype(dtype).reshape(nrows, ncols)
missing_value = default_missing_value_for_dtype(dtype)

for windowlen in valid_window_lengths(nrows):

Expand All @@ -78,6 +92,7 @@ def _gen_unadjusted_cases(dtype):
data,
windowlen,
{},
missing_value,
[
data[offset:offset + windowlen]
for offset in range(num_legal_windows)
Expand Down Expand Up @@ -230,6 +245,7 @@ def _gen_overwrite_adjustment_cases(dtype):

def _gen_expectations(baseline, adjustments, buffer_as_of, nrows):

missing_value = default_missing_value_for_dtype(baseline.dtype)
for windowlen in valid_window_lengths(nrows):

num_legal_windows = num_windows_of_length_M_on_buffers_of_length_N(
Expand All @@ -241,6 +257,7 @@ def _gen_expectations(baseline, adjustments, buffer_as_of, nrows):
baseline,
windowlen,
adjustments,
missing_value,
[
# This is a nasty expression...
#
Expand All @@ -267,9 +284,10 @@ def test_no_adjustments(self,
data,
lookback,
adjustments,
missing_value,
expected):

array = AdjustedArray(data, NOMASK, adjustments)
array = AdjustedArray(data, NOMASK, adjustments, missing_value)
for _ in range(2): # Iterate 2x ensure adjusted_arrays are re-usable.
window_iter = array.traverse(lookback)
for yielded, expected_yield in zip_longest(window_iter, expected):
Expand All @@ -282,9 +300,10 @@ def test_multiplicative_adjustments(self,
data,
lookback,
adjustments,
missing_value,
expected):

array = AdjustedArray(data, NOMASK, adjustments)
array = AdjustedArray(data, NOMASK, adjustments, missing_value)
for _ in range(2): # Iterate 2x ensure adjusted_arrays are re-usable.
window_iter = array.traverse(lookback)
for yielded, expected_yield in zip_longest(window_iter, expected):
Expand All @@ -301,18 +320,43 @@ def test_overwrite_adjustment_cases(self,
data,
lookback,
adjustments,
missing_value,
expected):
array = AdjustedArray(data, NOMASK, adjustments)
array = AdjustedArray(data, NOMASK, adjustments, missing_value)
for _ in range(2): # Iterate 2x ensure adjusted_arrays are re-usable.
window_iter = array.traverse(lookback)
for yielded, expected_yield in zip_longest(window_iter, expected):
self.assertEqual(yielded.dtype, data.dtype)
assert_array_equal(yielded, expected_yield)

@parameter_space(
dtype=[float64_dtype, int64_dtype, datetime64ns_dtype],
missing_value=[0, 10000],
window_length=[2, 3],
)
def test_masking(self, dtype, missing_value, window_length):
missing_value = value_with_dtype(dtype, missing_value)
baseline_ints = arange(15).reshape(5, 3)
baseline = baseline_ints.astype(dtype)
mask = (baseline_ints % 2).astype(bool)
masked_baseline = where(mask, baseline, missing_value)

array = AdjustedArray(
baseline,
mask,
adjustments={},
missing_value=missing_value,
)

gen_expected = moving_window(masked_baseline, window_length)
gen_actual = array.traverse(window_length)
for expected, actual in zip(gen_expected, gen_actual):
check_arrays(expected, actual)

def test_invalid_lookback(self):

data = arange(30, dtype=float).reshape(6, 5)
adj_array = AdjustedArray(data, NOMASK, {})
adj_array = AdjustedArray(data, NOMASK, {}, float('nan'))

with self.assertRaises(WindowLengthTooLong):
adj_array.traverse(7)
Expand All @@ -326,7 +370,7 @@ def test_invalid_lookback(self):
def test_array_views_arent_writable(self):

data = arange(30, dtype=float).reshape(6, 5)
adj_array = AdjustedArray(data, NOMASK, {})
adj_array = AdjustedArray(data, NOMASK, {}, float('nan'))

for frame in adj_array.traverse(3):
with self.assertRaises(ValueError):
Expand All @@ -338,14 +382,15 @@ def test_bad_input(self):
bad_mask = array([[0, 1, 1], [0, 0, 1]], dtype=bool)

with self.assertRaisesRegexp(ValueError, msg):
AdjustedArray(data, bad_mask, {})
AdjustedArray(data, bad_mask, {}, missing_value=-1)

def test_inspect(self):
data = arange(15, dtype=float).reshape(5, 3)
adj_array = AdjustedArray(
data,
NOMASK,
{4: [Float64Multiply(2, 3, 0, 0, 4.0)]},
float('nan'),
)

expected = dedent(
Expand Down
54 changes: 43 additions & 11 deletions tests/pipeline/test_blaze.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
NonPipelineField,
no_deltas_rules,
)
from zipline.utils.numpy_utils import repeat_last_axis
from zipline.utils.numpy_utils import (
float64_dtype,
int64_dtype,
repeat_last_axis,
)
from zipline.utils.test_utils import tmp_asset_finder, make_simple_equity_info


Expand Down Expand Up @@ -73,14 +77,16 @@ def setUpClass(cls):
cls.sids = sids = ord('A'), ord('B'), ord('C')
cls.df = df = pd.DataFrame({
'sid': sids * 3,
'value': (0, 1, 2, 1, 2, 3, 2, 3, 4),
'value': (0., 1., 2., 1., 2., 3., 2., 3., 4.),
'int_value': (0, 1, 2, 1, 2, 3, 2, 3, 4),
'asof_date': dates,
'timestamp': dates,
})
cls.dshape = dshape("""
var * {
sid: ?int64,
value: ?float64,
int_value: ?int64,
asof_date: datetime,
timestamp: datetime
}
Expand All @@ -91,6 +97,7 @@ def setUpClass(cls):
cls.macro_dshape = var * Record(dshape_)

cls.garbage_loader = BlazeLoader()
cls.missing_values = {'int_value': 0}

def test_tabular(self):
name = 'expr'
Expand All @@ -99,15 +106,20 @@ def test_tabular(self):
expr,
loader=self.garbage_loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
)
self.assertEqual(ds.__name__, name)
self.assertTrue(issubclass(ds, DataSet))
self.assertEqual(
{c.name: c.dtype for c in ds.columns},
{'sid': np.int64, 'value': np.float64},
)

for field in ('timestamp', 'asof_date'):
self.assertIs(ds.value.dtype, float64_dtype)
self.assertIs(ds.int_value.dtype, int64_dtype)

self.assertTrue(np.isnan(ds.value.missing_value))
self.assertEqual(ds.int_value.missing_value, 0)

invalid_type_fields = ('asof_date',)

for field in invalid_type_fields:
with self.assertRaises(AttributeError) as e:
getattr(ds, field)
self.assertIn("'%s'" % field, str(e.exception))
Expand All @@ -119,6 +131,7 @@ def test_tabular(self):
expr,
loader=self.garbage_loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
),
ds,
)
Expand All @@ -130,17 +143,19 @@ def test_column(self):
expr.value,
loader=self.garbage_loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
)
self.assertEqual(value.name, 'value')
self.assertIsInstance(value, BoundColumn)
self.assertEqual(value.dtype, np.float64)
self.assertIs(value.dtype, float64_dtype)

# test memoization
self.assertIs(
from_blaze(
expr.value,
loader=self.garbage_loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
),
value,
)
Expand All @@ -149,6 +164,7 @@ def test_column(self):
expr,
loader=self.garbage_loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
).value,
value,
)
Expand All @@ -159,6 +175,7 @@ def test_column(self):
expr,
loader=self.garbage_loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
),
value.dataset,
)
Expand Down Expand Up @@ -195,7 +212,11 @@ def test_auto_deltas(self):
)),
)
loader = BlazeLoader()
ds = from_blaze(expr.ds, loader=loader)
ds = from_blaze(
expr.ds,
loader=loader,
missing_values=self.missing_values,
)
self.assertEqual(len(loader), 1)
exprdata = loader[ds]
self.assertTrue(exprdata.expr.isidentical(expr.ds))
Expand All @@ -210,6 +231,7 @@ def test_auto_deltas_fail_warn(self):
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.warn,
missing_values=self.missing_values,
)
self.assertEqual(len(ws), 1)
w = ws[0].message
Expand Down Expand Up @@ -281,13 +303,15 @@ def test_complex_expr(self):
expr_with_add,
deltas=None,
loader=self.garbage_loader,
missing_values=self.missing_values,
)

with self.assertRaises(TypeError):
from_blaze(
expr.value + 1, # put an Add in the column
deltas=None,
loader=self.garbage_loader,
missing_values=self.missing_values,
)

deltas = bz.Data(
Expand All @@ -299,13 +323,15 @@ def test_complex_expr(self):
expr_with_add,
deltas=deltas,
loader=self.garbage_loader,
missing_values=self.missing_values,
)

with self.assertRaises(TypeError):
from_blaze(
expr.value + 1,
deltas=deltas,
loader=self.garbage_loader,
missing_values=self.missing_values,
)

def _test_id(self, df, dshape, expected, finder, add):
Expand All @@ -315,6 +341,7 @@ def _test_id(self, df, dshape, expected, finder, add):
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
)
p = Pipeline()
for a in add:
Expand Down Expand Up @@ -347,9 +374,11 @@ def test_custom_query_time_tz(self):
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
)
p = Pipeline()
p.add(ds.value.latest, 'value')
p.add(ds.int_value.latest, 'int_value')
dates = self.dates

with tmp_asset_finder() as finder:
Expand Down Expand Up @@ -405,7 +434,9 @@ def test_id(self):
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
self._test_id(self.df, self.dshape, expected, finder, ('value',))
self._test_id(
self.df, self.dshape, expected, finder, ('int_value', 'value',)
)

def test_id_ffill_out_of_window(self):
"""
Expand Down Expand Up @@ -512,7 +543,7 @@ def test_id_multiple_columns(self):
var * Record(fields),
expected,
finder,
('value', 'other'),
('value', 'int_value', 'other'),
)

def test_id_macro_dataset(self):
Expand Down Expand Up @@ -782,6 +813,7 @@ def _run_pipeline(self,
deltas,
loader=loader,
no_deltas_rule=no_deltas_rules.raise_,
missing_values=self.missing_values,
)
p = Pipeline()

Expand Down
Loading

0 comments on commit b632c41

Please sign in to comment.