In [1]:
import pandas as pd
import numpy as np
from pasi_test import pasiTree, delong_roc_variance
from sklearn.utils import resample
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
import time
from numba import jit

In [2]:
data = pd.read_csv('thyroid_dataset.csv')
X,y = data.iloc[:,0:-1], data.iloc[:,-1].to_numpy()

# fit a predictive model
predModel = LogisticRegression(random_state=19301014, max_iter=1000).fit(X, y)
y_pred = predModel.predict_proba(X)[:,1]
print('Predictive model AUC: {}'.format(roc_auc_score(y_true=y, y_score=y_pred)))


for j in X.columns:
    if isinstance(X[j][0], float):
        X[j] = X[j].astype('float64')
    else:
        X[j] = X[j].astype('string')

X['age'] = X['age'].astype('int64')

Predictive model AUC: 0.9593977925201739


In [3]:
pasiTree_model = pasiTree(measure='auc', min_samples_leaf=100, max_depth=3).fit(X=X, mu=None, y=y, y_pred=y_pred)

In [4]:
s = pasiTree_model.tree.export_graphviz(feature_names=list(X.columns),measure_name='auc')

In [5]:
# https://dreampuf.github.io/GraphvizOnline/

s

'digraph Tree {node [shape=box, style="rounded", color="black", fontname="helvetica"] ;edge [fontname="helvetica"] ;1 [label=<node 1<br/>train auc = 0.959 (0.945, 0.974)<br/>n_train = 3152<br/>T3 &le; 1.55>];2 [label=<node 2<br/>train auc = 0.945 (0.929, 0.961)<br/>n_train = 1007<br/>T3 &le; 1.15>];4 [label=<node 4<br/>train auc = 0.887 (0.855, 0.919)<br/>n_train = 504<br/>FTI &le; 117.5>];8 [style=bold; label=<node 8<br/>train auc = 0.917 (0.885, 0.949)<br/>n_train = 363>];4 -> 8;9 [style=bold; label=<node 9<br/>train auc = 0.761 (0.672, 0.85)<br/>n_train = 141>];4 -> 9;2 -> 4;5 [style=bold; label=<node 5<br/>train auc = 0.751 (0.659, 0.842)<br/>n_train = 503>];2 -> 5;1 -> 2;3 [label=<node 3<br/>train auc = 0.67 (0.572, 0.768)<br/>n_train = 2145<br/>TT4 &le; 176.5>];6 [label=<node 6<br/>train auc = 0.63 (0.506, 0.755)<br/>n_train = 1938<br/>T4U &le; 0.935>];12 [style=bold; label=<node 12<br/>train auc = 0.494 (0.268, 0.721)<br/>n_train = 691>];6 -> 12;13 [style=bold; label=<node 13<br

In [3]:
from pasi_test import delong_roc_variance

In [5]:
from pasi_test.acc_functions import compute_auc_and_variance_Hanley_McNeil

