## Define paramters

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import sklearn as sk


import numpy as np
import matplotlib as mpl


# cv parameters
num_folds = 5 
num_repeats = 10 

# model parameters
optimizer = 'adam'
learning_rate = 0.0008
loss = 'binary_crossentropy'

# training parameters
epochs = 40
batch_size = 32
metrics=['accuracy',
        tf.keras.metrics.AUC(curve = "ROC", name='AUROC'),
        tf.keras.metrics.AUC(curve = "PR", name='AUPRC'),
         tf.keras.metrics.Precision(name='precision'),
         tf.keras.metrics.Recall(name='recall'),
         tf.keras.metrics.SensitivityAtSpecificity(specificity=0.95, name="sen_at_0.95spec"),
         tf.keras.metrics.SensitivityAtSpecificity(specificity=0.98, name="sen_at_0.98spec")]






## Read in data

In [None]:

x_file = 'x_clean_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.03_0.05_.npy'
y_file = 'y_clean_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.03_0.05_.npy' 
z_file = 'z_clean_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.03_0.05_.npy'

performance_csv = 'performance_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.03_0.05_.csv'

x_file = 'x_clean_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.01_0.03_.npy'
y_file = 'y_clean_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.01_0.03_.npy' 
z_file = 'z_clean_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.01_0.03_.npy'

performance_csv = 'performance_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.01_0.03_.csv'

x = np.load(x_file, allow_pickle=True)
y = np.load(y_file, allow_pickle=True)
z = np.load(z_file, allow_pickle=True)

y = np.array(y)
input_shape = x.shape[1:]
#input_shape = (465, 201, 3)

## Design models

In [None]:
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
import scikeras.wrappers as sw
#from scikeras.wrappers import KerasClassifier


def create_model(input_shape, loss, learning_rate, metrics):
    model = keras.Sequential(
        [
            keras.Input(shape=input_shape),
            
            layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            layers.MaxPooling2D(pool_size=(3, 2)),
            
            layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            layers.MaxPooling2D(pool_size=(2, 2)),
            
            layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            layers.MaxPooling2D(pool_size=(2, 2)),
            
            layers.Conv2D(16, kernel_size=(3, 3), activation="relu"),
            layers.MaxPooling2D(pool_size=(2, 2)),
            
            layers.Flatten(),
            layers.Dense(32, activation="relu"),
            #layers.Dropout(0.2),
            layers.Dense(1, activation="sigmoid")
        ])
    
    opt = keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(loss=loss, 
        optimizer=opt, 
        metrics=metrics)
    return model


# compute class weights and set dictionary to class_weight parameter
from sklearn.utils import class_weight
class_weight = dict(zip(np.unique(y), class_weight.compute_class_weight(class_weight = 'balanced', 
                                                                        classes = np.unique(y), 
                                                                        y = y)))




model = sw.KerasClassifier(model = create_model,	
			model__metrics = metrics,
			model__input_shape = input_shape,
            model__loss = loss,
            model__learning_rate = learning_rate,
            class_weight=class_weight,
            validation_split=0.2,
            epochs=epochs, 
            batch_size=batch_size, 
            verbose=1)





## Design scores

In [None]:
# define scoring function by recall
from sklearn.metrics import make_scorer
from sklearn.metrics import recall_score

# define confusion_matrix_scorer, return specificity, sensitivity, f1, recall, precision and accuracy in a named dictionary
from sklearn.metrics import confusion_matrix
def scoring_1(clf, X, y):
    y_pred = clf.predict(X)
    cm = confusion_matrix(y, y_pred)
    tn, fp, fn, tp = cm.ravel()
    specificity = tn / (tn+fp)
    sensitivity = tp / (tp+fn)
    f1 = 2 * (sensitivity * specificity) / (sensitivity + specificity)
    recall = sensitivity
    precision = tp / (tp+fp)
    accuracy = (tp+tn) / (tp+tn+fp+fn)
    return {'specificity': specificity, 
    		'sensitivity': sensitivity, 
		'f1': f1, 
		'recall': recall, 
		'precision': precision, 
		'accuracy': accuracy}


