In [247]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
from sklearn.linear_model import Lasso
import einops

##############################################################################
# Placeholder conformal prediction function
##############################################################################
def conformal_pred_split(X_train, y_train, 
                         X_cal, y_cal, 
                         X_test, y_test, alpha=0.05,
                         train_fun=None, predict_fun=None, seed=None):
    """
    Mimics R's conformalInference::conformal.pred.split().
    This is a placeholder. In practice, you can:
    - use MAPIE (https://github.com/scikit-learn-contrib/MAPIE)
    - implement split conformal logic manually.
    
    Parameters
    ----------
    X_train : array of shape (n, p)
    y_train : array of shape (n,)
    X_test  : array of shape (n_test, p)
    y_test : array of shape (n_test,)
    alpha   : significance level (1 - coverage)
    train_fun : callable that trains a model (X, y) -> model
    predict_fun : callable that predicts from model (model, X) -> predictions
    seed : random seed (if needed)
    
    Returns
    -------
    A dict with keys:
      'lo': float (lower bound),
      'up': float (upper bound)
    for each test point. Here we assume n_test=1. If >1, you'd return arrays.
    """
    if seed is not None:
        np.random.seed(seed)
    # Train the model
    model = train_fun(X_train, y_train)
    preds_cal = predict_fun(model, X_cal)
    # Compute residuals (naive approach)
    cal_scores = np.abs(y_cal - preds_cal)
    
    # quantile of absolute residuals
    q = np.quantile(cal_scores, 1 - alpha)
    
    # Predict on test
    test_preds  = predict_fun(model, X_test)
    test_scores = np.abs(y_test - test_preds)
    # If X_test has shape (1, p), preds_test is a single float or shape (1,)
    # We'll assume n0=1 as in your R script. If more, adjust accordingly.
    if test_preds.shape == ():  # single float
        test_preds = np.array([test_preds])
    
    lo = test_preds - q
    up = test_preds + q
    
    return {'lo': lo, 'up': up, 'scores': cal_scores, 'test_scores': test_scores, 'test_preds': test_preds}


##############################################################################
# Lasso "funs" (mimicking lasso.funs(...))
##############################################################################
def lasso_train(X, y, alpha=1.0, fit_intercept=False):
    """
    Train a Lasso model with no standardization (unless you do it outside)
    and possibly no intercept (fit_intercept=False).
    """
    model = Lasso(alpha=alpha, fit_intercept=fit_intercept)
    model.fit(X, y)
    return model

def lasso_predict(model, X):
    return model.predict(X)

def lasso_funs(lambda_val=1.0, standardize=False, intercept=False):
    """
    Returns a dict with 'train' and 'predict' to mimic R's:
      lasso.funs(lambda = lambda_val, standardize = F, intercept = F)
    """
    # In scikit-learn, set fit_intercept=not intercept
    # 'standardize' = F means we skip scaling here; user can scale externally if needed
    return {
        'train': lambda X, y: lasso_train(X, y, alpha=lambda_val, fit_intercept=intercept),
        'predict': lasso_predict
    }


##############################################################################
# Majority Vote & Exchangeable Majority Vote
# (Assuming you have them from previous translations; placeholders below.)
##############################################################################
def majority_vote(M, w, rho=0.5):
    """
    M : 2D array of shape (k, 2)
    w : 1D array of shape (k,) for weights
    rho: threshold
    Returns 2D array of intervals or None
    """
    # (Placeholder logic; adapt your previously translated code)
    # unique sorted breakpoints
    unique_breaks = np.unique(M.flatten())
    unique_breaks.sort()

    i = 0
    lower = []
    upper = []
    while i < len(unique_breaks) - 1:
        # counts_int-like check
        mid = 0.5 * (unique_breaks[i] + unique_breaks[i+1])
        # Weighted coverage:
        coverage_indicators = [(1 if row[0] <= mid <= row[1] else 0) for row in M]
        coverage = np.average(coverage_indicators, weights=w)
        cond = (coverage > rho)

        if cond:
            lower.append(unique_breaks[i])
            j = i
            while j < len(unique_breaks) - 1 and cond:
                j += 1
                if j < len(unique_breaks) - 1:
                    mid2 = 0.5 * (unique_breaks[j] + unique_breaks[j+1])
                    coverage2 = np.average(
                        [(1 if row[0] <= mid2 <= row[1] else 0) for row in M],
                        weights=w
                    )
                    cond = (coverage2 > rho)
            upper.append(unique_breaks[j])
            i = j
        i += 1

    if len(lower) == 0:
        return None
    return np.column_stack((lower, upper))

def exch_majority_vote(M, tau=0.5):
    """
    Exchangeable majority vote
    M: 2D array (k, 2)
    tau: threshold
    """
    # (Placeholder logic.)
    k = M.shape[0]
    if k == 1:
        return M
    perm_indices = np.random.permutation(k)
    permM = M[perm_indices, :]
    newM = [None]*k
    newM[0] = permM[0,:].reshape(1,2)
    for i in range(1, k):
        subset = permM[:i+1, :]
        w = np.full(i+1, 1/(i+1))
        mv = majority_vote(subset, w, rho=tau)
        if mv is None:
            return None
        newM[i] = mv
    
    # We gather the final intersection across newM
    # The R code's logic checks if intervals appear in *all* sets.
    # We'll do a simplified approach: take the union in newM and only keep those 
    # that appear in each list. 
    # For brevity, return one combined interval or None.

    # If you already have a fully working version from earlier translations, use that.
    # Here is a minimal approach to avoid an overly long snippet:
    if any(x is None for x in newM):
        return None
    # Flatten all intervals
    all_breaks = []
    for arr in newM:
        all_breaks.extend(arr.flatten())
    all_breaks = np.unique(all_breaks)
    all_breaks.sort()

    if len(all_breaks) < 2:
        return None

    # We'll just return everything as one big bracket or None
    return np.array([[all_breaks[0], all_breaks[-1]]])

