In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
from scipy.stats.mstats import mquantiles

import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

In [None]:
# Import custom quantile regression models from the mfpi folder
import sys
sys.path.append("../")
from mfpi import qr_models as qr

## Data set

In this workbook, we will use a real data set about car fuel efficiency.
This dataset was taken from the UCI Machine Learning Repository (https://archive.ics.uci.edu/ml/datasets/auto+mpg).


In [None]:
import pandas as pd

# Download the Auto MPG Data Set
data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data"
col_names = ["mpg", "cylinders", "displacement", "horsepower", "weight", "acceleration", "year", "origin", "name"]
dataset = pd.read_fwf(data_url, names=col_names, na_values="?")

# Show data preview
dataset

Are there any missing values?

In [None]:
dataset[dataset.isnull().any(axis=1)]

Let's remove rows with missing values and the last column, which is not useful.

In [None]:
# Remove rows with missing values
dataset = dataset.dropna()

# Let's remove the car name column
dataset = dataset.drop(['name'], axis=1)

Let's visualize the data set.

In [None]:
# Plot pairwise correlations between variables
import seaborn as sns
sns.pairplot(dataset, height=1)
plt.show()

Before starting the analysis, let's convert the data set to a numpy array.

In [None]:
# Convert data frame to numpy array
dataset = dataset.to_numpy()

## Make a test set

We consider the problem of predicting the fuel efficiency (miles per gallon) using the other variables.

We will hold out some of the observations for testing.

In [None]:
from sklearn.model_selection import train_test_split

# Set a random seed for reproducibility
np.random.seed(2023)

# Divide data
X_data, X_test, Y_data, Y_test = train_test_split(dataset[:,1:-1], dataset[:,0], test_size=0.2, random_state=2020)

print("Number of test data points: {:d}.".format(len(Y_test)))

## Machine learning predictions

We can try to predict fuel efficiency using a random forest regression model.

In [None]:
# Initialize a random forest object
from sklearn.ensemble import RandomForestRegressor
black_box = RandomForestRegressor(n_estimators=100, min_samples_split=5, random_state=2020)

In [None]:
# Fit the black-box model on all data points
black_box.fit(X_data, Y_data)

# Make predictions on test data
Y_hat = black_box.predict(X_test)

# Compare test points to predicted values
plt.plot([0, 45], [0, 45], color = 'black', linewidth = 1)
plt.scatter(Y_test, Y_hat)
plt.axis('square')
plt.xlabel("True Y")
plt.ylabel("Predicted Y")
plt.show()

## Naive prediction intervals based on in-sample residuals

In [None]:
def naive_prediction_intervals(X, Y, X_test, black_box, alpha):
    """
    Compute naive prediction bands based on the distribution of
      residuals within the training data set
      
    Input
    X         : n x p data matrix of explanatory variables
    Y         : n x 1 vector of response variables
    X_test    : n x p test data matrix of explanatory variables
    black_box : sklearn model object with 'fit' and 'predict' methods
    alpha     : 1 - target coverage level 
    """
    
    # Output placeholder
    lower = None
    upper = None
    
    # Fit the black box model on the training data
    black_box.fit(X, Y)
    
    # Compute residuals on the training data
    residuals_calib = np.abs(Y - black_box.predict(X))
    
    # Compute suitable empirical quantile of absolute residuals
    n_calib = len(Y)
    level_adjusted = 1.0-alpha
    Q_hat = mquantiles(residuals_calib, prob=level_adjusted)[0]
    
    # Construct prediction bands
    Y_hat = black_box.predict(X_test)
    lower = Y_hat - Q_hat
    upper = Y_hat + Q_hat
    
    return lower, upper 

In [None]:
def evaluate_predictions(lower, upper, X, Y, verbose=True):
    """
    Evaluate performance metrics for a set of regression predictions
    Computes:
    - marginal coverage
    - average size of sets
    
    Input
    lower     : n x 1 vector of prediction lower bounds
    upper     : n x 1 vector of prediction upper upper
    X         : n x p data matrix of explanatory variables
    Y         : n x 1 vector of response variables
    """
    
    # Number of samples
    n = len(Y)
    
    # Evaluate the empirical coverage
    covered = (Y>=lower) * (Y <= upper)

    # Compute marginal coverage
    marginal_coverage = np.mean(covered)
    
    # Compute average size of prediction sets
    size = np.mean(upper-lower)
    
    # Compute average size of prediction sets that contain the true label
    idx_cover = np.where(covered)[0]
    size_cover = np.mean(upper[idx_cover]-lower[idx_cover])
    
    # Print summary
    if verbose:
        print('Marginal coverage       : {:2.3%}'.format(marginal_coverage))
        print('Average length          : {:2.3f}'.format(size))
        
    return marginal_coverage, size

In [None]:
# Desired coverage level (1-alpha)
alpha = 0.1

# Apply split conformal
lower, upper = naive_prediction_intervals(X_data, Y_data, X_test, black_box, alpha)

# Evaluate the predictions
metrics = evaluate_predictions(lower, upper, X_test, Y_test)

## Conformal prediction via conditional mean regression

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor

# Choose a black-box machine learning model (1,2,3,4)
bb_model_index = 1

if bb_model_index==1:
    # Random forest
    black_box = RandomForestRegressor(n_estimators=100, min_samples_split=10, random_state=2023)
elif bb_model_index==2:
    # Random forest with more aggressive splits
    black_box = RandomForestRegressor(n_estimators=100, min_samples_split=1, random_state=2023)
elif bb_model_index==3:
    # Support vector machine
    black_box = SVR(kernel='rbf', degree=3)
elif bb_model_index==4:
    # Neural network
    black_box = MLPRegressor(hidden_layer_sizes=(200,), max_iter=1000, random_state=2023)
else:
    print("Error: unknown machine learning model")
    black_box = None

In [None]:
def conformal_prediction_intervals(X, Y, X_test, black_box, alpha, random_state=2023):
    """
    Compute conformal prediction bands
    
    Input
    X         : n x p data matrix of explanatory variables
    Y         : n x 1 vector of response variables
    X_test    : n x p test data matrix of explanatory variables
    black_box : sklearn model object with 'fit' and 'predict' methods
    alpha     : 1 - target coverage level 
    """
    
    # Output placeholder
    lower = None
    upper = None
    
    # Split the data into training and calibration sets
    X_train, X_calib, Y_train, Y_calib = train_test_split(X, Y, test_size=0.5, random_state=2023)
    
    # Fit the black box model on the training data
    """TODO: write your code here (1 line)"""
    
    # Compute residuals on the calibration data
    """TODO: write your code here (1 line)"""
    
    # Compute suitable empirical quantile of absolute residuals
    """TODO: write your code here (3 lines)"""
    
    # Construct prediction bands
    """TODO: write your code here (3 lines)"""
    
    return lower, upper  

In [None]:
# Desired coverage level (1-alpha)
alpha = 0.1

# Apply split conformal
lower, upper = conformal_prediction_intervals(X_data, Y_data, X_test, black_box, alpha)

# Evaluate the predictions
metrics = evaluate_predictions(lower, upper, X_test, Y_test)

## Conformal prediction via quantile regression

Alternatively, we already know how to construct predictive intervals with valid marginal coverage using CQR.

In [None]:
# Choose a black-box quantile regression model (1 or 2)
bb_qr_model_index = 2

alpha = 0.1

if bb_qr_model_index==1:
    # Random forest
    black_box_qr = qr.LinearQR(alpha=0.1)
elif bb_qr_model_index==2:
    black_box_qr = qr.RFQR()
else:
    print("Error: unknown quantile regression model")
    black_box_qr = None

In [None]:
from sklearn.model_selection import train_test_split

def cqr_prediction_intervals(X, Y, X_test, black_box, alpha, random_state=2023):
    """
    Compute split-conformal quantile regression prediction bands.
    Uses quantile random forests as a black box 
    
    Input
    X         : n x p data matrix of explanatory variables
    Y         : n x 1 vector of response variables
    X_test    : n x p test data matrix of explanatory variables
    black_box : quantile regression model object with 'fit' and 'predict' methods
    alpha     : 1 - target coverage level 
    """
    
    # Output placeholder
    lower = None
    upper = None
    
    # Split the data into training and calibration sets
    """TODO: write your code here (1 line)"""
    
    # Fit the quantile regression model
    """TODO: write your code here (1 line)"""

    # Estimate conditional quantiles for calibration set
    """TODO: write your code here (2 lines)"""
    
    # Compute conformity scores on the calibration data
    """TODO: write your code here (1 line)"""
    
    # Compute suitable empirical quantile of absolute residuals
    """TODO: write your code here (3 lines)"""
    
    # Construct prediction bands
    """TODO: write your code here (3 lines)"""
    
    return lower, upper 

In [None]:
# Desired coverage level (1-alpha)
alpha = 0.1

# Apply quantile regression split conformal
lower, upper = cqr_prediction_intervals(X_data, Y_data, X_test, black_box_qr, alpha)

# Evaluate performance of predictions
metrics = evaluate_predictions(lower, upper, X_test, Y_test)

## Numerical experiments

We will now repeatedly apply the two methods described above to the data set, each time using a different random subset of the data for testing.

In [None]:
# Choose a black-box machine learning model (1,2,3,4)
bb_model_index = 1

if bb_model_index==1:
    # Random forest
    black_box = RandomForestRegressor(n_estimators=100, min_samples_split=10, random_state=2023)
elif bb_model_index==2:
    # Random forest with more aggressive splits
    black_box = RandomForestRegressor(n_estimators=100, min_samples_split=1, random_state=2023)
elif bb_model_index==3:
    # Support vector machine
    black_box = SVR(kernel='rbf', degree=3)
elif bb_model_index==4:
    # Neural network
    black_box = MLPRegressor(hidden_layer_sizes=(200,), max_iter=1000, random_state=2023)
else:
    print("Error: unknown machine learning model")
    black_box = None
    
# Choose a black-box quantile regression model (1 or 2)
bb_qr_model_index = 2

alpha = 0.1
if bb_qr_model_index==1:
    # Random forest
    black_box_qr = qr.LinearQR(alpha=0.1)
elif bb_qr_model_index==2:
    black_box_qr = qr.RFQR()
else:
    print("Error: unknown quantile regression model")
    black_box_qr = None

In [None]:
def run_experiment(dataset, black_box, black_box_qr, random_state=2023):
    # Divide data
    X_data, X_test, Y_data, Y_test = train_test_split(dataset[:,1:-1], dataset[:,0], 
                                                      test_size=0.2, random_state=random_state)
    
    # Run and evaluate naive
    lower_naive, upper_naive = naive_prediction_intervals(X_data, Y_data, X_test, black_box, alpha)
    metrics_naive = evaluate_predictions(lower_naive, upper_naive, X_test, Y_test, verbose=False)
    
    # Run and evaluate conformal
    lower_conformal, upper_conformal = conformal_prediction_intervals(X_data, Y_data, X_test, black_box, alpha, random_state=random_state)
    metrics_conformal = evaluate_predictions(lower_conformal, upper_conformal, X_test, Y_test, verbose=False)
    
    # Run and evaluate CQR
    lower_cqr, upper_cqr = cqr_prediction_intervals(X_data, Y_data, X_test, black_box_qr, alpha, random_state=random_state)
    metrics_cqr = evaluate_predictions(lower_cqr, upper_cqr, X_test, Y_test, verbose=False)
       
    # Return results
    results_exp = pd.DataFrame({"Method":["Naive", "Conformal", "CQR"], 
                                "Coverage":[metrics_naive[0], metrics_conformal[0], metrics_cqr[0]],
                                "Length":[metrics_naive[1], metrics_conformal[1], metrics_cqr[1]],
                  })
    return results_exp

In [None]:
# Run many experiments
results = pd.DataFrame()

from tqdm import tqdm

for experiment in tqdm(range(50)):
    
    # Random state for this experiment
    random_state = 2023 + experiment
    
    # Run the experiment
    result_exp = run_experiment(dataset, black_box, black_box_qr, random_state=random_state)
    
    # Store results
    results = pd.concat([results,result_exp])

In [None]:
# Prepare to make side-to-side plots
plt.figure(figsize=(12,3.5))

# Compare marginal coverage
plt.subplot(1, 2, 1)
ax = sns.boxplot(y="Coverage", x="Method", hue="Method", data=results)
ax.set(xlabel='Method', ylabel='Marginal coverage')
ax.axhline(1-alpha, ls='--', color="red")

# Compare average length of prediction intervals
plt.subplot(1, 2, 2)
ax = sns.boxplot(y="Length", x="Method", hue="Method", data=results)
ax.set(xlabel='Method', ylabel='Size of prediction intervals')
plt.show()