diff --git a/doc/source/whatsnew/v1.4.1.rst b/doc/source/whatsnew/v1.4.1.rst index 23aefb783456d..10b605f6ef43e 100644 --- a/doc/source/whatsnew/v1.4.1.rst +++ b/doc/source/whatsnew/v1.4.1.rst @@ -28,6 +28,7 @@ Fixed regressions Bug fixes ~~~~~~~~~ - Fixed segfault in :meth:``DataFrame.to_json`` when dumping tz-aware datetimes in Python 3.10 (:issue:`42130`) +- Fixed window aggregations in :meth:`DataFrame.rolling` and :meth:`Series.rolling` to skip over unused elements (:issue:`45647`) - .. --------------------------------------------------------------------------- diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 5ebb60dc7e41b..ff53a577af33f 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -122,9 +122,9 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: Py_ssize_t i, j - float64_t sum_x = 0, compensation_add = 0, compensation_remove = 0 + float64_t sum_x, compensation_add, compensation_remove int64_t s, e - int64_t nobs = 0, N = len(values) + int64_t nobs = 0, N = len(start) ndarray[float64_t] output bint is_monotonic_increasing_bounds @@ -139,10 +139,12 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: # setup + sum_x = compensation_add = compensation_remove = 0 + nobs = 0 for j in range(s, e): add_sum(values[j], &nobs, &sum_x, &compensation_add) @@ -226,9 +228,9 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x, def roll_mean(const float64_t[:] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: - float64_t val, compensation_add = 0, compensation_remove = 0, sum_x = 0 + float64_t val, compensation_add, compensation_remove, sum_x int64_t s, e - Py_ssize_t nobs = 0, i, j, neg_ct = 0, N = len(values) + Py_ssize_t nobs, i, j, neg_ct, N = len(start) ndarray[float64_t] output bint is_monotonic_increasing_bounds @@ -243,8 +245,10 @@ def roll_mean(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + compensation_add = compensation_remove = sum_x = 0 + nobs = neg_ct = 0 # setup for j in range(s, e): val = values[j] @@ -349,11 +353,11 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start, Numerically stable implementation using Welford's method. """ cdef: - float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, compensation_add = 0, - float64_t compensation_remove = 0, + float64_t mean_x, ssqdm_x, nobs, compensation_add, + float64_t compensation_remove, float64_t val, prev, delta, mean_x_old int64_t s, e - Py_ssize_t i, j, N = len(values) + Py_ssize_t i, j, N = len(start) ndarray[float64_t] output bint is_monotonic_increasing_bounds @@ -372,8 +376,9 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + mean_x = ssqdm_x = nobs = compensation_add = compensation_remove = 0 for j in range(s, e): add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add) @@ -500,11 +505,11 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, cdef: Py_ssize_t i, j float64_t val, prev, min_val, mean_val, sum_val = 0 - float64_t compensation_xxx_add = 0, compensation_xxx_remove = 0 - float64_t compensation_xx_add = 0, compensation_xx_remove = 0 - float64_t compensation_x_add = 0, compensation_x_remove = 0 - float64_t x = 0, xx = 0, xxx = 0 - int64_t nobs = 0, N = len(values), nobs_mean = 0 + float64_t compensation_xxx_add, compensation_xxx_remove + float64_t compensation_xx_add, compensation_xx_remove + float64_t compensation_x_add, compensation_x_remove + float64_t x, xx, xxx + int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0 int64_t s, e ndarray[float64_t] output, mean_array, values_copy bint is_monotonic_increasing_bounds @@ -518,7 +523,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, values_copy = np.copy(values) with nogil: - for i in range(0, N): + for i in range(0, V): val = values_copy[i] if notnan(val): nobs_mean += 1 @@ -527,7 +532,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, # Other cases would lead to imprecision for smallest values if min_val - mean_val > -1e5: mean_val = round(mean_val) - for i in range(0, N): + for i in range(0, V): values_copy[i] = values_copy[i] - mean_val for i in range(0, N): @@ -537,8 +542,13 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + compensation_xxx_add = compensation_xxx_remove = 0 + compensation_xx_add = compensation_xx_remove = 0 + compensation_x_add = compensation_x_remove = 0 + x = xx = xxx = 0 + nobs = 0 for j in range(s, e): val = values_copy[j] add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add, @@ -682,12 +692,12 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, cdef: Py_ssize_t i, j float64_t val, prev, mean_val, min_val, sum_val = 0 - float64_t compensation_xxxx_add = 0, compensation_xxxx_remove = 0 - float64_t compensation_xxx_remove = 0, compensation_xxx_add = 0 - float64_t compensation_xx_remove = 0, compensation_xx_add = 0 - float64_t compensation_x_remove = 0, compensation_x_add = 0 - float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 - int64_t nobs = 0, s, e, N = len(values), nobs_mean = 0 + float64_t compensation_xxxx_add, compensation_xxxx_remove + float64_t compensation_xxx_remove, compensation_xxx_add + float64_t compensation_xx_remove, compensation_xx_add + float64_t compensation_x_remove, compensation_x_add + float64_t x, xx, xxx, xxxx + int64_t nobs, s, e, N = len(start), V = len(values), nobs_mean = 0 ndarray[float64_t] output, values_copy bint is_monotonic_increasing_bounds @@ -700,7 +710,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, min_val = np.nanmin(values) with nogil: - for i in range(0, N): + for i in range(0, V): val = values_copy[i] if notnan(val): nobs_mean += 1 @@ -709,7 +719,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, # Other cases would lead to imprecision for smallest values if min_val - mean_val > -1e4: mean_val = round(mean_val) - for i in range(0, N): + for i in range(0, V): values_copy[i] = values_copy[i] - mean_val for i in range(0, N): @@ -719,8 +729,14 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + compensation_xxxx_add = compensation_xxxx_remove = 0 + compensation_xxx_remove = compensation_xxx_add = 0 + compensation_xx_remove = compensation_xx_add = 0 + compensation_x_remove = compensation_x_add = 0 + x = xx = xxx = xxxx = 0 + nobs = 0 for j in range(s, e): add_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx, &compensation_x_add, &compensation_xx_add, @@ -764,7 +780,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, Py_ssize_t i, j bint err = False, is_monotonic_increasing_bounds int midpoint, ret = 0 - int64_t nobs = 0, N = len(values), s, e, win + int64_t nobs = 0, N = len(start), s, e, win float64_t val, res, prev skiplist_t *sl ndarray[float64_t] output @@ -791,8 +807,12 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + if i != 0: + skiplist_destroy(sl) + sl = skiplist_init(win) + nobs = 0 # setup for j in range(s, e): val = values[j] @@ -948,7 +968,7 @@ cdef _roll_min_max(ndarray[numeric_t] values, cdef: numeric_t ai int64_t curr_win_size, start - Py_ssize_t i, k, nobs = 0, N = len(values) + Py_ssize_t i, k, nobs = 0, N = len(starti) deque Q[int64_t] # min/max always the front deque W[int64_t] # track the whole window for nobs compute ndarray[float64_t, ndim=1] output @@ -1031,7 +1051,7 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start, O(N log(window)) implementation using skip list """ cdef: - Py_ssize_t i, j, s, e, N = len(values), idx + Py_ssize_t i, j, s, e, N = len(start), idx int ret = 0 int64_t nobs = 0, win float64_t val, prev, midpoint, idx_with_fraction @@ -1068,8 +1088,8 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: - if not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + if i != 0: nobs = 0 skiplist_destroy(skiplist) skiplist = skiplist_init(win) @@ -1160,7 +1180,7 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start, derived from roll_quantile """ cdef: - Py_ssize_t i, j, s, e, N = len(values), idx + Py_ssize_t i, j, s, e, N = len(start), idx float64_t rank_min = 0, rank = 0 int64_t nobs = 0, win float64_t val @@ -1193,8 +1213,8 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: - if not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + if i != 0: nobs = 0 skiplist_destroy(skiplist) skiplist = skiplist_init(win) diff --git a/pandas/tests/window/test_cython_aggregations.py b/pandas/tests/window/test_cython_aggregations.py new file mode 100644 index 0000000000000..c60cb6ea74ec0 --- /dev/null +++ b/pandas/tests/window/test_cython_aggregations.py @@ -0,0 +1,111 @@ +from functools import partial +import sys + +import numpy as np +import pytest + +import pandas._libs.window.aggregations as window_aggregations + +from pandas import Series +import pandas._testing as tm + + +def _get_rolling_aggregations(): + # list pairs of name and function + # each function has this signature: + # (const float64_t[:] values, ndarray[int64_t] start, + # ndarray[int64_t] end, int64_t minp) -> np.ndarray + named_roll_aggs = ( + [ + ("roll_sum", window_aggregations.roll_sum), + ("roll_mean", window_aggregations.roll_mean), + ] + + [ + (f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof)) + for ddof in [0, 1] + ] + + [ + ("roll_skew", window_aggregations.roll_skew), + ("roll_kurt", window_aggregations.roll_kurt), + ("roll_median_c", window_aggregations.roll_median_c), + ("roll_max", window_aggregations.roll_max), + ("roll_min", window_aggregations.roll_min), + ] + + [ + ( + f"roll_quantile({quantile},{interpolation})", + partial( + window_aggregations.roll_quantile, + quantile=quantile, + interpolation=interpolation, + ), + ) + for quantile in [0.0001, 0.5, 0.9999] + for interpolation in window_aggregations.interpolation_types + ] + + [ + ( + f"roll_rank({percentile},{method},{ascending})", + partial( + window_aggregations.roll_rank, + percentile=percentile, + method=method, + ascending=ascending, + ), + ) + for percentile in [True, False] + for method in window_aggregations.rolling_rank_tiebreakers.keys() + for ascending in [True, False] + ] + ) + # unzip to a list of 2 tuples, names and functions + unzipped = list(zip(*named_roll_aggs)) + return {"ids": unzipped[0], "params": unzipped[1]} + + +_rolling_aggregations = _get_rolling_aggregations() + + +@pytest.fixture( + params=_rolling_aggregations["params"], ids=_rolling_aggregations["ids"] +) +def rolling_aggregation(request): + """Make a rolling aggregation function as fixture.""" + return request.param + + +def test_rolling_aggregation_boundary_consistency(rolling_aggregation): + # GH-45647 + minp, step, width, size, selection = 0, 1, 3, 11, [2, 7] + values = np.arange(1, 1 + size, dtype=np.float64) + end = np.arange(width, size, step, dtype=np.int64) + start = end - width + selarr = np.array(selection, dtype=np.int32) + result = Series(rolling_aggregation(values, start[selarr], end[selarr], minp)) + expected = Series(rolling_aggregation(values, start, end, minp)[selarr]) + tm.assert_equal(expected, result) + + +def test_rolling_aggregation_with_unused_elements(rolling_aggregation): + # GH-45647 + minp, width = 0, 5 # width at least 4 for kurt + size = 2 * width + 5 + values = np.arange(1, size + 1, dtype=np.float64) + values[width : width + 2] = sys.float_info.min + values[width + 2] = np.nan + values[width + 3 : width + 5] = sys.float_info.max + start = np.array([0, size - width], dtype=np.int64) + end = np.array([width, size], dtype=np.int64) + loc = np.array( + [j for i in range(len(start)) for j in range(start[i], end[i])], + dtype=np.int32, + ) + result = Series(rolling_aggregation(values, start, end, minp)) + compact_values = np.array(values[loc], dtype=np.float64) + compact_start = np.arange(0, len(start) * width, width, dtype=np.int64) + compact_end = compact_start + width + expected = Series( + rolling_aggregation(compact_values, compact_start, compact_end, minp) + ) + assert np.isfinite(expected.values).all(), "Not all expected values are finite" + tm.assert_equal(expected, result)