# All imports

In [None]:
from abc import ABC, abstractmethod
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF, RationalQuadratic, WhiteKernel, ConstantKernel
from sklearn.model_selection import train_test_split, cross_val_score, KFold
from sklearn.preprocessing import OneHotEncoder, RobustScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, f1_score, balanced_accuracy_score, roc_curve, auc, precision_recall_curve, average_precision_score, roc_auc_score
from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
import statsmodels.api as sm
import scipy as sc
from scipy.special import expit
import sys
from collections import Counter

np.set_printoptions(formatter={'float': lambda x: "{0:0.4f}".format(x)})

# Classification Problem

## Exploratory Analysis

### Reading in Dataset

In [None]:
df = pd.read_csv('heart.csv')
df.head()

### Dataset analysis

In [None]:
df.dtypes

In [None]:
# Select all numerical columns
target_feature = ["HeartDisease"]
numerical_covariates = df.select_dtypes(include=["int64", "float64"]).columns.drop(["HeartDisease", "FastingBS"])
numerical_covariates_with_target = df.select_dtypes(include=["int64", "float64"]).columns.drop("FastingBS")
categorical_covariates = df.select_dtypes(include=["object"]).columns.tolist()
categorical_covariates.append("FastingBS")

print("Numerical Covariates:", numerical_covariates.values)
print("Categorical Covariates:", categorical_covariates)
print("Balance of target column:", df[target_feature].value_counts())

We can see there are
- 5 numerical covariates
- 6 categorical covariates
    - Sex: 2 categories
    - ChestPainType: 4 categories
    - RestingECG: 3 categories
    - ExerciseAngina: 2 categories
    - ST_Slope: 3 categories
    - FastingBS: 2 categories

#### Correlation matrix and scatter plot of all numerical features

NOTE: We can see a lot of redundant samples with cholesterol being 0.

In [None]:
# Correlation matrix
corr = df[numerical_covariates_with_target].corr()
plt.figure()
sns.heatmap(corr, annot=True, cmap='coolwarm',vmin=-1, vmax=1)
plt.title('Correlation of numerical features')
plt.show()

# Scatter plot
pp = sns.pairplot(df[numerical_covariates_with_target], hue='HeartDisease')
pp.fig.suptitle("Scatter plot of numerical features alongside Correlations", y=1.02, fontsize=20)

for ax in pp.axes.flatten():
    if ax is not None:
        ax.tick_params(axis='both', labelsize=10)  
        ax.set_xlabel(ax.get_xlabel(), fontsize=20)
        ax.set_ylabel(ax.get_ylabel(), fontsize=20)
        

pp._legend.set_bbox_to_anchor((1, 0.5))  
pp._legend.set_title("Heart Disease", prop={'size': 16})
for text in pp._legend.texts:
    text.set_fontsize(14)
plt.plot()

#### Categorical Variable Analysis - Proportion plots

We first calculate the proportion of the target within each category of each categorical feature, and then plot the proportions within each category via a bar chart. This approach is better than comparing raw counts of the target variable within each category, because there may be imbalances of the counts of each covariate category.

In [None]:
n_rows, n_cols = 2, 3
fig, axes = plt.subplots(n_rows, n_cols, figsize=(18, 10))
axes = axes.flatten()  # Make it easy to index

for idx, col in enumerate(categorical_covariates[:n_rows * n_cols]):
    prop_df = (
        df.groupby(col)["HeartDisease"]
        .value_counts(normalize=True)
        .rename("proportion")
        .reset_index()
    )
    stacked_df = prop_df.pivot(index=col, columns="HeartDisease", values="proportion").fillna(0)

    # Plot in corresponding axis
    stacked_df.plot(kind="bar", stacked=True, colormap="coolwarm", ax=axes[idx], legend=False)
    axes[idx].set_title(f"Proportion of HeartDisease by {col}")
    axes[idx].set_xlabel("")
    axes[idx].set_ylabel("Proportion")

# Hide any unused subplots
for ax in axes[len(categorical_covariates):]:
    ax.set_visible(False)

plt.tight_layout()
plt.show()

#### Finding erroneous rows

In [None]:
print("Looking for null values in each column:")
print((df.isnull().sum()))
print("\nLooking for 0 values in numerical columns:")
print((df[numerical_covariates]==0).sum())

## Data Splitting

80% train, 10% validation, 10% test

In [None]:
X = df.drop(columns=['HeartDisease'])
y = df['HeartDisease']

X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.1, shuffle=True, random_state=1, stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.1111111111, shuffle=True, random_state=1, stratify=y_temp)

## Data Cleaning

