In [None]:
# default_exp rolling

# Rolling

> Rolling window operations

In [None]:
#hide
import random

from nbdev.showdoc import *

In [None]:
#export
from math import sqrt
from typing import Callable, Optional

import numpy as np
import pandas as pd
from numba import njit

In [None]:
#export
@njit
def rolling_mean(x: np.ndarray,
                 window_size: int,
                 min_samples: Optional[int] = None) -> np.ndarray:
    if min_samples is None:
        min_samples = window_size
    n_samples = x.size
    out = np.full(n_samples, np.nan, dtype=np.float32)
    accum = 0.
    for i in range(min_samples - 1):
        accum += x[i]
    for i in range(min_samples - 1, window_size):
        accum += x[i]
        out[i] = accum / (i+1)
    for i in range(window_size, n_samples):
        accum += x[i] - x[i - window_size]
        out[i] = accum / window_size
    return out

In [None]:
y = np.random.rand(100)
ys = pd.Series(y)

window_size = random.randint(2, 10)
min_samples = random.randint(2, window_size - 1)

assert np.allclose(rolling_mean(y, window_size, min_samples=1), 
                   ys.rolling(window_size, min_periods=1).mean().values)

assert np.allclose(rolling_mean(y, window_size, min_samples=min_samples), 
                   ys.rolling(window_size, min_periods=min_samples).mean().values,
                   equal_nan=True)

assert np.allclose(rolling_mean(y, window_size), 
                   ys.rolling(window_size).mean().values, 
                   equal_nan=True)

In [None]:
#export
@njit
def rolling_std(x: np.ndarray, 
                window_size: int,
                min_samples: Optional[int] = None) -> np.ndarray:
    if min_samples is None:
        min_samples = window_size
    if min_samples < 2:
        raise ValueError('min_samples must be greater than 1')
    n_samples = x.size
    out = np.full(n_samples, np.nan, dtype=np.float32)    
    rolling_means = rolling_mean(x, window_size, min_samples)
    accum_xsq = 0.
    for i in range(min_samples - 1):
        accum_xsq += x[i]**2
    for i in range(min_samples - 1, window_size):
        accum_xsq += x[i]**2
        if i > 0:
            out[i] = sqrt((accum_xsq - (i+1) * rolling_means[i]**2) / i)
    out[0] = np.nan
    for i in range(window_size, n_samples):
        accum_xsq += x[i]**2 - x[i-window_size]**2
        out[i] = sqrt((accum_xsq - window_size * rolling_means[i]**2) / (window_size-1))
    return out

In [None]:
y = np.random.rand(100)
ys = pd.Series(y)

window_size = random.randint(2, 10)
min_samples = random.randint(3, window_size - 1)

assert np.allclose(rolling_std(y, window_size, min_samples=2), 
                   ys.rolling(window_size, min_periods=2).std().values,
                   equal_nan=True)

assert np.allclose(rolling_std(y, window_size, min_samples=min_samples), 
                   ys.rolling(window_size, min_periods=min_samples).std().values,
                   equal_nan=True)

assert np.allclose(rolling_std(y, window_size), 
                   ys.rolling(window_size).std().values,
                   equal_nan=True)

In [None]:
#exporti
@njit 
def _rolling_comp(comp: Callable,
                  x: np.ndarray, 
                  window_size: int,
                  min_samples: Optional[int] = None):
    if min_samples is None:
        min_samples = window_size    
    n_samples = x.size   
    out = np.full(n_samples, np.nan, dtype=np.float32)  
    for i in range(min_samples - 1, n_samples):
        pivot = x[i]
        ws = min(i+1, window_size)
        for j in range(1, ws):
            if comp(x[i - j], pivot):
                pivot = x[i - j]
        out[i] = pivot
    return out

@njit
def _gt(x: float, y: float) -> bool:
    return x > y

@njit
def _lt(x: float, y: float) -> bool:
    return ~_gt(x, y)

In [None]:
#export
def rolling_max(x: np.ndarray,
                window_size: int,
                min_samples: Optional[int] = None):
    return _rolling_comp(_gt, x, window_size, min_samples)

In [None]:
y = np.random.rand(100)
ys = pd.Series(y)

window_size = random.randint(2, 10)
min_samples = random.randint(2, window_size - 1)

assert np.allclose(rolling_max(y, window_size, min_samples=1), 
                   ys.rolling(window_size, min_periods=1).max().values)

assert np.allclose(rolling_max(y, window_size, min_samples=min_samples), 
                   ys.rolling(window_size, min_periods=min_samples).max().values,
                   equal_nan=True)

assert np.allclose(rolling_max(y, window_size),
                   ys.rolling(window_size).max().values,
                   equal_nan=True)

In [None]:
#export
def rolling_min(x: np.ndarray,
                window_size: int,
                min_samples: Optional[int] = None):
    return _rolling_comp(_lt, x, window_size, min_samples)

In [None]:
y = np.random.rand(100)
ys = pd.Series(y)

window_size = random.randint(2, 10)
min_samples = random.randint(2, window_size - 1)

assert np.allclose(rolling_min(y, window_size, min_samples=1), 
                   ys.rolling(window_size, min_periods=1).min().values)

assert np.allclose(rolling_min(y, window_size, min_samples=min_samples), 
                   ys.rolling(window_size, min_periods=min_samples).min().values,
                   equal_nan=True)

assert np.allclose(rolling_min(y, window_size), 
                   ys.rolling(window_size).min().values,
                   equal_nan=True)