# Calibration - Predicting Intervals with PerpetualBooster

This notebook explores different methods for producing prediction intervals with `PerpetualBooster`. 
We compare several Perpetual-native methods that leverage internal leaf weight statistics against the popular MAPIE conformal prediction library.

### Methods Evaluated:
1.  **Perpetual Method 1 (Min-Max)**: Range-based intervals using leaf fold weights.
2.  **Perpetual Method 2 (Global Relative Position)**: Quantile-based intervals using piecewise interpolation of all fold weights.
3.  **Perpetual Method 3 (Weight Variance)**: Variance-based intervals using the standard deviation of local fold weights.
4.  **Perpetual Method 4 (Native `calibrate`)**: The built-in Rust implementation for prediction intervals.
5.  **MAPIE Split Conformal**: Constant-width model-agnostic conformal prediction.
6.  **MAPIE CV+**: Improved conformal prediction using cross-validation (Jackknife+).
7.  **MAPIE CQR**: Conformal Quantile Regression using quantile-loss baseline models.

We will evaluate these methods on the California Housing dataset, targeting a 90% coverage level.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from perpetual import PerpetualBooster
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

## 1. Data Loading and Splitting

We load the California Housing dataset using `sklearn`. 
*   **Train**: Used to train the `PerpetualBooster` model.
*   **Calibration**: Used to learn the mapping from model outputs to actual probabilities/coverage.
*   **Test**: Used to evaluate the final performance.

We use a 60/20/20 split for Train/Calibration/Test.

In [None]:
# Load data
data = fetch_california_housing(as_frame=True)
X = data.data
y = data.target

# Split into Test (20%) and Rest (80%)
X_rest, X_test, y_rest, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Split Rest into Train (75% of Rest) and Calibration (25% of Rest)
X_train, X_cal, y_train, y_cal = train_test_split(
    X_rest, y_rest, test_size=0.25, random_state=42
)

print(f"Train shape: {X_train.shape}")
print(f"Calibration shape: {X_cal.shape}")
print(f"Test shape: {X_test.shape}")

## 2. Model Training

We train the model with `save_node_stats=True`. This is critical as it instructs the booster to persist the summary statistics (min, max, etc.) of the gradients/hessians (or target values) in each leaf.

In [None]:
# Train PerpetualBooster
# Objective: SquaredLoss (Regression)
model = PerpetualBooster(objective="SquaredLoss", save_node_stats=True)
model.fit(X_train, y_train)

print(f"Number of trees: {len(model.get_node_lists())}")
print(f"Base Score: {model.base_score}")

## 2. Method 1: Min-Max Interval

### Logic
Each leaf node $l$ in tree $t$ has a range of weights $[w_{t,l}^{min}, w_{t,l}^{max}]$.
For a given input sample $x$, let $L_t(x)$ be the leaf it falls into in tree $t$.

We define the **Ensemble Minimum** ($S_{min}$) and **Ensemble Maximum** ($S_{max}$) as:
$$ S_{min}(x) = \sum_{t=1}^T w_{t, L_t(x)}^{min} + \text{base\_score} $$
$$ S_{max}(x) = \sum_{t=1}^T w_{t, L_t(x)}^{max} + \text{base\_score} $$

These define a "hard" interval $[S_{min}, S_{max}]$. However, this interval might be too wide or too narrow (uncalibrated).
We calculate the **Relative Position** ($P_{rel}$) of the true label $y$:
$$ P_{rel} = \frac{y - S_{min}}{S_{max} - S_{min}} $$

If the model is perfectly calibrated in a naive min-max sense, $P_{rel}$ might be Uniform(0,1). Usually, it is not. By looking at the distribution of $P_{rel}$ on the calibration set, we can map any desired confidence level (e.g., 90%) to specific values of $P_{rel}$.

