In [102]:
import glob
import os

import matplotlib.font_manager as fm
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import power_divergence, binomtest
import polars as pl
from matplotlib.lines import Line2D


## Configuration


In [103]:
DATA_DIR        = "./data"
SRC_PATH        = f"{DATA_DIR}/00_src" # Remember to have this data generated from Hawkin Dynamics SDK
COMBINED_PATH   = f"{DATA_DIR}/01_raw/01_combined.csv"
PURE_CMJ_PATH   = f"{DATA_DIR}/01_raw/02_combined_cmj.csv"
PLOTS_DIR       = f"{DATA_DIR}/plots"
STATS_DIR       = f"{DATA_DIR}/metric_stats"
LOG_PATH        = f"{DATA_DIR}/pipeline_stats.log"

CMJ_TEST_TYPE   = "Countermovement Jump"

ATHLETES_TO_DROP = [
    "test test",
    "test walidacja imu",
    "test 1",
    "test 2",
    "test 3",
    "test 4",
    "test 5",
    "test 6",
    "Zawodnik 1",
    "Zawodnik 2",
    "Matt Jordan",
    "Matt Jordan 2",
    "Matt Jordan 4",
    "Matt Jordan 5",
]

COLUMNS_TO_DROP = [
    "athlete_id",
    "external_kl6xI7wHgSTNtCNyjJpYWHYOmmn1",
    "testType_id",
    "testType_canonicalId",
    "tag_ids",
    "athlete_teams",
    "athlete_groups",
    "active",
    "athlete_active",
]

MIN_FAMILIARIZATION_DAYS    = 1
MIN_REPS_IN_FAM_SESSION     = 3
MIN_REPS_IN_VALID_SESSION   = 6
KEEP_FIRST_N = 6
REST_THRESHOLD_SEC = 30

METRICS_CONFIG = [
    ("jump_height_m",                       "JH"),
    ("mrsi",                                "mRSI"),
    ("stiffness_n_m",                       "Stiffness"),
    ("avg_relative_braking_force",          "ARBF"),
    ("avg_relative_braking_power_w_kg",     "ARBP"),
    ("avg_relative_propulsive_force",       "ARPF"),
    ("avg_relative_propulsive_power_w_kg",  "ARPP"),
]

G_ACC = 9.80665

TECHNICAL_COLUMNS = [
    "id", "athlete_name", "timestamp", "test_datetime",
    "test_date", "session_number", "rep_number",
    "rest_before_rep_seconds", "testType_name", 
    "system_weight_n", "system_weight_kg"
]

METRICS_COLUMNS = [
    "jump_height_m", "mrsi", "stiffness_n_m", "avg_relative_braking_force",
    "avg_relative_braking_power_w_kg", "avg_relative_propulsive_force",
    "avg_relative_propulsive_power_w_kg",
]

SIGMA_THRESHOLD = 4

## Pipeline technical functions

In [104]:
def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)


def log_stats(stage: str, df: pl.DataFrame, log_path: str) -> None:
    first_date = df.select(pl.col("test_date").min()).item()
    last_date = df.select(pl.col("test_date").max()).item()
    rep_count = df.select(pl.col("id").len()).item()
    athlete_count = df.select(pl.col("athlete_name").n_unique()).item()
    session_count = df.select(pl.struct(["athlete_name", "test_date"]).n_unique()).item()

    message = f"""
{stage}:
    First CMJ recorded date: {first_date}
    Last CMJ recorded date: {last_date}
    Count of CMJ repetitions: {rep_count}
    Count of athletes: {athlete_count}
    Count of sessions: {session_count}
------------------------------------------------------"""

    ensure_dir(os.path.dirname(log_path) or ".")
    with open(log_path, "a", encoding="utf-8") as f:
        f.write(message)


def log_message(message: str, log_path: str) -> None:
    ensure_dir(os.path.dirname(log_path) or ".")
    with open(log_path, "a", encoding="utf-8") as f:
        f.write(message)


def reset_log(log_path: str) -> None:
    ensure_dir(os.path.dirname(log_path) or ".")
    with open(log_path, "w", encoding="utf-8") as f:
        f.write("")


## Data pipeline functions