We have clearly identified 1 source of incorrect data: numerical columns cholesterol and RestingBP equals 0. Looking at the pairplot and combining contextual knowledge, these values having 0 is clearly a fault in a data collection method, due to the following reasons:
- Having a cholesterol of 0 is impossible, as all humans have some degree of cholesterol.
- A resting blood pressure (systolic) of 0 mmHg would indicate no blood circulation — which is not compatible with life.

NOTE: The variable OldPeak typically refers to ST depression caused by exercise. It's a measure taken during a stress test, such as a treadmill exercise. It represents the difference between ST segment height at rest and after exercise. OldPeak having value of 0 is NOT abnormal.
- OldPeak = 0.0 means no ST Depression = normal response
- OldPeak = 2.3 indicates significant Depression = possible heart disease

In [None]:
class DataCleaner(BaseEstimator, TransformerMixin):
    def __init__(self, subset=None, keep='first'):
        pass

    def reset_indices(self, X_cleaned, y_cleaned):
        return X_cleaned.reset_index(drop = True), y_cleaned.reset_index(drop = True)
    
    def remove_rows_with_0_column(self, X, y, col_name):
        non_zero_mask = X[col_name] != 0
        X_filtered = X[non_zero_mask]
        y_filtered = y[non_zero_mask]
        X_filtered, y_filtered = self.reset_indices(X_filtered, y_filtered)
        return X_filtered, y_filtered
    
    def fit(self, X, y):
        return self

    def transform(self, X, y):
        X_cleaned, y_cleaned = self.remove_rows_with_0_column(X, y, "Cholesterol")
        X_cleaned, y_cleaned = self.remove_rows_with_0_column(X_cleaned, y_cleaned, "RestingBP")
        return X_cleaned, y_cleaned

binary_cats = [col for col in categorical_covariates if df[col].nunique() == 2]
multi_cats = [col for col in categorical_covariates if df[col].nunique() > 2]

# We drop the first column after doing one-hot encoding to avoid perfect collinearity among covariates
preprocessor = ColumnTransformer(transformers=[
    ('binary', OneHotEncoder(drop='first', sparse_output=False), binary_cats),
    ('multi', OneHotEncoder(drop='first', sparse_output=False), multi_cats),
    ('num', RobustScaler(), numerical_covariates),
], remainder='passthrough')

full_pipeline = Pipeline([
    ('preprocess', preprocessor),
])

In [None]:
dc = DataCleaner()


X_train_cleaned, y_train_cleaned = dc.transform(X = X_train, y = y_train)
X_val_cleaned , y_val_cleaned = dc.transform(X = X_val, y = y_val)
X_test_cleaned , y_test_cleaned = dc.transform(X = X_test, y = y_test)

X_train_preprocessed = full_pipeline.fit_transform(X_train_cleaned, y_train_cleaned)
X_val_preprocessed = full_pipeline.fit_transform(X_val_cleaned, y_val_cleaned)
X_test_preprocessed = full_pipeline.fit_transform(X_test_cleaned, y_test_cleaned)

print("Dataset size:")
print(len(X_train_preprocessed) + len(X_val_preprocessed) + len(X_test_preprocessed))
print("\nTraining:")
print(X_train_preprocessed.shape)
print(y_train_cleaned.shape)
print("\nValidation")
print(X_val_preprocessed.shape)
print(y_val_cleaned.shape)
print("\nTesting:")
print(X_test_preprocessed.shape)
print(y_test_cleaned.shape)

## Model Training

### Model Selection Functions

The following functions are explained below whenever they are utilized.

#### K-Fold Cross Validation

In [None]:
# Conduct standard k-fold cross validation
def k_fold_cross_validation(full_pipeline, X_train_cleaned, y_train_cleaned, scoring, model, n_splits=5):
    X_train_preprocessed = full_pipeline.fit_transform(X_train_cleaned, y_train_cleaned)
    cv = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    scores = cross_val_score(model, X_train_preprocessed, y_train_cleaned, cv = cv, scoring=scoring)
    return scores

