In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from typing import List
from models.common.utility_functions import ulp
import os
os.environ["TT_METAL_CACHE"] = "/localdev/astancov/tt-metal/built" 
from loguru import logger

# Disable TT logging
os.environ['TT_LOGGER_LEVEL'] = 'off'
logger.disable('ttnn')

default_fig_size = (6, 4)

def plot_abs_err_histogram(res: torch.Tensor, ref: torch.Tensor, n_bins: int = 100):
    """
    Plot histogram of absolute differences (res - ref).
    
    Args:
        res: Result tensor
        ref: Reference tensor
        n_bins: Number of bins for histogram
    """
    abs_err = (res - ref).detach().cpu().numpy()
    abs_err_flat = abs_err.flatten()
    
    plt.figure(figsize=default_fig_size)
    plt.hist(abs_err_flat, bins=n_bins, edgecolor='black', alpha=0.7)
    plt.xlabel('Absolute Error')
    plt.ylabel('Frequency')
    plt.title('Histogram of Absolute Errors')
    plt.grid(True, alpha=0.3)
    plt.show()
    
    print(f"Mean absolute error: {abs_err_flat.mean():.6e}")
    print(f"Max absolute error: {abs_err_flat.max():.6e}")
    print(f"Min absolute error: {abs_err_flat.min():.6e}")


def plot_rel_err_histogram(res: torch.Tensor, ref: torch.Tensor, n_bins: int = 100):
    """
    Plot histogram of relative errors (res - ref) / ref.
    
    Args:
        res: Result tensor
        ref: Reference tensor
        n_bins: Number of bins for histogram
    """
    abs_err = (res - ref).abs().detach().cpu()
    ref_abs = ref.abs().detach().cpu()
    
    # Avoid division by zero - set zeros in ref to 1 (or use mask)
    rel_err = abs_err / torch.where(ref_abs > 0, ref_abs, torch.ones_like(ref_abs))
    rel_err_np = rel_err.numpy().flatten()
    
    # Filter out inf/nan values
    rel_err_np = rel_err_np[np.isfinite(rel_err_np)]
    
    plt.figure(figsize=default_fig_size)
    plt.hist(rel_err_np, bins=n_bins, edgecolor='black', alpha=0.7)
    plt.xlabel('Relative Error')
    plt.ylabel('Frequency')
    plt.title('Histogram of Relative Errors')
    plt.grid(True, alpha=0.3)
    plt.show()
    
    print(f"Mean relative error: {rel_err_np.mean():.6e}")
    print(f"Max relative error: {rel_err_np.max():.6e}")
    print(f"Median relative error: {np.median(rel_err_np):.6e}")


def rel_fro_error(res, ref):
        res_np = res.to(torch.float64).detach().cpu().numpy() if isinstance(res, torch.Tensor) else np.asarray(res).astype(np.float64)
        ref_np = ref.to(torch.float64).detach().cpu().numpy() if isinstance(ref, torch.Tensor) else np.asarray(ref).astype(np.float64)
        # flatten scalars for np.linalg.norm
        if np.isscalar(res_np) or (hasattr(res_np, "shape") and res_np.shape == ()):
            res_np = np.array([res_np])
        if np.isscalar(ref_np) or (hasattr(ref_np, "shape") and ref_np.shape == ()):
            ref_np = np.array([ref_np])
        num = np.linalg.norm(res_np - ref_np)
        denom = np.linalg.norm(ref_np)
        if denom == 0:
            return float("inf") if num > 0 else 0.0
        return float(num) / float(denom)

def plot_rel_fro_error_vs_magnitude(
    res_list: list, 
    names: list, 
    ref_list: list, 
    title_suffix=""
):
    """
    For each method (entry in res_list), plot the relative Frobenius error vs reference range.
    Each entry in res_list should be a list of result tensors corresponding to ref_list.
    Args:
        res_list: List of lists. Each sublist is a list of result tensors for one method.
                 Each tensor can be a torch.Tensor or a numpy array.
        ref_list: List of reference tensors (torch.Tensor or numpy arrays).
        names: List of legend entries, one for each sublist in res_list.
        title_suffix: String to append to the plot title.
    """
    if not isinstance(res_list, list) or not isinstance(ref_list, list) or not isinstance(names, list):
        raise ValueError("res_list, ref_list, and names must all be lists.")
    if len(res_list) != len(names):
        raise ValueError("res_list and names must have the same length")
    n_refs = len(ref_list)
    for r in res_list:
        if len(r) != n_refs:
            raise ValueError("Each inner list in res_list must have the same length as ref_list.")

    def to_numpy(x):
        if isinstance(x, torch.Tensor):
            return x.detach().cpu().numpy()
        return np.asarray(x)

    ranges = []
    for ref in ref_list:
        ref_np = to_numpy(ref)
        ref_range = ref_np.max() - ref_np.min()
        ranges.append(ref_range)

    plt.figure(figsize=default_fig_size)
    for results, label in zip(res_list, names):
        rel_errors = [rel_fro_error(res, ref) for res, ref in zip(results, ref_list)]
        marker = '.' if 'unfold' in label.lower() else 'x'
        plt.plot(ranges, rel_errors, marker=marker, linestyle="-", markersize=10 if marker == 'x' else 8, label=label, linewidth=2, alpha=0.7)

    plt.xlabel('Reference Range (max - min)')
    plt.ylabel('Relative Frobenius Error (||res-ref|| / ||ref||)')
    plt.title('Relative Frobenius Error vs Reference Magnitude Range' + (f" {title_suffix}" if title_suffix else ""))
    plt.grid(True, alpha=0.3)
    plt.xscale('log')
    plt.yscale('log')
    plt.legend()
    plt.tight_layout()
    plt.show()