In [105]:
def load_and_combine(src_path: str, combined_path: str) -> pl.DataFrame:
    files = glob.glob(os.path.join(src_path, "*.csv"))
    df_all = pl.concat([pl.read_csv(f) for f in files], how="diagonal_relaxed")

    df_all = df_all.filter(~pl.col("athlete_name").is_in(ATHLETES_TO_DROP))
    df_all = df_all.unique("id")
    df_all = df_all.drop(COLUMNS_TO_DROP)

    df_all = df_all.with_columns(
        pl.from_epoch(pl.col("timestamp"), time_unit="s").alias("test_datetime"),
        pl.from_epoch(pl.col("last_sync_time"), time_unit="s").alias("last_sync_time"),
        (pl.col("system_weight_n") / G_ACC).round(2).alias("system_weight_kg"),
    )
    df_all = df_all.with_columns(
        pl.col("test_datetime").dt.date().alias("test_date")
    )

    columns = df_all.columns.copy()
    columns.append(columns.pop(columns.index("last_sync_time")))
    columns.insert(columns.index("timestamp") + 1, columns.pop(columns.index("test_datetime")))
    columns.insert(columns.index("test_datetime") + 1, columns.pop(columns.index("test_date")))
    df_all = df_all.select(columns)

    df_all = df_all.sort("test_datetime")

    ensure_dir(os.path.dirname(combined_path) or ".")
    df_all.write_csv(combined_path)
    return df_all

def filter_cmj(df_all: pl.DataFrame, cmj_test_type: str, pure_cmj_path: str) -> pl.DataFrame:
    df_cmj = df_all.filter(pl.col("testType_name") == cmj_test_type)
    df_cmj = df_cmj.drop("tag_names")
    ensure_dir(os.path.dirname(pure_cmj_path) or ".")
    df_cmj.write_csv(pure_cmj_path)
    return df_cmj

def add_session_rep_columns(df_cmj: pl.DataFrame) -> pl.DataFrame:
    ts_col = "timestamp"
    df = (
        df_cmj
        .sort([ts_col, "athlete_name"])
        .with_columns(
            pl.col("test_date").rank(method="dense").over("athlete_name").alias("session_number")
        )
        .with_columns(
            pl.col(ts_col).rank(method="dense").over(["session_number", "athlete_name"]).alias("rep_number")
        )
        .with_columns(
            pl.col(ts_col).shift(1).over(["athlete_name", "session_number"]).alias("prev_rep_timestamp")
        )
        .with_columns(
            rest_before_rep_seconds=pl.col(ts_col) - pl.col("prev_rep_timestamp")
        )
        .drop("prev_rep_timestamp")
    )

    columns = df.columns.copy()
    columns.insert(0, columns.pop(columns.index("athlete_name")))
    columns.insert(columns.index("test_date") + 1, columns.pop(columns.index("session_number")))
    columns.insert(columns.index("session_number") + 1, columns.pop(columns.index("rep_number")))
    columns.insert(columns.index("rep_number") + 1, columns.pop(columns.index("rest_before_rep_seconds")))
    return df.select(columns)

def add_mrsi_session_stats(df: pl.DataFrame) -> pl.DataFrame:
    session_keys = ["athlete_name", "session_number"]
    df = df.with_columns([
        pl.col("mrsi").max().round(4).over(session_keys).alias("max_mrsi_in_session"),
        pl.col("mrsi").mean().round(4).over(session_keys).alias("avg_mrsi_in_session"),
        pl.col("mrsi").sort(descending=True).head(3).mean().round(4).over(session_keys)
        .alias("avg_3_best_mrsi_in_session"),
    ])

    columns = df.columns.copy()
    columns.insert(columns.index("mrsi") + 1, columns.pop(columns.index("max_mrsi_in_session")))
    columns.insert(columns.index("mrsi") + 2, columns.pop(columns.index("avg_mrsi_in_session")))
    columns.insert(columns.index("mrsi") + 3, columns.pop(columns.index("avg_3_best_mrsi_in_session")))
    return df.select(columns)

def add_within_athlete_zscore_abs(
    df: pl.DataFrame,
    metric_col: str,
    ddof: int = 1,
) -> pl.DataFrame:
    df_abs = df.with_columns(pl.col(metric_col).abs())
    stats = (
        df_abs.group_by("athlete_name")
        .agg([
            pl.col(metric_col).mean().alias("m_mean"),
            pl.col(metric_col).std(ddof=ddof).alias("m_sd"),
        ])
        .with_columns(pl.col("m_sd").replace(0, None).alias("m_sd"))
    )
    df2 = df_abs.join(stats, on="athlete_name", how="left")
    z_expr = (pl.col(metric_col) - pl.col("m_mean")) / pl.col("m_sd")
    return df2.with_columns(z_expr.fill_null(0.0).fill_nan(0.0).alias("z_score"))