In [170]:
def auc_estimation_HM(y, y_pred):
    """
    Compute the Area Under the Curve (AUC) and its variance for a given set of true labels and predictions.
    
    This function calculates the AUC using the trapezoidal method and then estimates its variance. 
    Additionally, it provides a 95% confidence interval for the AUC based on the normal approximation.
    
    Parameters:
    -----------
    y : numpy.ndarray
        A one-dimensional array of true labels. Must contain only binary values (0s and 1s) 
        where 1 denotes the positive class and 0 denotes the negative class.
    
    y_pred : numpy.ndarray
        A one-dimensional array of prediction scores corresponding to each sample in `y`. 
        Higher scores are assumed to indicate positive class preference.
    
    Returns:
    --------
    dict
        A dictionary containing:
        - 'mu': Estimated AUC
        - 'var': Estimated variance of AUC
        - 'sd': Standard deviation (square root of variance)
        - 'ci': 95% confidence interval for the AUC (as a list of two values: lower and upper bounds)
        - 'm': Number of positive samples
        - 'n': Number of negative samples
    
    Raises:
    -------
    ValueError:
        - If `y` and `y_pred` are not both numpy arrays.
        - If shapes of `y` and `y_pred` don't match or they are not 1-dimensional.
        - If `y` contains values other than 0 or 1.
        
    Notes:
    ------
    The function uses the trapezoidal rule to compute the AUC and a known formula to estimate its variance.
    Hanley, J. A., & McNeil, B. J. (1982). The meaning and use of the area under a receiver operating characteristic (ROC) curve. Radiology, 143(1), 29-36.
    """


    # Check that y and y_pred are numpy arrays
    if not (isinstance(y, np.ndarray) and isinstance(y_pred, np.ndarray)):
        raise ValueError("Both y and y_pred must be numpy arrays.")
    
    # Check that y and y_pred have the same shape and is of shape (N,)
    if y.shape != y_pred.shape or len(y.shape) != 1:
        raise ValueError("y and y_pred must have the same shape of (N,).")
    
    # Check that y contains only 0s and 1s
    if not np.array_equal(np.unique(y), [0, 1]) and not np.array_equal(np.unique(y), [1, 0]):
        raise ValueError("y should contain 0s and 1s.")
    
    # Calculate number of positive and negative samples
    m = np.sum(y)
    n = len(y_pred) - m

    # Sort the samples by ascending scores
    sorted_indices = np.argsort(y_pred)
    y_true_sorted = y[sorted_indices]

    # Vectorized calculation of cumulative sum for true positive and false positive
    cum_tpr = np.cumsum(y_true_sorted) / m
    cum_fpr = np.cumsum(1 - y_true_sorted) / n

    # Compute AUC using the trapezoid method
    auc = np.sum((cum_tpr[1:] - cum_tpr[:-1]) * (cum_fpr[1:] + cum_fpr[:-1])) / 2

    # Estimate variance using the updated formula
    auc2 = auc * auc
    Q1 = auc / (2 - auc)
    Q2 = 2 * auc2 / (1 + auc)
    var_auc = (auc * (1 - auc) + (m - 1) * (Q1 - auc2) + (n - 1) * (Q2 - auc2)) / (m * n)

    # # 95% CI, normal approximation
    # sd_hat = np.sqrt(var_auc)
    # alpha = 0.95
    # lower_upper_q = np.abs(np.array([0, 1]) - (1 - alpha) / 2)

    # if sd_hat > 0:
    #     ci = stats.norm.ppf(lower_upper_q, loc=auc, scale=sd_hat)
    # else:
    #     ci = np.array([0.0, 0.0])
    # ci[ci > 1] = 1
    # ci[ci < 0] = 0

    return auc, var_auc

In [180]:
@jit(nopython=True,parallel=False)
def auc_estimation_HM_numba(y:np.array, y_pred:np.array) -> (float, float):
    # Check that y contains only 0s and 1s
    unique_vals = np.unique(y)
    if not (len(unique_vals) == 2 and ((unique_vals[0] == 0 and unique_vals[1] == 1) or (unique_vals[0] == 1 and unique_vals[1] == 0))):
        raise ValueError("y should contain 0s and 1s.")
    
    # Calculate number of positive and negative samples
    m = np.sum(y)
    n = len(y_pred) - m

    # Sort the samples by ascending scores
    sorted_indices = np.argsort(y_pred)
    y_true_sorted = y[sorted_indices]

    # Vectorized calculation of cumulative sum for true positive and false positive
    cum_tpr = np.cumsum(y_true_sorted) / m
    cum_fpr = np.cumsum(1 - y_true_sorted) / n

    # Compute AUC using the trapezoid method
    auc = np.sum((cum_tpr[1:] - cum_tpr[:-1]) * (cum_fpr[1:] + cum_fpr[:-1])) / 2

    # Estimate variance using the updated formula
    auc2 = auc * auc
    Q1 = auc / (2 - auc)
    Q2 = 2 * auc2 / (1 + auc)
    var_auc = (auc * (1 - auc) + (m - 1) * (Q1 - auc2) + (n - 1) * (Q2 - auc2)) / (m * n)

    return auc, var_auc

In [171]:

