# Coverage Rate of Built-In Intervals

This notebook includes a simulation study to evaluate the coverage rates of our proposed built-in intervals. Each intervals are designed to capture:
1. Confidence Intervals: Ground truth $f(x)$
2. Prediction Intervals: Next observation $f(x)+\varepsilon$.
3. Reproduction Intervals: Next prediction $\widehat{f}(x)$
4. Conformal Prediction Intervals: Next observation

To evaluate the performance of proposed intervals, we train `reps` number of models and we sample a set of test points. We offer two ways of calculating the average coverage rates of an interval:

1. We first compute the coverage at each test point by averaging the boolean `covered` over models. Then we average the coverage rates to get an average coverage rate.
2. We first compute the coverage rate using the average of the boolean `covered` of a single model over all test points. Then we take average of that and output the average coverage rate.

Empirically the second coverage rates are much better than the first one. The rationale is as follow: Some points are just hard to be captured by our models. So averaging over models first will increase the weight of those 'bad' points.

## Helper Cells

In [None]:
import sys, os
sys.path.insert(0, os.path.abspath("../src"))
import numpy as np
import pandas as pd
import os
import glob
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Prevent ptitprince from raising cmap errors
cm.register_cmap = lambda *args, **kwargs: None
from matplotlib.ticker import FormatStrFormatter
from tqdm import tqdm
from itertools import product
from BRAT.algorithms import BRATD
from BRAT.utils import generate_data, find_min_scale
import ptitprince as pt

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
plt.style.use('../matplotlibrc')



In [None]:
# Parallelizing the simulation
mp.set_start_method('fork', force=True)

In [None]:
# calibrating the intervals
def calibrate_scale(widths_cal, y_cal, y_pred_cal, alpha=0.05):
    """
    Two-stage search for global c so that empirical coverage ≈ 1-alpha.
    """
    n = len(y_cal)
    def cov(c):
        lo = y_pred_cal - c * widths_cal
        hi = y_pred_cal + c * widths_cal
        return np.mean((y_cal >= lo) & (y_cal <= hi))

    # exponential search
    c = 1.0
    if cov(c) < 1 - alpha:
        while cov(c) < 1 - alpha:
            c *= 2
    C = c

    # binary refine on [0, C]
    lo, hi = 0.0, C
    tol = 2.0 / n
    while hi - lo > tol:
        mid = (lo + hi) / 2
        if cov(mid) < 1 - alpha:
            lo = mid
        else:
            hi = mid
    return hi

## Coverage Rates

The cell below run the simulation on a Friedman function:

$$
y=10\sin(\pi x_1\cdot x_2) + 20(x_3-0.5)^2 + 10x_4 + 5x_5 + \varepsilon
$$

All related data are stored to `./coverage/data/`

In [None]:
# During previous experiments, we explore around some combinations of hps to find patterns. Hence the grid search.
# Some patterns we found: 
# 1. What leads to a wide interval: Low learning rate, low dropout rate, low subsampling rate.
# 2. What leads to a narrow interval: Undersmoothing (the contrary)

# Hyperparameter grid. Add parameters if you'd like to explore around them!
lr_values = [0.6]
dr_values = [0.3]
sr_values = [0.8]
max_depth_values = [4]

# Data generation configurations
num_test_pts = 200
rep = 30
epoch = 200
Nystrom_subsample = 0.1
in_bag = False
function = 'friedman1'

# Create directories for saving results
os.makedirs("../experiments/coverage/data", exist_ok=True)
os.makedirs("../experiments/coverage/plots", exist_ok=True)