def best_session_ever(df: pl.DataFrame, kpi_col: str) -> pl.DataFrame:
    return (
        df.with_columns(
            pl.col("session_number")
            .sort_by(pl.col(kpi_col).abs(), descending=True)
            .first()
            .over("athlete_name")
            .alias("best_session_number")
        )
        .filter(pl.col("session_number") == pl.col("best_session_number"))
        .drop("best_session_number")
    )

def get_best_reps(df: pl.DataFrame, selector: str) -> pl.DataFrame:
    return (
        df.with_columns(
            pl.col("rep_number")
            .sort_by(pl.col(selector).abs(), descending=True)
            .first()
            .over("athlete_name")
            .alias("best_rep_number")
        )
        .filter(pl.col("rep_number") == pl.col("best_rep_number"))
        .drop("best_rep_number")
    )

def write_best_rep_stats(best_rep_df: pl.DataFrame, out_path: str) -> None:
    stats_df = (
        best_rep_df
        .group_by("rep_number")
        .len()
        .rename({"len": "count"})
        .sort("rep_number")
    )
    ensure_dir(os.path.dirname(out_path) or ".")
    stats_df.write_csv(out_path)

def filter_outliers(df: pl.DataFrame, col: str, sigma_threshold: int) -> pl.DataFrame:
    sd_filter = pl.col(f"{col}_z_score").is_between(-1 * sigma_threshold, sigma_threshold, "none")
    return df.filter(sd_filter)

def add_sex_height_from_csv(df: pl.DataFrame, src_file: str) -> pl.DataFrame:
    df_sex_height = pl.read_csv(src_file)
    df = df.join(
        df_sex_height,
        on="athlete_name",
        how="left"
    )

    if df.filter(pl.col("sex").is_null() | (pl.col("sex") == "")).height > 0:
        raise Exception ("There are athletes without assigned sex!")

    if df.filter(pl.col("height_cm").is_null() | (pl.col("height_cm") == 0)).height > 0:
        raise Exception ("There are athletes without height_cm!")

    
    columns = df.columns.copy()

    columns.insert(columns.index("athlete_name") + 1, columns.pop(columns.index("sex")))
    columns.insert(columns.index("athlete_name") + 2, columns.pop(columns.index("height_cm")))

    return df.select(columns)


## Plotting functions


In [106]:
CI_COLOR = "#ff8c00"
MEDIAN_COLOR = "#b35900"
MEAN_COLOR = "#d62728"
PARETO_LINE_COLOR = "#2ca02c"

BOX_WIDTH = 0.25
MEAN_X_OFFSET = BOX_WIDTH / 2.0
SEED_BOOT = 42
SEED_JITTER = 123
REP_COLS = [1, 2, 3, 4, 5, 6]


def set_manuscript_font() -> None:
    available = {f.name for f in fm.fontManager.ttflist}
    family = (
        ("Times New Roman" in available and "Times New Roman")
        or ("Nimbus Roman" in available and "Nimbus Roman")
        or "serif"
    )

    plt.rcParams.update({
        "font.family": family,
        "font.size": 12,
        "axes.titlesize": 11,
        "axes.labelsize": 12,
        "xtick.labelsize": 11,
        "ytick.labelsize": 11,
    })


def bootstrap_median_ci(values: np.ndarray, B: int = 5000, rng: np.random.Generator | None = None):
    values = np.asarray(values, dtype=float)
    n = len(values)
    rng = rng or np.random.default_rng(SEED_BOOT)

    boot = np.empty(B, dtype=float)
    for b in range(B):
        boot[b] = np.median(rng.choice(values, size=n, replace=True))

    med = float(np.median(values))
    lo = float(np.percentile(boot, 2.5))
    hi = float(np.percentile(boot, 97.5))
    return med, lo, hi


def add_n_annotation(ax: plt.Axes, n: int) -> None:
    ax.text(0.99, 0.02, f"n={n}", transform=ax.transAxes, ha="right", va="bottom", fontsize=9)


