In [7]:
from pathlib import Path
from enum import Enum


class STAGES(str, Enum):
    RIGID = "Rigid"
    AFFINE = "Affine"
    SYN = "SyN"


class EXPERIMENTS(str, Enum):
    DYNOMP = "dynomp"
    FP64 = "fp64"


N_LEVELS = 4


BOKEH_COLORBLIND = (
    "#0072B2",  # 0
    "#E69F00",  # 1
    "#F0E442",  # 2
    "#009E73",  # 3
    "#56B4E9",  # 4
    "#D55E00",  # 5
    "#CC79A7",  # 6
    "#000000",  # 7
)

# MODIFY IF NEEDED
FIG_DIR = Path("paper", "figures")
FIG_DIR.mkdir(parents=True, exist_ok=True)

# Data ingest

In [8]:
from collections import defaultdict
import re
from typing import Any, Callable

import pandas as pd


def cleanup_level(levels: list[str]) -> list[str]:
    rv: list[str] = []
    for level in levels:
        clean_level = level.strip("\n").strip()
        if not (
            clean_level.startswith("2DIAGNOSTIC")
            or clean_level.startswith("1DIAGNOSTIC")
        ):
            continue
        rv.append(clean_level)
    return rv


def data2table_dynomp(data: str) -> pd.DataFrame:
    table: defaultdict[str, Any] = defaultdict(list)
    for line in data.splitlines():
        if line in ["", "XX"]:
            continue
        cols = line.split(",")
        table["iteration"].append(int(cols[1]))
        table["metricValue"].append(float(cols[2]))
        table["vprec_precision"].append(int(cols[6]))
        table["pmin_estimate"].append(float(cols[7]))
        table["pmin_estimate_avg"].append(float(cols[8]))

    return pd.DataFrame(table | {"exp_id": EXPERIMENTS.DYNOMP.value})


def data2table_fp64(data: str) -> pd.DataFrame:
    table: defaultdict[str, Any] = defaultdict(list)
    for line in data.splitlines():
        if line in ["", "XX"]:
            continue
        cols = line.split(",")
        table["iteration"].append(int(cols[1]))
        table["metricValue"].append(float(cols[2]))

    return pd.DataFrame(table | {"exp_id": EXPERIMENTS.FP64.value})


def parse_log(
    txt: str, *, data2table_fn: Callable[[str], pd.DataFrame]
) -> pd.DataFrame:
    P_LEVEL_HEADER = r"DIAGNOSTIC,Iteration,metricValue,convergenceValue,ITERATION_TIME_INDEX,SINCE_LAST.*|  Elapsed time"
    levels = re.split(P_LEVEL_HEADER, txt)
    raw_data = cleanup_level(levels)

    parsed_levels: list[pd.DataFrame] = []
    for i, stage in enumerate(STAGES):
        for level in range(N_LEVELS):
            level_data = raw_data[level + i * N_LEVELS]
            table = data2table_fn(level_data)
            table["stage"] = stage.value
            table["level"] = level
            parsed_levels.append(table)

    return pd.concat(parsed_levels)


def get_data(
    log_dir: Path, *, data2table_fn: Callable[[str], pd.DataFrame]
) -> pd.DataFrame:
    data: list[pd.DataFrame] = list()

    for log in log_dir.glob("*.log"):
        subject_df = parse_log(
            log.read_text(),
            data2table_fn=data2table_fn,
        )
        subject_df["subject_id"] = log.stem
        data.append(subject_df)

    return pd.concat(data).reset_index(drop=True)


dynomp_df = get_data(
    Path("logs", EXPERIMENTS.DYNOMP.value), data2table_fn=data2table_dynomp
)
fp64_df = get_data(Path("logs", EXPERIMENTS.FP64.value), data2table_fn=data2table_fp64)

all_df: pd.DataFrame = pd.concat([dynomp_df, fp64_df]).reset_index(drop=True)
to_drop = (
    (all_df["stage"] == STAGES.SYN.value)
    & (all_df["level"] == 0)
    & (all_df["iteration"] == 1)
)
all_df = all_df.loc[~to_drop].reset_index(drop=True)
all_df