def plot_rel_error_vs_param(
    tensors, names, refs, norm_type="fro", param_vals=None, param_name="K", logy=False, title_suffix="", fig_size=default_fig_size
):
    """
    Plots relative norm error (||res-ref|| / ||ref|| using specified norm) vs a parameter for multiple sets of tensors.

    Args:
        tensors: list of lists of tensors. Each element of the outer list corresponds to a group/method/etc,
                 and should be a list of tensors (length n_params), one for each parameter value.
        names:   list of names (length = len(tensors)), one per outer list.
        refs:    list of reference tensors (length n_params), one for each parameter value.
        norm_type: str, one of ["fro", "1", "inf", "spectral"] (required).
        param_vals: optional list of parameter values (length n_params); otherwise [0, ..., n_params-1].
        param_name: string for parameter name in x-axis label.
        logy:     whether to plot y-axis in log scale.
    """

    if norm_type not in ["fro", "1", "inf", "spectral"]:
        raise ValueError(f"norm_type must be one of ['fro', '1', 'inf', 'spectral'], got '{norm_type}'")

    if not isinstance(tensors, list) or not isinstance(names, list) or not isinstance(refs, list):
        raise ValueError("tensors, names, and refs must all be lists.")
    if len(tensors) != len(names):
        raise ValueError("tensors and names must have the same length.")
    if len(tensors) == 0:
        raise ValueError("tensors list must not be empty.")
    n_params = [len(row) for row in tensors]
    n_refs = len(refs)
    if any(n != n_refs for n in n_params):
        raise ValueError("Each inner list in tensors and refs must all have the same length (len(refs)).")
    xvals = list(range(n_refs)) if param_vals is None else param_vals
    if len(xvals) != n_refs:
        raise ValueError("Mismatch between param_vals and number of reference tensors.")

    def to_numpy(x):
        if isinstance(x, torch.Tensor):
            return x.to(torch.float64).detach().cpu().numpy()
        return np.asarray(x).astype(np.float64)

    def rel_norm_error(res, ref, norm):
        # norm can be "fro", 1, np.inf, "spectral"
        # Flatten to 1D for consistent norm computation
        res_np = to_numpy(res).flatten()
        ref_np = to_numpy(ref).flatten()
        
        # For 1D arrays, Frobenius norm is equivalent to L2 norm
        if norm == "fro":
            err = np.linalg.norm(res_np - ref_np, ord=2) / np.linalg.norm(ref_np, ord=2)
        elif norm == "1":
            err = np.linalg.norm(res_np - ref_np, ord=1) / np.linalg.norm(ref_np, ord=1)
        elif norm == "inf":
            err = np.linalg.norm(res_np - ref_np, ord=np.inf) / np.linalg.norm(ref_np, ord=np.inf)
        elif norm == "spectral":
            # For 1D arrays, spectral norm is the same as L2 norm
            err = np.linalg.norm(res_np - ref_np, ord=2) / np.linalg.norm(ref_np, ord=2)
        else:
            raise ValueError(f"Unsupported norm type: {norm}")
        return err

    norm_label = {
        "fro": "Frobenius-Norm",
        "1": "1-Norm",
        "inf": "Infinity-Norm",
        "spectral": "Spectral-Norm"
    }[norm_type]

    plt.figure(figsize=fig_size)
    for idx, (tensor_list, label) in enumerate(zip(tensors, names)):
        errors = [rel_norm_error(out, ref, norm_type) for out, ref in zip(tensor_list, refs)]
        marker_style = "x" if "torch" in str(label) else "o"
        plt.plot(xvals, errors, marker=marker_style, linestyle="-", markersize=10 if marker_style == "x" else 8, label=str(label), linewidth=2)

    plt.xlabel(f"{param_name}")
    plt.ylabel(f"Relative {norm_label} Error (||res-ref|| / ||ref||") 
    plt.title(
        f"Relative {norm_label} Error vs {param_name}"
        + (f" {title_suffix}" if title_suffix else "")
    )
    plt.grid(True, alpha=0.3)
    # Avoid duplicate labels in legend
    handles, all_labels = plt.gca().get_legend_handles_labels()
    unique = dict(zip(all_labels, handles))
    plt.legend(unique.values(), unique.keys())
    if logy:
        plt.yscale("log")
    plt.tight_layout()
    plt.show()

def ulp_error(res, ref):
    """
    Compute the ULP error (in res ULPs)between two tensors.
    
    Args:
        res: Result tensor
        ref: Reference tensor
        
    Returns:
        ULP error between res and ref (in ULPs of res)
    """
    res_ulp = ulp(res)
    ref_as_res_dtype = ref.to(res.dtype)
    return torch.abs((res.to(torch.float64) - ref_as_res_dtype.to(torch.float64)) / res_ulp.to(torch.float64))

def plot_ulp_error_histogram(res, ref, n_bins=100):
    """
    Plot the ULP error histogram for two tensors.

    Args:
        res: Result tensor
        ref: Reference tensor
        n_bins: Number of bins for the histogram
    """
    ulp_errors = ulp_error(res, ref)
    ulp_vals = ulp_errors.detach().cpu().numpy().flatten()
    plt.figure(figsize=default_fig_size)
    plt.hist(ulp_vals, bins=n_bins, alpha=0.7, linewidth=2, edgecolor='black')
    plt.xlabel('ULP Error')
    plt.ylabel('Frequency')
    plt.title('ULP Error Histogram')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

def plot_ulp_error_cdf(res, ref):
    """
    Plot the ULP error CDF for two tensors.

    Args:
        res: Result tensor
        ref: Reference tensor
    """
    ulp_errors = ulp_error(res, ref)
    ulp_vals = ulp_errors.detach().cpu().numpy().flatten()
    sorted_vals = np.sort(ulp_vals)
    cdf = np.arange(1, len(sorted_vals) + 1) / len(sorted_vals)

    plt.figure(figsize=default_fig_size)
    plt.plot(sorted_vals, cdf, linewidth=2)
    plt.xlabel('ULP Error')
    plt.ylabel('Cumulative Probability')
    plt.title('ULP Error CDF')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

In [None]:
import ttnn
from ttnn.operations.conv2d import conv2d_unfold_matmul

def generate_torch_conv_tensors(batch, in_channels, out_channels, height, width, kernel_h, kernel_w, dtype, scaler=1.0, rand_func=torch.rand,seed=None):
    """
    Generate random input and weight tensors for a conv2d
    """
    if seed is not None:
        torch.manual_seed(seed)
    input_tensor = rand_func((batch, in_channels, height, width), dtype=dtype) * scaler
    weight_tensor = rand_func((out_channels, in_channels, kernel_h, kernel_w), dtype=dtype) * scaler
    bias_tensor = rand_func((1, 1, 1, out_channels), dtype=dtype) * scaler
    return input_tensor, weight_tensor, bias_tensor

def run_torch_conv2d(input, weight, bias, stride=1, padding=0):
    """
    Run a single conv2d in torch using the input tensors
    """
    return torch.nn.functional.conv2d(input, weight, bias.reshape(-1) if bias is not None else None, stride=stride, padding=padding)

def run_conv2d_unfold_matmul(input, weight, bias, stride=1, padding=0, matmul_precision="highest"):
    """
    Run conv2d using unfold+matmul decomposition
    """
    return conv2d_unfold_matmul(input, weight, bias, stride=stride, padding=padding, matmul_precision=matmul_precision)

def run_np_conv2d(input, weight, bias, stride=1, padding=0):
    """
    Run a single conv2d using numpy float64 (via torch for conv2d operation)
    Input tensors are numpy arrays in NCHW format
    """
    # Convert numpy arrays to torch tensors
    input_torch = torch.from_numpy(input)
    weight_torch = torch.from_numpy(weight)
    bias_torch = torch.from_numpy(bias) if bias is not None else None
    
    # Run conv2d
    output = torch.nn.functional.conv2d(input_torch, weight_torch, bias_torch.reshape(-1) if bias_torch is not None else None, stride=stride, padding=padding)
    
    # Convert back to numpy
    return output.numpy()