def plot_raincloud(
    df_bestday,
    title: str,
    y_limits: tuple,
    out_png: str,
    out_csv: str | None = None,
    out_stats_csv: str | None = None,
    rep_col: str = "rep_number",
    metric_col: str = "metric",
    z_col: str = "z_score",
    y_label: str = "Within-athlete z-score",
) -> None:
    df_plot = (
        df_bestday
        .with_columns([
            pl.col(rep_col).cast(pl.Int64, strict=False).alias(rep_col),
            pl.col(z_col).cast(pl.Float64, strict=False).alias(z_col),
            pl.col(metric_col).cast(pl.Float64, strict=False).alias(metric_col),
        ])
        .filter(pl.col(rep_col).is_between(1, 6))
        .filter(pl.col(z_col).is_not_null())
        .filter(pl.col(metric_col).is_not_null())
        .select( TECHNICAL_COLUMNS + [ z_col, metric_col ])
    )

    z_data_by_rep = [
        df_plot.filter(pl.col(rep_col) == r).get_column(z_col).to_numpy()
        for r in REP_COLS
    ]

    raw_data_by_rep = [
        df_plot.filter(pl.col(rep_col) == r).get_column(metric_col).to_numpy()
        for r in REP_COLS
    ]

    rng_ci = np.random.default_rng(SEED_BOOT)

    counts, means, medians, sds, ci_low, ci_high = [], [], [], [], [], []
    for vals in z_data_by_rep:
        arr = np.asarray(vals, dtype=float)
        counts.append(len(arr))
        means.append(float(np.mean(arr)) if len(arr) else np.nan)
        medians.append(float(np.median(arr)) if len(arr) else np.nan)
        if len(arr) == 0:
            sds.append(np.nan)
        elif len(arr) == 1:
            sds.append(0.0)
        else:
            sds.append(float(np.std(arr, ddof=1)))
        _, lo, hi = bootstrap_median_ci(arr, B=5000, rng=rng_ci)
        ci_low.append(lo)
        ci_high.append(hi)

    means = np.array(means, dtype=float)
    ci_low = np.array(ci_low, dtype=float)
    ci_high = np.array(ci_high, dtype=float)

    raw_counts, raw_means, raw_medians, raw_sds, raw_ci_low, raw_ci_high = [], [], [], [], [], []
    for vals in raw_data_by_rep:
        arr = np.asarray(vals, dtype=float)
        raw_counts.append(len(arr))
        raw_means.append(float(np.mean(arr)) if len(arr) else np.nan)
        raw_medians.append(float(np.median(arr)) if len(arr) else np.nan)
        if len(arr) == 0:
            raw_sds.append(np.nan)
        elif len(arr) == 1:
            raw_sds.append(0.0)
        else:
            raw_sds.append(float(np.std(arr, ddof=1)))
        _, lo, hi = bootstrap_median_ci(arr, B=5000, rng=rng_ci)
        raw_ci_low.append(lo)
        raw_ci_high.append(hi)

    summary_df = pl.DataFrame({
        rep_col: REP_COLS,

        # z summary
        "count": counts,
        "z_mean": means,
        "z_std": sds,
        "z_median": medians,
        "z_median_ci_low": ci_low,
        "z_median_ci_high": ci_high,

        # raw metric summary
        f"src_mean": raw_means,
        f"src_std": raw_sds,
        f"src_median": raw_medians,
        f"src_median_ci_low": raw_ci_low,
        f"src_median_ci_high": raw_ci_high,
    })

    if out_csv:
        ensure_dir(os.path.dirname(out_csv) or ".")
        df_plot.write_csv(out_csv)

    if out_stats_csv:
        ensure_dir(os.path.dirname(out_stats_csv) or ".")
        summary_df.write_csv(out_stats_csv)

    fig, ax = plt.subplots(figsize=(7.2, 4.8))
    vp = ax.violinplot(
        z_data_by_rep,
        positions=REP_COLS,
        widths=0.85,
        showmeans=False,
        showmedians=False,
        showextrema=False,
    )
    for body in vp["bodies"]:
        body.set_alpha(0.30)
        body.set_zorder(1)

    rng_points = np.random.default_rng(SEED_JITTER)
    x_vals = df_plot.get_column(rep_col).to_numpy().astype(float)
    y_vals = df_plot.get_column(z_col).to_numpy().astype(float)
    x_jitter = x_vals + rng_points.uniform(-0.12, 0.12, size=len(x_vals))
    ax.scatter(x_jitter, y_vals, alpha=0.25, s=10, zorder=2)

    ax.boxplot(
        z_data_by_rep,
        positions=REP_COLS,
        widths=BOX_WIDTH,
        showfliers=False,
        medianprops={"linewidth": 1.2, "color": MEDIAN_COLOR},
        whiskerprops={"linewidth": 1.0},
        capprops={"linewidth": 1.0},
        boxprops={"linewidth": 1.0},
    )

    for x, lo, hi in zip(REP_COLS, ci_low, ci_high):
        ax.vlines(x, lo, hi, linewidth=0.8, color=CI_COLOR, zorder=4)

    mean_x = np.array(REP_COLS, dtype=float) + MEAN_X_OFFSET
    ax.scatter(mean_x, means, s=8, color=MEAN_COLOR, zorder=5)

    ax.set_title(title, pad=12)
    ax.set_xlabel("Trial number")
    ax.set_ylabel(y_label)
    ax.set_xticks(REP_COLS)
    ax.set_xticklabels([str(r) for r in REP_COLS])
    ax.axhline(0, linestyle="--", linewidth=1.0, zorder=0)
    ax.set_ylim(y_limits)

    handles = [
        Line2D([0], [0], marker="o", linestyle="None", markersize=4,
               markerfacecolor=MEAN_COLOR, markeredgecolor=MEAN_COLOR, label="Mean"),
        Line2D([0], [0], color=CI_COLOR, linewidth=1.0, label="Median (95% CI)"),
    ]
    ax.legend(
        handles=handles,
        frameon=False,
        loc="upper right",
        bbox_to_anchor=(0.985, 0.98),
        fontsize=9,
        handlelength=1.0,
        handletextpad=0.4,
        borderaxespad=0.0,
    )

    fig.tight_layout(rect=(0, 0, 1, 0.88))
    fig.savefig(out_png, dpi=600, bbox_inches="tight", pad_inches=0.1)
    plt.close(fig)

    return summary_df