def exch_rand_majority_vote(M):
    """
    Placeholder for exch_rand_majority_vote(cis),
    which is not defined in your snippet.
    
    We'll guess it does the same as exch_majority_vote but uses 
    a random threshold 'u' somewhere inside?
    """
    # Minimal stand-in:
    # e.g. pick a random threshold in [0.5,1], then call exch_majority_vote
    u = np.random.uniform(0.5, 1)
    return exch_majority_vote(M, tau=u)

In [476]:
# simulation p > n
# lasso with different lambda values
n  = 200    # number of obs. in the train set
n0 = 1_000    # number of obs. in the test set
p  = 120    # number of predictors
m  = 10     # number of active predictors

# beta values
# np.random.seed(123)
beta_vals = np.concatenate((np.random.normal(loc=0, scale=2, size=m), 
                            np.zeros(p - m)))

# design matrix and outcome
X_train = np.random.randn(n, p)
y_train = X_train @ beta_vals + np.random.randn(n)

X_cal = np.random.randn(n, p)
y_cal = X_cal @ beta_vals + np.random.randn(n)

# design matrix and outcome (test)
X_test = np.random.randn(n0, p)
y_test = X_test @ beta_vals + np.random.randn(n0)

# lasso for all parameters
lambda_vals = np.exp(np.linspace(-4, 1.5, 20))  # seq(-4, 1.5, length=20)
k = len(lambda_vals)
funs = []
for lam in lambda_vals:
    funs.append(lasso_funs(lambda_val=lam, standardize=False, intercept=False))

# Obtain a conformal prediction interval for each X0 with level (1-alpha/2)
alpha = 0.1
conf_pred_ints = []
for f in funs:
    # X0 is shape (1, p), we assume n0=1
    out = conformal_pred_split(X_train, y_train, 
                               X_cal, y_cal,
                               X_test, y_test, 
                               alpha=alpha/2,
                               train_fun=f['train'],
                               predict_fun=f['predict'],
                               seed=123)
    conf_pred_ints.append(out)

In [477]:
scores = np.array([out['scores'] for out in conf_pred_ints]).T
test_scores = np.array([out['test_scores'] for out in conf_pred_ints]).T
test_preds = np.array([out['test_preds'] for out in conf_pred_ints]).T
ints = np.array([[conf_pred_int["lo"], conf_pred_int["up"]] for conf_pred_int in conf_pred_ints])
ints = einops.rearrange(ints, "k l n -> n k l")

In [478]:
maj_ints = [exch_majority_vote(int_) for int_ in ints]

In [479]:
cleaned_ints = []
ys = []

for i in range(len(maj_ints)):
    if maj_ints[i] is not None:
        cleaned_ints.append(maj_ints[i][0])
        ys.append(y_test[i])

cleaned_ints = np.array(cleaned_ints)
ys = np.array(ys)

In [480]:
M = 200
unnorm_dirs = np.abs(np.random.multivariate_normal(np.zeros(k), np.eye(k), size=M))
proj_dirs = (unnorm_dirs / np.linalg.norm(unnorm_dirs, axis=1, keepdims=True)).T
proj_scores = scores @ proj_dirs

In [481]:
beta_range = [1-alpha, 1-alpha/k]
coverage = 0.0
eps = .02

while not (1-alpha-eps <= coverage and coverage <= 1-alpha + eps):
    beta = (beta_range[0] * .8 + beta_range[1] * .2)
    mvcp_proj_quantiles = np.quantile(proj_scores, q=beta, axis=0)
    train_covered = einops.reduce(proj_scores < mvcp_proj_quantiles, "n p -> n", "prod")
    coverage = np.sum(train_covered) / len(train_covered)
    if coverage < 1-alpha:
        beta_range[0] = beta
    else:
        beta_range[1] = beta
    print(f"{beta} -- {coverage}")

0.919 -- 0.895


In [482]:
in_region = lambda scores : einops.reduce(scores @ proj_dirs < mvcp_proj_quantiles, "n p -> n", "prod")

In [483]:
mvcp_lens = []
exch_maj_vote_lens = []

for i in range(len(test_preds)):
    if maj_ints[i] is None: # sometimes majority vote fails? skip these for now
        continue

    test_pred = test_preds[i]
    range_ = np.max(test_pred) - np.min(test_pred)
    delta  = range_ / 100
    candidate_y = np.arange(np.min(test_pred) - range_, np.max(test_pred) + range_, delta)

    tiled_pred = einops.repeat(test_pred, "n -> n s", s=len(candidate_y))
    candidate_scores = np.abs(tiled_pred - candidate_y).T
    scores_in_region = in_region(candidate_scores)
    
    mvcp_len = np.sum(scores_in_region) * delta / range_
    exch_maj_vote_len = (maj_ints[i][0][1] - maj_ints[i][0][0]) / range_

    mvcp_lens.append(mvcp_len)
    exch_maj_vote_lens.append(exch_maj_vote_len)

In [484]:
mvcp_covered = np.mean(in_region(test_scores))
exch_maj_covered = np.mean(np.logical_and(cleaned_ints[:,0] <= ys, ys <= cleaned_ints[:,1]))

In [485]:
print(mvcp_covered)
print(exch_maj_covered)

print(f"{np.mean(mvcp_lens)} ({np.std(mvcp_lens)})")
print(f"{np.mean(exch_maj_vote_lens)} ({np.std(exch_maj_vote_lens)})")

0.867
0.9048096192384769
1.6735871743486972 (0.9104811418888928)
1.8330893218509021 (1.563613101875763)