def run_ttnn_conv2d(input, weight, bias, device, stride=1, padding=0, dtype=ttnn.bfloat16, 
                    fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4):
    """
    Run a single conv2d in ttnn and return the result.
    Input: input is NCHW torch tensor, weight is OIHW, bias is (1,1,1,C) or None
    Args:
        device: ttnn device (required, like in test_conv2d.py)
        dtype: ttnn dtype for activations, weights, and bias (ttnn.bfloat16, ttnn.float32, etc.)
        fp32_accum: whether to use FP32 dest accumulation in compute kernel
        math_fidelity: math fidelity setting (LoFi, HiFi2, HiFi3, HiFi4)
    """
    create_local_device = device is None
    if create_local_device:
        device = ttnn.CreateDevice(0,l1_small_size = 16384)
    # Extract dimensions
    batch, in_channels, height, width = input.shape
    out_channels = weight.shape[0]
    kernel_h, kernel_w = weight.shape[2], weight.shape[3]
    
    # Handle padding and stride
    if isinstance(padding, int):
        pad_h, pad_w = padding, padding
    elif len(padding) == 2:
        pad_h, pad_w = padding[0], padding[1]
    elif len(padding) == 4:
        pad_h, pad_w = padding[0], padding[2]  # Use top and left
    else:
        pad_h = pad_w = padding
        
    if isinstance(stride, int):
        str_h, str_w = stride, stride
    elif len(stride) == 2:
        str_h, str_w = stride[0], stride[1]
    else:
        str_h = str_w = stride
    
    # Convert input to NHWC format
    input_nhwc = input.permute(0, 2, 3, 1).contiguous()
    
    # Convert to ttnn tensors with specified dtype
    tt_input = ttnn.from_torch(input_nhwc, dtype=dtype, layout=ttnn.ROW_MAJOR_LAYOUT, device=device)
    tt_weight = ttnn.from_torch(weight, dtype=dtype)
    tt_bias = ttnn.from_torch(bias, dtype=dtype) if bias is not None else None
    
    compute_config = ttnn.init_device_compute_kernel_config(
        device.arch(),
        math_fidelity=math_fidelity,
        math_approx_mode=False,
        fp32_dest_acc_en=fp32_accum,
        packer_l1_acc=True,  # Always enabled
    )
    
    # Run conv2d (it handles weight preparation internally)
    [tt_output, out_dims, _] = ttnn.conv2d(
        input_tensor=tt_input,
        weight_tensor=tt_weight,
        bias_tensor=tt_bias,
        in_channels=in_channels,
        out_channels=out_channels,
        device=device,
        kernel_size=(kernel_h, kernel_w),
        stride=(str_h, str_w),
        padding=(pad_h, pad_h, pad_w, pad_w),
        dilation=(1, 1),
        batch_size=batch,
        input_height=height,
        input_width=width,
        compute_config=compute_config,
        groups=1,
        return_output_dim=True,
        return_weights_and_bias=True,
    )
    
    # Convert back to torch
    output_torch = ttnn.to_torch(tt_output)
    
    # Reshape from (1, 1, N*H*W, C) to (N, C, H, W)
    out_height, out_width = out_dims
    output_nchw = output_torch.reshape(batch, out_height, out_width, out_channels).permute(0, 3, 1, 2)
    
    if create_local_device:
        ttnn.close_device(device)

    return output_nchw

def single_conv2d_analysis(res, ref):
    """Run all analysis plots comparing result against reference"""
    plot_abs_err_histogram(res, ref)
    plot_rel_err_histogram(res, ref)
    plot_ulp_error_histogram(res, ref)
    plot_ulp_error_cdf(res, ref)

## Run a single-conv2d analysis

In [None]:
# Do single conv2d, compare ttnn.conv2d (various configs) and conv2d_unfold_matmul to numpy float64 reference
batch, in_ch, out_ch = 1, 3, 64
height, width = 32, 32
kernel_h, kernel_w = 3, 3
stride, padding = 1, 1

# Create device (like conftest provides in test_conv2d.py)
device = None

# Generate tensors in float64
input_64, weight_64, bias_64 = generate_torch_conv_tensors(batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float64)
input_bf16, weight_bf16, bias_bf16 = input_64.to(torch.bfloat16), weight_64.to(torch.bfloat16), bias_64.to(torch.bfloat16)
input_f32, weight_f32, bias_f32 = input_64.to(torch.float32), weight_64.to(torch.float32), bias_64.to(torch.float32)

# Compute numpy float64 reference
input_64_np = input_64.numpy()
weight_64_np = weight_64.numpy()
bias_64_np = bias_64.numpy()
np64_ref = run_np_conv2d(input_64_np, weight_64_np, bias_64_np, stride=stride, padding=padding)

# Test implementations - vary dtype, fp32_accum, and math_fidelity
ttnn_bf16_hifi4 = run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                  dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4)
ttnn_bf16_hifi4_fp32acc = run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                          dtype=ttnn.bfloat16, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4)
ttnn_bf16_lofi = run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                 dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.LoFi)
unfold_bf16 = run_conv2d_unfold_matmul(input_bf16, weight_bf16, bias_bf16, stride=stride, padding=padding)
unfold_f32 = run_conv2d_unfold_matmul(input_f32, weight_f32, bias_f32, stride=stride, padding=padding)

print("\n=== TTNN Conv2d (bf16, HiFi4) vs NumPy Float64 Reference ===")
single_conv2d_analysis(ttnn_bf16_hifi4, torch.from_numpy(np64_ref))

print("\n=== TTNN Conv2d (bf16, HiFi4, FP32 accum) vs NumPy Float64 Reference ===")
single_conv2d_analysis(ttnn_bf16_hifi4_fp32acc, torch.from_numpy(np64_ref))

print("\n=== TTNN Conv2d (bf16, LoFi) vs NumPy Float64 Reference ===")
single_conv2d_analysis(ttnn_bf16_lofi, torch.from_numpy(np64_ref))

print("\n=== Conv2d Unfold+Matmul (bf16) vs NumPy Float64 Reference ===")
single_conv2d_analysis(unfold_bf16, torch.from_numpy(np64_ref))

print("\n=== Conv2d Unfold+Matmul (f32) vs NumPy Float64 Reference ===")
single_conv2d_analysis(unfold_f32, torch.from_numpy(np64_ref))

## Run a sweep over input magnitudes, compare dtypes, fp32 accum, and math fidelity

In [None]:
scalers = [0.001, 0.01, 0.1, 1, 10, 100, 1000]
batch, in_ch, out_ch = 1, 64, 128
height, width = 32, 32
kernel_h, kernel_w = 3, 3
stride, padding = 1, 1


# Results for different configurations
results_bf16_hifi4 = []
results_bf16_hifi4_fp32acc = []
results_bf16_hifi2 = []
results_bf16_lofi = []
results_f32_hifi4 = []
results_f32_hifi4_fp32acc = []
results_unfold_bf16 = []
results_unfold_f32 = []
ref_tensors = []

