# Stress Test Surface Analysis

This notebook loads stress test runs produced by `inference/stress_inference.py`, extracts run metadata (model, variant, GPU, etc.), and builds pivot tables plus 3D surface plots for tokens-per-second throughput and peak memory usage. Adjust the configuration cells below to point at specific runs or compare multiple runs.

In [None]:
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Dict, List

import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.tri import LinearTriInterpolator, Triangulation
import numpy as np
import pandas as pd

plt.style.use('seaborn-v0_8-darkgrid')

: 

In [None]:
def detect_reports_root() -> Path:
    '''Best-effort search for the repo reports directory, regardless of notebook cwd.'''
    candidates = []
    cwd = Path.cwd().resolve()
    candidates.append(cwd)
    candidates.append(cwd / 'reports')
    if cwd.parent != cwd:
        candidates.append(cwd.parent)
        candidates.append(cwd.parent / 'reports')
    if len(cwd.parents) > 1:
        candidates.append(cwd.parents[1])
        candidates.append(cwd.parents[1] / 'reports')
    for candidate in candidates:
        if candidate is None:
            continue
        candidate = candidate.resolve()
        reports_dir = candidate if candidate.name == 'reports' else candidate / 'reports'
        if (reports_dir / 'stress').exists():
            return reports_dir
    raise FileNotFoundError('Could not locate reports/stress directory from current working directory.')

REPORTS_ROOT = detect_reports_root()
STRESS_ROOT = REPORTS_ROOT / 'stress'
print(f'Using reports root: {REPORTS_ROOT}')
print(f'Looking for runs under : {STRESS_ROOT}')

In [None]:
def iter_run_directories(root: Path) -> List[Path]:
    return sorted({path.parent for path in root.rglob('results.jsonl')})

def load_run_records(run_dir: Path) -> List[Dict[str, Any]]:
    metadata_path = run_dir / 'metadata.json'
    results_path = run_dir / 'results.jsonl'
    if not results_path.exists():
        return []
    metadata: Dict[str, Any] = {}
    if metadata_path.exists():
        metadata = json.loads(metadata_path.read_text())
    rows: List[Dict[str, Any]] = []
    with results_path.open() as handle:
        for line in handle:
            line = line.strip()
            if not line or line.startswith('Total output lines'):
                continue
            record = json.loads(line)
            result = record.get('result', {})
            generation = result.get('generation', {})
            runtime = result.get('runtime', {})
            model_info = result.get('model', {})
            gpu_info = runtime.get('gpu', {}) or {}
            input_tokens = generation.get('input_length_tokens', record.get('target_input_tokens'))
            new_tokens = generation.get('new_tokens')
            if new_tokens is None and generation.get('output_length_tokens') is not None and input_tokens is not None:
                new_tokens = generation['output_length_tokens'] - input_tokens
            output_tokens = new_tokens if new_tokens is not None else record.get('target_output_tokens')
            tokens_per_second = generation.get('tokens_per_second')
            max_memory_mb = runtime.get('max_memory_megabytes')
            if max_memory_mb is None and runtime.get('max_memory_bytes') is not None:
                max_memory_mb = runtime['max_memory_bytes'] / (1024 ** 2)
            rows.append({
                'run_directory': str(run_dir),
                'timestamp': result.get('timestamp'),
                'model_name': metadata.get('model_name') or model_info.get('name'),
                'model_variant': metadata.get('model_variant') or model_info.get('variant'),
                'gpu_name': metadata.get('gpu_name') or gpu_info.get('name'),
                'target_input_tokens': record.get('target_input_tokens'),
                'target_output_tokens': record.get('target_output_tokens'),
                'input_tokens': input_tokens,
                'output_tokens': output_tokens,
                'tokens_per_second': tokens_per_second,
                'max_memory_mb': max_memory_mb,
                'device': runtime.get('resolved_device'),
                'dtype': runtime.get('dtype'),
                'temperature': generation.get('temperature'),
                'do_sample': generation.get('do_sample'),
            })
    return rows

def load_all_runs(stress_root: Path) -> pd.DataFrame:
    records: List[Dict[str, Any]] = []
    for run_dir in iter_run_directories(stress_root):
        records.extend(load_run_records(run_dir))
    if not records:
        return pd.DataFrame()
    df = pd.DataFrame(records)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    return df

df = load_all_runs(STRESS_ROOT)
print(f"Loaded {len(df)} rows from {df['run_directory'].nunique() if not df.empty else 0} runs.")
df.head()

