# My Generator Benchmark (Full)

This is the bigger benchmark notebook.

I use it to show deeper current progress:
1. feature support on more configs
2. config impact with more variants
3. speed scaling with rows and workers

This one can take longer than the light notebook.


In [None]:
import copy
import sys
import time
from pathlib import Path

import pandas as pd
from IPython.display import display

ROOT = Path.cwd()
if not (ROOT / 'src').exists() and (ROOT.parent / 'src').exists():
    ROOT = ROOT.parent

SRC = ROOT / 'src'
if SRC.exists() and str(SRC) not in sys.path:
    sys.path.insert(0, str(SRC))

from itergen import ItergenSynthesizer, RunConfig, get_sample_config
from itergen.schema.config import build_column_specs

pd.set_option('display.max_columns', 50)
pd.set_option('display.width', 220)

print(f'Project root: {ROOT}')


def summarize_column_kinds(config):
    specs = build_column_specs(config)
    by_kind = {'binary': [], 'categorical': [], 'continuous': []}
    for col_id, spec in specs.items():
        by_kind.setdefault(spec.get('kind', 'other'), []).append(col_id)
    return specs, by_kind


def run_generation_case(label, config, run_overrides=None):
    params = dict(
        n_rows=2500,
        seed=101,
        tolerance=0.03,
        max_attempts=4,
        log_level='quiet',
        save_output=False,
        proposal_scoring_mode='incremental',
    )
    if run_overrides:
        params.update(run_overrides)

    run_cfg = RunConfig(**params)

    t0 = time.perf_counter()
    result = ItergenSynthesizer(config, run_cfg).generate()
    elapsed = time.perf_counter() - t0

    specs, by_kind = summarize_column_kinds(config)
    frame = result.dataframe

    summary = {
        'label': label,
        'rows': int(frame.shape[0]),
        'cols': int(frame.shape[1]),
        'binary_cols': len(by_kind.get('binary', [])),
        'categorical_cols': len(by_kind.get('categorical', [])),
        'continuous_cols': len(by_kind.get('continuous', [])),
        'success': bool(result.success),
        'attempts': int(result.attempts),
        'objective': float(result.metrics.get('objective', float('nan'))),
        'max_error': float(result.metrics.get('max_error', float('nan'))),
        'confidence': float(result.quality_report.get('confidence', float('nan'))),
        'runtime_sec': float(elapsed),
        'rows_per_sec': float(frame.shape[0] / elapsed) if elapsed > 0 else float('nan'),
    }
    return summary, result, specs, by_kind


def safe_first(items):
    return items[0] if items else None


In [None]:
PROFILE_NAME = 'full'

FEATURE_CASES = [
    ('Binary', 'binary', 3000),
    ('Categorical', 'categorical', 3000),
    ('Continuous', 'continuous', 3000),
    ('Continuous Parent Bins', 'continuous_parent_bins', 3000),
    ('Mixed', 'mixed', 3500),
    ('Mixed Large', 'mixed_large', 4000),
]

CONFIG_BASE_SAMPLE = 'mixed'
CONFIG_VARIANTS = [
    ('baseline_incremental', {}),
    ('stricter_tolerance', {'tolerance': 0.015, 'max_attempts': 6}),
    ('very_strict', {'tolerance': 0.01, 'max_attempts': 8}),
    ('faster_looser', {'tolerance': 0.06, 'max_attempts': 3}),
    ('full_scoring_mode', {'proposal_scoring_mode': 'full'}),
    ('attempt_workers_2', {'attempt_workers': 2}),
    ('attempt_workers_4', {'attempt_workers': 4}),
    ('small_group_lock', {'small_group_mode': 'lock'}),
    ('torch_controller_requested', {'use_torch_controller': True, 'torch_required': False}),
]

PERF_ROW_LEVELS = [1200, 2400, 4800]
PERF_WORKER_LEVELS = [1, 2, 4]
PERF_REPEATS = 3
PERF_MAX_ATTEMPTS = 5
PERF_OPTIMIZE_OVERRIDES = {
    'max_iters': 140,
    'patience': 10,
    'batch_size': 512,
    'proposals_per_batch': 24,
}