def count_cum_stats(
    best_info,
    rep_col: str = "rep_number",
) -> None:
    counts_df = best_info.group_by(rep_col).len().rename({"len": "count"})
    counts_map = dict(
        zip(
            counts_df.get_column(rep_col).to_list(),
            counts_df.get_column("count").to_list(),
        )
    )

    reps = np.array(REP_COLS, dtype=int)
    counts = np.array([int(counts_map.get(r, 0)) for r in reps], dtype=int)
    total = int(counts.sum())

    pct = np.divide(counts, total, out=np.zeros_like(counts, dtype=float), where=total != 0) * 100
    cum_pct = np.cumsum(counts) / total * 100 if total else np.zeros_like(counts, dtype=float)

    summary_df = pl.DataFrame({
            rep_col: reps.tolist(),
            "count": counts.tolist(),
            "percent": pct.tolist(),
            "cumulative_percent": cum_pct.tolist(),
        })

    return summary_df

    
def chi_square_g_best_reps(
    counts_df: pl.DataFrame,
    bonferroni_tests: int = 1,
):
    observed = counts_df.get_column("count").to_numpy()
    df = observed.size - 1
    n = observed.sum()
    
    chi2, p_chi2 = power_divergence(observed, lambda_='pearson') # Chi-Square
    v = np.sqrt(chi2 / (n * df)) if n > 0 else np.nan
    p_bonf_chi2 = min(p_chi2 * bonferroni_tests, 1.0)
    
    g, p_g = power_divergence(observed, lambda_="log-likelihood") # G-Test
    p_bonf_g = min(p_g * bonferroni_tests, 1.0)

    return {
        "N": n,
        "Chi-square": chi2,
        "df": df,
        "p (Chi-square)": p_chi2,
        "p (Chi-square) Bonferroni": p_bonf_chi2,
        "Cramer's V": v,
        "G-test (LRT)": g,
        "p (G-test)": p_g,
        "p (G-test) Bonferroni": p_bonf_g,
    }


def holm_adjust(p_values: list[float]) -> list[float]:
    if not p_values:
        return []
    m = len(p_values)
    order = np.argsort(p_values)
    adjusted = [0.0] * m
    prev = 0.0
    for rank, idx in enumerate(order):
        factor = m - rank
        adj = p_values[idx] * factor
        if adj < prev:
            adj = prev
        if adj > 1.0:
            adj = 1.0
        adjusted[idx] = adj
        prev = adj
    return adjusted