# Conduct k-fold cross validation of a model with parameters using only statistically significant covariates    
def k_fold_cross_validation_most_significant_covariates(full_pipeline, X_train_cleaned, y_train_cleaned, scoring, model, n_splits=5, k=4):
    def get_covariates_appearing_atleast_k_times(significant_covs_in_splits, k = n_splits-1):
        if k > n_splits: return
        counter = Counter()
        for arr in significant_covs_in_splits:
            counter.update(arr)
            
        covariates_at_least_k = sorted([item for item, count in counter.items() if count >= k])
        covariates_at_least_k = np.array(covariates_at_least_k)
        return covariates_at_least_k
    
    X_train_preprocessed = full_pipeline.fit_transform(X_train_cleaned, y_train_cleaned)
    cv = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    scores = []
    significant_covs = []

    for train_idx, val_idx in cv.split(X_train_preprocessed):
        X_train_fold, X_val_fold = X_train_preprocessed[train_idx, :], X_train_preprocessed[val_idx, :]
        y_train_fold, y_val_fold = y_train_cleaned[train_idx], y_train_cleaned[val_idx]
        
        X_train_fold = sm.add_constant(X_train_fold)
        X_val_fold = sm.add_constant(X_val_fold)
        
        model = sm.Logit(y_train_fold, X_train_fold)
        result = model.fit(disp=0)
        
        significant_features = np.where(result.pvalues <= 0.05)[0]
        
        X_train_fold_selected = X_train_fold[:, significant_features]
        X_val_fold_selected = X_val_fold[:, significant_features]
        
        model_selected = sm.Logit(y_train_fold, X_train_fold_selected)
        result_selected = model_selected.fit(disp=0)
        
        y_pred_val = result_selected.predict(X_val_fold_selected)
        
        if scoring == 'roc_auc':
            score = roc_auc_score(y_val_fold, y_pred_val)
        
        scores.append(score)
        significant_covs.append(significant_features)
        
    significant_covs = get_covariates_appearing_atleast_k_times(significant_covs, k=k)
    return scores, significant_covs
        

#### Bayesian Model Evidence Based Selection

In [None]:
# Evidence based selection for Bayesian Logistic Regression
def bayesian_model_evidence_selection(full_pipeline, X_train_cleaned, y_train_cleaned, BayesianLogisticRegressionClass, num_models = 10, seed = 1):
    np.random.seed(seed)
    X_train_preprocessed = full_pipeline.fit_transform(X_train_cleaned, y_train_cleaned)
    param_indices = list(range(X_train_preprocessed.shape[1]))
    
    all_subsets = []
    all_evidences = []
    best_subset = []
    highest_evidence = -sys.maxsize - 1
    
    for i in range(num_models):
        subset_size = np.random.randint(1, len(param_indices))
        
        if i == 0:
            subset = list(range(X_train_preprocessed.shape[1]))
        else:
            subset = np.random.choice(param_indices, subset_size, replace=False)
        subset = sorted(subset)
        
        blr = BayesianLogisticRegressionClass(
                                beta_0 = np.zeros(len(subset) + 1),
                                m_0 = np.zeros(len(subset) + 1),
                                sigma_0_inv= (X_train_preprocessed[:, subset].T @ X_train_preprocessed[:, subset])/ X_train_preprocessed.shape[0],
                                maxiter=100,
                                tolerance=1e-05,
                                fit_intercept=True,
                                unit_information_prior=True,
                                verbose = 0
                                )
        
        blr.fit(X_train_preprocessed[:, subset], y_train_cleaned[:])
        
        if blr.log_model_evidence > highest_evidence:
            highest_evidence = blr.log_model_evidence
            best_subset = subset

        all_subsets.append(subset)
        all_evidences.append(blr.log_model_evidence)
        
        print("Finished calculating evidence for subset number:", i+1)
    
    return highest_evidence, best_subset, all_evidences, all_subsets

### Model Evaluation Class

This class is used to evaluate performance on a particular dataset, and takes in a fitted model. For example, evaluating performance on validation or test set and takes in fitted model on training set. It calculates all sorts of classification metrics and curves, comprehensively evaluating performance.

In [None]:
# Classes evaluating a fitted classifier using all different metrics and curves
class ClassificationEvaluator(ABC):
    def __init__(self, model, X_evaluation_preprocessed, y_evaluation_cleaned, feature_names):
        self.model = model
        self.feature_names = feature_names
        self.real = y_evaluation_cleaned 
        self.preds = self.model.predict(X_evaluation_preprocessed[:, :])
        self.pred_probabilities = self.model.predict_proba(X_evaluation_preprocessed[:, :])
    
    @abstractmethod
    def plot_coefficients(self):
        pass
    
    @abstractmethod
    def get_coefficient_statistics(self):
        pass

    # Raw Accuracy, Balanced Accuracy, Sensitivity, Specificity, Precision, False Alarm Rate, F-Score
    def get_metrics(self):
        accuracy = accuracy_score(self.real, self.preds)
        precision = precision_score(self.real, self.preds)
        recall = recall_score(self.real, self.preds)
        
        tn, fp, fn, tp = confusion_matrix(self.real, self.preds).ravel()
        specificity = tn / (tn + fp)
        false_alarm_rate = fp / (tn + fp)
        f1 = f1_score(self.real, self.preds)
        balanced_accuracy = balanced_accuracy_score(self.real, self.preds)
        
        return {
            'Accuracy': accuracy,
            'Balanced accuracy': balanced_accuracy,
            'Precision': precision,
            'Sensivity/Recall': recall,
            'Specificity': specificity,
            'False Alarm Rate': false_alarm_rate,
            'F1 Score': f1
            }
        
    def plot_confusion_matrix(self):
        cm = confusion_matrix(self.real, self.preds)
        sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', cbar=False, 
                    xticklabels=['Negative', 'Positive'], yticklabels=['Negative', 'Positive'])
        plt.title("Confusion Matrix")
        plt.xlabel("Predicted Label")
        plt.ylabel("True Label")
        plt.show()
        
    def plot_roc_curve(self):
        y_preds_label1_probability = self.pred_probabilities[:, 1]
        fpr, tpr, thresholds = roc_curve(self.real, y_preds_label1_probability)
        roc_auc = auc(fpr, tpr)
        
        plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')  # Random classifier diagonal line
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate (1-specificity)')
        plt.ylabel('True Positive Rate (Sensitivity)')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc='lower right')
        plt.show()
        
    def plot_pr_curve(self):
        y_preds_label1_probability = self.pred_probabilities[:, 1]
        precision, recall, thresholds = precision_recall_curve(self.real, y_preds_label1_probability)
        ap_score = average_precision_score(self.real, y_preds_label1_probability)

        plt.plot(recall, precision, label=f'PR Curve (AP = {ap_score:.2f})', color='green')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title('Precision-Recall Curve')
        plt.legend(loc='lower left')
        plt.show()

    def get_all_classification_metrics(self):
        metrics = self.get_metrics()
        print(metrics)
        self.plot_coefficients()
        self.plot_confusion_matrix()
        self.plot_roc_curve()
        self.plot_pr_curve()
        self.get_coefficient_statistics()
        pass
    