Unnamed: 0,iteration,metricValue,vprec_precision,pmin_estimate,pmin_estimate_avg,exp_id,stage,level,subject_id
0,1,-0.568093,7.0,2.981722,2.981722,dynomp,Rigid,0,0003059
1,2,-0.568862,7.0,3.603413,3.292568,dynomp,Rigid,0,0003059
2,3,-0.570691,7.0,3.958206,3.514447,dynomp,Rigid,0,0003059
3,4,-0.572468,7.0,4.270413,3.703439,dynomp,Rigid,0,0003059
4,5,-0.575775,7.0,3.597166,3.682184,dynomp,Rigid,0,0003059
...,...,...,...,...,...,...,...,...,...
42425,16,-0.896145,,,,fp64,SyN,3,0003040
42426,17,-0.896713,,,,fp64,SyN,3,0003040
42427,18,-0.897232,,,,fp64,SyN,3,0003040
42428,19,-0.897713,,,,fp64,SyN,3,0003040


# Precision requirement per iteration


In [9]:
import plotly.graph_objects as go
from plotly.colors import hex_to_rgb
from plotly.subplots import make_subplots


def hex_to_rgba(hex_color: str, alpha: float) -> str:
    r, g, b = hex_to_rgb(hex_color)
    return f"rgba({r}, {g}, {b}, {alpha})"


def adjust_layout(
    fig: go.Figure, title: str, width: int | None = None, height: int | None = None
) -> None:
    fig.update_layout(
        height=height or (len(STAGES) * 400),
        width=width or 1200,
        font=dict(
            size=24,
            color="black",
        ),
        showlegend=True,
        legend=dict(
            orientation="h",
            yanchor="bottom",
            y=1.025,  # position above the subplots
            xanchor="left",
            x=0,
            traceorder="normal",
        ),
        title=title,
        margin=dict(l=10, r=50, t=150, b=10),
        plot_bgcolor="white",
    )
    for i in range(1, len(STAGES) * N_LEVELS + 1):
        fig.update_layout(
            {
                f"xaxis{i}": dict(showline=True, linewidth=2, linecolor="black"),
                f"yaxis{i}": dict(showline=True, linewidth=2, linecolor="black"),
            }
        )
    fig.update_yaxes(rangemode="tozero")


def get_base_fig(title: str) -> go.Figure:
    fig = make_subplots(
        rows=len(STAGES),
        cols=N_LEVELS,
        subplot_titles=[
            f"{stage.value}: Level {level}"
            for _, stage in enumerate(STAGES)
            for level in range(N_LEVELS)
        ],
        shared_xaxes=False,
        shared_yaxes=True,
        horizontal_spacing=0.025,
        vertical_spacing=0.075,
    )
    # Update subplot titles font size
    for annotation in fig.layout.annotations:
        annotation.font = dict(size=18)

    adjust_layout(fig, title=title)

    return fig


def show_and_save(fig: go.Figure, out_file: Path | None, show: bool):
    if show:
        fig.show()
    if out_file is not None:
        fig.update_layout(
            title=None,
            margin=dict(t=10),
        )
        out_file.parent.mkdir(parents=True, exist_ok=True)
        fig.write_image(str(out_file), scale=2)


In [10]:
HLINE_COLOR = "darkgray"


class COLOR(str, Enum):
    VPREC_PRECISION = BOKEH_COLORBLIND[0]
    PMIN_ESTIMATE = BOKEH_COLORBLIND[1]


def add_hline(fig: go.Figure, y: int, text: str, row: int) -> None:
    # Only show the horizontal line in the last column
    line = dict(dash="solid", width=1, color=HLINE_COLOR)
    fig.add_hline(
        y,
        row=row,
        line=line,
    )
    fig.add_hline(
        y,
        row=row,
        col=N_LEVELS,
        line=line,
        annotation_text=text,
        annotation_position="right",
        annotation_xref="paper",
        annotation_font_size=14,
        annotation_showarrow=False,
        annotation_font_color="black",
    )


def display_hardware_support(fig: go.Figure, hw_support: dict[str, int]):
    for row, stage in enumerate(STAGES, 1):
        for k, v in hw_support.items():
            if stage == STAGES.SYN and v >= 24:
                continue
            add_hline(fig, y=v, text=k, row=row)