for scaler in scalers:
    # Generate float64 tensors
    input_64, weight_64, bias_64 = generate_torch_conv_tensors(
        batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float64, scaler=scaler
    )
    input_bf16, weight_bf16, bias_bf16 = input_64.to(torch.bfloat16), weight_64.to(torch.bfloat16), bias_64.to(torch.bfloat16)
    input_f32, weight_f32, bias_f32 = input_64.to(torch.float32), weight_64.to(torch.float32), bias_64.to(torch.float32)
    
    # Compute numpy float64 reference
    input_64_np = input_64.numpy()
    weight_64_np = weight_64.numpy()
    bias_64_np = bias_64.numpy()
    np64_ref = run_np_conv2d(input_64_np, weight_64_np, bias_64_np, stride=stride, padding=padding)
    ref_tensors.append(np64_ref)
    
    # Results with different configurations (pass device to each call)
    results_bf16_hifi4.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                              dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    results_bf16_hifi4_fp32acc.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                                       dtype=ttnn.bfloat16, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    results_bf16_hifi2.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                              dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi2))
    results_bf16_lofi.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                             dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.LoFi))
    results_f32_hifi4.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=padding, 
                                             dtype=ttnn.float32, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    results_f32_hifi4_fp32acc.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=padding, 
                                                      dtype=ttnn.float32, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    results_unfold_bf16.append(run_conv2d_unfold_matmul(input_bf16, weight_bf16, bias_bf16, stride=stride, padding=padding))
    results_unfold_f32.append(run_conv2d_unfold_matmul(input_f32, weight_f32, bias_f32, stride=stride, padding=padding))


results = [results_bf16_hifi4, results_bf16_hifi4_fp32acc, results_bf16_hifi2, results_bf16_lofi, 
           results_f32_hifi4, results_f32_hifi4_fp32acc, results_unfold_bf16, results_unfold_f32]
names = [
    "ttnn (bf16, HiFi4)",
    "ttnn (bf16, HiFi4, fp32acc)",
    "ttnn (bf16, HiFi2)",
    "ttnn (bf16, LoFi)",
    "ttnn (f32, HiFi4)",
    "ttnn (f32, HiFi4, fp32acc)",
    "unfold_matmul (bf16)",
    "unfold_matmul (f32)"
]
plot_rel_fro_error_vs_magnitude(
    results,
    names, 
    ref_tensors,
    title_suffix="\nReference=np64"
)

## Run a sweep over number of channels, compare dtypes and fp32 accum settings

In [None]:
in_channels_list = [32, 64, 128, 256, 512, 1024, 2048, 2560, 3072, 3584, 4096, 5120, 6144, 7168, 8192, 10240, 12288, 14336, 16384]
out_ch = 128  # Fixed output channels
batch = 1
height, width = 32, 32
kernel_h, kernel_w = 3, 3
stride, padding = 1, 1

tensor_lists = []
names_lists = []
ref_list = []

# Results for different configurations
bf16_hifi4_results = []
bf16_hifi4_fp32acc_results = []
bf16_hifi2_results = []
bf16_lofi_results = []
f32_hifi4_results = []
f32_hifi4_fp32acc_results = []
unfold_bf16_results = []
unfold_f32_results = []

device = None
for in_ch in in_channels_list:
    # Generate float64 tensors
    input_64, weight_64, bias_64 = generate_torch_conv_tensors(
        batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float64
    )
    input_bf16, weight_bf16, bias_bf16 = input_64.to(torch.bfloat16), weight_64.to(torch.bfloat16), bias_64.to(torch.bfloat16)
    input_f32, weight_f32, bias_f32 = input_64.to(torch.float32), weight_64.to(torch.float32), bias_64.to(torch.float32)
    
    # Compute numpy float64 reference
    input_64_np = input_64.numpy()
    weight_64_np = weight_64.numpy()
    bias_64_np = bias_64.numpy()
    np64_ref = run_np_conv2d(input_64_np, weight_64_np, bias_64_np, stride=stride, padding=padding)
    ref_list.append(np64_ref)
    
    # Results with different configurations (pass device to each call)
    bf16_hifi4_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                              dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    bf16_hifi4_fp32acc_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                                       dtype=ttnn.bfloat16, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    bf16_hifi2_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                              dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi2))
    bf16_lofi_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                             dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.LoFi))
    f32_hifi4_results.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=padding, 
                                             dtype=ttnn.float32, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    f32_hifi4_fp32acc_results.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=padding, 
                                                      dtype=ttnn.float32, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    unfold_bf16_results.append(run_conv2d_unfold_matmul(input_bf16, weight_bf16, bias_bf16, stride=stride, padding=padding))
    unfold_f32_results.append(run_conv2d_unfold_matmul(input_f32, weight_f32, bias_f32, stride=stride, padding=padding))


# Calculate K = in_channels * kernel_h * kernel_w for x-axis
k_vals = [ic * kernel_h * kernel_w for ic in in_channels_list]

tensor_lists.extend([bf16_hifi4_results, bf16_hifi4_fp32acc_results, bf16_hifi2_results, bf16_lofi_results,
                     f32_hifi4_results, f32_hifi4_fp32acc_results, unfold_bf16_results, unfold_f32_results])
names_lists.extend([
    "ttnn (bf16, HiFi4)",
    "ttnn (bf16, HiFi4, fp32acc)",
    "ttnn (bf16, HiFi2)",
    "ttnn (bf16, LoFi)",
    "ttnn (f32, HiFi4)",
    "ttnn (f32, HiFi4, fp32acc)",
    "unfold_matmul (bf16)",
    "unfold_matmul (f32)"
])

for norm_type in ["fro", "1", "inf", "spectral"]:
    plot_rel_error_vs_param(
        tensors=tensor_lists,
        names=names_lists,
        refs=ref_list,
        norm_type=norm_type,
        param_vals=k_vals,
        param_name="K (In_Channels × Kh × Kw)",
        logy=True,
        title_suffix=f"\nh={height}, w={width}, k={kernel_h}x{kernel_w}, Reference=np64",
        fig_size=(12, 7)
    )

In [None]:
# Dense sampling for K <= 10k (where K = in_channels × kernel_h × kernel_w)
# With kernel 3x3, K <= 10k means in_channels <= ~1111
in_channels_list = [9 * i for i in range(32, 1056, 64)]  # All multiples of 32 from 32 to 1024
out_ch = 128  # Fixed output channels
batch = 1
height, width = 32, 32
kernel_h, kernel_w = 1, 1
stride, padding = 1, 0

tensor_lists = []
names_lists = []
ref_list = []

# Results for different configurations
bf16_hifi4_results = []
bf16_hifi4_fp32acc_results = []
bf16_hifi2_results = []
bf16_lofi_results = []
f32_hifi4_results = []
f32_hifi4_fp32acc_results = []
unfold_bf16_results = []
unfold_f32_results = []

device = None
for in_ch in in_channels_list:
    # Generate float64 tensors
    input_64, weight_64, bias_64 = generate_torch_conv_tensors(
        batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float64
    )
    input_bf16, weight_bf16, bias_bf16 = input_64.to(torch.bfloat16), weight_64.to(torch.bfloat16), bias_64.to(torch.bfloat16)
    input_f32, weight_f32, bias_f32 = input_64.to(torch.float32), weight_64.to(torch.float32), bias_64.to(torch.float32)
    
    # Compute numpy float64 reference
    input_64_np = input_64.numpy()
    weight_64_np = weight_64.numpy()
    bias_64_np = bias_64.numpy()
    np64_ref = run_np_conv2d(input_64_np, weight_64_np, bias_64_np, stride=stride, padding=padding)
    ref_list.append(np64_ref)
    
    # Results with different configurations (pass device to each call)
    bf16_hifi4_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                              dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    bf16_hifi4_fp32acc_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
                                                       dtype=ttnn.bfloat16, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    # bf16_hifi2_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
    #                                           dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi2))
    # bf16_lofi_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=padding, 
    #                                          dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.LoFi))
    f32_hifi4_results.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=padding, 
                                             dtype=ttnn.float32, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    f32_hifi4_fp32acc_results.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=padding, 
                                                      dtype=ttnn.float32, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    unfold_bf16_results.append(run_conv2d_unfold_matmul(input_bf16, weight_bf16, bias_bf16, stride=stride, padding=padding))
    unfold_f32_results.append(run_conv2d_unfold_matmul(input_f32, weight_f32, bias_f32, stride=stride, padding=padding))