# Iterate over all combinations of hyperparameters
for idx, (lr, dr, sr, max_depth) in enumerate(product(lr_values, dr_values, sr_values, max_depth_values)):
    # Data generation
    X_train, y_train, X_test, y_test, y_test_true, X_cal, y_cal = generate_data(
        function_type=function, 
        n_train=1000, 
        n_test=200,
        n_calibration=200, 
        noise_std=0.1,
        seed=idx
    )
    test_points = X_test[:num_test_pts]
    y_true = y_test_true[:num_test_pts]
    y_obs = y_test[:num_test_pts]

    # Training repetitions
    def training_rep(k):
        bratd = BRATD(
            n_estimators=epoch,
            learning_rate=lr,
            subsample_rate=sr,
            dropout_rate=dr,
            max_depth=max_depth,
            min_samples_split=2,
            disable_tqdm=True
        )
        
        # Data generation
        X_train, y_train, X_test, y_test, y_test_true, X_cal, y_cal = generate_data(
            function_type=function, 
            n_train=1000, 
            n_test=200,
            n_calibration=200, 
            noise_std=0.1, 
            seed=k
        ) 
        test_points = X_test[:num_test_pts]
        y_true = y_test_true[:num_test_pts]
        y_obs = y_test[:num_test_pts]
        
        bratd.fit(X_train, y_train, X_test, y_test)
        _, _, _ = bratd.unif_nystrom(Nystrom_subsample)
        bratd.sketch_K()
        sigma_hat2 = bratd.est_sigma_hat2(in_bag)
        sigma_hat = np.sqrt(sigma_hat2)
        lam = bratd.learning_rate
        q = 1 - bratd.dropout_rate
        s = (1 + lam * q) / lam

        # calibration
        y_pred_cal = []
        tau_cal    = []
        pi_sigma_cal = []
        for x_cal in X_cal:
            y_pc = bratd.predict(x_cal.reshape(1, -1)).item()
            rn_c = bratd.sketch_r(x_cal.reshape(1, -1), vector=False).item()
            t_c  = s * rn_c * sigma_hat
            p_c  = np.sqrt(sigma_hat2 + t_c**2)

            y_pred_cal.append(y_pc)
            tau_cal.append(t_c)
            pi_sigma_cal.append(p_c)

        y_pred_cal    = np.array(y_pred_cal)    # shape (n_cal,)
        widths_pi_cal = np.array(pi_sigma_cal)  # shape (n_cal,)

        # 2) Calibrate once per interval type
        c = calibrate_scale(widths_pi_cal, y_cal, y_pred_cal)

        for i, x in enumerate(test_points):
            y_pred = bratd.predict(x.reshape(1, -1))
            rn_norm = bratd.sketch_r(x.reshape(1, -1))
            tau_hat = s * rn_norm * sigma_hat
            pi_sigma = np.sqrt(sigma_hat2 + tau_hat**2)

            y_pred_cal = bratd.predict(X_cal)

            ci_lower = y_pred - tau_hat
            ci_upper = y_pred + tau_hat
            ci_width = ci_upper - ci_lower
            ci_covered = int(ci_lower.item() <= y_true[i] <= ci_upper.item())

            pi_lower = y_pred - pi_sigma
            pi_upper = y_pred + pi_sigma
            pi_width = pi_upper - pi_lower
            ri_lower = y_pred - np.sqrt(2) * tau_hat
            ri_upper = y_pred + np.sqrt(2) * tau_hat
            ri_width = ri_upper - ri_lower

            y_pred_cal = bratd.predict(X_cal)
            res_cal = np.abs(y_pred_cal - y_cal)
            qtle_hat = np.quantile(res_cal, 1 - 0.05)
            cfml_lower = y_pred - qtle_hat
            cfml_upper = y_pred + qtle_hat
            cfml_covered = int(cfml_lower.item() <= y_obs[i] <= cfml_upper.item())
            pi_covered = int(pi_lower.item() <= y_obs[i] <= pi_upper.item())

            ci_lower_scaled, ci_upper_scaled = y_pred- c * ci_width/2, y_pred + c * ci_width/2
            pi_lower_scaled, pi_upper_scaled = y_pred- c * pi_width/2, y_pred + c * pi_width/2
            ri_lower_scaled, ri_upper_scaled = y_pred- c * ri_width/2, y_pred + c * ri_width/2
            
            return {
                "model_idx": k,
                "pt_idx": i,
                "y_true": y_true[i].item(),
                "y_obs": y_obs[i].item(),
                "y_pred": y_pred.item(),
                "bias": y_pred.item() - y_true[i].item(),
                "calibration": c,
                "ci_lower": ci_lower.item(),
                "ci_upper": ci_upper.item(),
                "ci_width": ci_upper.item() - ci_lower.item(),
                "ci_covered": ci_covered,
                "ci_lower_scaled": ci_lower_scaled.item(),
                "ci_upper_scaled": ci_upper_scaled.item(),
                "ci_width_scaled": ci_upper_scaled.item() - ci_lower_scaled.item(),
                "ci_covered_scaled": ci_lower_scaled.item() <= y_obs[i] <= ci_upper_scaled.item(),
                "pi_lower": pi_lower.item(),
                "pi_upper": pi_upper.item(),
                "pi_width": pi_upper.item() - pi_lower.item(),
                "pi_covered": pi_covered,
                "pi_lower_scaled": pi_lower_scaled.item(),
                "pi_upper_scaled": pi_upper_scaled.item(),
                "pi_width_scaled": pi_upper_scaled.item() - pi_lower_scaled.item(),
                "pi_covered_scaled": pi_lower_scaled.item() <= y_obs[i] <= pi_upper_scaled.item(),
                "ri_lower": ri_lower.item(),
                "ri_upper": ri_upper.item(),
                "ri_width": ri_upper.item() - ri_lower.item(),
                "ri_lower_scaled": ri_lower_scaled.item(),
                "ri_upper_scaled": ri_upper_scaled.item(),
                "ri_width_scaled": ri_upper_scaled.item() - ri_lower_scaled.item(),
                "cfml_lower": cfml_lower.item(),
                "cfml_upper": cfml_upper.item(),
                "cfml_width": cfml_upper.item() - cfml_lower.item(),
                "cfml_covered": cfml_covered,
                "rn_norm": (rn_norm.item() if type(rn_norm) == np.ndarray else rn_norm),
                "sigma_hat": sigma_hat.item(),
                "tau_hat": tau_hat.item(),
                "pi_sigma": pi_sigma.item()
            }

    # parallelize training process. 
    with ProcessPoolExecutor(max_workers=1) as executor:
        result = tqdm(executor.map(training_rep, range(rep)), 
                      desc=f"Training BRATD (lr={lr}, dr={dr}, sr={sr}, max_depth={max_depth})", total=rep)
        rows = list(result)
        executor.shutdown()
        

    df = pd.DataFrame(rows)

    # ri coverage rate
    for k in range(rep):
        for i in range(num_test_pts):
            current_row = df[(df["model_idx"] == k) & (df["pt_idx"] == i)]
            if current_row.empty:
                continue

            ri_lower = current_row["ri_lower"].values[0]
            ri_upper = current_row["ri_upper"].values[0]
            ri_lower_scaled = current_row["ri_lower_scaled"].values[0]
            ri_upper_scaled = current_row["ri_upper_scaled"].values[0]

            other_models = df[(df["model_idx"] != k) & (df["pt_idx"] == i)]
            ri_cov_count = 0
            ri_scaled_cov_count = 0

            for _, row in other_models.iterrows():
                y_pred_other = row["y_pred"]
                if ri_lower <= y_pred_other <= ri_upper:
                    ri_cov_count += 1
                if ri_lower_scaled <= y_pred_other <= ri_upper_scaled:
                    ri_scaled_cov_count += 1

            df.loc[(df["model_idx"] == k) & (df["pt_idx"] == i), "ri_coverage"] = ri_cov_count / (rep - 1)
            df.loc[(df["model_idx"] == k) & (df["pt_idx"] == i), "ri_coverage_scaled"] = ri_scaled_cov_count / (rep - 1)

    # mean coverage rate
    mean_coverage = df.groupby("pt_idx").agg(
        ci_coverage_mean=("ci_covered", "mean"),
        ci_scaled_coverage_mean=("ci_covered_scaled", "mean"),
        pi_coverage_mean=("pi_covered", "mean"),
        pi_scaled_coverage_mean=("pi_covered_scaled", "mean"),
        ri_coverage_mean=("ri_coverage", "mean"),
        ri_scaled_coverage_mean=("ri_coverage_scaled", "mean"),
        conf_coverage_mean=("cfml_covered", "mean"),
    ).reset_index()


    # merge the mean coverage rate with the original dataframe
    df = df.merge(mean_coverage, on="pt_idx", how="left")

    # Save results
    if not os.path.exists('./coverage/data/'):
        os.makedirs('./coverage/data/')
    output_filename = f"npts_{num_test_pts}_rep_{rep}_epo_{epoch}_lr_{lr}_sr_{sr}_dr_{dr}_md_{max_depth}_nys_{Nystrom_subsample}_in_bag_{in_bag}"
    output_path = os.path.join(f"./coverage/data/", f"{output_filename}.parquet")
    df.to_parquet(output_path)