def _trace_boudary(
    fig: go.Figure,
    *,
    x: pd.Series,
    upper: pd.Series,
    lower: pd.Series,
    mean: pd.Series,
    row: int,
    col: int,
    color: str,
    dash: str,
):
    # Add the Upper Bound (Max)
    fig.add_trace(
        go.Scatter(
            x=x,
            y=upper,
            line=dict(width=0),
            showlegend=False,
        ),
        row=row,
        col=col,
    )

    # Add lower bound (Min)
    fig.add_trace(
        go.Scatter(
            x=x,
            y=lower,
            line=dict(width=0),
            fill="tonexty",  # This performs the shading
            fillcolor=hex_to_rgba(color, 0.2),  # Gray with 20% opacity
            showlegend=False,
        ),
        row=row,
        col=col,
    )

    # Mean line
    fig.add_trace(
        go.Scatter(
            x=x,
            y=mean,
            line=dict(
                color=color,
                width=3,
                dash=dash,
            ),
            showlegend=False,
        ),
        row=row,
        col=col,
    )


def trace_minmax(
    fig: go.Figure,
    stats: dict[str, pd.DataFrame],
    row: int,
    col: int,
    color: str,
    dash: str = "solid",
) -> None:
    _trace_boudary(
        fig,
        x=stats.index,
        upper=stats["max"],
        lower=stats["min"],
        mean=stats["mean"],
        row=row,
        col=col,
        color=color,
        dash=dash,
    )


def trace_std(
    fig: go.Figure,
    stats: dict[str, pd.DataFrame],
    row: int,
    col: int,
    color: str,
    dash: str = "solid",
) -> None:
    _trace_boudary(
        fig,
        x=stats.index,
        upper=stats["mean"] + stats["std"],
        lower=stats["mean"] - stats["std"],
        mean=stats["mean"],
        row=row,
        col=col,
        color=color,
        dash=dash,
    )


def make_legend(fig: go.Figure):
    fig.add_trace(
        go.Scatter(
            x=[1],
            y=[0],
            name="Virtual precision",
            line=dict(dash="solid", color=COLOR.VPREC_PRECISION.value),
            mode="lines",
            showlegend=True,
            hoverinfo="skip",
        ),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=[1],
            y=[0],
            name="Estimated pmin",
            line=dict(dash="dash", color=COLOR.PMIN_ESTIMATE.value),
            mode="lines",
            showlegend=True,
            hoverinfo="skip",
        ),
        row=1,
        col=1,
    )
    # Only add this ONCE (e.g., at subplot [1,1] or outside the loop)
    fig.add_trace(
        go.Scatter(
            x=[1],
            y=[0],
            mode="lines",
            line=dict(dash="solid", color=HLINE_COLOR),
            name="Hardware support",
            showlegend=True,
            hoverinfo="skip",
        ),
        row=1,
        col=1,
    )


def plot_pmin(
    df: pd.DataFrame,
    *,
    trace_fn,
    title: str,
    out_file: Path | None = None,
    show: bool = True,
) -> None:
    fig = get_base_fig(title)

    for i, stage in enumerate(STAGES):
        for level in range(N_LEVELS):
            stage_level_df = df[(df["stage"] == stage) & (df["level"] == level)]

            # Virtual precision
            stats = (
                stage_level_df.groupby("iteration")["vprec_precision"]
                .agg(["mean", "min", "max", "std"])
                .reset_index()
            )
            trace_fn(
                fig,
                stats,
                row=i + 1,
                col=level + 1,
                dash="solid",
                color=COLOR.VPREC_PRECISION.value,
            )
            # Estimated pmin
            stats = (
                stage_level_df.groupby("iteration")["pmin_estimate_avg"]
                .agg(["mean", "min", "max", "std"])
                .reset_index()
            )
            trace_fn(
                fig,
                stats,
                row=i + 1,
                col=level + 1,
                dash="dot",
                color=COLOR.PMIN_ESTIMATE.value,
            )

    make_legend(fig)
    hw_support = {"FP64": 53, "FP32": 23, "FP24": 16, "FP16": 10, "BF16": 7}
    display_hardware_support(fig, hw_support=hw_support)

    # x-axis title
    fig.add_annotation(
        text="nth Iteration",
        xref="paper",
        yref="paper",
        x=0.5,
        y=-0.075,
        showarrow=False,
        font=dict(size=24),
    )
    # y-axis title
    fig.add_annotation(
        text="Number of bits",
        xref="paper",
        yref="paper",
        x=-0.075,
        y=0.5,
        textangle=-90,
        showarrow=False,
        font=dict(size=24),
    )
    fig.update_layout(margin=dict(b=80, l=80))

    show_and_save(fig, out_file=out_file, show=show)