from sklearn.metrics import confusion_matrix

def sensitivity_at_specificity(y_true, y_pred_proba, specificity):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred_proba >= 0.5).ravel()
    actual_negatives = tn + fp
    actual_positives = fn + tp
    tn_rate = tn / actual_negatives
    fp_rate = fp / actual_negatives
    cutoff = 0.5
    while fp_rate > (1-specificity) and cutoff > 0:
        cutoff -= 0.01
        y_pred = (y_pred_proba > cutoff).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        actual_negatives = tn + fp
        actual_positives = fn + tp
        tn_rate = tn / actual_negatives
        fp_rate = fp / actual_negatives
    sensitivity = tp / actual_positives
    return sensitivity



scoring_2 = {'specificity': make_scorer(recall_score, pos_label=0),
	'sensitivity': make_scorer(recall_score, pos_label=1),
	'acc': 'accuracy',
	'f1': 'f1',
	'ppv': 'precision',
	'roc_auc': 'roc_auc',
	'pr_auc': 'average_precision',
	'sen_at_0.95spec': make_scorer(sensitivity_at_specificity,  specificity=0.95, needs_proba=True),
	'sen_at_0.98spec': make_scorer(sensitivity_at_specificity, specificity=0.98, needs_proba=True)}


## Run the model

In [None]:

# use cross_validate to return multiple metrics

from sklearn.model_selection import RepeatedStratifiedKFold 
from sklearn.model_selection import cross_validate

kfold = RepeatedStratifiedKFold(n_splits=num_folds,
                                n_repeats=num_repeats,
                                random_state=1)

cv_results = cross_validate(model, 
                          x, y, 
                          scoring= scoring_2,
                          return_train_score=True, 
                          cv=kfold,
                          verbose=2,
                          return_estimator=True,
                          n_jobs=1)

# use cross validate and stratifiedGroupKFold 
# repeat 10 times, 5 folds


from sklearn.model_selection import StratifiedGroupKFold
kfold_grouped = StratifiedGroupKFold(n_splits=num_folds,
                            shuffle=True)

cv_results2 = cross_validate(model, 
                          x, y, 
                          groups = z,
                          scoring= scoring_2,
                          return_train_score=True, 
                          cv=kfold_grouped,
                          verbose=2,
                          return_estimator=True,
                          n_jobs=1)

# tf > 0.1 and healthy
cv_results3 = cross_validate(model, 
                          x, y, 
                          groups= z,
                          scoring= scoring_2,
                          return_train_score=True, 
                          cv=kfold_grouped,
                          verbose=2,
                          return_estimator=True,
                          n_jobs=1)


# tf < 0.1 and healthy
cv_results4 = cross_validate(model, 
                          x, y, 
                          groups=z,
                          scoring= scoring_2,
                          return_train_score=True, 
                          cv=kfold_grouped,
                          verbose=2,
                          return_estimator=True,
                          n_jobs=1)



# x_clean_n_isize_n_motif_s1_C_n_motif_s1_T_TF_0.03_0.05_.npy
cv_results = cross_validate(model, 
                          x, y, 
                          groups=z,
                          scoring= scoring_2,
                          return_train_score=True, 
                          cv=kfold_grouped,
                          verbose=2,
                          return_estimator=True,
                          n_jobs=1)


# Perform repeated cross-validation with shuffling
results = {}

for _ in range(num_repeats):
    # report which loop we are on
    
    # Perform cross-validation with shuffling enabled
    cv_results = cross_validate(model, x, y, groups=z, scoring=scoring_2, cv=kfold_grouped, n_jobs=1, verbose=0)
    
    # Store the performance metrics for each repetition, store both train and test results
    for key, value in cv_results.items():
        if key not in results:
            results[key] = []
        results[key].append(value)

# Create a pandas DataFrame from the results dictionary
import pandas as pd

# remove 'estimator' key
relevant_keys = [k for k in cv_results.keys() if k != 'estimator']
filtered_dict = {k: v for k, v in cv_results.items() if k in relevant_keys}