The cell below plots the simulated resulst. In particular, it produces the rainclouds plot of interval coverage rate distribution, interval width distribution. We defautly used the calibrated intervals here.The first two rows are computed by averaging over points for an individual model, whereas the last two rows are computed by averaging over models for a individual test point.

In [None]:
#–– Palette & interval-to-label maps ––
palette = {
    "Built-In Confidence Interval": "#00BEFF",
    "Built-In Prediction Interval": "#F8766D",
    "Built-In Reproduction Interval": "#7CAE00",
    "Conformal Prediction Interval": "#C77CFF"
}

coverage_columns = {
    "ci_covered": "Built-In Confidence Interval",
    "pi_covered": "Built-In Prediction Interval",
    "ri_coverage": "Built-In Reproduction Interval",
    "cfml_covered": "Conformal Prediction Interval"
}

"""
# scaled coverage columns
coverage_columns = {
    "ci_covered_scaled": "Built-In Confidence Interval",
    "pi_covered_scaled": "Built-In Prediction Interval",
    "ri_coverage_scaled": "Built-In Reproduction Interval",
    "cfml_covered": "Conformal Prediction Interval"
}
"""

width_columns = {
    "ci_width":   "Built-In Confidence Interval",
    "pi_width":   "Built-In Prediction Interval",
    "ri_width":   "Built-In Reproduction Interval",
    "cfml_width": "Conformal Prediction Interval"
}