print(f'Loaded profile: {PROFILE_NAME}')


## Part 1 - Feature check


In [None]:
feature_rows = []
feature_artifacts = {}

for label, sample, n_rows in FEATURE_CASES:
    print(f'Running feature case: {label}')
    cfg = get_sample_config(sample)
    summary, result, specs, by_kind = run_generation_case(
        label=label,
        config=cfg,
        run_overrides={'n_rows': n_rows, 'seed': 202, 'max_attempts': 5},
    )
    summary['sample'] = sample
    feature_rows.append(summary)
    feature_artifacts[sample] = {'result': result, 'specs': specs, 'by_kind': by_kind}
    print(f'Done: {label} in {summary["runtime_sec"]:.2f}s')

feature_df = pd.DataFrame(feature_rows).sort_values('runtime_sec').reset_index(drop=True)
display(feature_df[['label', 'sample', 'rows', 'cols', 'binary_cols', 'categorical_cols', 'continuous_cols', 'success', 'attempts', 'objective', 'max_error', 'confidence', 'runtime_sec', 'rows_per_sec']].round(4))

checks = []
for sample, payload in feature_artifacts.items():
    df = payload['result'].dataframe
    specs = payload['specs']
    by_kind = payload['by_kind']

    for col in by_kind.get('binary', []):
        observed = set(pd.Series(df[col]).dropna().unique().tolist())
        checks.append({'sample': sample, 'column': col, 'check': 'binary in {0,1}', 'ok': observed.issubset({0, 1})})

    for col in by_kind.get('categorical', []):
        allowed = set(specs[col].get('labels') or [])
        observed = set(pd.Series(df[col]).dropna().astype(str).unique().tolist())
        checks.append({'sample': sample, 'column': col, 'check': 'categorical in labels', 'ok': observed.issubset(allowed)})

    for col in by_kind.get('continuous', []):
        targets = specs[col].get('targets') or {}
        min_v = targets.get('min')
        max_v = targets.get('max')
        if min_v is None and max_v is None:
            continue
        series = pd.to_numeric(df[col], errors='coerce')
        bad = pd.Series(False, index=series.index)
        if min_v is not None:
            bad = bad | (series < float(min_v))
        if max_v is not None:
            bad = bad | (series > float(max_v))
        checks.append({'sample': sample, 'column': col, 'check': 'continuous min/max bound', 'ok': float(bad.mean()) <= 0.02})

checks_df = pd.DataFrame(checks)
if not checks_df.empty:
    checks_summary = checks_df.groupby('sample', as_index=False).agg(total=('ok', 'count'), passed=('ok', 'sum'))
    checks_summary['pass_rate'] = checks_summary['passed'] / checks_summary['total']
    display(checks_summary.sort_values('pass_rate', ascending=False).round(4))


## Part 2 - Config impact check


In [None]:
base_config = get_sample_config(CONFIG_BASE_SAMPLE)
variant_rows = []
variant_results = {}

for name, overrides in CONFIG_VARIANTS:
    print(f'Running variant: {name}')
    summary, result, specs, by_kind = run_generation_case(
        label=name,
        config=copy.deepcopy(base_config),
        run_overrides={
            'n_rows': 2800,
            'seed': 404,
            'tolerance': 0.03,
            'max_attempts': 5,
            **overrides,
        },
    )
    summary['variant'] = name
    variant_rows.append(summary)
    variant_results[name] = {'result': result, 'specs': specs, 'by_kind': by_kind}
    print(f'Done variant: {name} | objective={summary["objective"]:.4f} | runtime={summary["runtime_sec"]:.2f}s')

variants_df = pd.DataFrame(variant_rows).sort_values('objective').reset_index(drop=True)
base_obj = float(variants_df.loc[variants_df['variant'] == 'baseline_incremental', 'objective'].iloc[0])
base_runtime = float(variants_df.loc[variants_df['variant'] == 'baseline_incremental', 'runtime_sec'].iloc[0])
variants_df['objective_delta_vs_base'] = variants_df['objective'] - base_obj
variants_df['runtime_delta_vs_base_sec'] = variants_df['runtime_sec'] - base_runtime
display(variants_df[['variant', 'success', 'attempts', 'objective', 'objective_delta_vs_base', 'max_error', 'confidence', 'runtime_sec', 'runtime_delta_vs_base_sec']].round(4))