# Calculate K = in_channels * kernel_h * kernel_w for x-axis
k_vals = [ic * kernel_h * kernel_w for ic in in_channels_list]
print(f"K range: {min(k_vals)} to {max(k_vals)}")
print(f"Number of points: {len(k_vals)}")

# tensor_lists.extend([bf16_hifi4_results, bf16_hifi4_fp32acc_results, bf16_hifi2_results, bf16_lofi_results,
                    #  f32_hifi4_results, f32_hifi4_fp32acc_results, unfold_bf16_results, unfold_f32_results])
tensor_lists.extend([bf16_hifi4_results, bf16_hifi4_fp32acc_results,
                     f32_hifi4_results, f32_hifi4_fp32acc_results, unfold_bf16_results, unfold_f32_results])
names_lists.extend([
    "ttnn (bf16, HiFi4)",
    "ttnn (bf16, HiFi4, fp32acc)",
    # "ttnn (bf16, HiFi2)",
    # "ttnn (bf16, LoFi)",
    "ttnn (f32, HiFi4)",
    "ttnn (f32, HiFi4, fp32acc)",
    "unfold_matmul (bf16)",
    "unfold_matmul (f32)"
])

for norm_type in ["fro", "1", "inf", "spectral"]:
    plot_rel_error_vs_param(
        tensors=tensor_lists,
        names=names_lists,
        refs=ref_list,
        norm_type=norm_type,
        param_vals=k_vals,
        param_name="K (In_Channels × Kh × Kw)",
        logy=True,
        title_suffix=f"\nh={height}, w={width}, k={kernel_h}x{kernel_w}, Reference=np64",
        fig_size=(12, 7)
    )

In [None]:
# Dense sampling for K <= 10k (where K = in_channels × kernel_h × kernel_w)
# With kernel 3x3, K <= 10k means in_channels <= ~1111
in_channels_list = [1024]  # All multiples of 32 from 32 to 1024
out_ch = 32  # Fixed output channels
batch = 1
height, width = 32, 32
kernel_h, kernel_w = 3,3
stride, padding = 1, 1

tensor_lists = []
names_lists = []
ref_list = []

# Results for different configurations
bf16_hifi4_results = []
bf16_hifi4_fp32acc_results = []
bf16_hifi2_results = []
bf16_lofi_results = []
f32_hifi4_results = []
f32_hifi4_fp32acc_results = []
unfold_bf16_results = []
unfold_f32_results = []

device = None
for in_ch in in_channels_list:
    # Generate float64 tensors
    input_64, weight_64, bias_64 = generate_torch_conv_tensors(
        batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float64
    )
    input_bf16, weight_bf16, bias_bf16 = input_64.to(torch.bfloat16), weight_64.to(torch.bfloat16), bias_64.to(torch.bfloat16)
    input_f32, weight_f32, bias_f32 = input_64.to(torch.float32), weight_64.to(torch.float32), bias_64.to(torch.float32)
    
    # Compute numpy float64 reference
    input_64_np = input_64.numpy()
    weight_64_np = weight_64.numpy()
    bias_64_np = bias_64.numpy()
    np64_ref = run_np_conv2d(input_64_np, weight_64_np, bias_64_np, stride=stride, padding=padding)
    ref_list.append(np64_ref)
    
    # Results with different configurations (pass device to each call)
    f32_hifi4_results.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=padding, 
                                             dtype=ttnn.float32, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    unfold_bf16_results.append(run_conv2d_unfold_matmul(input_bf16, weight_bf16, bias_bf16, stride=stride, padding=padding))
    unfold_f32_results.append(run_conv2d_unfold_matmul(input_f32, weight_f32, bias_f32, stride=stride, padding=padding))


# Calculate K = in_channels * kernel_h * kernel_w for x-axis
k_vals = [ic * kernel_h * kernel_w for ic in in_channels_list]
print(f"K range: {min(k_vals)} to {max(k_vals)}")
print(f"Number of points: {len(k_vals)}")

tensor_lists.extend([
                     f32_hifi4_results, unfold_bf16_results, unfold_f32_results])
names_lists.extend([
    "ttnn (f32, HiFi4)",
    "unfold_matmul (bf16)",
    "unfold_matmul (f32)"
])

for norm_type in ["fro", "1", "inf", "spectral"]:
    plot_rel_error_vs_param(
        tensors=tensor_lists,
        names=names_lists,
        refs=ref_list,
        norm_type=norm_type,
        param_vals=k_vals,
        param_name="K (In_Channels × Kh × Kw)",
        logy=True,
        title_suffix=f"\nh={height}, w={width}, k={kernel_h}x{kernel_w}, Reference=np64",
        fig_size=(12, 7)
    )

## Run a sweep over kernel sizes, compare dtypes, fp32 accum, and matmul precisions

In [None]:
kernel_sizes = [(1, 1), (3, 3), (5, 5), (7, 7)]
matmul_precisions = ["high"]
batch, in_ch, out_ch = 1, 64, 128
height, width = 32, 32
stride, padding = 1, 1

# Create device once (like conftest provides in test_conv2d.py)
device = None

tensor_lists = []
names_lists = []
ref_list = []

# Results for different configurations
bf16_hifi4_results = []
bf16_hifi4_fp32acc_results = []
bf16_hifi2_results = []
bf16_lofi_results = []
f32_hifi4_results = []
f32_hifi4_fp32acc_results = []
unfold_bf16_results_by_precision = {prec: [] for prec in matmul_precisions}
unfold_f32_results_by_precision = {prec: [] for prec in matmul_precisions}