class LogisticRegressionClassificationEvaluator(ClassificationEvaluator):
    def __init__(self, model, x_evaluation_preprocessed, y_evaluation_cleaned, feature_names, stats_model):
        self.stats_model = stats_model
        super().__init__(model, x_evaluation_preprocessed, y_evaluation_cleaned, feature_names)
    
    def plot_coefficients(self): 
        coefficients = self.model.coef_[0]
        plt.bar(range(len(self.feature_names)), np.abs(coefficients))
        plt.xticks(range(len(self.feature_names)),self.feature_names[:len(self.feature_names)],rotation=-45)
        plt.xlabel('Coefficient')
        plt.ylabel('Absolute value - log scale')
        plt.yscale('log')
        plt.show()
        
    def get_coefficient_statistics(self):
        results = self.stats_model.fit(method='ncg',maxiter=30000)
        print(results.summary())
        print()

        
class BayesianLogisticRegressionClassificationEvaluator(ClassificationEvaluator):
    def __init__(self, model, X_evaluation_preprocessed, y_evaluation_cleaned, feature_names):
        super().__init__(model, X_evaluation_preprocessed, y_evaluation_cleaned, feature_names)
        
    def plot_coefficients(self):
        coefficients = self.model.coef_
        self.feature_names = pd.Index(['intercept']).append(self.feature_names)
        plt.bar(range(len(self.feature_names)), np.abs(coefficients))
        plt.xticks(range(len(self.feature_names)),self.feature_names[:len(self.feature_names)],rotation=-45)
        plt.xlabel('Coefficient')
        plt.ylabel('Absolute value - log scale')
        plt.yscale('log')
        plt.show()
        return 

    def get_coefficient_statistics(self):
        pd.set_option('display.width', None)
        pd.set_option('display.max_columns', None)
        print(self.model.get_summary().to_string())
        print()
    
class GaussianProcessClassificationEvaluator(ClassificationEvaluator):
    def __init__(self, model, x_evaluation_preprocessed, y_evaluation_cleaned, feature_names):
        super().__init__(model, x_evaluation_preprocessed, y_evaluation_cleaned, feature_names)

    def _plot_coefficient_plotter(self, importance):
        plt.bar(range(len(self.feature_names)), np.abs(importance))
        plt.xticks(range(len(self.feature_names)),self.feature_names[:len(self.feature_names)],rotation=-45)
        plt.title("GP Feature Importance (1 / Length Scale)")
        plt.xlabel('Coefficient')
        plt.ylabel('Feature Importance (log scale)')
        plt.yscale('log')
        plt.show()

    def get_coefficient_statistics(self):
        return super().get_coefficient_statistics()
    
    def plot_coefficients(self):
        kernel_params = self.model.kernel_.get_params()
        length_scale_params = []
        for kp_key in kernel_params:
            if kp_key.endswith("length_scale"):
                length_scale_params.append(kp_key)
        
        for i in range(len(length_scale_params)):
            coefficients = kernel_params[length_scale_params[i]]
            if isinstance(coefficients, np.float64):
                print("Isotropic kernel with length_scale value:", coefficients)
                continue
            importance = 1 / coefficients
            self._plot_coefficient_plotter(importance)

### Gaussian Process Classification