def modal_vs_other_from_counts(
    counts_df: pl.DataFrame,
    metric_label: str,
    rep_col: str = "rep_number",
) -> pl.DataFrame:
    counts_map = dict(
        zip(
            counts_df.get_column(rep_col).to_list(),
            counts_df.get_column("count").to_list(),
        )
    )
    counts = [int(counts_map.get(r, 0)) for r in REP_COLS]
    n = int(sum(counts))

    modal_idx = int(np.argmax(counts))
    modal_rep = modal_idx + 1
    modal_count = int(counts[modal_idx])

    rows = []
    p_vals = []

    for rep in REP_COLS:
        if rep == modal_rep:
            continue
        comp_count = int(counts[rep - 1])
        n_pair = modal_count + comp_count

        if n_pair == 0:
            p_val = 1.0
        else:
            p_val = float(binomtest(modal_count, n_pair, 0.5, alternative="two-sided").pvalue)

        if modal_count == 0 and comp_count == 0:
            or_val = float("nan")
            ci_low = float("nan")
            ci_high = float("nan")
        else:
            a = modal_count
            b = comp_count
            if a == 0 or b == 0:
                a += 0.5
                b += 0.5
            or_val = float(a / b)
            se = float(np.sqrt(1.0 / a + 1.0 / b))
            log_or = float(np.log(or_val))
            ci_low = float(np.exp(log_or - 1.96 * se))
            ci_high = float(np.exp(log_or + 1.96 * se))

        rows.append({
            "Metric": metric_label,
            "N": n,
            "Modal rep": modal_rep,
            "Compared rep": rep,
            "Count modal rep": modal_count,
            "Count compared rep": comp_count,
            "OR (modal/compared)": or_val,
            "CI95% low": ci_low,
            "CI95% high": ci_high,
            "p (exact binomial, two-sided)": p_val,
            "Counts (Rep1-Rep6)": ", ".join(str(c) for c in counts),
        })
        p_vals.append(p_val)

    adjusted = holm_adjust(p_vals)
    for row, adj in zip(rows, adjusted):
        row["p (Holm, within-metric)"] = adj
        row["Significant (Holm<0.05)"] = adj < 0.05

    return pl.DataFrame(rows)


## Generic plot runner


In [107]:
def generate_metric_plots(
    df_all_sessions: pl.DataFrame,
    df_best_session: pl.DataFrame,
    metric_col: str,
    metric_label: str,
    out_dir: str,
    out_prefix: str,
) -> pl.DataFrame:
    df_best = df_all_sessions.join(
        df_best_session,
        on=["athlete_name", "session_number"],
        how="semi",
    )

    best_rep_df = get_best_reps(df_best, metric_col)

    rep_col = "rep_number"

    raincloud_summary_df = plot_raincloud(
        df_best,
        title=f"CMJ ({metric_label}) â€“ within-athlete z-score distribution by trial",
        y_limits=(-SIGMA_THRESHOLD - 0.25, SIGMA_THRESHOLD + 0.25),
        out_png=f"{out_dir}/{out_prefix}_raincloud.png",
        out_csv=f"{out_dir}/{out_prefix}_raincloud.csv",
        out_stats_csv=f"{out_dir}/{out_prefix}_raincloud_stats.csv",
        metric_col=metric_col,
        z_col=f"{metric_col}_z_score",
        rep_col=rep_col,
        y_label="Within-athlete z-score",
    )

    raincloud_summary_df = raincloud_summary_df.with_columns(pl.lit(metric_label).alias("metric_label"))

    cum_stats_df = count_cum_stats(
        best_rep_df,
        rep_col=rep_col,
    )

    modal_vs_other_df = modal_vs_other_from_counts(
        cum_stats_df,
        metric_label=metric_label,
        rep_col=rep_col,
    )

    chi2_g_df = pl.DataFrame(chi_square_g_best_reps(
        cum_stats_df,
        bonferroni_tests=len(METRICS_CONFIG),
    ))


    chi2_g_df = chi2_g_df.with_columns(
            pl.lit(
                ", ".join(str(cnt) for cnt in cum_stats_df["count"].to_list()))
                .alias("Counts (Rep1-Rep6)"),
            pl.lit(metric_label).alias("Metric")
        )

    ensure_dir(os.path.dirname(out_dir) or ".")
    chi2_g_df.write_csv(f"{out_dir}/{out_prefix}_chi2.csv")

    summary_df = raincloud_summary_df.join(
        cum_stats_df,
        on=rep_col,
        how="inner",
    )

    return summary_df, chi2_g_df, modal_vs_other_df

In [108]:
def assemble_raincloud_panel(
    image_paths: list[str],
    metric_labels: list[str],
    out_path: str,
    ncols: int = 2,
    wspace: float = 0.12,
    hspace: float = 0.08,
) -> None:
    if len(image_paths) != len(metric_labels):
        raise ValueError("image_paths and metric_labels must have the same length.")

    nrows = int(np.ceil(len(image_paths) / ncols))
    fig_height = max(3.5, 4 * nrows)
    fig, axes = plt.subplots(nrows, ncols, figsize=(12, fig_height))
    axes_flat = np.array(axes).ravel()

    for ax, img_path, label in zip(axes_flat, image_paths, metric_labels):
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Missing raincloud plot: {img_path}")
        img = plt.imread(img_path)
        ax.imshow(img)
        ax.axis("off")

    for ax in axes_flat[len(image_paths):]:
        ax.axis("off")

    fig.subplots_adjust(hspace=hspace, wspace=wspace)
    fig.tight_layout(rect=(0, 0, 1, 1))
    ensure_dir(os.path.dirname(out_path) or ".")
    fig.savefig(out_path, dpi=600, bbox_inches="tight", pad_inches=0.05)
    plt.close(fig)


