Skip to content

Commit

Permalink
BUG: Fix window aggregations to skip over unused elements (pandas-dev…
Browse files Browse the repository at this point in the history
  • Loading branch information
rtpsw authored and yehoshuadimarsky committed Jul 13, 2022
1 parent 24d18ea commit 45220c9
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 36 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.4.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Fixed regressions
Bug fixes
~~~~~~~~~
- Fixed segfault in :meth:``DataFrame.to_json`` when dumping tz-aware datetimes in Python 3.10 (:issue:`42130`)
- Fixed window aggregations in :meth:`DataFrame.rolling` and :meth:`Series.rolling` to skip over unused elements (:issue:`45647`)
-

.. ---------------------------------------------------------------------------
Expand Down
92 changes: 56 additions & 36 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
cdef:
Py_ssize_t i, j
float64_t sum_x = 0, compensation_add = 0, compensation_remove = 0
float64_t sum_x, compensation_add, compensation_remove
int64_t s, e
int64_t nobs = 0, N = len(values)
int64_t nobs = 0, N = len(start)
ndarray[float64_t] output
bint is_monotonic_increasing_bounds

Expand All @@ -139,10 +139,12 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

# setup

sum_x = compensation_add = compensation_remove = 0
nobs = 0
for j in range(s, e):
add_sum(values[j], &nobs, &sum_x, &compensation_add)

Expand Down Expand Up @@ -226,9 +228,9 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x,
def roll_mean(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
cdef:
float64_t val, compensation_add = 0, compensation_remove = 0, sum_x = 0
float64_t val, compensation_add, compensation_remove, sum_x
int64_t s, e
Py_ssize_t nobs = 0, i, j, neg_ct = 0, N = len(values)
Py_ssize_t nobs, i, j, neg_ct, N = len(start)
ndarray[float64_t] output
bint is_monotonic_increasing_bounds

Expand All @@ -243,8 +245,10 @@ def roll_mean(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

compensation_add = compensation_remove = sum_x = 0
nobs = neg_ct = 0
# setup
for j in range(s, e):
val = values[j]
Expand Down Expand Up @@ -349,11 +353,11 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,
Numerically stable implementation using Welford's method.
"""
cdef:
float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, compensation_add = 0,
float64_t compensation_remove = 0,
float64_t mean_x, ssqdm_x, nobs, compensation_add,
float64_t compensation_remove,
float64_t val, prev, delta, mean_x_old
int64_t s, e
Py_ssize_t i, j, N = len(values)
Py_ssize_t i, j, N = len(start)
ndarray[float64_t] output
bint is_monotonic_increasing_bounds

Expand All @@ -372,8 +376,9 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

mean_x = ssqdm_x = nobs = compensation_add = compensation_remove = 0
for j in range(s, e):
add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add)

Expand Down Expand Up @@ -500,11 +505,11 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
cdef:
Py_ssize_t i, j
float64_t val, prev, min_val, mean_val, sum_val = 0
float64_t compensation_xxx_add = 0, compensation_xxx_remove = 0
float64_t compensation_xx_add = 0, compensation_xx_remove = 0
float64_t compensation_x_add = 0, compensation_x_remove = 0
float64_t x = 0, xx = 0, xxx = 0
int64_t nobs = 0, N = len(values), nobs_mean = 0
float64_t compensation_xxx_add, compensation_xxx_remove
float64_t compensation_xx_add, compensation_xx_remove
float64_t compensation_x_add, compensation_x_remove
float64_t x, xx, xxx
int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0
int64_t s, e
ndarray[float64_t] output, mean_array, values_copy
bint is_monotonic_increasing_bounds
Expand All @@ -518,7 +523,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
values_copy = np.copy(values)

with nogil:
for i in range(0, N):
for i in range(0, V):
val = values_copy[i]
if notnan(val):
nobs_mean += 1
Expand All @@ -527,7 +532,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
# Other cases would lead to imprecision for smallest values
if min_val - mean_val > -1e5:
mean_val = round(mean_val)
for i in range(0, N):
for i in range(0, V):
values_copy[i] = values_copy[i] - mean_val

for i in range(0, N):
Expand All @@ -537,8 +542,13 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

compensation_xxx_add = compensation_xxx_remove = 0
compensation_xx_add = compensation_xx_remove = 0
compensation_x_add = compensation_x_remove = 0
x = xx = xxx = 0
nobs = 0
for j in range(s, e):
val = values_copy[j]
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
Expand Down Expand Up @@ -682,12 +692,12 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
cdef:
Py_ssize_t i, j
float64_t val, prev, mean_val, min_val, sum_val = 0
float64_t compensation_xxxx_add = 0, compensation_xxxx_remove = 0
float64_t compensation_xxx_remove = 0, compensation_xxx_add = 0
float64_t compensation_xx_remove = 0, compensation_xx_add = 0
float64_t compensation_x_remove = 0, compensation_x_add = 0
float64_t x = 0, xx = 0, xxx = 0, xxxx = 0
int64_t nobs = 0, s, e, N = len(values), nobs_mean = 0
float64_t compensation_xxxx_add, compensation_xxxx_remove
float64_t compensation_xxx_remove, compensation_xxx_add
float64_t compensation_xx_remove, compensation_xx_add
float64_t compensation_x_remove, compensation_x_add
float64_t x, xx, xxx, xxxx
int64_t nobs, s, e, N = len(start), V = len(values), nobs_mean = 0
ndarray[float64_t] output, values_copy
bint is_monotonic_increasing_bounds

Expand All @@ -700,7 +710,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
min_val = np.nanmin(values)

with nogil:
for i in range(0, N):
for i in range(0, V):
val = values_copy[i]
if notnan(val):
nobs_mean += 1
Expand All @@ -709,7 +719,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
# Other cases would lead to imprecision for smallest values
if min_val - mean_val > -1e4:
mean_val = round(mean_val)
for i in range(0, N):
for i in range(0, V):
values_copy[i] = values_copy[i] - mean_val

for i in range(0, N):
Expand All @@ -719,8 +729,14 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

compensation_xxxx_add = compensation_xxxx_remove = 0
compensation_xxx_remove = compensation_xxx_add = 0
compensation_xx_remove = compensation_xx_add = 0
compensation_x_remove = compensation_x_add = 0
x = xx = xxx = xxxx = 0
nobs = 0
for j in range(s, e):
add_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx,
&compensation_x_add, &compensation_xx_add,
Expand Down Expand Up @@ -764,7 +780,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
Py_ssize_t i, j
bint err = False, is_monotonic_increasing_bounds
int midpoint, ret = 0
int64_t nobs = 0, N = len(values), s, e, win
int64_t nobs = 0, N = len(start), s, e, win
float64_t val, res, prev
skiplist_t *sl
ndarray[float64_t] output
Expand All @@ -791,8 +807,12 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

if i != 0:
skiplist_destroy(sl)
sl = skiplist_init(<int>win)
nobs = 0
# setup
for j in range(s, e):
val = values[j]
Expand Down Expand Up @@ -948,7 +968,7 @@ cdef _roll_min_max(ndarray[numeric_t] values,
cdef:
numeric_t ai
int64_t curr_win_size, start
Py_ssize_t i, k, nobs = 0, N = len(values)
Py_ssize_t i, k, nobs = 0, N = len(starti)
deque Q[int64_t] # min/max always the front
deque W[int64_t] # track the whole window for nobs compute
ndarray[float64_t, ndim=1] output
Expand Down Expand Up @@ -1031,7 +1051,7 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
O(N log(window)) implementation using skip list
"""
cdef:
Py_ssize_t i, j, s, e, N = len(values), idx
Py_ssize_t i, j, s, e, N = len(start), idx
int ret = 0
int64_t nobs = 0, win
float64_t val, prev, midpoint, idx_with_fraction
Expand Down Expand Up @@ -1068,8 +1088,8 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
if i != 0:
nobs = 0
skiplist_destroy(skiplist)
skiplist = skiplist_init(<int>win)
Expand Down Expand Up @@ -1160,7 +1180,7 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
derived from roll_quantile
"""
cdef:
Py_ssize_t i, j, s, e, N = len(values), idx
Py_ssize_t i, j, s, e, N = len(start), idx
float64_t rank_min = 0, rank = 0
int64_t nobs = 0, win
float64_t val
Expand Down Expand Up @@ -1193,8 +1213,8 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
if i != 0:
nobs = 0
skiplist_destroy(skiplist)
skiplist = skiplist_init(<int>win)
Expand Down
111 changes: 111 additions & 0 deletions pandas/tests/window/test_cython_aggregations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from functools import partial
import sys

import numpy as np
import pytest

import pandas._libs.window.aggregations as window_aggregations

from pandas import Series
import pandas._testing as tm


def _get_rolling_aggregations():
# list pairs of name and function
# each function has this signature:
# (const float64_t[:] values, ndarray[int64_t] start,
# ndarray[int64_t] end, int64_t minp) -> np.ndarray
named_roll_aggs = (
[
("roll_sum", window_aggregations.roll_sum),
("roll_mean", window_aggregations.roll_mean),
]
+ [
(f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof))
for ddof in [0, 1]
]
+ [
("roll_skew", window_aggregations.roll_skew),
("roll_kurt", window_aggregations.roll_kurt),
("roll_median_c", window_aggregations.roll_median_c),
("roll_max", window_aggregations.roll_max),
("roll_min", window_aggregations.roll_min),
]
+ [
(
f"roll_quantile({quantile},{interpolation})",
partial(
window_aggregations.roll_quantile,
quantile=quantile,
interpolation=interpolation,
),
)
for quantile in [0.0001, 0.5, 0.9999]
for interpolation in window_aggregations.interpolation_types
]
+ [
(
f"roll_rank({percentile},{method},{ascending})",
partial(
window_aggregations.roll_rank,
percentile=percentile,
method=method,
ascending=ascending,
),
)
for percentile in [True, False]
for method in window_aggregations.rolling_rank_tiebreakers.keys()
for ascending in [True, False]
]
)
# unzip to a list of 2 tuples, names and functions
unzipped = list(zip(*named_roll_aggs))
return {"ids": unzipped[0], "params": unzipped[1]}


_rolling_aggregations = _get_rolling_aggregations()


@pytest.fixture(
params=_rolling_aggregations["params"], ids=_rolling_aggregations["ids"]
)
def rolling_aggregation(request):
"""Make a rolling aggregation function as fixture."""
return request.param


def test_rolling_aggregation_boundary_consistency(rolling_aggregation):
# GH-45647
minp, step, width, size, selection = 0, 1, 3, 11, [2, 7]
values = np.arange(1, 1 + size, dtype=np.float64)
end = np.arange(width, size, step, dtype=np.int64)
start = end - width
selarr = np.array(selection, dtype=np.int32)
result = Series(rolling_aggregation(values, start[selarr], end[selarr], minp))
expected = Series(rolling_aggregation(values, start, end, minp)[selarr])
tm.assert_equal(expected, result)


def test_rolling_aggregation_with_unused_elements(rolling_aggregation):
# GH-45647
minp, width = 0, 5 # width at least 4 for kurt
size = 2 * width + 5
values = np.arange(1, size + 1, dtype=np.float64)
values[width : width + 2] = sys.float_info.min
values[width + 2] = np.nan
values[width + 3 : width + 5] = sys.float_info.max
start = np.array([0, size - width], dtype=np.int64)
end = np.array([width, size], dtype=np.int64)
loc = np.array(
[j for i in range(len(start)) for j in range(start[i], end[i])],
dtype=np.int32,
)
result = Series(rolling_aggregation(values, start, end, minp))
compact_values = np.array(values[loc], dtype=np.float64)
compact_start = np.arange(0, len(start) * width, width, dtype=np.int64)
compact_end = compact_start + width
expected = Series(
rolling_aggregation(compact_values, compact_start, compact_end, minp)
)
assert np.isfinite(expected.values).all(), "Not all expected values are finite"
tm.assert_equal(expected, result)

0 comments on commit 45220c9

Please sign in to comment.