Skip to content

ENH: Rolling with steps #43973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
1 change: 1 addition & 0 deletions pandas/_libs/window/indexers.pyi
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ def calculate_variable_window_bounds(
num_values: int, # int64_t
window_size: int, # int64_t
min_periods,
step_size: int,
center: bool,
closed: str | None,
index: np.ndarray, # const int64_t[:]
58 changes: 33 additions & 25 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ def calculate_variable_window_bounds(
int64_t num_values,
int64_t window_size,
object min_periods, # unused but here to match get_window_bounds signature
int64_t step_size,
bint center,
str closed,
const int64_t[:] index
@@ -32,6 +33,9 @@ def calculate_variable_window_bounds(
min_periods : object
ignored, exists for compatibility

step_size : int64
(minimum) step size of the moving window

center : bint
center the rolling window on the current observation

@@ -49,8 +53,8 @@ def calculate_variable_window_bounds(
bint left_closed = False
bint right_closed = False
ndarray[int64_t, ndim=1] start, end
int64_t start_bound, end_bound, index_growth_sign = 1
Py_ssize_t i, j
int64_t start_bound, end_bound, step_bound, index_growth_sign = 1
Py_ssize_t i, j, last_valid

# default is 'right'
if closed is None:
@@ -89,11 +93,22 @@ def calculate_variable_window_bounds(
end[0] = j
break

step_bound = index[0] + index_growth_sign * step_size
last_valid = 0

with nogil:

# start is start of slice interval (including)
# end is end of slice interval (not including)
for i in range(1, num_values):

if index[i] - index_growth_sign * step_bound < 0:
start[i] = end[i-1]
end[i] = end[i-1]
continue
else:
step_bound = index[i] + step_size

if center:
end_bound = index[i] + index_growth_sign * window_size / 2
start_bound = index[i] - index_growth_sign * window_size / 2
@@ -105,34 +120,27 @@ def calculate_variable_window_bounds(
if left_closed:
start_bound -= 1 * index_growth_sign

# advance the start bound until we are
# within the constraint
# advance the start bound until we are within the constraint
start[i] = i
for j in range(start[i - 1], i):
for j in range(start[last_valid], i):
if (index[j] - start_bound) * index_growth_sign > 0:
start[i] = j
break

# for centered window advance the end bound until we are
# outside the constraint
if center:
for j in range(end[i - 1], num_values + 1):
if j == num_values:
end[i] = j
elif ((index[j] - end_bound) * index_growth_sign == 0 and
right_closed):
end[i] = j + 1
elif (index[j] - end_bound) * index_growth_sign >= 0:
end[i] = j
# advance the end bound until we are outside the constraint
end[i] = num_values
for j in range(max(end[last_valid], i), num_values):
if (index[j] - end_bound) * index_growth_sign == 0 and right_closed:
end[i] = j + 1
# for duplicate indices on non-centered windows
# we want the first of the identical indices
# see Gh 43944 and GH 20712
if not center:
break
# end bound is previous end
# or current index
elif (index[end[i - 1]] - end_bound) * index_growth_sign <= 0:
end[i] = i + 1
else:
end[i] = end[i - 1]
elif (index[j] - end_bound) * index_growth_sign >= 0:
end[i] = j
break

last_valid = i

# right endpoint is open
if not right_closed and not center:
end[i] -= 1
return start, end
3 changes: 3 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
@@ -10965,6 +10965,7 @@ def rolling(
axis: Axis = 0,
closed: str | None = None,
method: str = "single",
step: int | timedelta | BaseOffset | None = None,
):
axis = self._get_axis_number(axis)

@@ -10979,6 +10980,7 @@ def rolling(
axis=axis,
closed=closed,
method=method,
step=step,
)

return Rolling(
@@ -10991,6 +10993,7 @@ def rolling(
axis=axis,
closed=closed,
method=method,
step=step,
)

@final
27 changes: 27 additions & 0 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
@@ -70,6 +70,16 @@ def get_window_bounds(
class FixedWindowIndexer(BaseIndexer):
"""Creates window boundaries that are of fixed length."""

def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
step: int = 1,
**kwargs,
):
super().__init__(index_array, window_size, **kwargs)
self.step = step

@Appender(get_window_bounds_doc)
def get_window_bounds(
self,
@@ -94,12 +104,28 @@ def get_window_bounds(
end = np.clip(end, 0, num_values)
start = np.clip(start, 0, num_values)

# apply step, the resulting window will have zero length
if self.step > 1:
mask = np.full_like(start, True, dtype=bool)
mask[:: self.step] = False
start[mask] = end[mask]

return start, end


class VariableWindowIndexer(BaseIndexer):
"""Creates window boundaries that are of variable length, namely for time series."""

def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
step: int = 1,
**kwargs,
):
super().__init__(index_array, window_size, **kwargs)
self.step = step

@Appender(get_window_bounds_doc)
def get_window_bounds(
self,
@@ -117,6 +143,7 @@ def get_window_bounds(
num_values,
self.window_size,
min_periods,
self.step,
center, # type: ignore[arg-type]
closed,
self.index_array, # type: ignore[arg-type]
46 changes: 44 additions & 2 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
@@ -127,6 +127,7 @@ def __init__(
on: str | Index | None = None,
closed: str | None = None,
method: str = "single",
step=None,
*,
selection=None,
):
@@ -140,6 +141,8 @@ def __init__(
self._win_type = win_type
self.axis = obj._get_axis_number(axis) if axis is not None else None
self.method = method
self.step = step
self._step_size = 0
self._win_freq_i8 = None
if self.on is None:
if self.axis == 0:
@@ -398,8 +401,12 @@ def _get_window_indexer(self) -> BaseIndexer:
index_array=self._index_array,
window_size=self._win_freq_i8,
center=self.center,
step=self._step_size,
)
return FixedWindowIndexer(window_size=self.window)
return FixedWindowIndexer(
window_size=self.window,
step=self._step_size,
)

def _apply_series(
self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None
@@ -877,6 +884,11 @@ class Window(BaseWindow):

.. versionadded:: 1.3.0

step : int or offset, default None
Minimum step size the window is shifted and have a value
(otherwise result is NA). For a window that is specified by an offset,
`step` also needs to be an offset. Otherwise, `step` must be an integer.

Returns
-------
a Window or Rolling sub-classed for the particular operation
@@ -1007,6 +1019,7 @@ class Window(BaseWindow):
"on",
"closed",
"method",
"step",
]

def _validate(self):
@@ -1031,6 +1044,10 @@ def _validate(self):
if self.method != "single":
raise NotImplementedError("'single' is the only supported method type.")

if self.step is not None and not is_integer(self.step):
raise ValueError("step must be an integer 0 or greater")
self._step_size = self.step or 0

def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray:
"""
Center the result in the window for weighted rolling aggregations.
@@ -1093,6 +1110,11 @@ def calc(x):
if self.center:
result = self._center_window(result, offset)

if self._step_size > 1:
mask = np.full_like(result, True, dtype=bool)
mask[:: self._step_size] = False
result[mask] = np.nan

return result

return self._apply_blockwise(homogeneous_func, name)
@@ -1576,6 +1598,7 @@ class Rolling(RollingAndExpandingMixin):
"on",
"closed",
"method",
"step",
]

def _validate(self):
@@ -1612,6 +1635,25 @@ def _validate(self):
elif not is_integer(self.window) or self.window < 0:
raise ValueError("window must be an integer 0 or greater")

if self.step is None:
self._step_size = 0
elif self._win_freq_i8 is not None:

try:
step = to_offset(self.step)
except (TypeError, ValueError) as err:
raise ValueError(
f"passed step {self.step} is not "
"compatible with a datetimelike window"
) from err
if isinstance(self._on, PeriodIndex):
self._step_size = step.nanos / (self._on.freq.nanos / self._on.freq.n)
else:
self._step_size = step.nanos

elif not is_integer(self.step) or self.step < 0:
raise ValueError("step must be an integer 0 or greater")

def _validate_monotonic(self):
"""
Validate monotonic (increasing or decreasing).
@@ -2486,7 +2528,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
rolling_indexer: type[BaseIndexer]
indexer_kwargs: dict[str, Any] | None = None
indexer_kwargs = {"step": self._step_size}
index_array = self._index_array
if isinstance(self.window, BaseIndexer):
rolling_indexer = type(self.window)