# Data Preparation & Split Generation

In [None]:
import os, sys, subprocess, json
from pathlib import Path
from hashlib import blake2b
import pandas as pd
import numpy as np

# Locate repo root and add to path
HERE = Path.cwd().resolve()
REPO_ROOT = None
for _p in [HERE] + list(HERE.parents):
    if (_p / 'ml').exists() and (_p / 'data').exists():
        REPO_ROOT = _p
        break
assert REPO_ROOT, f'Cannot find repo root from {HERE}'
os.chdir(REPO_ROOT)
sys.path.insert(0, str(REPO_ROOT))

from ml.notebooks.experiment_config import *

print('[*] Repo root        :', REPO_ROOT)
print('[*] Random seed      :', RANDOM_SEED)
print('[*] Merged parquet   :', MERGED_PARQUET)
print('[*] Zeek-only out    :', ZEEK_ONLY_PARQUET)
print('[*] Zeek+eBPF out    :', ZEEK_EBPF_PARQUET)

## Build feature-set parquets

In [None]:
assert MERGED_PARQUET.exists(), f'Missing: {MERGED_PARQUET}'

report_dir = REPORTS_DIR / 'make_datasets'
report_dir.mkdir(parents=True, exist_ok=True)

subprocess.run([
    'python3', 'ml/data_prep/make_datasets.py',
    '--in_parquet',  str(MERGED_PARQUET),
    '--out_baseline', str(ZEEK_ONLY_PARQUET),
    '--out_enhanced', str(ZEEK_EBPF_PARQUET),
    '--report_dir',  str(report_dir),
], check=True)

# Quick sanity
for p in [ZEEK_ONLY_PARQUET, ZEEK_EBPF_PARQUET]:
    df_tmp = pd.read_parquet(p, columns=['label_family'])
    print(f'[+] {p.name}: {len(df_tmp):,} rows, labels: {sorted(df_tmp["label_family"].unique())}')

## Session-aware temporal splits (PRIMARY strategy)

In [None]:
print('[*] SESSION-AWARE TEMPORAL SPLIT')
print('Groups flows by (src_ip, dst_ip, proto, day), orders groups by earliest')
print('timestamp, then assigns chronologically.  Prevents both time and session leakage.')

for label, parquet_path, out_dir in [
    ('zeek_only', ZEEK_ONLY_PARQUET, SPLITS_SESSION_TEMPORAL_BASELINE),
    ('zeek_ebpf', ZEEK_EBPF_PARQUET, SPLITS_SESSION_TEMPORAL_EBPF),
]:
    print(f'\n\n[*] {label}')
    subprocess.run([
        'python3', 'ml/data_prep/split_session_temporal.py',
        '--in_parquet', str(parquet_path),
        '--out_dir',    str(out_dir),
        '--train_frac', '0.70',
        '--val_frac',   '0.15',
        '--test_frac',  '0.15',
        '--seed',       str(RANDOM_SEED),
    ], check=True)

print('\n[+] Session-temporal splits done.')

## Within-day time splits (COMPARISON reference)

In [None]:
print('[*] WITHIN-DAY TIME SPLIT')
print('Splits within each day by timestamp.  Prevents time leakage but NOT session leakage.')
print('Kept for direct comparison with the session-temporal results.')

for label, parquet_path, out_dir in [
    ('zeek_only', ZEEK_ONLY_PARQUET, SPLITS_WITHIN_DAY_BASELINE),
    ('zeek_ebpf', ZEEK_EBPF_PARQUET, SPLITS_WITHIN_DAY_EBPF),
]:
    print(f'\n\n[*] {label}')
    subprocess.run([
        'python3', 'ml/data_prep/split_days_auto.py',
        '--in_parquet', str(parquet_path),
        '--out_dir',    str(out_dir),
        '--protocol',   'within_day_time',
        '--seed',       str(RANDOM_SEED),
        '--train_frac', '0.70',
        '--val_frac',   '0.15',
        '--test_frac',  '0.15',
    ], check=True)

print('\n[+] Within-day splits done.')

## Day-holdout splits (STRESS-TEST: unseen days)

In [None]:
print('[*] DAY HOLDOUT SPLIT (Primary preset: Mon/Tue/Wed train, Thu val, Fri test)')
print('WARNING: some attack families only appear on one day and will be unseen in train.')
print('Used only as a generalisation stress-test.')

for label, parquet_path, out_dir in [
    ('zeek_only', ZEEK_ONLY_PARQUET, SPLITS_DAY_HOLDOUT_BASELINE),
    ('zeek_ebpf', ZEEK_EBPF_PARQUET, SPLITS_DAY_HOLDOUT_EBPF),
]:
    print(f'\n\n[*] {label}')
    subprocess.run([
        'python3', 'ml/data_prep/split_by_day.py',
        '--in_parquet', str(parquet_path),
        '--out_dir',    str(out_dir),
        '--split',      'primary',
    ], check=True)