In [11]:
plot_data = all_df[all_df["exp_id"] == EXPERIMENTS.DYNOMP.value]
plot_pmin(
    plot_data,
    trace_fn=trace_minmax,
    title="Pmin: MinMax bounds",
    out_file=FIG_DIR / "pmin_minmax_bounds.png",
)
plot_pmin(
    plot_data,
    trace_fn=trace_std,
    title="Pmin: Std bounds",
    out_file=FIG_DIR / "pmin_std_bounds.png",
)

# Relative difference (dynomp vs. FP64)


In [12]:
df_last = all_df.sort_values("iteration", ascending=False).drop_duplicates(
    subset=["subject_id", "stage", "level", "exp_id"], keep="first"
)

df_pivot = df_last.pivot_table(
    index=["subject_id", "stage", "level"],
    columns="exp_id",
    values="metricValue",
).reset_index()

df_pivot["diff_fp64_dynomp"] = (df_pivot["dynomp"] - df_pivot["fp64"]) / df_pivot[
    "fp64"
].abs()
df_pivot


exp_id,subject_id,stage,level,dynomp,fp64,diff_fp64_dynomp
0,0003001,Affine,0,-0.870154,-0.871005,0.000976
1,0003001,Affine,1,-0.730923,-0.730976,0.000072
2,0003001,Affine,2,-0.620237,-0.620194,-0.000070
3,0003001,Affine,3,-0.551351,-0.551397,0.000083
4,0003001,Rigid,0,-0.613321,-0.665033,0.077758
...,...,...,...,...,...,...
595,0003064,Rigid,3,-0.351743,-0.351596,-0.000418
596,0003064,SyN,0,-0.973502,-0.973531,0.000030
597,0003064,SyN,1,-0.946573,-0.946442,-0.000138
598,0003064,SyN,2,-0.932416,-0.932349,-0.000072


In [13]:
df_pivot[df_pivot["diff_fp64_dynomp"] < -0.001]


exp_id,subject_id,stage,level,dynomp,fp64,diff_fp64_dynomp
16,0003002,Rigid,0,-0.548358,-0.528529,-0.037518
17,0003002,Rigid,1,-0.445200,-0.443836,-0.003074
18,0003002,Rigid,2,-0.386080,-0.385556,-0.001359
19,0003002,Rigid,3,-0.358634,-0.357638,-0.002784
26,0003004,Affine,2,-0.601323,-0.600542,-0.001301
...,...,...,...,...,...,...
580,0003063,Rigid,0,-0.578687,-0.530171,-0.091511
581,0003063,Rigid,1,-0.465945,-0.464802,-0.002459
582,0003063,Rigid,2,-0.409729,-0.408117,-0.003951
583,0003063,Rigid,3,-0.378916,-0.377729,-0.003143


In [14]:
THRESHOLDS = [1e-2, 5e-3, 1e-3, 1e-4]
THRESHOLDS = [5e-3, 1e-3, 1e-4]


def display_thresholds(fig: go.Figure, thresholds: list[float]):
    for row, stage in enumerate(STAGES, 1):
        for treshold in thresholds:
            if stage == STAGES.SYN and treshold >= 2e-3:
                continue
            if stage == STAGES.RIGID and treshold >= 4e-3:
                continue
            add_hline(fig, y=treshold, text=f"{treshold:.0e}", row=row)