#### Cross Validation to find best Gaussian Process Kernel

Unlike regression, we don't need to do sub-sampling and can do traditional k-fold cross validation. This is because the dataset is much smaller in this case. The training dataset has only 448 samples. We conduct 5-fold cross validation. to compare 2 kernels - RBF and Rational Quadratic. We add a constant to this kernel controlling for scale, and also white noise allowing for erroneous measurements. We use ROC-AUC as the metric across the different folds. We take the model with the highest average ROC-AUC across the splits. We do this because of a slight imbalance in the target variable, with there being ~500 positive samples and ~400 negative samples. Using ROC-AUC as a metric allows us to account for both Sensitivity and Specificity and choose a model maximizing both. Note a lot more models were tested, but omitted and only most important kernels are shown.

##### Isotropic RBF kernel
NOTE: Below cell took ~10 mins to run on Apple MacBook M1 Air

In [None]:
cv_scores_gp_rbf_all_cols = k_fold_cross_validation(full_pipeline = full_pipeline, 
                                                X_train_cleaned=X_train_cleaned, 
                                                y_train_cleaned=y_train_cleaned, 
                                                scoring='roc_auc', 
                                                model = GaussianProcessClassifier(kernel = 
                                                                                  ConstantKernel(constant_value=1,
                                                                                                constant_value_bounds=(1e-1, 1e+10)
                                                                                                )
                                                                                  * RBF(length_scale=1.0, 
                                                                                      length_scale_bounds=(1e-10, 100)
                                                                                      )
                                                                                  + WhiteKernel(noise_level=1 ** 2, 
                                                                                              noise_level_bounds=(1e-10, 1e+10)
                                                                                              )
                                                                                  ), 
                                                n_splits=5)
print(cv_scores_gp_rbf_all_cols)
print(np.mean(cv_scores_gp_rbf_all_cols))

##### Anisotropic RBF kernel
NOTE: Below cell took ~25 mins to run on Apple MacBook M1 Air

In [None]:
cv_scores_gp_rbf_all_cols_aniso = k_fold_cross_validation(full_pipeline = full_pipeline, 
                                                X_train_cleaned=X_train_cleaned, 
                                                y_train_cleaned=y_train_cleaned, 
                                                scoring='roc_auc', 
                                                model = GaussianProcessClassifier(kernel = 
                                                                                  ConstantKernel(constant_value=1,
                                                                                                constant_value_bounds=(1e-10, 1e+10)
                                                                                                )
                                                                                  * RBF(length_scale=[1.0] * X_train_preprocessed.shape[1], 
                                                                                      length_scale_bounds=(1e-10, 1e+10)
                                                                                      )
                                                                                  + WhiteKernel(noise_level=1 ** 2, 
                                                                                              noise_level_bounds=(1e-10, 1e+10)
                                                                                              )
                                                                                  ), 
                                                n_splits=5)
print(cv_scores_gp_rbf_all_cols_aniso)
print(np.mean(cv_scores_gp_rbf_all_cols_aniso))

##### Isotropic RationalQuadratic Kernel
NOTE: Below cell took ~25 mins to run on Apple MacBook M1 Air

In [None]:
cv_scores_gp_rq_all_cols = k_fold_cross_validation(full_pipeline = full_pipeline, 
                                                X_train_cleaned=X_train_cleaned, 
                                                y_train_cleaned=y_train_cleaned, 
                                                scoring='roc_auc', 
                                                model = GaussianProcessClassifier(kernel = 
                                                                                  ConstantKernel(constant_value=1,
                                                                                                constant_value_bounds=(1e-10, 1e+10)
                                                                                                )
                                                                                  * RationalQuadratic(length_scale=1.0,
                                                                                                      alpha=1.0,
                                                                                                      alpha_bounds=(1e-10, 1e+10)
                                                                                                      )
                                                                                  + WhiteKernel(noise_level=1 ** 2, 
                                                                                              noise_level_bounds=(1e-10, 1e+10)
                                                                                              )
                                                                                  ), 
                                                n_splits=5)
print(cv_scores_gp_rq_all_cols)
print(np.mean(cv_scores_gp_rq_all_cols))

#### Retraining on training set and seeing performance on Validation Set

We take the best performing kernel across different splits, refit on the entire training dataset, and see performance on unseen validation set.

In [None]:
kernel = ConstantKernel(constant_value=1,constant_value_bounds=(1e-10, 1e+10))* RBF(length_scale=[1.0] * X_train_preprocessed.shape[1], length_scale_bounds=(1e-10, 1e+10))+ WhiteKernel(noise_level=1 ** 2, noise_level_bounds=(1e-10, 1e+10))
gp = GaussianProcessClassifier(kernel = kernel)

gp.fit(X_train_preprocessed, y_train_cleaned)