for kernel_h, kernel_w in kernel_sizes:
    pad_h = kernel_h // 2 if padding == 1 else padding
    pad_w = kernel_w // 2 if padding == 1 else padding
    
    # Generate float64 tensors
    input_64, weight_64, bias_64 = generate_torch_conv_tensors(
        batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float64
    )
    input_bf16, weight_bf16, bias_bf16 = input_64.to(torch.bfloat16), weight_64.to(torch.bfloat16), bias_64.to(torch.bfloat16)
    input_f32, weight_f32, bias_f32 = input_64.to(torch.float32), weight_64.to(torch.float32), bias_64.to(torch.float32)
    
    # Compute numpy float64 reference
    input_64_np = input_64.numpy()
    weight_64_np = weight_64.numpy()
    bias_64_np = bias_64.numpy()
    np64_ref = run_np_conv2d(input_64_np, weight_64_np, bias_64_np, stride=stride, padding=(pad_h, pad_w))
    ref_list.append(np64_ref)
    
    # TTNN results with different dtypes, fp32 accum, and math fidelity (pass device to each call)
    bf16_hifi4_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=(pad_h, pad_w), 
                                              dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    bf16_hifi4_fp32acc_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=(pad_h, pad_w), 
                                                       dtype=ttnn.bfloat16, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    bf16_hifi2_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=(pad_h, pad_w), 
                                              dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi2))
    bf16_lofi_results.append(run_ttnn_conv2d(input_bf16, weight_bf16, bias_bf16, device, stride=stride, padding=(pad_h, pad_w), 
                                             dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.LoFi))
    f32_hifi4_results.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=(pad_h, pad_w), 
                                             dtype=ttnn.float32, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4))
    f32_hifi4_fp32acc_results.append(run_ttnn_conv2d(input_f32, weight_f32, bias_f32, device, stride=stride, padding=(pad_h, pad_w), 
                                                      dtype=ttnn.float32, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4))
    
    # Unfold results for different precisions and dtypes
    for prec in matmul_precisions:
        unfold_bf16_results_by_precision[prec].append(
            run_conv2d_unfold_matmul(input_bf16, weight_bf16, bias_bf16, stride=stride, padding=(pad_h, pad_w), matmul_precision=prec)
        )
        unfold_f32_results_by_precision[prec].append(
            run_conv2d_unfold_matmul(input_f32, weight_f32, bias_f32, stride=stride, padding=(pad_h, pad_w), matmul_precision=prec)
        )


# Add TTNN results
tensor_lists.extend([bf16_hifi4_results, bf16_hifi4_fp32acc_results, bf16_hifi2_results, bf16_lofi_results,
                     f32_hifi4_results, f32_hifi4_fp32acc_results])
names_lists.extend([
    "ttnn (bf16, HiFi4)",
    "ttnn (bf16, HiFi4, fp32acc)",
    "ttnn (bf16, HiFi2)",
    "ttnn (bf16, LoFi)",
    "ttnn (f32, HiFi4)",
    "ttnn (f32, HiFi4, fp32acc)"
])

# Add unfold results for different precisions and dtypes
for prec in matmul_precisions:
    tensor_lists.append(unfold_bf16_results_by_precision[prec])
    names_lists.append(f"unfold_matmul (bf16, {prec})")
for prec in matmul_precisions:
    tensor_lists.append(unfold_f32_results_by_precision[prec])
    names_lists.append(f"unfold_matmul (f32, {prec})")

for norm_type in ["fro", "1", "inf", "spectral"]:
    plot_rel_error_vs_param(
        tensors=tensor_lists,
        names=names_lists,
        refs=ref_list,
        norm_type=norm_type,
        param_vals=[f"{kh}x{kw}" for kh, kw in kernel_sizes],
        param_name="Kernel Size",
        logy=True,
        title_suffix=f"\nbatch={batch}, in_ch={in_ch}, out_ch={out_ch}, h={height}, w={width}, Reference=np64",
        fig_size=(14, 8)
    )

## ULP Analysis: ttnn.conv2d vs unfold+matmul across Inner Dimensions

In [None]:
def clamped_randn(*args, **kwargs):
    return torch.clamp(torch.randn(*args, **kwargs), 0,1)

In [None]:
# Sweep over inner dimensions (K = in_channels × kernel_h × kernel_w)
# Target: K from ~288 to ~100,000
# With kernel 3x3 (K=9 per channel), need in_channels from 32 to ~11,104 to reach K=100k

# Generate input channels list (multiples of 32) to reach inner_dim ~100k
kernel_h, kernel_w = 3, 3

# Dense sampling with multiples of 32
in_channels_list = [32,64,128,256,512,1024,2048,4096,6144,8192,10240]
# in_channels_list = [32,64,128,256,512,1024] + [1024*i for i in range(2,16)]

print(f"Input channels range: {min(in_channels_list)} to {max(in_channels_list)}")
print(f"Number of test points: {len(in_channels_list)}")

# Fixed parameters
out_ch = 32
batch = 1
height, width = 32, 32
stride, padding = 1, 1
num_iterations = 40  # Number of iterations per configuration to reduce flakiness
use_bias = False  # Set to True to include bias, False to exclude bias

# Storage for results
inner_dims = []
mean_ulp_bf16 = []
max_ulp_bf16 = []
mean_ulp_bf16_fp32acc = []
max_ulp_bf16_fp32acc = []
# mean_ulp_f32_fp32acc = []
# max_ulp_f32_fp32acc = []

device = None

bias_str = "with bias" if use_bias else "without bias"
print(f"\nRunning ULP analysis {bias_str} with {num_iterations} iterations per configuration...")

import sys

for idx, in_ch in enumerate(in_channels_list):
    inner_dim = in_ch * kernel_h * kernel_w
    inner_dims.append(inner_dim)
    
    # Collect ULP statistics across multiple iterations
    iter_mean_ulp_bf16 = []
    iter_max_ulp_bf16 = []
    iter_mean_ulp_bf16_fp32acc = []
    iter_max_ulp_bf16_fp32acc = []
    # iter_mean_ulp_f32_fp32acc = []
    #   iter_max_ulp_f32_fp32acc = []
    
    for iteration in range(num_iterations):
        print(f"Progress: {iteration + 1}/{num_iterations} - K={inner_dim}",flush=True)
        # Generate test tensors (new random tensors for each iteration)
        input_f32, weight_f32, bias_f32 = generate_torch_conv_tensors(
            batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float32,1, torch.rand
        )
        input_bf16 = input_f32.to(torch.bfloat16)
        weight_bf16 = weight_f32.to(torch.bfloat16)
        bias_bf16 = bias_f32.to(torch.bfloat16) if use_bias else None
        bias_f32 = bias_f32 if use_bias else None
        
        # Compute reference using conv2d_unfold_matmul (torch implementation with medium precision for bf16)
        ref_unfold_matmul = run_conv2d_unfold_matmul(
            input_bf16, weight_bf16, bias_bf16.reshape(-1) if use_bias else None, 
            stride=stride, padding=padding
        )
        # # For f32, need f32 reference
        # ref_unfold_matmul_f32 = run_conv2d_unfold_matmul(
        #     input_f32, weight_f32, bias_f32.reshape(-1) if use_bias else None, 
        #     stride=stride, padding=padding, 
        #     matmul_precision="highest"
        # )
        
        # Run ttnn.conv2d with different configurations
        # 1. bf16, HiFi4, no fp32_accum
        ttnn_bf16 = run_ttnn_conv2d(
            input_bf16, weight_bf16, bias_bf16, device, 
            stride=stride, padding=padding, 
            dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4
        )
        
        # 2. bf16, HiFi4, with fp32_accum
        ttnn_bf16_fp32acc = run_ttnn_conv2d(
            input_bf16, weight_bf16, bias_bf16, device, 
            stride=stride, padding=padding, 
            dtype=ttnn.bfloat16, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4
        )
        
        # # 3. f32, HiFi4, with fp32_accum
        # ttnn_f32_fp32acc = run_ttnn_conv2d(
        #     input_f32, weight_f32, bias_f32, device, 
        #     stride=stride, padding=padding, 
        #     dtype=ttnn.float32, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4
        # )
        
        # Compute ULP errors (comparing ttnn.conv2d to unfold+matmul reference)
        ulp_bf16 = ulp_error(ttnn_bf16, ref_unfold_matmul)
        ulp_bf16_fp32acc = ulp_error(ttnn_bf16_fp32acc, ref_unfold_matmul)
        
        # ulp_f32_fp32acc = ulp_error(ttnn_f32_fp32acc, ref_unfold_matmul_f32)
        
        # Store this iteration's ULP statistics
        iter_mean_ulp_bf16.append(ulp_bf16.mean().item())
        iter_max_ulp_bf16.append(ulp_bf16.max().item())
        iter_mean_ulp_bf16_fp32acc.append(ulp_bf16_fp32acc.mean().item())
        iter_max_ulp_bf16_fp32acc.append(ulp_bf16_fp32acc.max().item())
        # iter_mean_ulp_f32_fp32acc.append(ulp_f32_fp32acc.mean().item())
        # iter_max_ulp_f32_fp32acc.append(ulp_f32_fp32acc.max().item())
    
    # Average ULP statistics across all iterations
    mean_ulp_bf16.append(np.mean(iter_mean_ulp_bf16))
    max_ulp_bf16.append(np.mean(iter_max_ulp_bf16))
    mean_ulp_bf16_fp32acc.append(np.mean(iter_mean_ulp_bf16_fp32acc))
    max_ulp_bf16_fp32acc.append(np.mean(iter_max_ulp_bf16_fp32acc))
    # mean_ulp_f32_fp32acc.append(np.mean(iter_mean_ulp_f32_fp32acc))
    # max_ulp_f32_fp32acc.append(np.mean(iter_max_ulp_f32_fp32acc))