"""
# scaled width columns
width_columns = {
    "ci_width_scaled":   "Built-In Confidence Interval",
    "pi_width_scaled":   "Built-In Prediction Interval",
    "ri_width_scaled":   "Built-In Reproduction Interval",
    "cfml_width": "Conformal Prediction Interval"
}
"""

intervals =[
    "Built-In Confidence Interval",
    "Built-In Reproduction Interval",
    "Built-In Prediction Interval",
    "Conformal Prediction Interval"
]

#–– Directories ––
input_dir  = "./coverage/data/"
output_dir = "./coverage/plots/"
os.makedirs(output_dir, exist_ok=True)

#–– Loop over every .parquet in the input directory ––
for pq_path in glob.glob(os.path.join(input_dir, "*.parquet")):
    # 1) Load
    df = pd.read_parquet(pq_path)

    # 2) Compute averages
    # marginal coverage rates (model avg cov / reps)
    m_cov_means = (
        df
        .groupby("model_idx")[ list(coverage_columns.keys()) ]
        .mean()
        .reset_index()
    )
    m_wid_means = (
        df
        .groupby("model_idx")[ list(width_columns.keys()) ]
        .mean()
        .reset_index()
    )
    # conditional coverage rates (per point coverage / test size)
    c_cov_means = (
        df
        .groupby("pt_idx")[ list(coverage_columns.keys()) ]
        .mean()
        .reset_index()
    )
    c_wid_means = (
        df
        .groupby("pt_idx")[ list(width_columns.keys()) ]
        .mean()
        .reset_index()
    )
    """ scaled coverage rates
    # marginal coverage rates (model avg cov / reps)
    m_cov_means = (
        df
        .groupby("model_idx")[ list(scaled_coverage_columns.keys()) ]
        .mean()
        .reset_index()
    )
    m_wid_means = (
        df
        .groupby("model_idx")[ list(scaled_width_columns.keys()) ]
        .mean()
        .reset_index()
    )
    # conditional coverage rates (per point coverage / test size)
    c_cov_means = (
        df
        .groupby("pt_idx")[ list(scaled_coverage_columns.keys()) ]
        .mean()
        .reset_index()
    )
    c_wid_means = (
        df
        .groupby("pt_idx")[ list(scaled_width_columns.keys()) ]
        .mean()
        .reset_index()
    )
    """

    # 3) Melt into long-form coverage & width
    m_cov = pd.melt(
        m_cov_means,
        id_vars="model_idx",
        var_name="Interval Type",
        value_name="Coverage"
    )
    m_cov["Interval Type"] = m_cov["Interval Type"].map(coverage_columns)

    m_wid = pd.melt(
        m_wid_means,
        id_vars="model_idx",
        var_name="Interval Type",
        value_name="Width"
    )
    m_wid["Interval Type"] = m_wid["Interval Type"].map(width_columns)

    c_cov = pd.melt(
        c_cov_means,
        id_vars="pt_idx",
        var_name="Interval Type",
        value_name="Coverage"
    )
    c_cov["Interval Type"] = c_cov["Interval Type"].map(coverage_columns)

    c_wid = pd.melt(
        c_wid_means,
        id_vars="pt_idx",
        var_name="Interval Type",
        value_name="Width"
    )
    c_wid["Interval Type"] = c_wid["Interval Type"].map(width_columns)

    # 4) Build 4×4 raincloud grid
    fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(8, 4), sharey=False, sharex=False)

    # Each row: (long-form DataFrame, which column to plot on y)
    plot_configs = [
        (m_cov, "Coverage"),    # row 0: marginal coverage (per-model averages, average over points)
        (m_wid, "Width"),            # row 1: marginal width
        (c_cov, "Coverage"),    # row 2: conditional coverage (per-point averages, average over models )
        (c_wid, "Width"),            # row 3: conditional width
    ]

    ynames = ['Models', 'Models', 'Points', 'Points']
    for row_idx, (data_long, y_col) in enumerate(plot_configs):
        for col_idx, label in enumerate(intervals):
            ax = axes[row_idx, col_idx]
            sub_df = data_long[data_long["Interval Type"] == label]

            # (Optional) annotate outlier ratio only for coverage rows
            # if y_col == "Coverage Rate":
            #     total = len(sub_df)
            #     outliers = (sub_df["Coverage Rate"] < 0.9).sum()
            #     ratio = outliers / total if total else 0
            #     ax.text(
            #         0.05, 0.95,
            #         f"Outlier Ratio: {ratio:.2%}",
            #         transform=ax.transAxes,
            #         fontsize=7, va="top", ha="left",
            #         bbox=dict(boxstyle="round,pad=0.3",
            #                   edgecolor="grey", facecolor="white", alpha=0.2)
            #     )

            # draw the raincloud
            pt.RainCloud(
                x="Interval Type", y=y_col, data=sub_df,
                palette=palette, bw=0.2, width_viol=1.2, width_box=0.2,
                ax=ax, orient="h", point_size=1.5, rain_alpha=0.7, jitter=1,
                box_linewidth=1, point_linewidth=0.5,
                box_whis=(9, 91),
                box_whiskerprops=dict(linewidth=0.5),
                box_capprops=dict(linewidth=0.5),
                box_flierprops=dict(marker="o", markersize=1.0,
                                    markeredgewidth=0.5, alpha=0.5)
            )

            # only the top row shows titles
            if row_idx == 0:
                ax.set_title(label, fontsize=8)
            ax.set_yticks([])
            ax.set_xlabel(y_col, fontsize=6)
            ax.set_ylabel(ynames[row_idx], fontsize=6)
            ax.xaxis.set_major_formatter(FormatStrFormatter('%.3g'))
            # # only the bottom row shows x-labels
            # if row_idx == 3:
            
            # else:
            #     ax.set_xlabel("")
            ax.tick_params(axis="x", labelsize=5, length=2, width=0.5)

    plt.tight_layout() #(pad=2.0)
    base = os.path.splitext(os.path.basename(pq_path))[0]
    out_path = os.path.join(output_dir, f"{base}.png")
    fig.savefig(out_path, dpi=300)
    print('saved')
    plt.close(fig)

print(f"Saved raincloud plots to {output_dir}")
