diff --git a/pandas/_libs/window/indexers.pyi b/pandas/_libs/window/indexers.pyi index c9bc64be34ac9..e7630684a2380 100644 --- a/pandas/_libs/window/indexers.pyi +++ b/pandas/_libs/window/indexers.pyi @@ -6,6 +6,7 @@ def calculate_variable_window_bounds( num_values: int, # int64_t window_size: int, # int64_t min_periods, + step_size: int, center: bool, closed: str | None, index: np.ndarray, # const int64_t[:] diff --git a/pandas/_libs/window/indexers.pyx b/pandas/_libs/window/indexers.pyx index 59889cb58c3d5..f712cd174a968 100644 --- a/pandas/_libs/window/indexers.pyx +++ b/pandas/_libs/window/indexers.pyx @@ -14,6 +14,7 @@ def calculate_variable_window_bounds( int64_t num_values, int64_t window_size, object min_periods, # unused but here to match get_window_bounds signature + int64_t step_size, bint center, str closed, const int64_t[:] index @@ -32,6 +33,9 @@ def calculate_variable_window_bounds( min_periods : object ignored, exists for compatibility + step_size : int64 + (minimum) step size of the moving window + center : bint center the rolling window on the current observation @@ -49,8 +53,8 @@ def calculate_variable_window_bounds( bint left_closed = False bint right_closed = False ndarray[int64_t, ndim=1] start, end - int64_t start_bound, end_bound, index_growth_sign = 1 - Py_ssize_t i, j + int64_t start_bound, end_bound, step_bound, index_growth_sign = 1 + Py_ssize_t i, j, last_valid # default is 'right' if closed is None: @@ -89,11 +93,22 @@ def calculate_variable_window_bounds( end[0] = j break + step_bound = index[0] + index_growth_sign * step_size + last_valid = 0 + with nogil: # start is start of slice interval (including) # end is end of slice interval (not including) for i in range(1, num_values): + + if index[i] - index_growth_sign * step_bound < 0: + start[i] = end[i-1] + end[i] = end[i-1] + continue + else: + step_bound = index[i] + step_size + if center: end_bound = index[i] + index_growth_sign * window_size / 2 start_bound = index[i] - index_growth_sign * window_size / 2 @@ -105,34 +120,27 @@ def calculate_variable_window_bounds( if left_closed: start_bound -= 1 * index_growth_sign - # advance the start bound until we are - # within the constraint + # advance the start bound until we are within the constraint start[i] = i - for j in range(start[i - 1], i): + for j in range(start[last_valid], i): if (index[j] - start_bound) * index_growth_sign > 0: start[i] = j break - # for centered window advance the end bound until we are - # outside the constraint - if center: - for j in range(end[i - 1], num_values + 1): - if j == num_values: - end[i] = j - elif ((index[j] - end_bound) * index_growth_sign == 0 and - right_closed): - end[i] = j + 1 - elif (index[j] - end_bound) * index_growth_sign >= 0: - end[i] = j + # advance the end bound until we are outside the constraint + end[i] = num_values + for j in range(max(end[last_valid], i), num_values): + if (index[j] - end_bound) * index_growth_sign == 0 and right_closed: + end[i] = j + 1 + # for duplicate indices on non-centered windows + # we want the first of the identical indices + # see Gh 43944 and GH 20712 + if not center: break - # end bound is previous end - # or current index - elif (index[end[i - 1]] - end_bound) * index_growth_sign <= 0: - end[i] = i + 1 - else: - end[i] = end[i - 1] + elif (index[j] - end_bound) * index_growth_sign >= 0: + end[i] = j + break + + last_valid = i - # right endpoint is open - if not right_closed and not center: - end[i] -= 1 return start, end diff --git a/pandas/core/generic.py b/pandas/core/generic.py index b235f120d98c8..bf0183b2a32a7 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -10965,6 +10965,7 @@ def rolling( axis: Axis = 0, closed: str | None = None, method: str = "single", + step: int | timedelta | BaseOffset | None = None, ): axis = self._get_axis_number(axis) @@ -10979,6 +10980,7 @@ def rolling( axis=axis, closed=closed, method=method, + step=step, ) return Rolling( @@ -10991,6 +10993,7 @@ def rolling( axis=axis, closed=closed, method=method, + step=step, ) @final diff --git a/pandas/core/indexers/objects.py b/pandas/core/indexers/objects.py index cef023a647d7f..6230d6b2890e8 100644 --- a/pandas/core/indexers/objects.py +++ b/pandas/core/indexers/objects.py @@ -70,6 +70,16 @@ def get_window_bounds( class FixedWindowIndexer(BaseIndexer): """Creates window boundaries that are of fixed length.""" + def __init__( + self, + index_array: np.ndarray | None = None, + window_size: int = 0, + step: int = 1, + **kwargs, + ): + super().__init__(index_array, window_size, **kwargs) + self.step = step + @Appender(get_window_bounds_doc) def get_window_bounds( self, @@ -94,12 +104,28 @@ def get_window_bounds( end = np.clip(end, 0, num_values) start = np.clip(start, 0, num_values) + # apply step, the resulting window will have zero length + if self.step > 1: + mask = np.full_like(start, True, dtype=bool) + mask[:: self.step] = False + start[mask] = end[mask] + return start, end class VariableWindowIndexer(BaseIndexer): """Creates window boundaries that are of variable length, namely for time series.""" + def __init__( + self, + index_array: np.ndarray | None = None, + window_size: int = 0, + step: int = 1, + **kwargs, + ): + super().__init__(index_array, window_size, **kwargs) + self.step = step + @Appender(get_window_bounds_doc) def get_window_bounds( self, @@ -117,6 +143,7 @@ def get_window_bounds( num_values, self.window_size, min_periods, + self.step, center, # type: ignore[arg-type] closed, self.index_array, # type: ignore[arg-type] diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 2b8ed3c97d026..0e6392edf37b6 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -127,6 +127,7 @@ def __init__( on: str | Index | None = None, closed: str | None = None, method: str = "single", + step=None, *, selection=None, ): @@ -140,6 +141,8 @@ def __init__( self._win_type = win_type self.axis = obj._get_axis_number(axis) if axis is not None else None self.method = method + self.step = step + self._step_size = 0 self._win_freq_i8 = None if self.on is None: if self.axis == 0: @@ -398,8 +401,12 @@ def _get_window_indexer(self) -> BaseIndexer: index_array=self._index_array, window_size=self._win_freq_i8, center=self.center, + step=self._step_size, ) - return FixedWindowIndexer(window_size=self.window) + return FixedWindowIndexer( + window_size=self.window, + step=self._step_size, + ) def _apply_series( self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None @@ -877,6 +884,11 @@ class Window(BaseWindow): .. versionadded:: 1.3.0 + step : int or offset, default None + Minimum step size the window is shifted and have a value + (otherwise result is NA). For a window that is specified by an offset, + `step` also needs to be an offset. Otherwise, `step` must be an integer. + Returns ------- a Window or Rolling sub-classed for the particular operation @@ -1007,6 +1019,7 @@ class Window(BaseWindow): "on", "closed", "method", + "step", ] def _validate(self): @@ -1031,6 +1044,10 @@ def _validate(self): if self.method != "single": raise NotImplementedError("'single' is the only supported method type.") + if self.step is not None and not is_integer(self.step): + raise ValueError("step must be an integer 0 or greater") + self._step_size = self.step or 0 + def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray: """ Center the result in the window for weighted rolling aggregations. @@ -1093,6 +1110,11 @@ def calc(x): if self.center: result = self._center_window(result, offset) + if self._step_size > 1: + mask = np.full_like(result, True, dtype=bool) + mask[:: self._step_size] = False + result[mask] = np.nan + return result return self._apply_blockwise(homogeneous_func, name) @@ -1576,6 +1598,7 @@ class Rolling(RollingAndExpandingMixin): "on", "closed", "method", + "step", ] def _validate(self): @@ -1612,6 +1635,25 @@ def _validate(self): elif not is_integer(self.window) or self.window < 0: raise ValueError("window must be an integer 0 or greater") + if self.step is None: + self._step_size = 0 + elif self._win_freq_i8 is not None: + + try: + step = to_offset(self.step) + except (TypeError, ValueError) as err: + raise ValueError( + f"passed step {self.step} is not " + "compatible with a datetimelike window" + ) from err + if isinstance(self._on, PeriodIndex): + self._step_size = step.nanos / (self._on.freq.nanos / self._on.freq.n) + else: + self._step_size = step.nanos + + elif not is_integer(self.step) or self.step < 0: + raise ValueError("step must be an integer 0 or greater") + def _validate_monotonic(self): """ Validate monotonic (increasing or decreasing). @@ -2486,7 +2528,7 @@ def _get_window_indexer(self) -> GroupbyIndexer: GroupbyIndexer """ rolling_indexer: type[BaseIndexer] - indexer_kwargs: dict[str, Any] | None = None + indexer_kwargs = {"step": self._step_size} index_array = self._index_array if isinstance(self.window, BaseIndexer): rolling_indexer = type(self.window)