def plot_rel_diff(
    df: pd.DataFrame,
    *,
    value_col: str,
    title: str,
    out_file: Path | None = None,
    show: bool = True,
) -> None:
    fig = go.Figure()
    adjust_layout(fig, title=title, width=1200, height=400)

    for i, stage in enumerate(STAGES):
        for level in range(N_LEVELS):
            stage_level_df = df[(df["stage"] == stage) & (df["level"] == level)]

            fig.add_trace(
                go.Box(
                    y=stage_level_df[value_col],
                    boxpoints="all",
                    pointpos=0,
                    marker=dict(
                        color=hex_to_rgba(BOKEH_COLORBLIND[i], 0.3),
                        line=dict(
                            width=1,
                            color=hex_to_rgba(BOKEH_COLORBLIND[i], 1),
                        ),
                        size=8,
                    ),
                    # line=dict(color="rgba(0,0,0,0)"),
                    # fillcolor="rgba(0,0,0,0)",
                    showlegend=False,
                    name=f"{stage}:{level}",
                ),
            )

    for threshold in THRESHOLDS:
        fig.add_hline(
            y=threshold,
            line=dict(dash="solid", width=1, color=HLINE_COLOR),
            annotation_text=f"{threshold:.0e}",
            annotation_position="right",
            annotation_font_size=14,
            annotation_showarrow=False,
            annotation_font_color="black",
        )
    # y=0
    fig.add_hline(
        y=0,
        line=dict(dash="dot", width=1, color="black"),
    )

    # y-axis
    fig.update_yaxes(
        title="Rel. diff. in loss",
        title_font=dict(size=24),
        tickformat=".0e",  # This is the D3 equivalent of "{x:.0e}"
        range=[-5e-3, 5e-3],
    )

    show_and_save(fig, out_file=out_file, show=show)

In [15]:
plot_rel_diff(
    df_pivot,
    value_col="diff_fp64_dynomp",
    title="Relative Difference in Metric Value (dynomp vs. FP64)",
    out_file=FIG_DIR / "rel_diff_fp64_dynomp.png",
)

In [16]:
df_pivot[df_pivot["diff_fp64_dynomp"] < 0]

exp_id,subject_id,stage,level,dynomp,fp64,diff_fp64_dynomp
2,0003001,Affine,2,-0.620237,-0.620194,-0.000070
8,0003001,SyN,0,-0.974242,-0.973985,-0.000263
9,0003001,SyN,1,-0.951661,-0.951562,-0.000105
10,0003001,SyN,2,-0.932938,-0.932830,-0.000115
11,0003001,SyN,3,-0.903799,-0.903749,-0.000056
...,...,...,...,...,...,...
591,0003064,Affine,3,-0.543470,-0.543209,-0.000480
593,0003064,Rigid,1,-0.436887,-0.434977,-0.004391
595,0003064,Rigid,3,-0.351743,-0.351596,-0.000418
597,0003064,SyN,1,-0.946573,-0.946442,-0.000138


In [17]:
df_pivot

exp_id,subject_id,stage,level,dynomp,fp64,diff_fp64_dynomp
0,0003001,Affine,0,-0.870154,-0.871005,0.000976
1,0003001,Affine,1,-0.730923,-0.730976,0.000072
2,0003001,Affine,2,-0.620237,-0.620194,-0.000070
3,0003001,Affine,3,-0.551351,-0.551397,0.000083
4,0003001,Rigid,0,-0.613321,-0.665033,0.077758
...,...,...,...,...,...,...
595,0003064,Rigid,3,-0.351743,-0.351596,-0.000418
596,0003064,SyN,0,-0.973502,-0.973531,0.000030
597,0003064,SyN,1,-0.946573,-0.946442,-0.000138
598,0003064,SyN,2,-0.932416,-0.932349,-0.000072


# Stat test

In [18]:
baseline = all_df[all_df["exp_id"] == EXPERIMENTS.FP64.value]
dynomp = all_df[all_df["exp_id"] == EXPERIMENTS.DYNOMP.value]


In [19]:
import numpy as np
from scipy import stats
from statsmodels.stats.multitest import multipletests

# =====================================================================
# 0. EXPERIMENTAL SETUP & SYNTHETIC DATA
# =====================================================================
np.random.seed(42)  # For reproducibility

# Define the 12 stage-level pairs
stages = ["Rigid", "Affine", "SyN"]
levels = [4]
pair_names = [f"{s}_L{l}" for s in stages for l in levels]

n_images = 10
n_pairs = len(pair_names)

