Import needed packages

In [1]:
import numpy as np
import pandas as pd
from scipy import stats
import sklearn
from sklearn import ensemble
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.model_selection import KFold

1. Define a method to load the kmer data for our Histogram Gradient Boosting model, selected due to large dataset size

In [2]:
def load_data_kmer():
    train_kmers = np.load('data/train_test_data/train_kmers.npy', allow_pickle=True)
    test_kmers = np.load('data/train_test_data/test_kmers.npy', allow_pickle=True)

    # Load target data & IDs
    y_train = np.load('data/train_test_data/y_train.npy', allow_pickle=True)
    y_train_ids = np.load('data/train_test_data/train_ids.npy', allow_pickle=True).astype(str)
    y_test_ids = np.load('data/train_test_data/test_ids.npy', allow_pickle=True).astype(str)

    return train_kmers, test_kmers, y_train, y_train_ids, y_test_ids

2. Load kmer data, define variables for kfold split/model hyperparameters here

In [None]:
# Load kmer data and convert to numpy arrays for use in models
train_kmers, test_kmers, y_train, y_train_ids, y_test_ids = load_data_kmer()
X_train = np.array(train_kmers)
X_test = np.array(test_kmers)
y_train = y_train.reshape(-1)

# Seed for consistency in testing
seed = 73

# Kfold variables
k = 5
n_iter = 50
cv = 10

# Model variables
n_estimators_low = 1
n_estimators_high = 50
k_best = 2000
model_performance = {}

3. Build stratified kfold (given the lopsided-ness of the data)

In [None]:
kfold = sklearn.model_selection.StratifiedKFold(
    n_splits = k,
    shuffle = True,
    random_state = seed,
)

4. Define method to select features using chi2

In [None]:
def select_features(X_train, X_test, y_train):
    selector = SelectKBest(score_func=chi2, k=k_best)
    X_train_new = selector.fit_transform(X_train, y_train)
    X_test_new = selector.transform(X_test)

    return X_train_new, X_test_new

5. Define a method to build an HGB model, using chi2 feature selection to reduce computation costs

In [None]:
def train_gb_model(X_train: np.array, y_train: np.array, performance_dict: dict, kfold: KFold, n_estimators_low: int, n_estimators_high: int, n_iter: int, cv: int, seed: int, k_best: int = 200):
    for i, (train_index, val_index) in enumerate(kfold.split(X_train, y_train)):

        print(f"Starting Outer fold {i}")
        X_train_outer, X_val_outer, y_train_outer, y_val_outer  = (
            X_train[train_index], X_train[val_index], y_train[train_index], y_train[val_index]
        )

        # Select k best features to reduce cost
        X_train_outer, X_val_outer = select_features(X_train=X_train_outer, y_train=y_train_outer, X_test=X_val_outer)
        print("K Best selected.")

        gbc_random_cv = sklearn.model_selection.RandomizedSearchCV(
            estimator = ensemble.HistGradientBoostingClassifier(random_state=seed),
            param_distributions = {
                "learning_rate": stats.loguniform(0.001, 1),
                "max_iter": stats.randint(n_estimators_low, n_estimators_high),
                "l2_regularization": stats.loguniform(1e-5, 10)
            },
            cv = cv,
            scoring = sklearn.metrics.make_scorer(sklearn.metrics.balanced_accuracy_score),
            random_state = seed,
            n_jobs = -1,
        )

        # Fit the model
        print(f"Fitting model for fold {i}")
        gbc_random_cv.fit(X_train_outer, y_train_outer)

        # Assess the best model using the outer validation data
        print(f"Assessing model performance for fold {i}")
        y_pred_outer_gbc = gbc_random_cv.predict(X_val_outer)
        performance_dict[i] = sklearn.metrics.confusion_matrix(y_val_outer, y_pred_outer_gbc, labels=["S","R"])

6. Train models using nested K-fold cross validation. 

In [None]:
train_gb_model(
    X_train = X_train,
    y_train = y_train,
    performance_dict = model_performance,
    kfold = kfold,
    n_estimators_low = n_estimators_low,
    n_estimators_high = n_estimators_high,
    n_iter = n_iter,
    cv = cv,
    seed = seed,
    k_best = k_best,
)