print('\n[+] Day-holdout splits done.')

## Leakage & label-coverage diagnostics

In [None]:
def _hash_rows(df: pd.DataFrame, cols: list) -> set:
    use = df[cols].copy()
    for c in cols:
        use[c] = use[c].astype(str)
    joined = use.apply(lambda r: '||'.join(r.values.tolist()), axis=1)
    return set(joined.apply(lambda s: blake2b(s.encode('utf-8'), digest_size=8).hexdigest()))

def leakage_report(splits_dir: Path, name: str):
    splits_dir = Path(splits_dir)
    if not splits_dir.exists():
        print(f'[!] Skipping {name} — directory not found: {splits_dir}')
        return

    train = pd.read_parquet(splits_dir / 'train.parquet')
    val   = pd.read_parquet(splits_dir / 'val.parquet')
    test  = pd.read_parquet(splits_dir / 'test.parquet')

    print(f'Leakage check: {name}')
    print(f'rows  train/val/test: {len(train):,} / {len(val):,} / {len(test):,}')

    # 5-tuple session overlap
    tuple_cols = [c for c in ['orig_h','resp_h','orig_p','resp_p','proto','src_ip','dst_ip'] if c in train.columns]
    if tuple_cols:
        ht = _hash_rows(train, tuple_cols[:5])
        hv = _hash_rows(val,   tuple_cols[:5])
        hs = _hash_rows(test,  tuple_cols[:5])
        print(f'5-tuple overlap  train∩val={len(ht&hv):,}  train∩test={len(ht&hs):,}  val∩test={len(hv&hs):,}')
        if len(ht&hs) > 0:
            print(f'  -> {len(ht&hs)/max(len(hs),1)*100:.1f}% of test sessions seen in train (session leakage if high)')
    else:
        print('[!] No 5-tuple columns found.')

    # Exact-row duplicate check
    full_cols = [c for c in train.columns if c not in ['ts','start_ts','end_ts','t_end']][:50]
    hf_tr = _hash_rows(train, full_cols)
    hf_va = _hash_rows(val,   full_cols)
    hf_te = _hash_rows(test,  full_cols)
    print(f'Exact-row overlap train∩val={len(hf_tr&hf_va):,}  train∩test={len(hf_tr&hf_te):,}')

    # Label coverage
    lc = 'label_family'
    if lc in train.columns:
        tr_lbl = set(train[lc].astype(str).unique())
        va_lbl = set(val[lc].astype(str).unique())
        te_lbl = set(test[lc].astype(str).unique())
        unseen_val  = sorted(va_lbl - tr_lbl)
        unseen_test = sorted(te_lbl - tr_lbl)
        print(f'Attack families unseen in train but in val : {unseen_val}')
        print(f'Attack families unseen in train but in test: {unseen_test}')

# Run for all three split strategies
leakage_report(SPLITS_SESSION_TEMPORAL_BASELINE, 'session_temporal / zeek_only')
leakage_report(SPLITS_SESSION_TEMPORAL_EBPF,     'session_temporal / zeek_ebpf')
leakage_report(SPLITS_WITHIN_DAY_BASELINE,        'within_day_time  / zeek_only')
leakage_report(SPLITS_DAY_HOLDOUT_BASELINE,       'day_holdout      / zeek_only')

## Label distribution overview

In [None]:
import matplotlib.pyplot as plt

def plot_label_dist(splits_dir: Path, title: str):
    splits_dir = Path(splits_dir)
    if not splits_dir.exists():
        print(f'[!] Skipping {title}')
        return
    fig, axes = plt.subplots(1, 3, figsize=(18, 5), sharey=False)
    for ax, split in zip(axes, ['train', 'val', 'test']):
        df = pd.read_parquet(splits_dir / f'{split}.parquet', columns=['label_family'])
        vc = df['label_family'].value_counts()
        vc.plot(kind='barh', ax=ax)
        ax.set_title(f'{split} ({len(df):,} rows)')
        ax.set_xlabel('Count')
    fig.suptitle(title, fontsize=13, fontweight='bold')
    plt.tight_layout()
    plt.savefig(REPORTS_DIR / f'label_dist_{title.replace(" ","_").replace("/","-")}.png', dpi=150)
    plt.show()

plot_label_dist(SPLITS_SESSION_TEMPORAL_BASELINE, 'session_temporal / zeek_only')
plot_label_dist(SPLITS_WITHIN_DAY_BASELINE,        'within_day_time  / zeek_only')
plot_label_dist(SPLITS_DAY_HOLDOUT_BASELINE,       'day_holdout      / zeek_only')