In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression, QuantileRegressor
from mapie.regression import MapieRegressor, MapieQuantileRegressor
from mapie.conformity_scores import AbsoluteConformityScore, ResidualNormalisedScore
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from mapie.metrics import regression_coverage_score
import warnings
import seaborn as sns
warnings.filterwarnings('ignore')

# Set seed for reproducibility
np.random.seed(42)

# Function to generate homoscedastic data with multiple features
def generate_homoscedastic_data(n, m, beta, sigma, noise_scale=7):
    X = np.random.uniform(0, 100, (n, m))
    epsilon = np.random.normal(0, 1, n)
    Y = X.dot(beta) + sigma * epsilon * noise_scale
    return X, Y

# Function to generate heteroscedastic data with multiple features
def generate_heteroscedastic_data(n, m, beta, noise_scale=1):
    X = np.random.uniform(0, 100, (n, m))
    epsilon1 = np.random.normal(0, 1, n)
    epsilon2 = np.random.normal(0, 1, n)
    Z = X.dot(beta) + noise_scale * epsilon1
    Y = Z * (1 + noise_scale * epsilon2)
    return X, Y

# Function to generate heteroscedastic sparse data with multiple features
def generate_heteroscedastic_sparse_data(n, m, beta, noise_scale=1):
    X = np.random.uniform(0, 100, (n, m))
    mask_central = (X[:, 0] >= 25) & (X[:, 0] <= 75)
    mask_outer = (X[:, 0] < 25) | (X[:, 0] > 75)
    
    # Reduce the number of points in the outer regions by keeping only a fraction of them
    outer_indices = np.where(mask_outer)[0]
    keep_outer_indices = np.random.choice(outer_indices, size=int(len(outer_indices) * 0.2), replace=False)
    
    # Combine the central and reduced outer indices
    final_indices = np.concatenate([np.where(mask_central)[0], keep_outer_indices])
    X = X[final_indices]
    
    epsilon1 = np.random.normal(0, 1, X.shape[0])
    epsilon2 = np.random.normal(0, 1, X.shape[0])
    Z = X.dot(beta) + noise_scale * epsilon1
    Y = Z * (1 + noise_scale * epsilon2)
    return X, Y

In [None]:

# Parameters
n = 2000  # Number of samples
m = 1     # Number of features
beta = np.ones(m)  # Coefficients for the linear model
sigma = 1.0

# Generate datasets
X1, Y1 = generate_homoscedastic_data(n, m, beta, sigma)
X2, Y2 = generate_heteroscedastic_data(n, m, beta, noise_scale=0.2)
X3, Y3 = generate_heteroscedastic_sparse_data(n, m, beta, noise_scale=0.2)

datasets = [(X1, Y1, 'Homoscedastic'), (X2, Y2, 'Heteroscedastic'), (X3, Y3, 'Heteroscedastic Sparse')]

# Function to train MAPIE models and evaluate
def train_and_evaluate_mapie_models(X, Y, dataset_name):
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=4)
    
    results = []

    # Linear Regression Model
    model = LinearRegression()
    model.fit(X_train, y_train)

    # MAPIE Regressor with AbsoluteConformityScore
    mapie_absolute = MapieRegressor(estimator=model, cv="split", conformity_score=AbsoluteConformityScore())
    mapie_absolute.fit(X_train, y_train)
    y_pred_abs, y_pis_abs = mapie_absolute.predict(X_test, alpha=0.1)

    mse_abs = mean_squared_error(y_test, y_pred_abs)
    r2_abs = r2_score(y_test, y_pred_abs)
    coverage_abs = regression_coverage_score(y_test, y_pis_abs[:, 0], y_pis_abs[:, 1])
    avg_width_abs = np.mean(y_pis_abs[:, 1] - y_pis_abs[:, 0])
    
    results.append((dataset_name, 'AbsoluteConformityScore', mse_abs, r2_abs, coverage_abs, avg_width_abs))

    # MAPIE Regressor with ResidualNormalisedScore
    mapie_normalized = MapieRegressor(estimator=model, cv="split", conformity_score=ResidualNormalisedScore())
    mapie_normalized.fit(X_train, y_train)
    y_pred_norm, y_pis_norm = mapie_normalized.predict(X_test, alpha=0.1)

    mse_norm = mean_squared_error(y_test, y_pred_norm)
    r2_norm = r2_score(y_test, y_pred_norm)
    coverage_norm = regression_coverage_score(y_test, y_pis_norm[:, 0], y_pis_norm[:, 1])
    avg_width_norm = np.mean(y_pis_norm[:, 1] - y_pis_norm[:, 0])
    
    results.append((dataset_name, 'ResidualNormalisedScore', mse_norm, r2_norm, coverage_norm, avg_width_norm))

    # Conformal Quantile Regression (CQR)
    quantile_model = QuantileRegressor(quantile=0.5, solver='highs')
    mapie_cqr = MapieQuantileRegressor(estimator=quantile_model, cv="split", alpha=0.1)
    mapie_cqr.fit(X_train, y_train)
    y_pred_cqr, y_pis_cqr = mapie_cqr.predict(X_test)

    mse_cqr = mean_squared_error(y_test, y_pred_cqr)
    r2_cqr = r2_score(y_test, y_pred_cqr)
    coverage_cqr = regression_coverage_score(y_test, y_pis_cqr[:, 0, 0], y_pis_cqr[:, 1, 0])
    avg_width_cqr = np.mean(y_pis_cqr[:, 1, 0] - y_pis_cqr[:, 0, 0])
    
    results.append((dataset_name, 'ConformalQuantileRegression', mse_cqr, r2_cqr, coverage_cqr, avg_width_cqr))

    return results, X_test, y_test, y_pred_abs, y_pis_abs, y_pred_norm, y_pis_norm, y_pred_cqr, y_pis_cqr


