Script to process KPI time series data:
For train data:
1. Check for missing points in each KPI sequence.
2. Perform linear interpolation for missing points.
3. Split time series based on KPI ID.

For test data:
1. Split time series based on KPI ID.

In [373]:
# Import necessary libraries
import logging
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import polars as pl
import seaborn as sns
import warnings
import zipfile

logging.basicConfig(level=logging.INFO)

warnings.filterwarnings("ignore")

sns.set_theme(style="whitegrid")
plt.rcParams.update(
    {
        "axes.edgecolor": "0.3",
        "axes.linewidth": 0.8,
        "font.size": 12,
        "axes.titlesize": 14,
        "axes.labelsize": 12,
        "axes.titleweight": "bold",
        "legend.fontsize": 10,
        "figure.dpi": 120,
        "legend.frameon": False,
    }
)

In [374]:
# unzip raw data if not already unzipped
raw_test_data_path = "../../datasets/kpi/raw/phase2_ground_truth.hdf.zip"
raw_train_data_path = "../../datasets/kpi/raw/phase2_train.csv.zip"
test_data_path = "../../datasets/kpi/raw/phase2_ground_truth.hdf"
train_data_path = "../../datasets/kpi/raw/phase2_train.csv"
if not os.path.exists(test_data_path):
    with zipfile.ZipFile(raw_test_data_path, "r") as zip_ref:
        zip_ref.extractall(os.path.dirname(raw_test_data_path))
    print(f"Unzipped raw data to {os.path.dirname(raw_test_data_path)}")
if not os.path.exists(train_data_path):
    with zipfile.ZipFile(raw_train_data_path, "r") as zip_ref:
        zip_ref.extractall(os.path.dirname(raw_train_data_path))
    print(f"Unzipped raw data to {os.path.dirname(raw_train_data_path)}")

# convert test data from hdf to parquet
test_data_path = "../../datasets/kpi/raw/phase2_ground_truth.hdf"
test_data_parquet = "../../datasets/kpi/raw/phase2_ground_truth.parquet"
# If parquet already exists, skip conversion to avoid repeated work
if os.path.exists(test_data_parquet):
    print(f"Parquet already exists: {test_data_parquet}")
else:
    print(f"Reading HDF: {test_data_path}")
    try:
        test_ts = pd.read_hdf(test_data_path)
        # Ensure KPI ID values are safe to serialize (UUIDs -> str)
        if "KPI ID" in test_ts.columns:
            test_ts["KPI ID"] = test_ts["KPI ID"].astype(str)
        test_ts = pl.from_pandas(test_ts)
        test_ts.write_parquet(  # pyright: ignore[reportAttributeAccessIssue]
            test_data_parquet
        )
        print(f"Wrote parquet: {test_data_parquet}")
    except Exception as e:
        print(f"Failed to convert HDF to parquet: {e}")

Parquet already exists: ../../datasets/kpi/raw/phase2_ground_truth.parquet


In [375]:
# Load train and test data
train_data_path = "../../datasets/kpi/raw/phase2_train.csv"
test_data_path = (
    "../../datasets/kpi/raw/phase2_ground_truth.parquet"  # parquet produced above
)
# Use polars for fast IO: CSV for train, parquet for test
train_ts = pl.read_csv(
    train_data_path,
    schema={
        "timestamp": pl.UInt64,
        "value": pl.Float64,
        "label": pl.UInt8,
        "KPI ID": pl.String,
    },
    truncate_ragged_lines=True,
)
test_ts = pl.read_parquet(test_data_path)

In [376]:
# Explore train data
print(f"Train data columns: {train_ts.columns}")

print(f"Train data shape: {train_ts.shape}")

print(
    f"Train data has {train_ts['KPI ID'].n_unique()} KPI IDs:\n{train_ts['KPI ID'].unique().to_list()}"
)
train_ts.head()

