# CPEC detection RetinaNet model training and validation

In [None]:
import biondi, copy, glob, matplotlib
import numpy as np
from tensorflow.keras import models
from tensorflow import keras
import matplotlib.pyplot as plt
import keras_tuner as kt
from keras_tuner.engine import tuner_utils
import sklearn
import seaborn as sns

# Load training, validation, and test datasets

In [None]:
# specify image and label dataset filepaths
training_dataset_image_dir = ''
training_dataset_label_dir = ''
validation_dataset_image_dir = ''
validation_dataset_label_dir = ''
test_dataset_image_dir = ''
test_dataset_label_dir = ''
# load datasets
train_x = np.load(training_dataset_image_dir)
train_y = (np.load(training_dataset_label_dir)>2).astype('uint8')
validation_x = np.load(validation_dataset_image_dir)
validation_y = (np.load(validation_dataset_label_dir)>2).astype('uint8')
test_x = np.load(test_dataset_image_dir)
test_y = (np.load(test_dataset_label_dir)>2).astype('uint8')

# Create data generators which will be passed to the keras model for training and validation

In [None]:
tgen = biondi.dataset.TrainingGenerator(train_x, batch_size=64,labels=keras.utils.to_categorical(train_y, num_classes=2),flip=True, rotation=True, contrast=True,)
vgen = biondi.dataset.TrainingGenerator(validation_x, batch_size=64, labels=keras.utils.to_categorical(validation_y, num_classes=2), validation=True)

# Hyperparameter optimization

In [None]:
class BayesianOptimizationCV(kt.BayesianOptimization):
    def helper_fn(self, data, labels, **kwargs):
        kf = sklearn.model_selection.KFold(n_splits=self.executions_per_trial, shuffle=True)
        kfgen = kf.split(X=data)
        return kf, kfgen, data, labels
    
    def run_trial(self, trial, *args, **kwargs):
        model_checkpoint = tuner_utils.SaveBestEpoch(
            objective=self.oracle.objective,
            filepath=self._get_checkpoint_fname(trial.trial_id),
        )
        original_callbacks = kwargs.pop("callbacks", [])

        # Run the training process multiple times.
        histories = []
        kf, kfgen, data, labels = self.helper_fn(*args)

        for execution in range(self.executions_per_trial):
            copied_kwargs = copy.copy(kwargs)
            callbacks = self._deepcopy_callbacks(original_callbacks)
            self._configure_tensorboard_dir(callbacks, trial, execution)
            callbacks.append(tuner_utils.TunerCallback(self, trial))
            # Only checkpoint the best epoch across all executions.
            callbacks.append(model_checkpoint)
            copied_kwargs["callbacks"] = callbacks
            train_idx, test_idx = next(kfgen)
            tgen = biondi.dataset.TrainingGenerator(data=data[train_idx],
                                     labels=labels[train_idx],
                                     batch_size=64,
                                     flip=True,
                                     rotation=True,
                                     contrast=True,
                                     **kwargs)
            vgen = biondi.dataset.TrainingGenerator(data=data[test_idx],
                                     labels=labels[test_idx],
                                     batch_size=64,
                                     validation=True,
                                     **kwargs)
            obj_value = self._build_and_fit_model(trial, x=tgen, validation_data=vgen, **copied_kwargs)

            histories.append(obj_value)
        return histories

## Create KerasTuner tuner object

In [None]:
# Specify working directory 
tuner_dir = ''
# Specify project name
tuner_name = ''
tuner = BayesianOptimizationCV(kt.applications.HyperResNet(
    input_shape=(128,128,3), 
    classes=2,), 
    objective='val_loss', 
    max_trials=20, 
    executions_per_trial=5, 
    project_name=tuner_name, 
    directory=tuner_dir,
)

## Start hyperparameter search

In [None]:
tuner.search(train_x, keras.utils.to_categorical(train_y, num_classes=2), workers=14, max_queue_size=200, epochs=50, class_weight = {0:1, 1:2.8})

In [None]:
# Get optimized hyperparamters
optmized_hp = tuner.get_best_hyperparameters()[0]

# Define model parameters (determined by hyperparameter optimization)

In [None]:
hyper_params = {
    'version': 'next',
    'conv3_depth': 8,
    'conv4_depth': 36,
    'pooling': 'avg',
    'optimizer': 'rmsprop',
    'learning_rate': 0.01
}

# Create the model using optimized hyperparameters

In [None]:
model = tuner.hypermodel.build(hyper_params)

# Start training classification model

In [None]:
# create checkpoint object to save the model after each epoch (optional).
chckpnt_dir = ''
checkpoint = keras.callbacks.ModelCheckpoint(filepath=chckpnt_dir + 'model_epoch-{epoch:02d}_val_loss-{val_loss:.4f}_val_accuracy-{val_accuracy:.4f}')

In [None]:
# train model. Depending on system resources the number of workers and the max_queue_size can be adjusted.
# remove callbacks parameter if model checkpoints are not needed.
trainhist = model.fit(x=tgen, validation_data=vgen, epochs=50, class_weight={0:1,1:5.9}, workers=14, max_queue_size=200, callbacks=[checkpointer])

# Visualize model training and performance across epochs

In [None]:
#create list of model checkpoint saves
chckpnts = sorted(glob.glob(chckpnt_dir + 'model_epoch*'))

In [None]:
#extract validation loss, positive predictive value, and sensitivity from model checkpoint filenames.
plt.plot(trainhist.history['val_loss'])
plt.plot(trainhist.history['val_accuracy'])

# Model performance statistics on test dataset

In [None]:
# normalize image data
norm_test_x = biondi.dataset.per_sample_tile_normalization(test_x)

In [None]:
# specify model checkpoint filepath
test_model_checkpoint = ''
test_model = models.load_model(test_model_checkpoint)

In [None]:
# get model predictions on test dataset
softmax_scores = test_model.predict(norm_test_x)
predictions = np.argmax(softmax_scores, axis=-1)

In [None]:
#get true/false positives/negatives (TP, FP, TN, FN) for model predictions
TP, FP, TN, FN = biondi.statistics.binary_tpfptnfn(predictions, test_y)

In [None]:
# calculate binary sensitivity, precision, & accuracy
print("Sensitivity:", TP/(TP+FN))
print("Precision:", TP/(TP+FP))
print("Accuracy:", np.sum(test_y==predictions)/len(test_y))

In [None]:
# calculate area under the receiver operating characteristic curve (AUROC)
fpr, tpr, thresholds = sklearn.metrics.roc_curve(test_y, softmax_scores[:,1:])
roc_auc = sklearn.metrics.roc_auc_score(test_y, softmax_scores[:,1:])

In [None]:
# visualize curve
font = {'family' : 'normal',
        'weight' : 'bold',
        'size'   : 12}

matplotlib.rc('font', **font)
plt.rcParams['legend.title_fontsize'] = 12
fig, ax = plt.subplots(figsize=(6,6))
ax.tick_params(axis='both', which='major', labelsize=16)
g = sns.lineplot(x=fpr, y=tpr, palette=sns.color_palette('tab10'), label='ROC curve (area = %0.2f)' % roc_auc, ax=ax)
plt.legend(loc='lower right',frameon=False)
plt.ylabel('True positive rate (TPR)', fontweight='bold', fontsize=18)
plt.xlabel('False positive rate (FPR)', fontweight='bold', fontsize=18)