In [None]:

# Function to plot prediction comparisons
def plot_predictions_comparison(datasets, train_and_evaluate_fn):
    sns.set(style="whitegrid")
    covered_color = 'blue'  # Color for covered data points
    uncovered_color = 'red'  # Color for uncovered data points

    # Loop through each dataset
    for X, Y, dataset_name in datasets:
        results, X_test, y_test, y_pred_abs, y_pis_abs, y_pred_norm, y_pis_norm, y_pred_cqr, y_pis_cqr = train_and_evaluate_fn(X, Y, dataset_name)
        
        # Ensure y_pis are correctly flattened
        y_pis_abs = y_pis_abs.squeeze()
        y_pis_norm = y_pis_norm.squeeze()
        y_pis_cqr = y_pis_cqr.squeeze()

        # Create a figure with three subplots
        fig, axes = plt.subplots(1, 3, figsize=(18, 6), sharey=True)

        # Define coverage for each plot
        covered_abs = (y_test >= y_pis_abs[:, 0]) & (y_test <= y_pis_abs[:, 1])
        covered_norm = (y_test >= y_pis_norm[:, 0]) & (y_test <= y_pis_norm[:, 1])
        covered_cqr = (y_test >= y_pis_cqr[:, 0]) & (y_test <= y_pis_cqr[:, 1])
        
        # Uniform representation of data points and plots
        for i, (pred, covered, title) in enumerate(zip(
            [y_pred_abs, y_pred_norm, y_pred_cqr],
            [covered_abs, covered_norm, covered_cqr],
            ['AbsoluteConformityScore', 'ResidualNormalisedScore', 'ConformalQuantileRegression'])):
            sns.scatterplot(x=y_test, y=pred, hue=covered, palette={True: covered_color, False: uncovered_color}, ax=axes[i], legend=(i==2))
            axes[i].plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
            axes[i].set_title(f'{title} - Coverage={np.mean(covered)*100:.1f}%')
            axes[i].set_xlabel('Actual Target')
            axes[i].set_ylabel('Predicted Target')
            axes[i].set_ylim(0, 120)
        
        # Display legend only in the last plot
        if i == 2:
            handles, labels = axes[i].get_legend_handles_labels()
            labels = ['Uncovered', 'Covered']
            axes[i].legend(handles=handles, labels=labels, title='Prediction Coverage')

        # Save the figure
        plt.savefig(f'{dataset_name}_predictions_comparison.png')
        plt.show()
        
        # Calculate prediction interval width for each model approach
        interval_width_abs = y_pis_abs[:, 1] - y_pis_abs[:, 0]
        interval_width_norm = y_pis_norm[:, 1] - y_pis_norm[:, 0]
        interval_width_cqr = y_pis_cqr[:, 1] - y_pis_cqr[:, 0]
    
        # Create plots for prediction interval width
        fig, axes = plt.subplots(1, 3, figsize=(24, 6), sharey=True)
        
        # Plot for AbsoluteConformityScore
        sns.scatterplot(x=y_test, y=interval_width_abs, hue=covered_abs, palette={True: covered_color, False: uncovered_color}, ax=axes[0])
        axes[0].set_xlabel('Actual Target (y)')
        axes[0].set_title('AbsoluteConformityScore - Interval Width')
        axes[0].set_ylabel('Prediction Interval Width (Δy)')
        axes[0].set_ylim(0, 100)  # Optional: adjust y-limit
        axes[0].legend([],[], frameon=False)  # Suppress the legend
        
        # Plot for ResidualNormalisedScore
        sns.scatterplot(x=y_test, y=interval_width_norm, hue=covered_norm, palette={True: covered_color, False: uncovered_color}, ax=axes[1])
        axes[1].set_xlabel('Actual Target (y)')
        axes[1].set_title('ResidualNormalisedScore - Interval Width')
        axes[1].set_ylim(0, 100)  # Optional: adjust y-limit
        axes[1].legend([],[], frameon=False)  # Suppress the legend
        
        # Plot for ConformalQuantileRegression
        sns.scatterplot(x=y_test, y=interval_width_cqr, hue=covered_cqr, palette={True: covered_color, False: uncovered_color}, ax=axes[2])
        axes[2].set_xlabel('Actual Target (y)')
        axes[2].set_title('ConformalQuantileRegression - Interval Width')
        axes[2].set_ylim(0, 100)  # Optional: adjust y-limit
        # This plot shows the legend to represent coverage (Covered/Uncovered)
        handles, labels = axes[2].get_legend_handles_labels()
        axes[2].legend(handles=handles, labels=['Uncovered', 'Covered'], title='Prediction Coverage')

        if i < 2:  # No legend for the first two plots
            axes[i].legend([],[], frameon=False)  # Empty legend
        else:  # Full legend for the last plot
            handles, labels = axes[i].get_legend_handles_labels()
            labels = ['Uncovered', 'Covered']
            axes[i].legend(handles=handles, labels=labels, title='Prediction Coverage')
            plt.savefig(f'{dataset_name}_interval_width_comparison.png')
        
        # Calculate local coverage values for each interval
        quantiles = np.percentile(y_test, np.arange(0, 101, 10))
        local_coverage_abs = []
        local_coverage_norm = []
        local_coverage_cqr = []
    
        for i in range(len(quantiles)-1):
            mask = (y_test >= quantiles[i]) & (y_test < quantiles[i+1])
            local_coverage_abs.append(np.mean((y_test[mask] >= y_pis_abs[mask, 0]) & (y_test[mask] <= y_pis_abs[mask, 1])))
            local_coverage_norm.append(np.mean((y_test[mask] >= y_pis_norm[mask, 0]) & (y_test[mask] <= y_pis_norm[mask, 1])))
            local_coverage_cqr.append(np.mean((y_test[mask] >= y_pis_cqr[mask, 0]) & (y_test[mask] <= y_pis_cqr[mask, 1])))
    
        # Visualization of local coverage for each prediction method
        fig, axes = plt.subplots(1, 3, figsize=(24, 6), sharey=True)
        sns.barplot(x=np.arange(10), y=local_coverage_abs, ax=axes[0], palette=['#1f77b4'])  # Blue for Homoscedastic
        sns.barplot(x=np.arange(10), y=local_coverage_norm, ax=axes[1], palette=['#ff7f0e'])  # Orange for Heteroscedastic
        sns.barplot(x=np.arange(10), y=local_coverage_cqr, ax=axes[2], palette=['#2ca02c'])  # Green for Heteroscedastic Sparse

        # Setting up the plots
        for i, title in enumerate(['AbsoluteConformityScore', 'ResidualNormalisedScore', 'ConformalQuantileRegression']):
            axes[i].set_xticks(range(10))
            axes[i].set_xticklabels([f'{int(quantiles[j])}-{int(quantiles[j+1])}' for j in range(10)], rotation=45)
            axes[i].set_xlabel('Target Quantile (q)')
            axes[i].set_ylabel('Coverage Score' if i == 0 else "")
            axes[i].axhline(y=np.mean([local_coverage_abs, local_coverage_norm, local_coverage_cqr][i]), color='r', linestyle='--', label='Global Coverage')
            axes[i].legend()
            axes[i].set_ylim(0, 1)
        plt.tight_layout()
        plt.savefig(f'{dataset_name}_local_coverage_comparison.png')
        plt.show()

In [None]:

# Example (train_and_evaluate_mapie_models and datasets must be defined)
plot_predictions_comparison(datasets, train_and_evaluate_mapie_models)
