In [1]:
import polars as pl
from polars.testing import assert_frame_equal

from polars_ts import mann_kendall


def mk_stat_polars(y: pl.Series) -> pl.Expr:
    """Naive implementation of Mann-Kendall trend test for a Polars columns.
    
    Use this just to verify the output. The optimized method is available via
    from polars_ts import mann_kendall.
    """
    n = y.len()

    # Initialize variable for sum of ranks (s)
    s = 0

    # We will use vectorized operations to compute the sums hopefully
    for k in range(n - 1):
        # Compare each element with all subsequent elements
        greater = (y[k + 1 : n] > y[k]).sum()
        less = (y[k + 1 : n] < y[k]).sum()
        s += greater - less

    # Mann-Kendall statistic formula: s / (0.5 * n * (n - 1))
    mk_stat = s / (0.5 * n * (n - 1))
    return mk_stat

In [2]:
df = pl.read_parquet("https://datasets-nixtla.s3.amazonaws.com/m4-hourly.parquet")

In [3]:
optimized = (
    df.group_by("unique_id", maintain_order=True)
    .agg(mann_kendall=mann_kendall("y"))
    .with_columns(pl.col("mann_kendall").list.first())
)
optimized

unique_id,mann_kendall
str,f64
"""H1""",0.144457
"""H10""",-0.208341
"""H100""",0.183003
"""H101""",-0.072339
"""H102""",-0.050867
…,…
"""H95""",0.296763
"""H96""",-0.043858
"""H97""",0.118803
"""H98""",0.006375


In [4]:
naive = (
    df.group_by("unique_id", maintain_order=True)
    .agg(pl.col("y").map_batches(mk_stat_polars, return_dtype=pl.Float64).alias("mann_kendall"))
    .with_columns(pl.col("mann_kendall").list.first())
)

In [5]:
assert_frame_equal(optimized, naive)