def auc_estimation_Shirahata(y:np.array, y_pred:np.array) -> dict:
    """
    Estimate the AUC and its variance using the formula from Shirahata [1993].
    
    Parameters:
    - y (np.array): True binary responses (0/1) with shape (N,).
    - y_pred (np.array): Predicted scores with shape (N,).

    Returns:
    - dict: A dictionary containing the following key-value pairs:
        * 'mu': Estimated AUC
        * 'var': Estimated variance of mu
        * 'sd': Estimated standard error of mu
        * 'ci': 95% confidence interval for AUC (based on the normal approximation)
        * 'm': Number of positive samples
        * 'n': Number of negative samples
        * 'B': Sum of the number of times each positive sample score exceeds each negative sample score

    Raises:
    - ValueError: If `y` and `y_pred` have different lengths.
    - ValueError: If all labels in `y` are positive or all are negative.
    
    Notes:
    - The method is based on Shirahata [1993]. https://www.jstage.jst.go.jp/article/jjscs1988/6/2/6_2_1/_article
    """
    # Check for mismatched sizes
    if len(y) != len(y_pred):
        raise ValueError(f"Input arrays y and y_pred must have the same size. {len(y)} != {len(y_pred)}")
    
    # Check if y and y_pred have shape (N,)
    if y.shape != (len(y),) or y_pred.shape != (len(y_pred),):
        raise ValueError("Both y and y_pred must have a shape of (N,).")
    
    pos_array = y_pred[y == 1]
    neg_array = y_pred[y == 0]
    m = float(len(pos_array))
    n = float(len(neg_array))

    # Check for all positive or all negative
    if m == 0:
        raise ValueError("Positive labels must be present in y.")
    
    if n == 0:
        raise ValueError("Negative labels must be present in y.")


    c_array = np.sum(pos_array[:, np.newaxis] > neg_array, axis=1)
    d_array = np.sum(neg_array[:, np.newaxis] < pos_array, axis=1)

    c_square = np.sum(c_array ** 2)
    d_square = np.sum(d_array ** 2)
    B = np.sum(c_array)

    denominator = m * n
    if m > 1:
        denominator *= (m - 1)
    if n > 1:
        denominator *= (n - 1)
    
    var_hat = (-(m + n - 1) * B ** 2 / (m * n) - B + c_square + d_square) / denominator
    
    # 95% CI, normal approximation
    mu = B / (m * n)

    return mu, var_hat




In [188]:
def generate_test_cases(n_samples=1000000, n_cases=10, seed=42):
    test_cases = []
    np.random.seed(seed)
    for _ in range(n_cases):
        y_true = np.random.randint(0, 2, n_samples)
        y_scores = np.random.rand(n_samples)
        test_cases.append((y_true, y_scores))
    return test_cases

# --- Analysis and comparison of the methods ---

def compare_methods(test_cases):
    results_delong = []
    results_previous = []
    results_shirahata = []

    delong_times = []
    previous_times = []
    shirahata_times = []

    for y_true, y_scores in test_cases:
        # DeLong method
        start_time = time.perf_counter()
        for _ in range(100):
            delong_dict = delong_roc_variance(y=y_true, y_pred=y_scores)
            auc_delong, var_delong = delong_dict['mu'], delong_dict['var']
        end_time = time.perf_counter()
        results_delong.append((auc_delong, var_delong))
        delong_times.append((end_time - start_time)/100)

        # Previous method
        start_time = time.perf_counter()
        for _ in range(100):
            auc_prev, var_prev = auc_estimation_HM_numba(y_true, y_scores)
        end_time = time.perf_counter()
        results_previous.append((auc_prev, var_prev))
        previous_times.append((end_time - start_time)/100)

        # Shirahata method
        start_time = time.perf_counter()
        for _ in range(100):
            auc_shirahata, var_shirahata = auc_estimation_Shirahata(y_true, y_scores)
        end_time = time.perf_counter()
        results_shirahata.append((auc_shirahata, var_shirahata))
        shirahata_times.append((end_time - start_time)/100)
    
    return results_delong, delong_times, results_previous, previous_times, results_shirahata, shirahata_times

# --- Execution and Display of Results ---

# Generate test cases
test_cases = generate_test_cases(n_samples=10000, n_cases=2)

# Compare methods
results_delong, delong_times, results_previous, previous_times, results_shirahata, shirahata_times = compare_methods(test_cases)


In [189]:
results_delong

[(0.502891119543968, 3.334567797676501e-05),
 (0.506466758200824, 3.333602631838647e-05)]

In [190]:
results_previous

[(0.5028911195439676, 3.333586389985171e-05),
 (0.5064667582008258, 3.3329289456592686e-05)]

In [191]:
results_shirahata

[(0.5028911195439681, 3.3342345034407675e-05),
 (0.5064667582008238, 3.333269288227195e-05)]

In [192]:
delong_times

[0.001789498329962953, 0.0015297870799986413]

In [193]:
previous_times

[0.0007301195799664129, 0.0007109329199738568]

In [194]:
shirahata_times

[0.0634725516600156, 0.06326952541996433]