# create a pandas DataFrame from the new dictionary
cv_df = pd.DataFrame.from_dict(filtered_dict)

# print the resulting DataFrame
cv_df.to_csv(performance_csv, index=False)

# clean up memory
del cv_results
tf.keras.backend.clear_session()







## temp block

In [None]:




# or return the predicted labels
from sklearn.model_selection import cross_val_predict
y_pred = cross_val_predict(model, x, y, cv=kfold)


# or use for loop to do cross validation and get the metrics
for train_index, test_index in kfold.split(x, y):
    x_train, x_test = x[train_index], x[test_index]
    y_train, y_test = y[train_index], y[test_index]
    model.fit(x_train, y_train)
    y_pred = model.predict(x_test)
    y_pred_proba = model.predict_proba(x_test)
    cm = confusion_matrix(y_test, y_pred)
    tn, fp, fn, tp = cm.ravel()
    specificity = tn / (tn+fp)
    sensitivity = tp / (tp+fn)
    f1 = 2 * (sensitivity * specificity) / (sensitivity + specificity)
    recall = sensitivity
    precision = tp / (tp+fp)
    accuracy = (tp+tn) / (tp+tn+fp+fn)
    print('specificity: ', specificity)
    print('sensitivity: ', sensitivity)
    print('f1: ', f1)
    print('recall: ', recall)
    print('precision: ', precision)
    print('accuracy: ', accuracy)
    print('roc_auc: ', roc_auc_score(y_test, y_pred_proba))
    print('pr_auc: ', average_precision_score(y_test, y_pred_proba))
    print('sen_at_0.95spec: ', sensitivity_at_specificity(y_test, y_pred, 0.95))
    print('sen_at_0.98spec: ', sensitivity_at_specificity(y_test, y_pred, 0.98))
    print('--------------------------------------')



## ROC curve

In [None]:
# get the ROC curve of all folds into one plot and plot the mean ROC curve
from sklearn.metrics import roc_curve

import matplotlib.pyplot as plt
from sklearn.metrics import auc
from sklearn.metrics import RocCurveDisplay
# see here: https://stackoverflow.com/questions/62739598/how-to-plot-the-roc-curve-for-ann-for-10-fold-cross-validation-in-keras-using-py
model._estimator_type = "classifier"
tprs = []
aucs = []
mean_fpr = np.linspace(0, 1, 100)

fig, ax = plt.subplots(figsize=(6, 6))


for fold, (train, test) in enumerate(kfold.split(X = x, y = y)):
    model.fit(x[train], y[train])
    viz = RocCurveDisplay.from_estimator(
        model,
        x[test],
        y[test],
        name=f"ROC split {fold}",
        alpha=0.3,
        lw=1,
        ax=ax,
    )
    interp_tpr = np.interp(mean_fpr, viz.fpr, viz.tpr)
    interp_tpr[0] = 0.0
    tprs.append(interp_tpr)
    aucs.append(viz.roc_auc)

ax.plot([0, 1], [0, 1], "k--", label="chance level (AUC = 0.5)")

mean_tpr = np.mean(tprs, axis=0)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
std_auc = np.std(aucs)
ax.plot(
    mean_fpr,
    mean_tpr,
    color="b",
    label=r"Mean ROC (AUC = %0.2f $\pm$ %0.2f)" % (mean_auc, std_auc),
    lw=2,
    alpha=0.8,
)

std_tpr = np.std(tprs, axis=0)
tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
tprs_lower = np.maximum(mean_tpr - std_tpr, 0)
ax.fill_between(
    mean_fpr,
    tprs_lower,
    tprs_upper,
    color="grey",
    alpha=0.2,
    label=r"$\pm$ 1 std. dev.",
)

ax.set(
    xlim=[-0.05, 1.05],
    ylim=[-0.05, 1.05],
    xlabel="False Positive Rate",
    ylabel="True Positive Rate",
    title=f"Mean ROC curve with variability\n(Positive label 'Cancer')",
)
ax.axis("square")
ax.legend(loc="lower right")
plt.show()

#save the plt figure as pdf
fig.savefig('roc_curve_n_isize.pdf')