In [None]:
def get_leaf_nodes_and_predict(model, X):
    # Get the raw visited nodes for each sample
    # This return shape (n_trees, n_samples) and each element is a set of nodes traversed
    pred_nodes = model.predict_nodes(X)

    # Get all leaf nodes from the model structure
    tree_nodes_list = model.get_node_lists()

    # Create a fast lookup for leaf weights: tree_idx -> node_id -> weights
    leaf_weights_lookup = []
    for tree_nodes in tree_nodes_list:
        lookup = {node.num: node.weights for node in tree_nodes if node.is_leaf}
        leaf_weights_lookup.append(lookup)

    n_samples = len(X)
    n_trees = len(pred_nodes)

    # Array to store min and max for each tree prediction
    tree_min_max = np.zeros((n_samples, n_trees, 2))

    for t in range(n_trees):
        for i in range(n_samples):
            # Find the leaf node ID in the visited set
            visited_nodes = pred_nodes[t][i]
            # The leaf node is the only one in the set that is present in the leaf_weights_lookup
            leaf_id = next(
                nid for nid in visited_nodes if nid in leaf_weights_lookup[t]
            )

            weights = leaf_weights_lookup[t][leaf_id]
            tree_min_max[i, t, 0] = min(weights)
            tree_min_max[i, t, 1] = max(weights)

    # Sum across trees
    s_min = np.sum(tree_min_max[:, :, 0], axis=1) + model.base_score
    s_max = np.sum(tree_min_max[:, :, 1], axis=1) + model.base_score

    return s_min, s_max


# Calculate intervals for Calibration Set
cal_s_min, cal_s_max = get_leaf_nodes_and_predict(model, X_cal)

# Check coverage of the raw interval [S_min, S_max]
raw_coverage = np.mean((y_cal >= cal_s_min) & (y_cal <= cal_s_max))
print(f"Raw Interval [S_min, S_max] Coverage on Calibration Set: {raw_coverage:.4f}")

# Calculate Relative Position (P_rel)
denom = cal_s_max - cal_s_min
min_valid_denom = 1e-6
denom[denom < min_valid_denom] = min_valid_denom  # Avoid division by zero

cal_p_rel = (y_cal - cal_s_min) / denom

# Visualize P_rel distribution
plt.figure(figsize=(10, 5))
sns.histplot(cal_p_rel, bins=100, kde=True)
plt.title("Distribution of Relative Position (P_rel) on Calibration Set")
plt.xlabel("Relative Position within [Min, Max] Interval (0=Min, 1=Max)")
plt.axvline(0, color="r", linestyle="--")
plt.axvline(1, color="r", linestyle="--")
plt.show()

### Creating the Mapping

We use the empirical quantiles of $P_{rel}$ to define our calibrated intervals.
To get a $90\%$ confidence interval (covering from 5% to 95% of probability mass), we find the $5^{th}$ and $95^{th}$ percentiles of the $P_{rel}$ distribution.

In [None]:
def get_calibrated_interval_m1(s_min, s_max, cal_p_rel_dist, confidence_level=0.90):
    alpha = 1.0 - confidence_level
    target_lower = alpha / 2.0
    target_upper = 1.0 - (alpha / 2.0)

    # Find the threshold values of P_rel that correspond to these quantiles
    p_rel_lower = np.quantile(cal_p_rel_dist, target_lower)
    p_rel_upper = np.quantile(cal_p_rel_dist, target_upper)

    # Apply these relative thresholds to the specific S_min/S_max of each sample
    pred_lower = s_min + p_rel_lower * (s_max - s_min)
    pred_upper = s_min + p_rel_upper * (s_max - s_min)

    return pred_lower, pred_upper


# Apply to Test Set
test_s_min, test_s_max = get_leaf_nodes_and_predict(model, X_test)

# Calculate 90% Interval
test_m1_lower, test_m1_upper = get_calibrated_interval_m1(
    test_s_min, test_s_max, cal_p_rel, 0.90
)

# Evaluate
m1_coverage = np.mean((y_test >= test_m1_lower) & (y_test <= test_m1_upper))
m1_width = np.mean(test_m1_upper - test_m1_lower)

print("Method 1 (Min-Max) Evaluation on Test Set (Target 90%):")
print(f"Coverage: {m1_coverage:.4f}")
print(f"Mean Width: {m1_width:.4f}")

## 3. Method 2: Global Relative Position

### Logic: Consistent Ensemble Interpolation
A naive simulation that randomly picks one weight per tree independently will suffer from **Variance Collapse** due to the Central Limit Theorem.