In [None]:
classification_evaluator_gp = GaussianProcessClassificationEvaluator(model=gp,
                                                                     x_evaluation_preprocessed=X_val_preprocessed,
                                                                     y_evaluation_cleaned=y_val_cleaned,
                                                                     feature_names=[f'x{j+1}' for j in range(X_val_preprocessed.shape[1])])

classification_evaluator_gp.get_all_classification_metrics()

### Logistic Regression Classification

#### Cross Validation
We utilize cross validation to compare 2 models
- One is using all covariates.
- Second model utilizes only statistically significant covariates in each split.
    - It returns scores and covariates which are statistically significant in at least $k$ of the n_splits.

##### Cross validation using all covariates

In [None]:
cv_scores_log_reg_all_cols = k_fold_cross_validation(full_pipeline, X_train_cleaned, y_train_cleaned, 'roc_auc', model = LogisticRegression(penalty=None, solver='newton-cg'), n_splits=10)
print("Cross Validation scores (ROC_AUC):", cv_scores_log_reg_all_cols)
print("Mean of Cross Validation scores (ROC_AUC):", np.mean(cv_scores_log_reg_all_cols))

##### Cross-Validation of only statistically significant covariates

This corresponds to the second cross-validation function. The workflow is as follows:
1. Take a single training fold and validation fold, and fit the a Logistic Regression model on the training fold using all covariates
2. Look at the statistical significance of each of the covariates, and record only the significant covariates
3. Refit the model on the training fold using **only** statistically significant covariates
4. Use this model to predict on validation fold, and get ROC AUC metric
5. Record the indices of the statistically significant covariates of this split.
6. After getting all scores for all folds, return the set of covariates which were statistically significant in at least $k$ of the n_splits number of splits, where $1 < k < $ n_splits, and the corresponding metric scores.

In [None]:
cv_scores_log_reg_sig_covs, sig_covs = k_fold_cross_validation_most_significant_covariates(full_pipeline, X_train_cleaned, y_train_cleaned, 'roc_auc', model = LogisticRegression(penalty=None, solver='newton-cg'), n_splits=10, k=8)
print("Cross Validation scores (ROC_AUC):", cv_scores_log_reg_sig_covs)
print("Mean of Cross Validation scores (ROC_AUC):",np.mean(cv_scores_log_reg_sig_covs))
print("Signficant covariate indices (including intercept): ", sig_covs)

#### Testing both models on Validation set
We have seen the performance of both models via cross validation. The model using all covariates appears to do better than the model using only statistically significant covariates. We will now see performances on the validation set of both models, after retraining models on the entire training set. We have a list of covariates which have appeared to be statistically significant at least k times across n_splits number of cross validation splits. We will use these covariates only to train a logistic regression model utilizing only statistically significant covariates.

##### All covariates on validation set

In [None]:
log_reg = LogisticRegression(penalty=None, solver='newton-cg')
log_reg.fit(X_train_preprocessed, y_train_cleaned)

In [None]:
classification_evaluator_log_reg = LogisticRegressionClassificationEvaluator(model=log_reg,
                                                                     x_evaluation_preprocessed=X_val_preprocessed,
                                                                     y_evaluation_cleaned=y_val_cleaned,
                                                                     feature_names=[f'x{i}' for i in range(1, 16)],
                                                                     stats_model=sm.Logit(y_train_cleaned, sm.add_constant(X_train_preprocessed[:, :])))
                                                            
classification_evaluator_log_reg.get_all_classification_metrics()

##### Statistically significant covariates on validation set

In [None]:
log_reg_sig_covs = LogisticRegression(penalty=None, solver='newton-cg')
log_reg_sig_covs.fit(X_train_preprocessed[:, sig_covs-1], y_train_cleaned)

In [None]:
classification_evaluator_log_reg_sig_covs = LogisticRegressionClassificationEvaluator(model=log_reg_sig_covs,
                                                                     x_evaluation_preprocessed=X_val_preprocessed[:, sig_covs-1],
                                                                     y_evaluation_cleaned=y_val_cleaned,
                                                                     feature_names=sig_covs,
                                                                     stats_model=sm.Logit(y_train_cleaned, sm.add_constant(X_train_preprocessed[:, sig_covs-1])))
                                                            
classification_evaluator_log_reg_sig_covs.get_all_classification_metrics()

### Bayesian Logistic Regression Classification

The following class implements Bayesian Logistic Regression using the Laplace approximation and exposes a class interface similar to sklearn's implementation of classifiers. It inherits from the base classifier class part of the sklearn package. It extends from that in the sense that after fitting, it is able to provide a pandas dataframe of the summary of the fitted model including coefficients and the 95% confidence interval for the parameter estimate. It also automatically calculates model evidence while fitting to data.