Train data columns: ['timestamp', 'value', 'label', 'KPI ID']
Train data shape: (3004066, 4)
Train data has 29 KPI IDs:
['6efa3a07-4544-34a0-b921-a155bd1a05e8', 'f0932edd-6400-3e63-9559-0a9860a1baa9', 'da10a69f-d836-3baa-ad40-3e548ecf1fbd', '05f10d3a-239c-3bef-9bdc-a2feeb0037aa', 'a8c06b47-cc41-3738-9110-12df0ee4c721', '57051487-3a40-3828-9084-a12f7f23ee38', '43115f2a-baeb-3b01-96f7-4ea14188343c', '55f8b8b8-b659-38df-b3df-e4a5a8a54bc9', '301c70d8-1630-35ac-8f96-bc1b6f4359ea', '847e8ecc-f8d2-3a93-9107-f367a0aab37d', '6a757df4-95e5-3357-8406-165e2bd49360', 'e0747cad-8dc8-38a9-a9ab-855b61f5551d', 'ffb82d38-5f00-37db-abc0-5d2e4e4cb6aa', '9c639a46-34c8-39bc-aaf0-9144b37adfc8', '42d6616d-c9c5-370a-a8ba-17ead74f3114', '4d2af31a-9916-3d9f-8a8e-8a268a48c095', 'a07ac296-de40-3a7c-8df3-91f642cc14d0', 'ba5f3328-9f3f-3ff5-a683-84437d16d554', '431a8542-c468-3988-a508-3afd06a218da', '1c6d7a26-1f1a-3321-bb4d-7a9d969ec8f0', '54350a12-7a9d-3ca8-b81f-f886b9d156fd', 'ab216663-dcc2-3a24-b1ee-2c3e550e06c9',

timestamp,value,label,KPI ID
u64,f64,u8,str
1476460800,0.012604,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1476460860,0.017786,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1476460920,0.012014,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1476460980,0.017062,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1476461040,0.023632,0,"""da10a69f-d836-3baa-ad40-3e548e…"


In [377]:
# Explore test data
print(f"Test data shape: {test_ts.shape}")
print(
    f"Test data has {test_ts['KPI ID'].n_unique()} KPI IDs:\n{test_ts['KPI ID'].unique().to_list()}"
)
test_ts.head()

Test data shape: (2918847, 4)
Test data has 29 KPI IDs:
['a07ac296-de40-3a7c-8df3-91f642cc14d0', '6a757df4-95e5-3357-8406-165e2bd49360', '43115f2a-baeb-3b01-96f7-4ea14188343c', '4d2af31a-9916-3d9f-8a8e-8a268a48c095', '57051487-3a40-3828-9084-a12f7f23ee38', '7103fa0f-cac4-314f-addc-866190247439', 'da10a69f-d836-3baa-ad40-3e548ecf1fbd', '6d1114ae-be04-3c46-b5aa-be1a003a57cd', 'ba5f3328-9f3f-3ff5-a683-84437d16d554', 'a8c06b47-cc41-3738-9110-12df0ee4c721', 'ffb82d38-5f00-37db-abc0-5d2e4e4cb6aa', '05f10d3a-239c-3bef-9bdc-a2feeb0037aa', '431a8542-c468-3988-a508-3afd06a218da', '9c639a46-34c8-39bc-aaf0-9144b37adfc8', '0efb375b-b902-3661-ab23-9a0bb799f4e3', '6efa3a07-4544-34a0-b921-a155bd1a05e8', '847e8ecc-f8d2-3a93-9107-f367a0aab37d', '301c70d8-1630-35ac-8f96-bc1b6f4359ea', 'e0747cad-8dc8-38a9-a9ab-855b61f5551d', '54350a12-7a9d-3ca8-b81f-f886b9d156fd', 'ab216663-dcc2-3a24-b1ee-2c3e550e06c9', '42d6616d-c9c5-370a-a8ba-17ead74f3114', '55f8b8b8-b659-38df-b3df-e4a5a8a54bc9', '1c6d7a26-1f1a-3321-bb4

timestamp,value,label,KPI ID
i64,f64,i64,str
1482940800,0.048066,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1482940860,0.010651,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1482940920,0.014996,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1482940980,0.039722,0,"""da10a69f-d836-3baa-ad40-3e548e…"
1482941040,0.022452,0,"""da10a69f-d836-3baa-ad40-3e548e…"


In [378]:
# utils for KPI stats
def compute_kpi_stats(df: pl.DataFrame) -> pl.DataFrame:
    """Compute KPI statistics from time series data.

    Args:
        df (pl.DataFrame): Input DataFrame with columns 'KPI ID', 'timestamp', 'value', 'label'.
    Returns:
        pl.DataFrame: DataFrame with computed statistics per 'KPI ID'.
    """
    # Group by 'KPI ID' and compute statistics
    return (
        df.group_by("KPI ID")
        .agg(
            [
                pl.count().alias("Points"),
                ((pl.col("label") == 1).sum()).alias("Anomaly Points"),
                # Interval calculation
                pl.col("timestamp")
                .diff()
                .median()
                .cast(pl.UInt32)
                .alias("Interval (s)"),
                # Calculate expected points directly in aggregation
                (
                    (
                        (pl.col("timestamp").max() - pl.col("timestamp").min())
                        / pl.col("timestamp").diff().median()
                    )
                    + 1
                )
                .cast(pl.UInt32)
                .alias("Expected Points"),
                pl.col("timestamp")
                .diff()
                .map_batches(
                    lambda intervals: pl.Series(
                        [
                            ((intervals / intervals.median()).round() - 1)
                            .filter((intervals / intervals.median()).round() - 1 > 0)
                            .min()
                            or 0
                        ],
                        dtype=pl.Int32,
                        strict=False,
                    )
                )
                .first()
                .alias("Gap | Min"),
                pl.col("timestamp")
                .diff()
                .cast(pl.UInt32)
                .map_batches(
                    lambda intervals: pl.Series(
                        [
                            ((intervals / intervals.median()).round() - 1)
                            .filter((intervals / intervals.median()).round() - 1 > 0)
                            .median()
                            or 0
                        ],
                        dtype=pl.Int32,
                        strict=False,
                    )
                )
                .first()
                .alias("Gap | Median"),
                pl.col("timestamp")
                .diff()
                .cast(pl.UInt32)
                .map_batches(
                    lambda intervals: pl.Series(
                        [
                            ((intervals / intervals.median()).round() - 1)
                            .filter((intervals / intervals.median()).round() - 1 > 0)
                            .max()
                            or 0
                        ],
                        dtype=pl.Int32,
                        strict=False,
                    )
                )
                .first()
                .alias("Gap | Max"),
            ]
        )
        .with_columns(
            pl.col("KPI ID").str.slice(0, 8),
            # Calculate missing points
            pl.when(pl.col("Interval (s)").is_not_null())
            .then((pl.col("Expected Points") - pl.col("Points")).clip(lower_bound=0))
            .otherwise(0)
            .cast(pl.UInt32)
            .alias("Missing Points"),
        )
        .with_columns(
            # Calculate missing rate and anomaly rate
            (100 * pl.col("Missing Points") / pl.col("Expected Points"))
            .fill_null(0)
            .round(2)
            .alias("Missing Rate (%)"),
            (100 * pl.col("Anomaly Points") / pl.col("Points"))
            .fill_null(0)
            .round(2)
            .alias("Anomaly Rate (%)"),
        )
        .select(
            [
                "KPI ID",
                "Points",
                "Interval (s)",
                "Missing Points",
                "Missing Rate (%)",
                "Gap | Min",
                "Gap | Max",
                "Gap | Median",
                "Anomaly Points",
                "Anomaly Rate (%)",
            ]
        )
        .sort("KPI ID")
    )

In [388]:
# Explore train time series per KPI ID.
train_ts = train_ts.sort(["KPI ID", "timestamp"])

compute_kpi_stats(train_ts).sort("Gap | Max")

KPI ID,Points,Interval (s),Missing Points,Missing Rate (%),Gap | Min,Gap | Max,Gap | Median,Anomaly Points,Anomaly Rate (%)
str,u32,u32,u32,f64,i32,i32,i32,u32,f64
"""0efb375b""",8784,300,0,0.0,0,0,0,13,0.15
"""301c70d8""",8784,300,0,0.0,0,0,0,114,1.3
"""c02607e8""",8784,300,0,0.0,0,0,0,6,0.07
"""e0747cad""",8784,300,0,0.0,0,0,0,93,1.06
"""da10a69f""",107717,60,283,0.26,1,2,1,7363,6.84
…,…,…,…,…,…,…,…,…,…
"""6a757df4""",129010,60,2785,2.11,1,1722,3,6853,5.31
"""4d2af31a""",128872,60,2923,2.22,1,1752,3,7396,5.74
"""431a8542""",129046,60,2749,2.09,1,1774,2,8222,6.37
"""f0932edd""",128789,60,3006,2.28,1,1774,2,7254,5.63


In [380]:
# Explore test time series per KPI ID.
test_ts = test_ts.sort(["KPI ID", "timestamp"])

compute_kpi_stats(test_ts)

KPI ID,Points,Interval (s),Missing Points,Missing Rate (%),Gap | Min,Gap | Max,Gap | Median,Anomaly Points,Anomaly Rate (%)
str,u32,u32,u32,f64,i32,i32,i32,u32,f64
"""05f10d3a""",149130,60,396,0.26,1,33,1,991,0.66
"""0efb375b""",8784,300,0,0.0,0,0,0,71,0.81
"""1c6d7a26""",149156,60,370,0.25,1,6,1,633,0.42
"""301c70d8""",8784,300,0,0.0,0,0,0,206,2.35
"""42d6616d""",149161,60,365,0.24,1,6,1,1896,1.27
…,…,…,…,…,…,…,…,…,…
"""c69a50cf""",149159,60,367,0.25,1,6,1,702,0.47
"""da10a69f""",107167,60,833,0.77,1,6,1,8750,8.16
"""e0747cad""",8784,300,0,0.0,0,0,0,116,1.32
"""f0932edd""",112149,60,19646,14.91,1,5420,10,2842,2.53


In [381]:
# Function to fill missing time points with linear interpolation
def fill_missing_points_with_nan(df: pl.DataFrame) -> pl.DataFrame:
    """
    Fill missing time points for each KPI with NaN values.
    Much faster than Python-loop-based method.
    """

    results = []
    print("Filling missing points for each KPI ...")

    # Process each KPI ID separately
    for kpi_id in df["KPI ID"].unique():
        kpi_ts = df.filter(pl.col("KPI ID") == kpi_id).sort("timestamp")

        # Skip KPIs with less than 2 points
        if kpi_ts.height < 2:
            results.append(kpi_ts)
            continue

        # Calculate interval (median)
        timestamps = kpi_ts["timestamp"]
        interval = int(np.median(np.diff(timestamps)))

        start_time = int(timestamps[0])
        end_time = int(timestamps[-1])

        # Generate complete timestamp sequence (Polars natively supports arange)
        complete_timestamps = pl.arange(
            start_time,
            end_time + interval,
            step=interval,
            eager=True,
        )
        completed = pl.DataFrame(
            {
                "timestamp": complete_timestamps,
                "KPI ID": [kpi_id] * len(complete_timestamps),
            }
        )

        # Use left join to fill missing points
        completed = completed.join(kpi_ts, on=["KPI ID", "timestamp"], how="left")

        completed["label"].fill_null(0)

        # Statistics
        missing_added = completed.height - kpi_ts.height
        if missing_added > 0:
            print(
                f"   KPI {kpi_id[:8]}: {kpi_ts.height} -> {completed.height} (Added {missing_added} NaN points)"
            )

        results.append(completed)

    # Concatenate all KPIs
    df = pl.concat(results).sort(["KPI ID", "timestamp"])

    # Validation info
    total_missing = df["value"].is_null().sum()
    print(f"Completed {len(results)} KPIs, total missing points added: {total_missing}")

    return df

In [382]:
# train_ts = fill_missing_points_with_nan(train_ts)
compute_kpi_stats(train_ts)

KPI ID,Points,Interval (s),Missing Points,Missing Rate (%),Gap | Min,Gap | Max,Gap | Median,Anomaly Points,Anomaly Rate (%)
str,u32,u32,u32,f64,i32,i32,i32,u32,f64
"""05f10d3a""",146255,60,3272,2.19,1,312,1,1285,0.88
"""0efb375b""",8784,300,0,0.0,0,0,0,13,0.15
"""1c6d7a26""",146254,60,3273,2.19,1,312,1,1064,0.73
"""301c70d8""",8784,300,0,0.0,0,0,0,114,1.3
"""42d6616d""",146253,60,3274,2.19,1,312,1,1407,0.96
…,…,…,…,…,…,…,…,…,…
"""c69a50cf""",146255,60,3272,2.19,1,312,1,878,0.6
"""da10a69f""",107717,60,283,0.26,1,2,1,7363,6.84
"""e0747cad""",8784,300,0,0.0,0,0,0,93,1.06
"""f0932edd""",128789,60,3006,2.28,1,1774,2,7254,5.63


In [383]:
# utils for testing different filling strategies
def visualize_missing_segments_comparison(
    df: pl.DataFrame,
    min_missing=10,
    max_missing=100,
    n_samples=3,
):
    """
    auto choose missing segments (10~100 points) to compare three filling strategies
    (linear interpolation, forward fill, backward fill), paper-quality visualization.
    """
    # Check columns
    for col in ["KPI ID", "timestamp", "value", "label"]:
        assert col in df.columns, f"Missing column: {col}"

    # Find KPIs with missing values
    kpis = df.group_by("KPI ID").agg(
        pl.col("value").is_null().sum().alias("missing_count")
    )
    candidate_kpis = kpis.filter(pl.col("missing_count") > 0)["KPI ID"].to_list()
    chosen_segments = []

    for kpi_id in candidate_kpis:
        kpi_ts = df.filter(pl.col("KPI ID") == kpi_id).sort("timestamp")
        value = kpi_ts["value"]
        timestamp = kpi_ts["timestamp"]
        nan_mask = value.is_null().to_numpy()

        # Find continuous NaN segments
        diff = np.diff(np.concatenate(([0], nan_mask.view(np.int8), [0])))
        starts = np.where(diff == 1)[0]
        ends = np.where(diff == -1)[0]
        for s, e in zip(starts, ends):
            length = e - s
            if min_missing <= length <= max_missing:
                chosen_segments.append((kpi_id, s, e))
                break  # One segment per KPI to enhance diversity

        if len(chosen_segments) >= n_samples:
            break

    if not chosen_segments:
        print("No suitable missing segments found.")
        return

    print(f"Found {len(chosen_segments)} missing segments for visualization.")

    # Plot
    _, axes = plt.subplots(
        len(chosen_segments), 1, figsize=(12, 4.5 * len(chosen_segments))
    )
    if len(chosen_segments) == 1:
        axes = [axes]

    colors = {"linear": "#0072B2", "forward": "#009E73", "backward": "#D55E00"}

    for ax, (kpi_id, s, e) in zip(axes, chosen_segments):
        kpi_ts = df.filter(pl.col("KPI ID") == kpi_id).sort("timestamp")

        # Calculate three filling strategies simultaneously
        kpi_ts = kpi_ts.with_columns(
            [
                pl.col("value").interpolate().alias("linear"),
                pl.col("value").fill_null(strategy="forward").alias("forward"),
                pl.col("value").fill_null(strategy="backward").alias("backward"),
            ]
        )

        timestamp = kpi_ts["timestamp"]
        value = kpi_ts["value"]
        label = kpi_ts["label"]

        # Dynamic window (2 times the length of the missing segment)
        pad = int(e - s)
        start = max(0, s - pad)
        end = min(len(value), e + pad)

        x = timestamp[start:end]
        y = value[start:end]
        y_linear = kpi_ts["linear"][start:end]
        y_forward = kpi_ts["forward"][start:end]
        y_backward = kpi_ts["backward"][start:end]

        x = pd.to_datetime(x.to_numpy(), unit="ms")
        # Various filling strategies
        ax.plot(x, y_linear, color=colors["linear"], lw=2, label="Linear interpolation")
        ax.plot(x, y_forward, color=colors["forward"], lw=2, label="Forward fill")
        ax.plot(x, y_backward, color=colors["backward"], lw=2, label="Backward fill")

        # Plot original curve with NaNs
        ax.plot(x, y, "-", color="black", lw=1.5, label="Original (with NaN)")

        ax.axvline(x[pad - 1], color="0.45", linestyle="--", linewidth=1.5, zorder=3)
        ax.axvline(x[-pad], color="0.45", linestyle="--", linewidth=1.5, zorder=3)

        # Anomalies
        anomaly_idx = np.where(label[start:end] == 1)[0]
        if len(anomaly_idx) > 0:
            ax.scatter(
                x[anomaly_idx],
                y_linear[anomaly_idx],
                color="red",
                s=40,
                marker="x",
                zorder=5,
                label="Anomaly",
            )

        # ax.grid(
        #     True, color="0.85", linestyle="-", linewidth=1, alpha=0.5, zorder=0
        # )  # 浅灰色网格

        # # 保证 x 轴刻度与网格线对齐
        # import matplotlib as mpl
        # ax.xaxis.set_major_locator(mpl.dates.AutoDateLocator())
        # ax.xaxis.set_major_formatter(mpl.dates.DateFormatter("%Y-%m-%d %H:%M"))
        plt.setp(ax.get_xticklabels(), rotation=30, ha="right")  # 倾斜防

        ax.set_title(f"KPI: {kpi_id} | Missing segment [{s}-{e}] ({e - s} points)")
        ax.set_xlabel("Timestamp")
        ax.set_ylabel("Value")
        ax.legend(loc="best", fontsize=9)
        sns.despine(ax=ax)

    plt.tight_layout()
    plt.show()

In [384]:
visualize_missing_segments_comparison(
    train_ts, min_missing=10, max_missing=100, n_samples=3
)

No suitable missing segments found.


In [385]:
# Interpolate missing values in the 'value' column using linear interpolation
# train_ts = train_ts.with_columns(pl.col("value").interpolate())

In [386]:
# seperate train ts into multiple csv files based on KPI ID
kpi_ids = train_ts["KPI ID"].unique().to_list()
output_dir = "../../datasets/kpi/train"
if os.path.exists(output_dir):
    for f in os.listdir(output_dir):
        os.remove(os.path.join(output_dir, f))
else:
    os.makedirs(output_dir, exist_ok=True)
for kpi_id in kpi_ids:
    kpi_ts = (
        train_ts.filter(pl.col("KPI ID") == kpi_id)
        .sort("timestamp")
        .select(["timestamp", "value", "label"])
    )
    output_path = os.path.join(output_dir, f"{kpi_id}.csv")
    kpi_ts.write_csv(output_path)
    print(f"Wrote KPI {kpi_id} to {output_path}")

Wrote KPI 54350a12-7a9d-3ca8-b81f-f886b9d156fd to ../../datasets/kpi/train/54350a12-7a9d-3ca8-b81f-f886b9d156fd.csv
Wrote KPI 7103fa0f-cac4-314f-addc-866190247439 to ../../datasets/kpi/train/7103fa0f-cac4-314f-addc-866190247439.csv
Wrote KPI 57051487-3a40-3828-9084-a12f7f23ee38 to ../../datasets/kpi/train/57051487-3a40-3828-9084-a12f7f23ee38.csv
Wrote KPI f0932edd-6400-3e63-9559-0a9860a1baa9 to ../../datasets/kpi/train/f0932edd-6400-3e63-9559-0a9860a1baa9.csv
Wrote KPI c02607e8-7399-3dde-9d28-8a8da5e5d251 to ../../datasets/kpi/train/c02607e8-7399-3dde-9d28-8a8da5e5d251.csv
Wrote KPI ba5f3328-9f3f-3ff5-a683-84437d16d554 to ../../datasets/kpi/train/ba5f3328-9f3f-3ff5-a683-84437d16d554.csv
Wrote KPI 43115f2a-baeb-3b01-96f7-4ea14188343c to ../../datasets/kpi/train/43115f2a-baeb-3b01-96f7-4ea14188343c.csv
Wrote KPI ffb82d38-5f00-37db-abc0-5d2e4e4cb6aa to ../../datasets/kpi/train/ffb82d38-5f00-37db-abc0-5d2e4e4cb6aa.csv
Wrote KPI 8723f0fb-eaef-32e6-b372-6034c9c04b80 to ../../datasets/kpi/tra

In [387]:
# seperate test ts into multiple csv files based on KPI ID
kpi_ids = test_ts["KPI ID"].unique().to_list()
output_dir = "../../datasets/kpi/test"
if os.path.exists(output_dir):
    for f in os.listdir(output_dir):
        os.remove(os.path.join(output_dir, f))
else:
    os.makedirs(output_dir, exist_ok=True)
for kpi_id in kpi_ids:
    kpi_ts = (
        test_ts.filter(pl.col("KPI ID") == kpi_id)
        .sort("timestamp")
        .select(["timestamp", "value", "label"])
    )
    output_path = os.path.join(output_dir, f"{kpi_id}.csv")
    kpi_ts.write_csv(output_path)
    print(f"Wrote KPI {kpi_id} to {output_path}")

Wrote KPI ab216663-dcc2-3a24-b1ee-2c3e550e06c9 to ../../datasets/kpi/test/ab216663-dcc2-3a24-b1ee-2c3e550e06c9.csv
Wrote KPI 55f8b8b8-b659-38df-b3df-e4a5a8a54bc9 to ../../datasets/kpi/test/55f8b8b8-b659-38df-b3df-e4a5a8a54bc9.csv
Wrote KPI 54350a12-7a9d-3ca8-b81f-f886b9d156fd to ../../datasets/kpi/test/54350a12-7a9d-3ca8-b81f-f886b9d156fd.csv
Wrote KPI 6efa3a07-4544-34a0-b921-a155bd1a05e8 to ../../datasets/kpi/test/6efa3a07-4544-34a0-b921-a155bd1a05e8.csv
Wrote KPI c02607e8-7399-3dde-9d28-8a8da5e5d251 to ../../datasets/kpi/test/c02607e8-7399-3dde-9d28-8a8da5e5d251.csv
Wrote KPI a07ac296-de40-3a7c-8df3-91f642cc14d0 to ../../datasets/kpi/test/a07ac296-de40-3a7c-8df3-91f642cc14d0.csv
Wrote KPI 4d2af31a-9916-3d9f-8a8e-8a268a48c095 to ../../datasets/kpi/test/4d2af31a-9916-3d9f-8a8e-8a268a48c095.csv
Wrote KPI 847e8ecc-f8d2-3a93-9107-f367a0aab37d to ../../datasets/kpi/test/847e8ecc-f8d2-3a93-9107-f367a0aab37d.csv
Wrote KPI 7103fa0f-cac4-314f-addc-866190247439 to ../../datasets/kpi/test/7103fa