# SATP Results - Modalita Streaming

Notebook ottimizzato per dataset grandi: lettura a chunk e riduzione su bin di progresso.

In [None]:
from pathlib import Path
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

plt.style.use('seaborn-v0_8-whitegrid')
RESULTS_PATH = Path('..').resolve() / 'results.csv'
BINS = 400

metrics = [
    'f0_mean','f0_hat_mean','bias','difference','variance','stddev','rmse','mae',
    'mean_relative_error','bias_relative','rse_observed'
]
keys = ['algorithm','sample_size','distinct_count','seed','progress_bin']
usecols = ['mode','algorithm','sample_size','distinct_count','seed','element_index'] + metrics

sum_acc = None
cnt_acc = None

for chunk in pd.read_csv(RESULTS_PATH, usecols=usecols, chunksize=300_000):
    s = chunk[chunk['mode'] == 'streaming'].copy()
    if s.empty:
        continue
    s['sample_size'] = pd.to_numeric(s['sample_size'], errors='coerce')
    s['element_index'] = pd.to_numeric(s['element_index'], errors='coerce')
    for m in metrics:
        s[m] = pd.to_numeric(s[m], errors='coerce')

    progress = s['element_index'] / s['sample_size']
    s['progress_bin'] = np.rint(progress * BINS).clip(0, BINS).astype(int)

    gb = s.groupby(keys, observed=True)
    sums = gb[metrics].sum(min_count=1)
    cnts = gb.size().astype(float)

    sum_acc = sums if sum_acc is None else sum_acc.add(sums, fill_value=0.0)
    cnt_acc = cnts if cnt_acc is None else cnt_acc.add(cnts, fill_value=0.0)

if sum_acc is None:
    raise RuntimeError('Nessuna riga streaming trovata in results.csv')

reduced = sum_acc.div(cnt_acc, axis=0).reset_index()
reduced['progress'] = reduced['progress_bin'] / BINS
reduced['element_index_est'] = (reduced['progress'] * reduced['sample_size']).round().clip(lower=1).astype(int)

print('results:', RESULTS_PATH)
print('rows reduced:', len(reduced))
print('sample sizes:', sorted(reduced['sample_size'].dropna().unique().tolist()))
print('algorithms:', sorted(reduced['algorithm'].dropna().unique().tolist()))


In [None]:
# Metriche vs progress_bin, con pannelli per sample_size
plot_metrics = [
    'f0_hat_mean','bias','difference','variance','stddev','rmse','mae',
    'mean_relative_error','bias_relative','rse_observed'
]
sizes = sorted(reduced['sample_size'].unique())
algos = sorted(reduced['algorithm'].unique())

def plot_facets(metric):
    n = len(sizes)
    ncols = 3
    nrows = math.ceil(n / ncols)
    fig, axes = plt.subplots(nrows, ncols, figsize=(5*ncols, 3.6*nrows), squeeze=False)
    for idx, size in enumerate(sizes):
        r = idx // ncols
        c = idx % ncols
        ax = axes[r][c]
        ss = reduced[reduced['sample_size'] == size].sort_values('progress')
        for algo in algos:
            g = ss[ss['algorithm'] == algo]
            ax.plot(g['progress'], g[metric], label=algo)
        ax.set_title(f'sample_size={int(size)}')
        ax.set_xlabel('progress (t / sample_size)')
        ax.set_ylabel(metric)
    for idx in range(len(sizes), nrows*ncols):
        r = idx // ncols
        c = idx % ncols
        axes[r][c].axis('off')
    handles, labels = axes[0][0].get_legend_handles_labels()
    fig.legend(handles, labels, loc='upper center', ncol=4)
    fig.suptitle(f'Streaming: {metric} vs progress', y=1.02, fontsize=14)
    plt.tight_layout()
    plt.show()

for m in plot_metrics:
    plot_facets(m)


In [None]:
# Dettaglio sul sample_size massimo (asse x log)
max_size = int(reduced['sample_size'].max())
rmax = reduced[reduced['sample_size'] == max_size].sort_values('element_index_est')

for m in plot_metrics:
    fig, ax = plt.subplots(figsize=(8.5, 4.8))
    for algo in algos:
        g = rmax[rmax['algorithm'] == algo]
        ax.plot(g['element_index_est'], g[m], label=algo)
    ax.set_xscale('log')
    ax.set_xlabel('element_index (t, stimato da bin)')
    ax.set_ylabel(m)
    ax.set_title(f'Streaming (sample_size={max_size}): {m} vs element_index')
    ax.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# Consistenza con normal alla fine stream (progress_bin == BINS)
def load_mode_rows(path: Path, mode: str, usecols, chunksize: int = 250_000) -> pd.DataFrame:
    parts = []
    for chunk in pd.read_csv(path, usecols=usecols, chunksize=chunksize):
        sub = chunk[chunk['mode'] == mode]
        if not sub.empty:
            parts.append(sub.copy())
    if not parts:
        return pd.DataFrame(columns=usecols)
    return pd.concat(parts, ignore_index=True)

cmp_metrics = ['f0_hat_mean','bias','rmse','mae','mean_relative_error','rse_observed']
join_cols = ['algorithm','sample_size','distinct_count','seed']

normal_usecols = ['mode'] + join_cols + cmp_metrics
normal = load_mode_rows(RESULTS_PATH, 'normal', normal_usecols)
for c in ['sample_size','distinct_count','seed'] + cmp_metrics:
    normal[c] = pd.to_numeric(normal[c], errors='coerce')

stream_last = reduced[reduced['progress_bin'] == BINS][join_cols + cmp_metrics].copy()
merged = normal[join_cols + cmp_metrics].merge(stream_last, on=join_cols, suffixes=('_normal','_stream_last'))

display(merged.head(20))

for m in cmp_metrics:
    fig, ax = plt.subplots(figsize=(6.2, 5.8))
    for algo in sorted(merged['algorithm'].unique()):
        g = merged[merged['algorithm'] == algo]
        ax.scatter(g[f'{m}_normal'], g[f'{m}_stream_last'], label=algo)
    mn = min(merged[f'{m}_normal'].min(), merged[f'{m}_stream_last'].min())
    mx = max(merged[f'{m}_normal'].max(), merged[f'{m}_stream_last'].max())
    ax.plot([mn, mx], [mn, mx], 'k--', linewidth=1, label='y=x')
    ax.set_xlabel(f'{m} (normal)')
    ax.set_ylabel(f'{m} (stream_last)')
    ax.set_title(f'Consistency: {m} normal vs stream_last')
    ax.legend()
    plt.tight_layout()
    plt.show()