def create_raincloud_panels(
    plots_dir: str,
    panel_specs: list[tuple],
    panel_dir_name: str = "raincloud_panels",
    default_ncols: int = 2,
) -> None:
    metric_label_map = dict(METRICS_CONFIG)
    panel_dir = os.path.join(plots_dir, panel_dir_name)
    ensure_dir(panel_dir)

    for spec in panel_specs:
        if len(spec) == 2:
            panel_name, metrics = spec
            ncols = default_ncols
        elif len(spec) == 3:
            panel_name, metrics, ncols = spec
        else:
            raise ValueError("panel_specs entries must be (name, metrics) or (name, metrics, ncols)")

        image_paths = [
            os.path.join(plots_dir, m, f"CMJ_{m}_raincloud.png")
            for m in metrics
        ]
        metric_labels = [metric_label_map.get(m, m) for m in metrics]
        out_path = os.path.join(panel_dir, f"{panel_name}_raincloud_panel.png")
        assemble_raincloud_panel(
            image_paths=image_paths,
            metric_labels=metric_labels,
            out_path=out_path,
            ncols=ncols,
        )


## Run pipeline


In [109]:
ensure_dir(PLOTS_DIR)
ensure_dir(STATS_DIR)
set_manuscript_font()
reset_log(LOG_PATH)

df_all = load_and_combine(SRC_PATH, COMBINED_PATH)
df_cmj = filter_cmj(df_all, CMJ_TEST_TYPE, PURE_CMJ_PATH)
df_cmj_sessions = add_session_rep_columns(df_cmj)

df_filtered = df_cmj_sessions.select(TECHNICAL_COLUMNS + METRICS_COLUMNS)

log_stats("Initially", df_filtered, LOG_PATH)

df_filtered = (
    df_filtered
    .with_columns(
        pl.col("rep_number").count().over(["athlete_name", "session_number"]).alias("rep_numbers_in_session")
    )
    .filter(pl.col("rep_numbers_in_session") >= MIN_REPS_IN_FAM_SESSION)
)

log_stats("Sessions with at least 3 repetitions", df_filtered, LOG_PATH)

df_filtered = df_filtered.filter(pl.col("session_number") > MIN_FAMILIARIZATION_DAYS)

log_stats("One familiarization session removed from dataset", df_filtered, LOG_PATH)

df_filtered = df_filtered.filter(pl.col("rep_numbers_in_session") >= MIN_REPS_IN_VALID_SESSION)

log_stats("Correct sessions with at least 6 repetitions", df_filtered, LOG_PATH)

df_filtered = df_filtered.filter(pl.col("rep_number") <= KEEP_FIRST_N)

log_stats("Correct tests protocols - removed repetitions above 6th", df_filtered, LOG_PATH)

df_filtered = df_filtered.join(
    df_filtered
    .filter(
        (pl.col("rep_number") == 4) &
        (pl.col("rest_before_rep_seconds") >= REST_THRESHOLD_SEC)
    )
    .select(["athlete_name", "session_number"])
    .unique(),
    on=["athlete_name", "session_number"],
    how="semi",
)

log_stats("Correct tests protocols with at least 30 seconds of rest", df_filtered, LOG_PATH)

df_filtered = add_mrsi_session_stats(df_filtered)

df_filtered = add_sex_height_from_csv(df_filtered, "data/athletes.csv")

df_filtered = df_filtered.drop("rep_numbers_in_session")

# Calcualte z-scores per rep
df_filtered = df_filtered.with_columns(
    [
        (
            (pl.col(c).abs() - pl.col(c).abs().mean().over("athlete_name"))
            / pl.col(c).abs().std(ddof=1).over("athlete_name")
        ).alias(f"{c}_z_score")
        for c in METRICS_COLUMNS
])

df_filtered.write_csv("data/filtered_data.csv")

mean_n = df_filtered.select(pl.col("system_weight_n").mean().round(2)).item()
std_n = df_filtered.select(pl.col("system_weight_n").std(ddof=0).round(2)).item()
mean_kg = df_filtered.select(pl.col("system_weight_kg").mean().round(2)).item()
std_kg = df_filtered.select(pl.col("system_weight_kg").std(ddof=0).round(2)).item()
mean_height = df_filtered.select(pl.col("height_cm").mean().round(2)).item()
std_height = df_filtered.select(pl.col("height_cm").std(ddof=0).round(2)).item()
males_cnt = df_filtered.filter(pl.col("sex") == "M").select("athlete_name").unique().height
females_cnt = df_filtered.filter(pl.col("sex") == "F").select("athlete_name").unique().height