print("\nULP analysis complete!")

# Plot Mean ULP vs Inner Dimension
plt.figure(figsize=(14, 6))

plt.subplot(1, 2, 1)
plt.plot(inner_dims, mean_ulp_bf16, 'o-', label='bf16, HiFi4', linewidth=2, markersize=4)
plt.plot(inner_dims, mean_ulp_bf16_fp32acc, 's-', label='bf16, HiFi4, fp32acc', linewidth=2, markersize=4)
# plt.plot(inner_dims, mean_ulp_f32_fp32acc, '^-', label='f32, HiFi4, fp32acc', linewidth=2, markersize=4)
plt.xlabel('Inner Dimension K (in_channels × kH × kW)', fontsize=12)
plt.ylabel(f'Mean ULP Error (avg of {num_iterations} runs)', fontsize=12)
plt.title(f'Mean ULP: ttnn.conv2d vs unfold+matmul ({bias_str})', fontsize=13)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=10)
plt.tight_layout()

# Plot Max ULP vs Inner Dimension
plt.subplot(1, 2, 2)
plt.plot(inner_dims, max_ulp_bf16, 'o-', label='bf16, HiFi4', linewidth=2, markersize=4)
plt.plot(inner_dims, max_ulp_bf16_fp32acc, 's-', label='bf16, HiFi4, fp32acc', linewidth=2, markersize=4)
# plt.plot(inner_dims, max_ulp_f32_fp32acc, '^-', label='f32, HiFi4, fp32acc', linewidth=2, markersize=4)
plt.xlabel('Inner Dimension K (in_channels × kH × kW)', fontsize=12)
plt.ylabel(f'Max ULP Error (avg of {num_iterations} runs)', fontsize=12)
plt.title(f'Max ULP: ttnn.conv2d vs unfold+matmul ({bias_str})', fontsize=13)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=10)
plt.tight_layout()

bias_suffix = "_with_bias" if use_bias else "_no_bias"
plt.savefig(f'output_conv_ulp{bias_suffix}.png', dpi=150, bbox_inches='tight')
plt.show()

# Print statistics
print(f"\n{'='*120}")
print(f"ULP Statistics Per K Value (averaged over {num_iterations} iterations, {bias_str})")
print(f"{'='*120}")
print(f"Configuration: out_ch={out_ch}, kernel={kernel_h}x{kernel_w}, h={height}, w={width}")
print(f"\n{'K':<10} {'bf16 Mean':<12} {'bf16 Max':<12} {'bf16+fp32acc Mean':<18} {'bf16+fp32acc Max':<18} {'f32+fp32acc Mean':<18} {'f32+fp32acc Max':<18}")
print(f"{'-'*120}")
for k, m_bf16, mx_bf16, m_bf16_fp32, mx_bf16_fp32 in zip(
    inner_dims, mean_ulp_bf16, max_ulp_bf16, 
    mean_ulp_bf16_fp32acc, max_ulp_bf16_fp32acc, 
    # mean_ulp_f32_fp32acc, max_ulp_f32_fp32acc
):
    print(f"{k:<10} {m_bf16:<12.2f} {mx_bf16:<12.2f} {m_bf16_fp32:<18.2f} {mx_bf16_fp32:<18.2f}")

print(f"\n{'='*120}")
print(f"Overall Summary Statistics")
print(f"{'='*120}")
print(f"Inner dimension range: {min(inner_dims)} to {max(inner_dims)}")
print(f"\n{'Config':<30} {'Mean ULP (avg)':<20} {'Max ULP (avg)':<20}")
print(f"{'-'*70}")
print(f"{'bf16, HiFi4':<30} {np.mean(mean_ulp_bf16):>8.2f} {np.mean(max_ulp_bf16):>20.2f}")
print(f"{'bf16, HiFi4, fp32acc':<30} {np.mean(mean_ulp_bf16_fp32acc):>8.2f} {np.mean(max_ulp_bf16_fp32acc):>20.2f}")
# print(f"{'f32, HiFi4, fp32acc':<30} {np.mean(mean_ulp_f32_fp32acc):>8.2f} {np.mean(max_ulp_f32_fp32acc):>20.2f}")
print(f"{'='*120}")

In [None]:
# PCC Analysis: ttnn.conv2d vs unfold+matmul across Inner Dimensions
# Sweep over inner dimensions (K = in_channels × kernel_h × kernel_w)

from models.common.utility_functions import comp_pcc

# Generate input channels list (multiples of 32) to reach inner_dim ~100k
kernel_h, kernel_w = 3, 3

# Dense sampling with multiples of 32
in_channels_list = [32,64,128,256,512,1024] + [1024*i for i in range(2,16)]
print(f"Input channels range: {min(in_channels_list)} to {max(in_channels_list)}")
print(f"Number of test points: {len(in_channels_list)}")

# Fixed parameters
out_ch = 32
batch = 1
height, width = 32, 32
stride, padding = 1, 1
num_iterations = 5  # Number of iterations per configuration to reduce flakiness
use_bias = False  # Set to True to include bias, False to exclude bias

# Storage for results
inner_dims = []
pcc_bf16 = []
pcc_bf16_fp32acc = []
pcc_f32_fp32acc = []
pcc_f32 = []
device = None

bias_str = "with bias" if use_bias else "without bias"
print(f"\nRunning PCC analysis {bias_str} with {num_iterations} iterations per configuration...")

import sys