baseline = variant_results['baseline_incremental']
binary_col = safe_first(baseline['by_kind'].get('binary', []))
categorical_col = safe_first(baseline['by_kind'].get('categorical', []))
continuous_col = safe_first(baseline['by_kind'].get('continuous', []))

rows = []
for name, payload in variant_results.items():
    df = payload['result'].dataframe
    row = {'variant': name}
    if binary_col and binary_col in df.columns:
        row[f'{binary_col}_rate'] = float(pd.to_numeric(df[binary_col], errors='coerce').mean())
    if categorical_col and categorical_col in df.columns:
        dist = df[categorical_col].value_counts(normalize=True)
        row[f'{categorical_col}_top'] = str(dist.index[0]) if not dist.empty else None
        row[f'{categorical_col}_top_share'] = float(dist.iloc[0]) if not dist.empty else float('nan')
    if continuous_col and continuous_col in df.columns:
        series = pd.to_numeric(df[continuous_col], errors='coerce')
        row[f'{continuous_col}_mean'] = float(series.mean())
        row[f'{continuous_col}_std'] = float(series.std(ddof=0))
    rows.append(row)

display(pd.DataFrame(rows).sort_values('variant').reset_index(drop=True).round(4))


## Part 3 - Performance check


In [None]:
perf_config = get_sample_config('mixed')
perf_rows = []

for n_rows in PERF_ROW_LEVELS:
    for workers in PERF_WORKER_LEVELS:
        for rep in range(PERF_REPEATS):
            print(f'Perf run -> rows={n_rows}, workers={workers}, repeat={rep}')
            seed = 900 + rep
            summary, _result, _specs, _by_kind = run_generation_case(
                label=f'rows_{n_rows}_w_{workers}_r_{rep}',
                config=copy.deepcopy(perf_config),
                run_overrides={
                    'n_rows': n_rows,
                    'seed': seed,
                    'tolerance': 0.03,
                    'max_attempts': PERF_MAX_ATTEMPTS,
                    'attempt_workers': workers,
                    'proposal_scoring_mode': 'incremental',
                    'optimize_overrides': PERF_OPTIMIZE_OVERRIDES,
                },
            )
            perf_rows.append({
                'n_rows': n_rows,
                'workers': workers,
                'repeat': rep,
                'runtime_sec': summary['runtime_sec'],
                'rows_per_sec': summary['rows_per_sec'],
                'objective': summary['objective'],
                'max_error': summary['max_error'],
                'success': summary['success'],
                'attempts': summary['attempts'],
            })

perf_df = pd.DataFrame(perf_rows)
display(perf_df.head(18).round(4))

perf_agg = perf_df.groupby(['n_rows', 'workers'], as_index=False).agg(
    avg_runtime_sec=('runtime_sec', 'mean'),
    median_runtime_sec=('runtime_sec', 'median'),
    avg_rows_per_sec=('rows_per_sec', 'mean'),
    avg_objective=('objective', 'mean'),
    avg_max_error=('max_error', 'mean'),
    success_rate=('success', 'mean'),
    avg_attempts=('attempts', 'mean'),
)

base = perf_agg[perf_agg['workers'] == 1][['n_rows', 'avg_runtime_sec']].rename(columns={'avg_runtime_sec': 'runtime_w1'})
perf_agg = perf_agg.merge(base, on='n_rows', how='left')
perf_agg['speedup_vs_1'] = perf_agg['runtime_w1'] / perf_agg['avg_runtime_sec']
display(perf_agg.round(4))

speedup_table = perf_agg.pivot(index='n_rows', columns='workers', values='speedup_vs_1')
display(speedup_table.round(3))