To fix this, we use **Consistent Ensemble Interpolation**:
1. **Sort Weights**: The 5 leaf weights are cross-validation fold weights that are **not inherently ordered**. We apply `np.sort` to treat them as an ordered 5-number summary.
2. **Aggregate First**: For each sample, we sum the sorted 5-number summary weights across all trees. This gives us an **Ensemble 5-Number Summary**.

### Global Relative Position Calibration (with Extrapolation)
The key insight is that the ensemble's 5-number summary range only covers ~48% of the data. To achieve 90% coverage, we need to **extrapolate** beyond the raw min/max.

For each sample on the calibration set, we compute a **quantile position** $P$ of the true value within the ensemble distribution, allowing $P < 0$ or $P > 1$ via linear extrapolation beyond the endpoints.

We then take the 5th and 95th percentiles of the $P$ distribution on the calibration set, and use these to construct calibrated intervals on the test set by interpolating/extrapolating the ensemble distribution at those target positions.

In [None]:
def get_ensemble_summary(model, X):
    """Get the ensemble 5-number summary for each sample."""
    pred_nodes = model.predict_nodes(X)
    tree_nodes_list = model.get_node_lists()

    leaf_weights_lookup = []
    for tree_nodes in tree_nodes_list:
        # CRITICAL: Sort the weights. They are CV fold weights, NOT inherently ordered.
        lookup = {
            node.num: np.sort(np.array(node.weights))
            for node in tree_nodes
            if node.is_leaf
        }
        leaf_weights_lookup.append(lookup)

    n_samples = len(X)
    n_trees = len(pred_nodes)
    ensemble_stats = np.zeros((n_samples, 5))

    for t in range(n_trees):
        lookup = leaf_weights_lookup[t]
        for i in range(n_samples):
            leaf_id = next(nid for nid in pred_nodes[t][i] if nid in lookup)
            ensemble_stats[i, :] += lookup[leaf_id]

    ensemble_stats += model.base_score
    return ensemble_stats


def get_quantile_position(ensemble_stats, y_true):
    """Compute the Global Relative Position of the truth within the ensemble distribution.
    Supports extrapolation: P < 0 means below min, P > 1 means above max."""
    y_true = np.array(y_true)
    stat_q = np.array([0.0, 0.25, 0.5, 0.75, 1.0])
    n_samples = len(y_true)
    positions = np.zeros(n_samples)

    for i in range(n_samples):
        vals = ensemble_stats[i, :]  # sorted 5-number summary
        y = y_true[i]

        if y <= vals[0]:
            # Extrapolate below min using slope of first segment
            delta = vals[1] - vals[0]
            if delta > 1e-12:
                positions[i] = (y - vals[0]) / (delta / (stat_q[1] - stat_q[0]))
            else:
                positions[i] = 0.0
        elif y >= vals[-1]:
            # Extrapolate above max using slope of last segment
            delta = vals[-1] - vals[-2]
            if delta > 1e-12:
                positions[i] = 1.0 + (y - vals[-1]) / (
                    delta / (stat_q[-1] - stat_q[-2])
                )
            else:
                positions[i] = 1.0
        else:
            # Interpolate within the distribution
            for k in range(4):
                if vals[k] <= y <= vals[k + 1]:
                    delta = vals[k + 1] - vals[k]
                    if delta > 1e-12:
                        frac = (y - vals[k]) / delta
                    else:
                        frac = 0.5
                    positions[i] = stat_q[k] + frac * (stat_q[k + 1] - stat_q[k])
                    break
    return positions


def get_calibrated_interval_m2(ensemble_stats, cal_positions, confidence_level=0.9):
    """Build calibrated intervals by extrapolating the ensemble distribution."""
    alpha = 1.0 - confidence_level
    p_low = np.quantile(cal_positions, alpha / 2.0)
    p_high = np.quantile(cal_positions, 1.0 - alpha / 2.0)

    stat_q = np.array([0.0, 0.25, 0.5, 0.75, 1.0])
    n_samples = ensemble_stats.shape[0]
    lower = np.zeros(n_samples)
    upper = np.zeros(n_samples)

    for i in range(n_samples):
        vals = ensemble_stats[i, :]
        # Interpolate within [0, 1]
        lower[i] = np.interp(p_low, stat_q, vals)
        upper[i] = np.interp(p_high, stat_q, vals)

        # Handle extrapolation beyond [0, 1]
        if p_low < 0:
            slope = (vals[1] - vals[0]) / (stat_q[1] - stat_q[0])
            lower[i] = vals[0] + slope * p_low
        if p_high > 1:
            slope = (vals[-1] - vals[-2]) / (stat_q[-1] - stat_q[-2])
            upper[i] = vals[-1] + slope * (p_high - 1.0)

    return lower, upper


