In [7]:
import gc
import time
import tracemalloc
from itertools import product
from pathlib import Path

import numpy as np
import pandas as pd
import psutil
import seaborn as sns
from sklearn.decomposition import IncrementalPCA
from tqdm import tqdm

try:
    from datasketches import kll_floats_sketch
except ImportError:
    raise ImportError("Install with `pip install datasketches`")

sns.set(style="whitegrid")
RESULTS_DIR = Path("stream_experiment_results")
RESULTS_DIR.mkdir(exist_ok=True)


# Utilities
def current_mem_rss():
    return psutil.Process().memory_info().rss


def sketch_serialized_size(sketch: kll_floats_sketch):
    return len(sketch.serialize())


def generate_stream(
    n_samples, n_features, batch_size, distribution="normal", variance=1.0, seed=42
):
    rng = np.random.default_rng(seed)
    n_batches = int(np.ceil(n_samples / batch_size))
    for _ in range(n_batches):
        if distribution == "normal":
            batch = rng.normal(0.0, variance, size=(batch_size, n_features))
        else:
            raise ValueError(f"Unsupported distribution: {distribution}")
        yield batch.astype(np.float32)


def pca_reconstruction_mse(pca_obj, X):
    try:
        comps = pca_obj.transform(X)
        recon = pca_obj.inverse_transform(comps)
        return float(np.mean((X - recon) ** 2))
    except:
        return np.nan


def kll_median_error(sketches, X):
    errs = []
    for j, s in enumerate(sketches):
        approx = s.get_quantile(0.5)
        truth = np.median(X[:, j])
        errs.append(abs(truth - approx))
    return float(np.mean(errs))


# Inserted imports and setup (same as before)


def run_config(
    n_samples,
    n_features,
    batch_size,
    kll_k,
    n_components,
    seed,
    compute_accuracy=False,
    mode="both",
):
    tracemalloc.start()
    gc.collect()

    stream = generate_stream(n_samples, n_features, batch_size, seed=seed)
    rss_baseline = current_mem_rss()

    ipca = (
        IncrementalPCA(n_components=n_components) if mode in ["both", "pca"] else None
    )
    sketches = (
        [kll_floats_sketch(kll_k) for _ in range(n_features)]
        if mode in ["both", "kll"]
        else None
    )

    holdout = (
        np.vstack(
            [next(generate_stream(5000, n_features, 1000, seed=999)) for _ in range(5)]
        )
        if compute_accuracy
        else None
    )

    metrics = []
    start_time = time.perf_counter()

    for i, batch in enumerate(stream, 1):
        t_data = 0.0

        # PCA
        if mode in ["both", "pca"]:
            t0 = time.perf_counter()
            ipca.partial_fit(batch)
            t_ipca = time.perf_counter() - t0
            ipca_mem = current_mem_rss() - rss_baseline
        else:
            t_ipca = 0.0
            ipca_mem = 0.0

        # KLL
        if mode in ["both", "kll"]:
            t0 = time.perf_counter()
            for j in range(n_features):
                sketches[j].update(batch[:, j])
            t_kll = time.perf_counter() - t0
            kll_mem = current_mem_rss() - rss_baseline
        else:
            t_kll = 0.0
            kll_mem = 0.0

        pca_mse = (
            pca_reconstruction_mse(ipca, holdout)
            if compute_accuracy and ipca and i % 5 == 0
            else np.nan
        )
        kll_err = (
            kll_median_error(sketches, batch)
            if compute_accuracy and sketches
            else np.nan
        )

        metrics.append(
            {
                "batch": i,
                "t_data": t_data,
                "t_ipca": t_ipca,
                "t_kll": t_kll,
                "throughput_sps": batch_size / (t_ipca + t_kll),
                "ipca_mem_delta": ipca_mem,
                "kll_mem_delta": kll_mem,
                "pca_mse": pca_mse,
                "kll_median_err": kll_err,
            }
        )

    total_time = time.perf_counter() - start_time
    _, peak_rss = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    kll_serialized_size = (
        sum(sketch_serialized_size(s) for s in sketches) if sketches else 0
    )

    full_df = pd.DataFrame(metrics)
    summary = {
        "mode": mode,
        "kll_k": kll_k,
        "n_components": n_components,
        "seed": seed,
        "total_time_s": total_time,
        "peak_rss_bytes": peak_rss,
        "mean_pca_mse": full_df["pca_mse"].mean(),
        "mean_kll_err": full_df["kll_median_err"].mean(),
        "n_samples": n_samples,
        "n_features": n_features,
        "batch_size": batch_size,
        "kll_serialized_size_bytes": kll_serialized_size,
    }

    return summary, full_df

In [8]:
# Run Grid
n_samples = 100_000
n_features = 100
batch_size = 1000
kll_ks = [8, 16, 32, 64, 128, 200]
n_comps = [2, 5, 10, 20, 50, 100]
seeds = [42, 101, 202]

# Run Grid: separate PCA and KLL experiments
grid_results = []
batch_metrics = []

for mode in ["pca", "kll"]:
    for kll_k, n_comp, seed in tqdm(
        product(kll_ks, n_comps, seeds), total=len(kll_ks) * len(n_comps) * len(seeds)
    ):
        summary, batch_df = run_config(
            n_samples=n_samples,
            n_features=n_features,
            batch_size=batch_size,
            kll_k=kll_k,
            n_components=n_comp,
            seed=seed,
            compute_accuracy=True,
            mode=mode,
        )
        batch_df["mode"] = mode
        batch_df["kll_k"] = kll_k
        batch_df["n_components"] = n_comp
        batch_df["seed"] = seed

        grid_results.append(summary)
        batch_metrics.append(batch_df)

# Save results
summary_df = pd.DataFrame(grid_results)
batch_df = pd.concat(batch_metrics, ignore_index=True)

summary_df.to_csv(RESULTS_DIR / "summary_metrics_split.csv", index=False)
batch_df.to_csv(RESULTS_DIR / "all_batch_metrics_split.csv", index=False)

100%|██████████| 108/108 [02:12<00:00,  1.22s/it]
100%|██████████| 108/108 [01:07<00:00,  1.59it/s]


KeyError: 'time_mean'