diff --git a/pandas/_libs/window/indexers.pyi b/pandas/_libs/window/indexers.pyi
index c9bc64be34ac9..e7630684a2380 100644
--- a/pandas/_libs/window/indexers.pyi
+++ b/pandas/_libs/window/indexers.pyi
@@ -6,6 +6,7 @@ def calculate_variable_window_bounds(
     num_values: int,  # int64_t
     window_size: int,  # int64_t
     min_periods,
+    step_size: int,
     center: bool,
     closed: str | None,
     index: np.ndarray,  # const int64_t[:]
diff --git a/pandas/_libs/window/indexers.pyx b/pandas/_libs/window/indexers.pyx
index 59889cb58c3d5..f712cd174a968 100644
--- a/pandas/_libs/window/indexers.pyx
+++ b/pandas/_libs/window/indexers.pyx
@@ -14,6 +14,7 @@ def calculate_variable_window_bounds(
     int64_t num_values,
     int64_t window_size,
     object min_periods,  # unused but here to match get_window_bounds signature
+    int64_t step_size,
     bint center,
     str closed,
     const int64_t[:] index
@@ -32,6 +33,9 @@ def calculate_variable_window_bounds(
     min_periods : object
         ignored, exists for compatibility
 
+    step_size : int64
+        (minimum) step size of the moving window
+
     center : bint
         center the rolling window on the current observation
 
@@ -49,8 +53,8 @@ def calculate_variable_window_bounds(
         bint left_closed = False
         bint right_closed = False
         ndarray[int64_t, ndim=1] start, end
-        int64_t start_bound, end_bound, index_growth_sign = 1
-        Py_ssize_t i, j
+        int64_t start_bound, end_bound, step_bound, index_growth_sign = 1
+        Py_ssize_t i, j, last_valid
 
     # default is 'right'
     if closed is None:
@@ -89,11 +93,22 @@ def calculate_variable_window_bounds(
                 end[0] = j
                 break
 
+    step_bound = index[0] + index_growth_sign * step_size
+    last_valid = 0
+
     with nogil:
 
         # start is start of slice interval (including)
         # end is end of slice interval (not including)
         for i in range(1, num_values):
+
+            if index[i] - index_growth_sign * step_bound < 0:
+                start[i] = end[i-1]
+                end[i] = end[i-1]
+                continue
+            else:
+                step_bound = index[i] + step_size
+
             if center:
                 end_bound = index[i] + index_growth_sign * window_size / 2
                 start_bound = index[i] - index_growth_sign * window_size / 2
@@ -105,34 +120,27 @@ def calculate_variable_window_bounds(
             if left_closed:
                 start_bound -= 1 * index_growth_sign
 
-            # advance the start bound until we are
-            # within the constraint
+            # advance the start bound until we are within the constraint
             start[i] = i
-            for j in range(start[i - 1], i):
+            for j in range(start[last_valid], i):
                 if (index[j] - start_bound) * index_growth_sign > 0:
                     start[i] = j
                     break
 
-            # for centered window advance the end bound until we are
-            # outside the constraint
-            if center:
-                for j in range(end[i - 1], num_values + 1):
-                    if j == num_values:
-                        end[i] = j
-                    elif ((index[j] - end_bound) * index_growth_sign == 0 and
-                          right_closed):
-                        end[i] = j + 1
-                    elif (index[j] - end_bound) * index_growth_sign >= 0:
-                        end[i] = j
+            # advance the end bound until we are outside the constraint
+            end[i] = num_values
+            for j in range(max(end[last_valid], i), num_values):
+                if (index[j] - end_bound) * index_growth_sign == 0 and right_closed:
+                    end[i] = j + 1
+                    # for duplicate indices on non-centered windows
+                    # we want the first of the identical indices
+                    # see Gh 43944 and GH 20712
+                    if not center:
                         break
-            # end bound is previous end
-            # or current index
-            elif (index[end[i - 1]] - end_bound) * index_growth_sign <= 0:
-                end[i] = i + 1
-            else:
-                end[i] = end[i - 1]
+                elif (index[j] - end_bound) * index_growth_sign >= 0:
+                    end[i] = j
+                    break
+
+            last_valid = i
 
-            # right endpoint is open
-            if not right_closed and not center:
-                end[i] -= 1
     return start, end
diff --git a/pandas/core/generic.py b/pandas/core/generic.py
index b235f120d98c8..bf0183b2a32a7 100644
--- a/pandas/core/generic.py
+++ b/pandas/core/generic.py
@@ -10965,6 +10965,7 @@ def rolling(
         axis: Axis = 0,
         closed: str | None = None,
         method: str = "single",
+        step: int | timedelta | BaseOffset | None = None,
     ):
         axis = self._get_axis_number(axis)
 
@@ -10979,6 +10980,7 @@ def rolling(
                 axis=axis,
                 closed=closed,
                 method=method,
+                step=step,
             )
 
         return Rolling(
@@ -10991,6 +10993,7 @@ def rolling(
             axis=axis,
             closed=closed,
             method=method,
+            step=step,
         )
 
     @final
diff --git a/pandas/core/indexers/objects.py b/pandas/core/indexers/objects.py
index cef023a647d7f..6230d6b2890e8 100644
--- a/pandas/core/indexers/objects.py
+++ b/pandas/core/indexers/objects.py
@@ -70,6 +70,16 @@ def get_window_bounds(
 class FixedWindowIndexer(BaseIndexer):
     """Creates window boundaries that are of fixed length."""
 
+    def __init__(
+        self,
+        index_array: np.ndarray | None = None,
+        window_size: int = 0,
+        step: int = 1,
+        **kwargs,
+    ):
+        super().__init__(index_array, window_size, **kwargs)
+        self.step = step
+
     @Appender(get_window_bounds_doc)
     def get_window_bounds(
         self,
@@ -94,12 +104,28 @@ def get_window_bounds(
         end = np.clip(end, 0, num_values)
         start = np.clip(start, 0, num_values)
 
+        # apply step, the resulting window will have zero length
+        if self.step > 1:
+            mask = np.full_like(start, True, dtype=bool)
+            mask[:: self.step] = False
+            start[mask] = end[mask]
+
         return start, end
 
 
 class VariableWindowIndexer(BaseIndexer):
     """Creates window boundaries that are of variable length, namely for time series."""
 
+    def __init__(
+        self,
+        index_array: np.ndarray | None = None,
+        window_size: int = 0,
+        step: int = 1,
+        **kwargs,
+    ):
+        super().__init__(index_array, window_size, **kwargs)
+        self.step = step
+
     @Appender(get_window_bounds_doc)
     def get_window_bounds(
         self,
@@ -117,6 +143,7 @@ def get_window_bounds(
             num_values,
             self.window_size,
             min_periods,
+            self.step,
             center,  # type: ignore[arg-type]
             closed,
             self.index_array,  # type: ignore[arg-type]
diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py
index 2b8ed3c97d026..0e6392edf37b6 100644
--- a/pandas/core/window/rolling.py
+++ b/pandas/core/window/rolling.py
@@ -127,6 +127,7 @@ def __init__(
         on: str | Index | None = None,
         closed: str | None = None,
         method: str = "single",
+        step=None,
         *,
         selection=None,
     ):
@@ -140,6 +141,8 @@ def __init__(
         self._win_type = win_type
         self.axis = obj._get_axis_number(axis) if axis is not None else None
         self.method = method
+        self.step = step
+        self._step_size = 0
         self._win_freq_i8 = None
         if self.on is None:
             if self.axis == 0:
@@ -398,8 +401,12 @@ def _get_window_indexer(self) -> BaseIndexer:
                 index_array=self._index_array,
                 window_size=self._win_freq_i8,
                 center=self.center,
+                step=self._step_size,
             )
-        return FixedWindowIndexer(window_size=self.window)
+        return FixedWindowIndexer(
+            window_size=self.window,
+            step=self._step_size,
+        )
 
     def _apply_series(
         self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None
@@ -877,6 +884,11 @@ class Window(BaseWindow):
 
         .. versionadded:: 1.3.0
 
+    step : int or offset, default None
+        Minimum step size the window is shifted and have a value
+        (otherwise result is NA). For a window that is specified by an offset,
+        `step` also needs to be an offset. Otherwise, `step` must be an integer.
+
     Returns
     -------
     a Window or Rolling sub-classed for the particular operation
@@ -1007,6 +1019,7 @@ class Window(BaseWindow):
         "on",
         "closed",
         "method",
+        "step",
     ]
 
     def _validate(self):
@@ -1031,6 +1044,10 @@ def _validate(self):
         if self.method != "single":
             raise NotImplementedError("'single' is the only supported method type.")
 
+        if self.step is not None and not is_integer(self.step):
+            raise ValueError("step must be an integer 0 or greater")
+        self._step_size = self.step or 0
+
     def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray:
         """
         Center the result in the window for weighted rolling aggregations.
@@ -1093,6 +1110,11 @@ def calc(x):
             if self.center:
                 result = self._center_window(result, offset)
 
+            if self._step_size > 1:
+                mask = np.full_like(result, True, dtype=bool)
+                mask[:: self._step_size] = False
+                result[mask] = np.nan
+
             return result
 
         return self._apply_blockwise(homogeneous_func, name)
@@ -1576,6 +1598,7 @@ class Rolling(RollingAndExpandingMixin):
         "on",
         "closed",
         "method",
+        "step",
     ]
 
     def _validate(self):
@@ -1612,6 +1635,25 @@ def _validate(self):
         elif not is_integer(self.window) or self.window < 0:
             raise ValueError("window must be an integer 0 or greater")
 
+        if self.step is None:
+            self._step_size = 0
+        elif self._win_freq_i8 is not None:
+
+            try:
+                step = to_offset(self.step)
+            except (TypeError, ValueError) as err:
+                raise ValueError(
+                    f"passed step {self.step} is not "
+                    "compatible with a datetimelike window"
+                ) from err
+            if isinstance(self._on, PeriodIndex):
+                self._step_size = step.nanos / (self._on.freq.nanos / self._on.freq.n)
+            else:
+                self._step_size = step.nanos
+
+        elif not is_integer(self.step) or self.step < 0:
+            raise ValueError("step must be an integer 0 or greater")
+
     def _validate_monotonic(self):
         """
         Validate monotonic (increasing or decreasing).
@@ -2486,7 +2528,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
         GroupbyIndexer
         """
         rolling_indexer: type[BaseIndexer]
-        indexer_kwargs: dict[str, Any] | None = None
+        indexer_kwargs = {"step": self._step_size}
         index_array = self._index_array
         if isinstance(self.window, BaseIndexer):
             rolling_indexer = type(self.window)