# Generate synthetic loss data: Shape (12 pairs, 10 images)
# (Assuming lower loss is better)
baseline_loss = np.random.uniform(0.2, 0.8, size=(n_pairs, n_images))

# Simulate the DynoMP patch having a very slight degradation (0 to 0.025 difference)
dynomp_loss = baseline_loss + np.random.uniform(0.00, 0.025, size=(n_pairs, n_images))

# Calculate paired differences: Positive values mean DynoMP performed worse
differences = dynomp_loss - baseline_loss

# For Approaches 1 & 2, let's isolate just the final output: SyN Level 4 (Index 11)
syn_l4_diffs = differences[-1]

print(f"--- Focusing on Final Stage ({pair_names[-1]}) ---")
print(f"Mean degradation for {pair_names[-1]}: {np.mean(syn_l4_diffs):.4f}\n")

# =====================================================================
# APPROACH 1: Standard One-Tailed T-Test (Supervisor's Baseline)
# =====================================================================
# Goalpost: DynoMP loss is equal to baseline (Diff <= 0)
# Alternative: DynoMP loss is strictly worse (Diff > 0)

stat_std, p_std = stats.ttest_1samp(syn_l4_diffs, popmean=0, alternative="greater")

print("1. Standard One-Tailed T-Test (SyN_L4 only):")
print(f"   p-value: {p_std:.4f}")
if p_std < 0.05:
    print("   Result: We PROVE the patch is statistically worse.")
else:
    print("   Result: We failed to prove it is worse.")
print()

# =====================================================================
# APPROACH 2: Non-Inferiority One-Tailed T-Test
# =====================================================================
# Goalpost: DynoMP is unacceptably worse (Diff >= Î”)
# Alternative: DynoMP is non-inferior (Diff < Î”)
delta = 0.02  # Define maximum acceptable loss degradation

stat_ni, p_ni = stats.ttest_1samp(syn_l4_diffs, popmean=delta, alternative="less")

print(f"2. Non-Inferiority Test (SyN_L4 only, Delta = {delta}):")
print(f"   p-value: {p_ni:.4f}")
if p_ni < 0.05:
    print(
        "   Result: We PROVE DynoMP is non-inferior (degradation is strictly < Delta)."
    )
else:
    print("   Result: We failed to prove non-inferiority.")
print()

# =====================================================================
# APPROACH 3: Bonferroni Correction Across All 12 Stage-Level Pairs
# =====================================================================
print("3. Bonferroni Corrected Non-Inferiority Across All 12 Pairs:")
print("   (Testing if degradation is strictly < Delta for EVERY step)\n")

# 3a. Collect raw non-inferiority p-values for all 12 pairs
raw_p_values = []
for i in range(n_pairs):
    _, p_val = stats.ttest_1samp(differences[i], popmean=delta, alternative="less")
    raw_p_values.append(p_val)

# 3b. Apply Bonferroni correction
alpha = 0.05
reject_array, pvals_corrected, _, _ = multipletests(
    raw_p_values, alpha=alpha, method="bonferroni"
)

# 3c. Display results
for i in range(n_pairs):
    status = "Pass" if reject_array[i] else "Fail"
    print(
        f"   {pair_names[i]:<10} | Raw p: {raw_p_values[i]:.4f} | Corrected p: {pvals_corrected[i]:.4f} | {status}"
    )


--- Focusing on Final Stage (SyN_L4) ---
Mean degradation for SyN_L4: 0.0144

1. Standard One-Tailed T-Test (SyN_L4 only):
   p-value: 0.0005
   Result: We PROVE the patch is statistically worse.

2. Non-Inferiority Test (SyN_L4 only, Delta = 0.02):
   p-value: 0.0459
   Result: We PROVE DynoMP is non-inferior (degradation is strictly < Delta).

3. Bonferroni Corrected Non-Inferiority Across All 12 Pairs:
   (Testing if degradation is strictly < Delta for EVERY step)

   Rigid_L4   | Raw p: 0.0126 | Corrected p: 0.0378 | Pass
   Affine_L4  | Raw p: 0.0006 | Corrected p: 0.0019 | Pass
   SyN_L4     | Raw p: 0.0459 | Corrected p: 0.1376 | Fail


In [20]:
differences.shape


(3, 10)