7. Combine all data across all folds into one matrix.

In [None]:
combined_matrix_kmer = np.mean(list(model_performance.values()), axis = 0)
pd.DataFrame(data = combined_matrix_kmer, index = ["S", "R"], columns = ["S", "R"])
print("Kmer performance:")
print(combined_matrix_kmer)

8. Define a function to calculate the confidence intervals for the model.

In [None]:
def calculate_normal_confidence_intervals(confusion_matrices_list):
    accuracy_list = []
    sensitivity_list = []
    specificity_list = []
    bal_acc_list = []

    for confusion_matrix in confusion_matrices_list:
        sensitivity = confusion_matrix[1,1] / confusion_matrix[1,:].sum()
        specificity = confusion_matrix[0,0] / confusion_matrix[0,:].sum()

        accuracy_list.append((confusion_matrix[0,0] + confusion_matrix[1,1]) / confusion_matrix.sum())
        sensitivity_list.append(sensitivity)
        specificity_list.append(specificity)
        bal_acc_list.append(np.mean([sensitivity, specificity]))

    location, std = np.mean(accuracy_list), np.std(accuracy_list)
    accuracy_ci = stats.norm.interval(0.95, loc = location, scale = std)

    location, std = np.mean(sensitivity_list), np.std(sensitivity_list)
    sensitivity_ci = stats.norm.interval(0.95, loc = location, scale = std)

    location, std = np.mean(specificity_list), np.std(specificity_list)
    specificity_ci = stats.norm.interval(0.95, loc = location, scale = std)

    location, std = np.mean(bal_acc_list), np.std(bal_acc_list)
    bal_acc_ci = stats.norm.interval(0.95, loc = location, scale = std)

    return accuracy_ci, sensitivity_ci, specificity_ci, bal_acc_ci

9. Calculate confidence intervals for the model.

In [None]:
acc_ci, sens_ci, spec_ci, ba_ci = calculate_normal_confidence_intervals(list(model_performance.values()))
print(f"Accuracy confidence interval: {acc_ci}")
print(f"Sensitivity confidence interval: {sens_ci}")
print(f"Specificity confidence interval: {spec_ci}")
print(f"Balanced accuracy confidence interval: {ba_ci}")

10. After finalizing the variable ranges, build a final model using all the data

In [None]:
X_train, X_test = select_features(X_train=X_train, X_test=X_test, y_train=y_train)

print("Training final model")
gb_final_cv = sklearn.model_selection.RandomizedSearchCV(
    estimator = ensemble.HistGradientBoostingClassifier(random_state=seed),
    param_distributions = {
        "learning_rate": stats.loguniform(0.001, 1),
        "max_iter": stats.randint(n_estimators_low, n_estimators_high),
        "l2_regularization": stats.loguniform(1e-5, 10)
    },
    cv = cv,
    scoring = sklearn.metrics.make_scorer(sklearn.metrics.balanced_accuracy_score),
    random_state = seed,
    n_jobs = 1,
    n_iter = 5,
    verbose = 1,
)

# Fit the final model
print("Fitting final cv model")
gb_final_cv.fit(X_train, y_train)

# Get optimal parameters
print("Getting optimal parameters")
final_lr = gb_final_cv.best_estimator_.get_params()["learning_rate"]
final_max_iter = gb_final_cv.best_estimator_.get_params()["max_iter"]
final_l2_regularization = gb_final_cv.best_estimator_.get_params()["l2_regularization"]

print(f"Final learning rate: {final_lr}")
print(f"Final max iterations: {final_max_iter}")
print(f"Final l2 regularization: {final_l2_regularization}")

print("Building final model")
# Build final model
final_model = ensemble.HistGradientBoostingClassifier(
    learning_rate = final_lr,
    max_iter = final_max_iter,
    l2_regularization = final_l2_regularization,
)

11. Fit the final model

In [None]:
final_model.fit(X_train, y_train)

12. Predict and output results to file

In [None]:
y_pred_test = final_model.predict(X_test)
final_predictions = pd.DataFrame(data = {"genome_id":y_test_ids, "y_pred":y_pred_test})
final_predictions.to_csv('kmer_gb_hist_stratified.csv', index = False)