for idx, in_ch in enumerate(in_channels_list):
    inner_dim = in_ch * kernel_h * kernel_w
    inner_dims.append(inner_dim)
    
    # Collect PCC statistics across multiple iterations
    iter_pcc_bf16 = []
    iter_pcc_f32 = []
    iter_pcc_bf16_fp32acc = []
    iter_pcc_f32_fp32acc = []
    for iteration in range(num_iterations):
        print(f"Progress: {iteration + 1}/{num_iterations} - K={inner_dim}",flush=True)
        # Generate test tensors (new random tensors for each iteration)
        input_f32, weight_f32, bias_f32 = generate_torch_conv_tensors(
            batch, in_ch, out_ch, height, width, kernel_h, kernel_w, torch.float32, 1, torch.rand,0
        )
        input_bf16 = input_f32.to(torch.bfloat16)
        weight_bf16 = weight_f32.to(torch.bfloat16)
        bias_bf16 = bias_f32.to(torch.bfloat16) if use_bias else None
        bias_f32 = bias_f32 if use_bias else None
        
        # Compute reference using conv2d_unfold_matmul (torch implementation)
        ref_unfold_matmul_f32 = run_conv2d_unfold_matmul(
            input_f32, weight_f32, bias_f32.reshape(-1) if use_bias else None, 
            stride=stride, padding=padding
        )
        
        # Run ttnn.conv2d with different configurations
        # 1. bf16, HiFi4, no fp32_accum
        ttnn_bf16 = run_ttnn_conv2d(
            input_bf16, weight_bf16, bias_bf16, device, 
            stride=stride, padding=padding, 
            dtype=ttnn.bfloat16, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4
        )
        
        # 2. bf16, HiFi4, with fp32_accum
        ttnn_bf16_fp32acc = run_ttnn_conv2d(
            input_bf16, weight_bf16, bias_bf16, device, 
            stride=stride, padding=padding, 
            dtype=ttnn.bfloat16, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4
        )
        
        # 3. f32, HiFi4, with fp32_accum
        ttnn_f32_fp32acc = run_ttnn_conv2d(
            input_f32, weight_f32, bias_f32, device, 
            stride=stride, padding=padding, 
            dtype=ttnn.float32, fp32_accum=True, math_fidelity=ttnn.MathFidelity.HiFi4
        )

        ttnn_f32 = run_ttnn_conv2d(
            input_f32, weight_f32, bias_f32, device, 
            stride=stride, padding=padding, 
            dtype=ttnn.float32, fp32_accum=False, math_fidelity=ttnn.MathFidelity.HiFi4
        )
        
        # Compute PCC (comparing ttnn.conv2d to unfold+matmul reference)
        # comp_pcc returns (pass_bool, pcc_value)
        _, pcc_bf16_val = comp_pcc(ref_unfold_matmul_f32, ttnn_bf16)
        _, pcc_bf16_fp32acc_val = comp_pcc(ref_unfold_matmul_f32, ttnn_bf16_fp32acc)
        _, pcc_f32_val = comp_pcc(ref_unfold_matmul_f32, ttnn_f32)
        _, pcc_f32_fp32acc_val = comp_pcc(ref_unfold_matmul_f32, ttnn_f32_fp32acc)
        
        # Store this iteration's PCC values
        iter_pcc_bf16.append(pcc_bf16_val)
        iter_pcc_bf16_fp32acc.append(pcc_bf16_fp32acc_val)
        iter_pcc_f32.append(pcc_f32_val)
        iter_pcc_f32_fp32acc.append(pcc_f32_fp32acc_val)
    
    # Average PCC across all iterations
    pcc_bf16.append(np.mean(iter_pcc_bf16))
    pcc_bf16_fp32acc.append(np.mean(iter_pcc_bf16_fp32acc))
    pcc_f32.append(np.mean(iter_pcc_f32))
    pcc_f32_fp32acc.append(np.mean(iter_pcc_f32_fp32acc))

print("\nPCC analysis complete!")

# Plot PCC vs Inner Dimension
plt.figure(figsize=(12, 6))

plt.plot(inner_dims, pcc_bf16, 'o-', label='bf16, HiFi4', linewidth=2, markersize=4)
plt.plot(inner_dims, pcc_bf16_fp32acc, 's-', label='bf16, HiFi4, fp32acc', linewidth=2, markersize=4)
plt.plot(inner_dims, pcc_f32_fp32acc, '^-', label='f32, HiFi4, fp32acc', linewidth=2, markersize=4)
plt.plot(inner_dims, pcc_f32, 'v-', label='f32, HiFi4', linewidth=2, markersize=4)
plt.xlabel('Inner Dimension K (in_channels × kH × kW)', fontsize=12)
plt.ylabel(f'PCC (avg of {num_iterations} runs)', fontsize=12)
plt.title(f'PCC: ttnn.conv2d vs unfold+matmul ({bias_str})', fontsize=13)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=10)
plt.tight_layout()

bias_suffix = "_with_bias" if use_bias else "_no_bias"
plt.savefig(f'output_conv_pcc{bias_suffix}.png', dpi=150, bbox_inches='tight')
plt.show()

# Print statistics
print(f"\n{'='*80}")
print(f"PCC Statistics Per K Value (averaged over {num_iterations} iterations, {bias_str})")
print(f"{'='*80}")
print(f"Configuration: out_ch={out_ch}, kernel={kernel_h}x{kernel_w}, h={height}, w={width}")
print(f"\n{'K':<10} {'bf16 PCC':<15} {'bf16+fp32acc PCC':<20} {'f32+fp32acc PCC':<20} {'f32 PCC':<20}")
print(f"{'-'*80}")
for k, pcc_bf16_val, pcc_bf16_fp32_val, pcc_f32_val, pcc_f32_fp32_val,pcc_f32_val in zip(
    inner_dims, pcc_bf16, pcc_bf16_fp32acc, pcc_f32_fp32acc, pcc_f32
):
    print(f"{k:<10} {pcc_bf16_val:<15.8f} {pcc_bf16_fp32_val:<20.8f} {pcc_f32_val:<20.8f} {pcc_f32_fp32_val:<20.8f}")

print(f"\n{'='*80}")
print(f"Overall Summary Statistics")
print(f"{'='*80}")
print(f"Inner dimension range: {min(inner_dims)} to {max(inner_dims)}")
print(f"\n{'Config':<30} {'Mean PCC':<20} {'Min PCC':<20}")
print(f"{'-'*70}")
print(f"{'bf16, HiFi4':<30} {np.mean(pcc_bf16):>8.8f} {np.min(pcc_bf16):>20.8f}")
print(f"{'bf16, HiFi4, fp32acc':<30} {np.mean(pcc_bf16_fp32acc):>8.8f} {np.min(pcc_bf16_fp32acc):>20.8f}")
print(f"{'f32, HiFi4, fp32acc':<30} {np.mean(pcc_f32_fp32acc):>8.8f} {np.min(pcc_f32_fp32acc):>20.8f}")
print(f"{'f32, HiFi4':<30} {np.mean(pcc_f32):>8.8f} {np.min(pcc_f32):>20.8f}")
print(f"{'='*80}")