print("Calculating Ensemble Summaries (with sorted weights)...")
cal_ensemble_stats = get_ensemble_summary(model, X_cal)

# Compute Global Relative Position on calibration set
cal_positions = get_quantile_position(cal_ensemble_stats, y_cal)
print(
    f"Calibration Positions: min={cal_positions.min():.4f}, max={cal_positions.max():.4f}, mean={cal_positions.mean():.4f}"
)

# Compute calibrated interval on calibration set for sanity check
m2_cal_lower, m2_cal_upper = get_calibrated_interval_m2(
    cal_ensemble_stats, cal_positions
)
m2_coverage = np.mean((y_cal >= m2_cal_lower) & (y_cal <= m2_cal_upper))
print(f"Method 2 Calibration Coverage on Calibration Set: {m2_coverage:.4f}")

# Apply to Test Set
test_m2_ensemble_stats = get_ensemble_summary(model, X_test)
m2_lower, m2_upper = get_calibrated_interval_m2(test_m2_ensemble_stats, cal_positions)

m2_coverage = np.mean((y_test >= m2_lower) & (y_test <= m2_upper))
m2_width = np.mean(m2_upper - m2_lower)

print("Method 2 (GRP) Evaluation on Test Set:")
print(f"Coverage: {m2_coverage:.4f}")
print(f"Mean Width: {m2_width:.4f}")

## 4. Method 3: Weight Variance Method

This method leverages the standard deviation of leaf weights across the 5 cross-validation folds. 
The interval width is assumed to be proportional to the sum of standard deviations across all trees.

In [None]:
def get_weight_variance_uncertainty(model, X):
    pred_nodes = model.predict_nodes(X)
    tree_nodes_list = model.get_node_lists()

    leaf_std_lookup = []
    for tree_nodes in tree_nodes_list:
        # Calculate std deviation of weights for each leaf node
        lookup = {node.num: np.std(node.weights) for node in tree_nodes if node.is_leaf}
        leaf_std_lookup.append(lookup)

    n_samples = len(X)
    n_trees = len(pred_nodes)

    # Aggregate uncertainty across trees
    uncertainty = np.zeros(n_samples)
    for t in range(n_trees):
        current_lookup = leaf_std_lookup[t]
        for i in range(n_samples):
            visited = pred_nodes[t][i]
            leaf_id = next(nid for nid in visited if nid in current_lookup)
            uncertainty[i] += current_lookup[leaf_id]

    return uncertainty


# Calculate uncertainty on calibration set
cal_uncertainty = get_weight_variance_uncertainty(model, X_cal)

# Calculate residuals on calibration set
cal_preds = model.predict(X_cal)
if isinstance(cal_preds, tuple):
    cal_preds = cal_preds[0]
abs_residuals = np.abs(y_cal - cal_preds)

# Calibrate scaling factor: residuals / uncertainty
scaling_factors = abs_residuals / np.maximum(cal_uncertainty, 1e-6)
q_factor = np.quantile(scaling_factors, 0.9)  # for 90% coverage

print(f"Weight Variance Scaling Factor (90% quantile): {q_factor:.4f}")

# Evaluate on Test Set
test_uncertainty = get_weight_variance_uncertainty(model, X_test)
test_preds = model.predict(X_test)
if isinstance(test_preds, tuple):
    test_preds = test_preds[0]

m3_wv_lower = test_preds - q_factor * test_uncertainty
m3_wv_upper = test_preds + q_factor * test_uncertainty

m3_wv_coverage = np.mean((y_test >= m3_wv_lower) & (y_test <= m3_wv_upper))
m3_wv_width = np.mean(m3_wv_upper - m3_wv_lower)