In [None]:
class BayesianLogisticRegression(BaseEstimator, ClassifierMixin):
    def __init__(self, beta_0, m_0, sigma_0_inv, maxiter = 100, tolerance = 1e-05, fit_intercept = True, unit_information_prior = True, verbose = 1):
        self.fit_intercept = fit_intercept
        self.beta_0 = beta_0 # beta_0 is where to start the iterative update of the algorithm. This doesn't affect the prior, but is a numerical helper, NOT statistical assumption.
        self.m_0 = m_0 # m_0 is the prior mean of the distribution over weights beta.
        self.sigma_0_inv = sigma_0_inv # Prior precision matrix
        self.maxiter = maxiter
        self.tolerance = tolerance
        self.unit_information_prior = unit_information_prior
        self.verbose = verbose
    
    def __log_likelihood(self, X, y, beta):
        log_likelihood = y.T @ np.log(expit(X @ beta)) + (1 - y).T @ np.log(1 - expit(X @ beta))
        return log_likelihood

    def __log_likelihood_derivative(self, X, y, beta):
        log_likelihood_derivative =  X.T @ (expit(X @ beta) - y)
        return log_likelihood_derivative

    def __log_likelihood_second_derivative(self, X, y, beta):
        S = np.diag(expit(X @ beta) * (1 - expit(X @ beta)))
        Hessian = X.T @ S @ X
        return Hessian
    
    def __log_posterior(self, X, y, beta):
        log_likelihood = self.__log_likelihood(X, y, beta)
        log_prior = - 0.5 * (beta - self.m_0).T @ self.sigma_0_inv @ (beta - self.m_0)
        return log_likelihood + log_prior
    
    def __neg_log_posterior(self, X, y, beta):
        log_posterior = self.__log_posterior(X, y, beta)
        return -log_posterior
    
    def __log_posterior_derivative(self, X, y, beta):
        log_posterior_derivative = X.T @ (y - expit(X @ beta)) + self.sigma_0_inv @ (beta - self.m_0)
        return log_posterior_derivative
    
    def __neg_log_posterior_derivative(self, X, y, beta):
        log_posterior_derivative = self.__log_posterior_derivative(X, y, beta)
        return -log_posterior_derivative
    
    def __log_posterior_second_derivative(self, X, y, beta):
        S = np.diag(expit(X @ beta) * (1 - expit(X @ beta)))
        log_posterior_second_derivative = - (X.T @ S @ X) - self.sigma_0_inv
        return log_posterior_second_derivative

    def __neg_log_posterior_second_derivative(self, X, y, beta):
        log_posterior_second_derivative = self.__log_posterior_second_derivative(X, y, beta)
        return - log_posterior_second_derivative
    
    def __newton_raphson_optimization(self, X, y):
        i = 0
        beta = self.beta_0
        neg_log_posterior = self.__neg_log_posterior(X, y, beta)
        abs_diff = 1
        while abs_diff > self.tolerance and i < self.maxiter:
            if self.verbose == 1: print('iteration ',i+1,' Negative Log Posterior ',neg_log_posterior, ' AbDiff ', abs_diff)
            neg_log_posterior_derivative = self.__neg_log_posterior_derivative(X, y, beta)
            neg_log_posterior_hessian = self.__neg_log_posterior_second_derivative(X, y, beta)
            neg_log_posterior_hessian_inverse = sc.linalg.inv(neg_log_posterior_hessian)
            beta = beta - neg_log_posterior_hessian_inverse @ neg_log_posterior_derivative

            neg_log_posterior_new = self.__neg_log_posterior(X, y, beta)
            abs_diff = np.abs(neg_log_posterior_new - neg_log_posterior)
            neg_log_posterior = neg_log_posterior_new
            i += 1
            
        if (i == self.maxiter):
            print('Did not Converge') 
        
        return beta, neg_log_posterior_hessian_inverse, True
    
    def __calculate_model_evidence(self, X, y):
        log_likelihood = self.__log_likelihood(X, y, self.beta_map)
        log_prior = -0.5 * (self.beta_map - self.m_0).T @ self.sigma_0_inv @ (self.beta_map - self.m_0) \
            + 0.5 * np.linalg.slogdet(self.sigma_0_inv)[1] \
            - 0.5 * len(self.beta_map) * np.log(2 * np.pi)
            
        hessian = self.__neg_log_posterior_second_derivative(X, y, self.beta_map)
        log_det_hessian = np.linalg.slogdet(hessian)[1]
        normalization = 0.5 * (len(self.beta_map) * np.log(2*np.pi) - log_det_hessian)
        log_evidence = log_likelihood + log_prior + normalization
        return log_evidence
    
    def get_summary(self):
        check_is_fitted(self)
        se = np.sqrt(np.diag(self.sigma_map))
        lower5 = self.beta_map - 1.96 * se
        upper5 = self.beta_map + 1.96 * se
        
        results = np.column_stack([self.beta_map ,se ,lower5 ,upper5])
        col = ['post mean','post se','lower 5% bound','upper 95% bound']
        summary = pd.DataFrame(results, columns=col, index=[f'x{j+1}' for j in range(len(self.beta_map))])
        return summary
        
    
    def fit(self, X, y):
        X, y = check_X_y(X, y)
        if self.fit_intercept:
            X = np.column_stack([np.ones(X.shape[0]), X])
        if self.unit_information_prior:
            self.sigma_0_inv = (X.T @ X) / X.shape[0]
        
        beta_map, sigma_map, converged = self.__newton_raphson_optimization(X, y)
        if converged:
            self.beta_map = beta_map
            self.sigma_map = sigma_map
            self.log_model_evidence = self.__calculate_model_evidence(X, y)
            self.coef_ = beta_map
            self.fitted_ = True
        return self

    def predict_proba(self, X, n_samples = 10000):
        check_is_fitted(self)
        X = check_array(X)
        if self.fit_intercept:
            X = np.hstack([np.ones((X.shape[0], 1)), X])
            
        beta_map_samples = np.random.multivariate_normal(self.beta_map, self.sigma_map, size=n_samples)
        logits = X @ beta_map_samples.T
        probabilities = expit(logits)
        mean_probs = probabilities.mean(axis=1)
        return np.vstack([1 - mean_probs, mean_probs]).T
    
    def predict(self, X, n_samples = 10000):
        check_is_fitted(self)
        proba = self.predict_proba(X, n_samples=n_samples)
        return (proba[:, 1] >= 0.5).astype(int)