In [None]:
if df.empty:
    raise RuntimeError('No stress test results found. Run inference/stress_inference.py first.')

run_dirs = sorted(df['run_directory'].unique())
run_dirs

In [None]:
# Select a single run (latest by default). Override TARGET_RUN to inspect another directory.
latest_run = df.sort_values('timestamp')['run_directory'].iloc[-1]
TARGET_RUN = latest_run
print(f'Using run: {TARGET_RUN}')
run_df = df[df['run_directory'] == TARGET_RUN].copy()
run_df.sort_values(['input_tokens', 'output_tokens']).head()

In [None]:
meta = run_df[['model_name', 'model_variant', 'gpu_name', 'device', 'dtype']].drop_duplicates()
print('Run metadata:')
display(meta)

In [None]:
tokens_table = run_df.pivot_table(
    index='output_tokens',
    columns='input_tokens',
    values='tokens_per_second',
    aggfunc='mean'
).sort_index().sort_index(axis=1)

memory_table = run_df.pivot_table(
    index='output_tokens',
    columns='input_tokens',
    values='max_memory_mb',
    aggfunc='max'
).sort_index().sort_index(axis=1)

print('Tokens/sec surface table:')
display(tokens_table)
print('Peak memory (MB) surface table:')
display(memory_table)

In [None]:
def plot_surface(table: pd.DataFrame, title: str, z_label: str, cmap=cm.viridis) -> None:
    if table.empty:
        raise ValueError('Surface table is empty.')
    x = table.columns.to_numpy()
    y = table.index.to_numpy()
    X, Y = np.meshgrid(x, y)
    Z = table.to_numpy(dtype=float)
    Z_masked = np.ma.array(Z, mask=np.isnan(Z))

    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111, projection='3d')
    surf = ax.plot_surface(X, Y, Z_masked, cmap=cmap, edgecolor='none')
    ax.set_xlabel('Input tokens')
    ax.set_ylabel('Output tokens')
    ax.set_zlabel(z_label)
    ax.set_title(title)
    fig.colorbar(surf, shrink=0.6, aspect=10, label=z_label)
    plt.show()

plot_surface(tokens_table, 'Tokens/sec surface', 'Tokens/sec', cmap=cm.viridis)

In [None]:
plot_surface(memory_table, 'Peak memory surface', 'Max memory (MB)', cmap=cm.inferno)

In [None]:
ENABLE_INTERPOLATION = True
GRID_POINTS = 50

def interpolate_surface(run_frame: pd.DataFrame, value_column: str, grid_points: int = GRID_POINTS):
    data = run_frame[['input_tokens', 'output_tokens', value_column]].dropna()
    if data.shape[0] < 3:
        raise ValueError('Need at least 3 data points to interpolate a surface.')
    points = data[['input_tokens', 'output_tokens']].to_numpy(dtype=float)
    values = data[value_column].to_numpy(dtype=float)
    tri = Triangulation(points[:, 0], points[:, 1])
    interpolator = LinearTriInterpolator(tri, values)
    x_grid = np.linspace(points[:, 0].min(), points[:, 0].max(), grid_points)
    y_grid = np.linspace(points[:, 1].min(), points[:, 1].max(), grid_points)
    X, Y = np.meshgrid(x_grid, y_grid)
    Z = interpolator(X, Y)
    return X, Y, Z

def plot_interpolated_surface(run_frame: pd.DataFrame, value_column: str, title: str, z_label: str, cmap=cm.plasma, grid_points: int = GRID_POINTS):
    X, Y, Z = interpolate_surface(run_frame, value_column, grid_points)
    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111, projection='3d')
    surf = ax.plot_surface(X, Y, Z, cmap=cmap, edgecolor='none')
    ax.set_xlabel('Input tokens')
    ax.set_ylabel('Output tokens')
    ax.set_zlabel(z_label)
    ax.set_title(f"{title} (interpolated)")
    fig.colorbar(surf, shrink=0.6, aspect=10, label=z_label)
    plt.show()

if ENABLE_INTERPOLATION:
    plot_interpolated_surface(run_df, 'tokens_per_second', 'Tokens/sec surface', 'Tokens/sec', cmap=cm.magma)
    plot_interpolated_surface(run_df, 'max_memory_mb', 'Peak memory surface', 'Max memory (MB)', cmap=cm.cividis)
else:
    print('Interpolation disabled; set ENABLE_INTERPOLATION = True to show smoothed surfaces.')