print("Method 3 (Weight Variance) Evaluation on Test Set:")
print(f"Coverage: {m3_wv_coverage:.4f}")
print(f"Mean Width: {m3_wv_width:.4f}")

## 5. Method 4: Native Calibration

PerpetualBooster provides a native `calibrate` method that automates the calibration process using the same cross-validation fold weights inside the library.

In [None]:
# Method 4: Native Calibrate
# We use a significance level alpha=0.1 for 90% coverage
model.calibrate_conformal(X_train, y_train, X_cal, y_cal, alpha=np.array([0.1]))

# Native prediction returns a dictionary mapping alpha to (lower, upper)
intervals = model.predict_intervals(X_test)
m3_lower, m3_upper = intervals["0.1"][:, 0], intervals["0.1"][:, 1]

m3_coverage = np.mean((y_test >= m3_lower) & (y_test <= m3_upper))
m3_width = np.mean(m3_upper - m3_lower)

print("Method 4 (Native Calibrate) Evaluation on Test Set (Target 90%):")
print(f"Coverage: {m3_coverage:.4f}")
print(f"Mean Width: {m3_width:.4f}")

## 7. Additional MAPIE Methods

Beyond Split Conformal, MAPIE supports more advanced methods like CV+ (Jackknife+) and Conformal Quantile Regression (CQR).
- **CV+ / Jackknife+**: Uses cross-validation to get more stable intervals, often better for smaller datasets.
- **CQR**: Calibrates quantile regression models, producing adaptive-width intervals based on estimated quantiles.

In [None]:
from mapie.regression import ConformalizedQuantileRegressor, CrossConformalRegressor
from sklearn.ensemble import GradientBoostingRegressor, HistGradientBoostingRegressor

# MAPIE CV+ (Jackknife+)
# We use cv=5 to match Perpetual's internal fold count
mapie_cv_plus = CrossConformalRegressor(
    estimator=GradientBoostingRegressor(random_state=42), cv=5, method="plus"
)
mapie_cv_plus.fit_conformalize(X_rest, y_rest)
_, y_pis_cv = mapie_cv_plus.predict_interval(X_test)
m_cv_lower, m_cv_upper = y_pis_cv[:, 0, 0], y_pis_cv[:, 1, 0]

m_cv_coverage = np.mean((y_test >= m_cv_lower) & (y_test <= m_cv_upper))
m_cv_width = np.mean(m_cv_upper - m_cv_lower)
print(f"MAPIE CV+ Coverage: {m_cv_coverage:.4f}, Mean Width: {m_cv_width:.4f}")

# MAPIE CQR (Conformal Quantile Regression)
est_cqr = HistGradientBoostingRegressor(loss="quantile", quantile=0.5, random_state=42)
mapie_cqr = ConformalizedQuantileRegressor(est_cqr)
mapie_cqr.fit(X_train, y_train)
mapie_cqr.conformalize(X_cal, y_cal)
_, y_pis_cqr = mapie_cqr.predict_interval(X_test)
m_cqr_lower, m_cqr_upper = y_pis_cqr[:, 0, 0], y_pis_cqr[:, 1, 0]

m_cqr_coverage = np.mean((y_test >= m_cqr_lower) & (y_test <= m_cqr_upper))
m_cqr_width = np.mean(m_cqr_upper - m_cqr_lower)
print(f"MAPIE CQR Coverage: {m_cqr_coverage:.4f}, Mean Width: {m_cqr_width:.4f}")

## 8. Comparison and Results

We compare Method 1 and Method 2 on the Test Set. Both methods utilize the internal statistics of the booster and achieve approximately 90% calibrated coverage.

## 6. Baseline: MAPIE (Split Conformal)