#### Bayesian Model Evidence

We utiise the above class's capabilities of calculating model evidence along with a function to automatically select the set of covariates which maximize the marginal log likelihood of the model i.e. the log evidence. The function randomly selects a set of covariates, and fits them on the training dataset. We then calculate the marginal log likelihood of the Laplace Approximation, and use the model maximizing this.

In [None]:
best_evidence, best_feature_subset, all_evidences, all_subsets = bayesian_model_evidence_selection(full_pipeline, 
                                                                            X_train_cleaned, 
                                                                            y_train_cleaned, 
                                                                            BayesianLogisticRegression,
                                                                            num_models=10
                                                                            )

print("Best Model Evidence:", best_evidence)
print("Best Feature Subset:", best_feature_subset)

In [None]:
print(all_evidences)
print(all_subsets)

#### Refitting model with highest marginal likelihood on training set and evaluating performance on validation

We use the model using only the covariates producing the highest marginal log likelihood. We see it's performance on the validation set

In [None]:
blr = BayesianLogisticRegression(beta_0 = np.zeros(len(best_feature_subset) + 1),
                                 m_0 = np.zeros(len(best_feature_subset) + 1),
                                 sigma_0_inv= (X_train_preprocessed[:, best_feature_subset].T @ X_train_preprocessed[:, best_feature_subset])/ X_train_preprocessed.shape[0],
                                 maxiter=100,
                                 tolerance=1e-05,
                                 fit_intercept=True,
                                 verbose=0
                                )

blr.fit(X_train_preprocessed[:, best_feature_subset], y_train_cleaned)

In [None]:
classification_evaluator_blr = BayesianLogisticRegressionClassificationEvaluator(
                                                                            model=blr,
                                                                            X_evaluation_preprocessed=X_val_preprocessed[:, best_feature_subset],
                                                                            y_evaluation_cleaned=y_val_cleaned,
                                                                            feature_names=pd.Index([f'x{i}' for i in best_feature_subset]),
                                                                        )

classification_evaluator_blr.get_all_classification_metrics()

## Final Model Evaluation

The best model we found was the Bayesian Logistic Regression classifier with the feature subset having the highest marginal likelihood. The validation set ROC AUC was 0.94, and the area under Precision-Recall curve was also 0.94. This model also had an F1-score of 0.87, which was higher than the ones produced by Logistic Regression and Gaussian Process Classifier which were ~0.80 and ~0.85 respectively. The Bayesian Logistic Regression model has beaten other models on most of the classification metrics, and we can see this visually via the confusion matrix.

We now evaluate the same performance on the final unseen test set, which is unseen from the beginning.

In [None]:
classification_evaluator_blr = BayesianLogisticRegressionClassificationEvaluator(
                                                                            model=blr,
                                                                            X_evaluation_preprocessed=X_test_preprocessed[:, best_feature_subset],
                                                                            y_evaluation_cleaned=y_test_cleaned,
                                                                            feature_names=pd.Index([f'x{i}' for i in best_feature_subset]),
                                                                        )

classification_evaluator_blr.get_all_classification_metrics()