log_message(f"""
Demographics of filtered dataset:
    Average weight in N: {mean_n},
    STD of system weight in N: {std_n},

    Average weight in kg: {mean_kg},
    STD of weight in kg: {std_kg},
    
    Average height in cm: {mean_height},
    STD of height in cm: {std_height},
    
    Males count: {males_cnt},
    Females count: {females_cnt}
""",
LOG_PATH)

## Choose best session and generate plots for a metric


In [111]:
RECALCULATE_Z_SCORE = False

# Choose best session based on max mRSI and generate plots for each metric
df_best_session = best_session_ever(df_filtered, "max_mrsi_in_session")

df_best_session.write_csv("data/best_session_per_athlete.csv")

# Initial schemas with first columns - others will be added in concat
summary_df = pl.DataFrame(schema={"metric_label": pl.String})
chi2_g_df = pl.DataFrame(schema={
    "Metric": pl.String,
    "N": pl.String,
    "Counts (Rep1-Rep6)": pl.String
})


modal_vs_other_df = pl.DataFrame(schema={
    "Metric": pl.String,
    "N": pl.Int64,
    "Modal rep": pl.Int64,
    "Compared rep": pl.Int64,
    "Count modal rep": pl.Int64,
    "Count compared rep": pl.Int64,
    "OR (modal/compared)": pl.Float64,
    "CI95% low": pl.Float64,
    "CI95% high": pl.Float64,
    "p (exact binomial, two-sided)": pl.Float64,
    "p (Holm, within-metric)": pl.Float64,
    "Significant (Holm<0.05)": pl.Boolean,
    "Counts (Rep1-Rep6)": pl.String,
})

outliers = []

for metric_col, metric_label in METRICS_CONFIG:
    metric_dir = f"{PLOTS_DIR}/{metric_col}"
    ensure_dir(metric_dir)

    cnt_w_outliers = df_filtered.height
    df_wo_outliers = filter_outliers(df_filtered, metric_col, SIGMA_THRESHOLD)
    cnt_wo_outliers = df_wo_outliers.height

    if RECALCULATE_Z_SCORE:
        df_wo_outliers = df_wo_outliers.with_columns(
            [
                (
                    (pl.col(c).abs() - pl.col(c).abs().mean().over("athlete_name"))
                    / pl.col(c).abs().std(ddof=1).over("athlete_name")
                ).alias(f"{c}_z_score")
                for c in METRICS_COLUMNS
            ]
        )

    outliers.append({
        "Metric": metric_label,
        "N (before outlier removal)": cnt_w_outliers,
        "N (after >4 SD removal)": cnt_wo_outliers,
        "% Data Loss": (cnt_w_outliers - cnt_wo_outliers) / cnt_w_outliers * 100,
    })

    new_summary_df, new_chi2_g_df, new_modal_vs_other_df = generate_metric_plots(
        df_all_sessions=df_wo_outliers,
        df_best_session=df_best_session,
        metric_col=metric_col,
        metric_label=metric_label,
        out_dir=metric_dir,
        out_prefix=f"CMJ_{metric_col}",
    )

    summary_df = pl.concat([summary_df, new_summary_df], how="diagonal_relaxed")

    chi2_g_df = pl.concat([chi2_g_df, new_chi2_g_df], how="diagonal_relaxed")

    modal_vs_other_df = pl.concat([modal_vs_other_df, new_modal_vs_other_df], how="diagonal_relaxed")

summary_df = summary_df.rename({"count_right": "reps_count"})
summary_df.write_csv(f"{DATA_DIR}/all_summary.csv")
chi2_g_df.write_csv(f"{DATA_DIR}/chi2_g_summary.csv")
modal_vs_other_df.write_csv(f"{DATA_DIR}/modal_vs_other.csv")

pl.DataFrame(outliers).write_csv(f"{DATA_DIR}/outliers_summary.csv")

In [112]:
panel_specs = [
    ("all_metrics", [
        "jump_height_m",
        "mrsi",
        "stiffness_n_m",
        "avg_relative_braking_force",
        "avg_relative_braking_power_w_kg",
        "avg_relative_propulsive_force",
        "avg_relative_propulsive_power_w_kg",
    ], 2),
]

create_raincloud_panels(
    plots_dir=PLOTS_DIR,
    panel_specs=panel_specs,
    default_ncols=2,
)