We compare against [MAPIE](https://mapie.readthedocs.io/), a popular conformal prediction library.
MAPIE's `SplitConformalRegressor` uses a model-agnostic approach: it fits any sklearn regressor,
computes residuals on a calibration set, and uses their quantiles to build prediction intervals.

Key difference: MAPIE produces **constant-width** intervals (same absolute width for all predictions),
while our methods produce **adaptive-width** intervals that vary based on each sample's leaf weight spread.

In [None]:
from mapie.regression import SplitConformalRegressor
from sklearn.ensemble import GradientBoostingRegressor

# Train a GradientBoostingRegressor (same family as PerpetualBooster)
gbr = GradientBoostingRegressor(random_state=42)
gbr.fit(X_train, y_train)

# MAPIE Split Conformal: calibrate residuals on calibration set
mapie = SplitConformalRegressor(estimator=gbr, confidence_level=0.9, prefit=True)
mapie.conformalize(X_cal, y_cal)

# Predict intervals on test set
y_pred_mapie, y_pis = mapie.predict_interval(X_test)
mapie_lower = y_pis[:, 0, 0]
mapie_upper = y_pis[:, 1, 0]

mapie_coverage = np.mean((y_test >= mapie_lower) & (y_test <= mapie_upper))
mapie_width = np.mean(mapie_upper - mapie_lower)
print(f"MAPIE Coverage: {mapie_coverage:.4f}")
print(f"MAPIE Mean Width: {mapie_width:.4f}")

In [None]:
# Visual comparison: interval widths across key methods
import matplotlib.pyplot as plt
import numpy as np

fig, axes = plt.subplots(4, 2, figsize=(10, 10), sharey=True)
axes = axes.flatten()

sort_idx = np.argsort(
    model.predict(X_test)[0]
    if isinstance(model.predict(X_test), tuple)
    else model.predict(X_test)
)
x_range = np.arange(len(sort_idx))
y_test_arr = np.array(y_test)

methods = [
    ("MAPIE Split", mapie_lower, mapie_upper, mapie_coverage, mapie_width, "gray"),
    ("MAPIE CV+", m_cv_lower, m_cv_upper, m_cv_coverage, m_cv_width, "purple"),
    ("MAPIE CQR", m_cqr_lower, m_cqr_upper, m_cqr_coverage, m_cqr_width, "brown"),
    ("Perpetual M1", test_m1_lower, test_m1_upper, m1_coverage, m1_width, "green"),
    ("Perpetual M2", m2_lower, m2_upper, m2_coverage, m2_width, "orange"),
    ("Perpetual M3 (WV)", m3_wv_lower, m3_wv_upper, m3_wv_coverage, m3_wv_width, "red"),
    ("Perpetual M4 (Nat)", m3_lower, m3_upper, m3_coverage, m3_width, "blue"),
]

for i, (name, low, high, cov, wid, col) in enumerate(methods):
    axes[i].fill_between(x_range, low[sort_idx], high[sort_idx], alpha=0.3, color=col)
    axes[i].scatter(x_range, y_test_arr[sort_idx], s=1, alpha=0.3, color="black")
    axes[i].set_title(f"{name} (Cov={cov:.3f}, Width={wid:.3f})")

# Hide the last empty subplot
axes[-1].set_visible(False)

plt.tight_layout()
plt.show()

In [None]:
# Summary table including all methods
summary = pd.DataFrame(
    {
        "Method": [
            "MAPIE Split Conformal",
            "MAPIE CV+ (Jackknife+)",
            "MAPIE CQR",
            "Perpetual Method 1 (Min-Max)",
            "Perpetual Method 2 (GRP)",
            "Perpetual Method 3 (Weight Var)",
            "Perpetual Method 4 (Native)",
        ],
        "Coverage": [
            mapie_coverage,
            m_cv_coverage,
            m_cqr_coverage,
            m1_coverage,
            m2_coverage,
            m3_wv_coverage,
            m3_coverage,
        ],
        "Mean Width": [
            mapie_width,
            m_cv_width,
            m_cqr_width,
            m1_width,
            m2_width,
            m3_wv_width,
            m3_width,
        ],
    }
)
summary["Width vs MAPIE (Split)"] = summary["Mean Width"] / mapie_width
summary["Width vs MAPIE (Split)"] = summary["Width vs MAPIE (Split)"].map(
    lambda x: f"{x:.1%}"
)
print(summary.to_string(index=False))

### Results Summary

Methods summary: All methods achieve requested coverage. Perpetual's methods and MAPIE CQR provide adaptive widths. 
The Weight Variance (M3) method provides highly competitive adaptive intervals using purely internal leaf statistics.