try:
    import matplotlib.pyplot as plt

    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    for workers in sorted(perf_agg['workers'].unique()):
        sub = perf_agg[perf_agg['workers'] == workers]
        axes[0].plot(sub['n_rows'], sub['avg_runtime_sec'], marker='o', label=f'workers={workers}')
        axes[1].plot(sub['n_rows'], sub['avg_rows_per_sec'], marker='o', label=f'workers={workers}')
    axes[0].set_title('Runtime vs Rows (Full)')
    axes[0].set_xlabel('n_rows')
    axes[0].set_ylabel('seconds')
    axes[0].legend()
    axes[1].set_title('Throughput vs Rows (Full)')
    axes[1].set_xlabel('n_rows')
    axes[1].set_ylabel('rows/sec')
    axes[1].legend()
    plt.tight_layout()
    plt.show()
except Exception as exc:
    print(f'Plot skipped: {exc}')

print('Current progress: full benchmark finished')


## Output explanation and result notes

How to read output values:
- `objective`: main quality score; lower is better
- `max_error`: largest error point; lower is better
- `confidence`: quick quality indicator; higher is better
- `runtime_sec`: wall time for the run
- `rows_per_sec`: throughput
- `speedup_vs_1`: gain vs one worker

Possible causes behind results:
- more features and conditions increase optimization work
- strict tolerance usually improves fit but can slow down runtime
- worker scaling depends on workload size and process overhead
- run-to-run differences can appear due to search path and randomness


In [None]:
print('=== Full run discussion ===')

if 'feature_df' in globals() and len(feature_df) > 0:
    fast = feature_df.sort_values('runtime_sec').iloc[0]
    slow = feature_df.sort_values('runtime_sec', ascending=False).iloc[0]
    best = feature_df.sort_values('objective').iloc[0]
    print(f'- Fastest feature case: {fast["label"]} ({fast["runtime_sec"]:.2f}s).')
    print(f'- Slowest feature case: {slow["label"]} ({slow["runtime_sec"]:.2f}s).')
    print(f'- Best feature quality: {best["label"]} (objective={best["objective"]:.4f}).')
    print('  Possible cause: large or highly dependent configs need more optimization passes.')

if 'variants_df' in globals() and len(variants_df) > 0:
    best_variant = variants_df.sort_values('objective').iloc[0]
    fast_variant = variants_df.sort_values('runtime_sec').iloc[0]
    print(f'- Best variant quality: {best_variant["variant"]} (objective={best_variant["objective"]:.4f}).')
    print(f'- Fastest variant runtime: {fast_variant["variant"]} ({fast_variant["runtime_sec"]:.2f}s).')

    base_rows = variants_df[variants_df['variant'] == 'baseline_incremental']
    if len(base_rows) > 0:
        base = base_rows.iloc[0]
        dq = best_variant['objective'] - base['objective']
        dt = fast_variant['runtime_sec'] - base['runtime_sec']
        print(f'- Delta vs baseline (quality best): {dq:.4f} objective points.')
        print(f'- Delta vs baseline (fastest runtime): {dt:.2f}s.')
        print('  Possible cause: stronger rules/settings improve fit but can cost time.')

if 'perf_agg' in globals() and len(perf_agg) > 0:
    best_speed = perf_agg.sort_values('speedup_vs_1', ascending=False).iloc[0]
    print(
        f'- Best worker scaling: rows={int(best_speed["n_rows"])}, workers={int(best_speed["workers"])}, '
        f'speedup={best_speed["speedup_vs_1"]:.2f}x'
    )
    if best_speed['speedup_vs_1'] > 1.5:
        print('  Cause idea: workload is large enough to amortize process overhead.')
    elif best_speed['speedup_vs_1'] > 1.1:
        print('  Cause idea: parallelism helps, but overhead still limits perfect scaling.')
    else:
        print('  Cause idea: overhead or contention may hide worker benefits here.')

print('Current progress note: full results discussed.')


## My final conclusion (fill this after run)

Use 4-5 short lines:
- Current progress: ...
- Best quality result: ...
- Best speed result: ...
- Main trade-off I saw: ...
- Possible cause of this behavior: ...
- Next thing I will improve: ...
