## Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import skimage as sk
from PIL import Image
from tqdm.notebook import tqdm

import tensorflow as tf

from keras.preprocessing.image import ImageDataGenerator
from keras.applications.xception import Xception
from keras.models import Model, load_model
from keras.layers import Dense
from keras.callbacks import History, EarlyStopping, ModelCheckpoint, Callback
from keras import backend as K

from sklearn.utils import class_weight
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score

## Specify parameters and directories
Decide on the following experimental variables before running the rest of this script

In [None]:
# Experiment details
EXPERIMENT_ID = "0001" #this will autofill
IMAGE_TYPE = "CROP" #select from UNIL or CROP

# Neural net details
ARCHIT = 'Xception'
PRETRAINED_WEIGHTS = 'imagenet'
MONITORED_METRIC = 'loss'
## Set as TRAIN_DF during hyperparameter optimisation, and FINAL_TRAIN_DF in final testing:
TRAINING_DATASET = "FINAL_TRAIN_DF"
## Set as VALID_DF during hyperparameter optimisation, and TEST_DF in final testing:
EVALUATION_DATASET = "TEST_DF" 

# Training and validation details
TRAIN_BATCH_SIZE = 32
EVAL_BATCH_SIZE = 1 #keep this to 1 if possible, otherwise not all images in validation dataset will be used (unless number of val images is multiple of batch size)
NUMBER_EPOCHS = 40 #specify maximum number of epochs to train for (unless early stopping)

# Set directories
INPUT_DIR = "" # Path for folder containing input images (either UNIL or CROP images accordingly)
OUTPUT_DIR = "" # Path for folder in which you would like to save output files
os.chdir(INPUT_DIR)

# Update this filepath to the Excel file containing classification train/val/test splitting
TRAIN_VALID_TEST_EXCEL_FILEPATH = f"............/classification_training_validation_test_{IMAGE_TYPE}.xlsx"
TRAIN_DF = pd.read_excel(TRAIN_VALID_TEST_EXCEL_FILEPATH, sheet_name=TRAINING_DATASET, dtype=str)
EVAL_DF = pd.read_excel(TRAIN_VALID_TEST_EXCEL_FILEPATH, sheet_name=EVALUATION_DATASET, dtype=str)
print(TRAIN_DF.head())

## Set up the tools to load the data
For the training data we will use augmentation, but not for the testing data

In [None]:
train_datagen = ImageDataGenerator(rotation_range=180,
                             width_shift_range=0.15,
                             height_shift_range=0.15,
                             rescale=1. / 255,
                             shear_range=0.2,
                             zoom_range=0.15,
                             horizontal_flip=True,
                             brightness_range=(0.2, 2.0),
                             fill_mode='nearest')
eval_datagen = ImageDataGenerator(rescale=1. / 255)

train_generator = train_datagen.flow_from_dataframe(TRAIN_DF, directory=None, 
                                                    class_mode='categorical', 
                                                    x_col='filenames', 
                                                    y_col='labels',
                                                    target_size=(299, 299), 
                                                    batch_size=TRAIN_BATCH_SIZE, 
                                                    shuffle=True)
eval_generator = eval_datagen.flow_from_dataframe(EVAL_DF, 
                                                    directory=None, 
                                                    class_mode='categorical', 
                                                    x_col='filenames', 
                                                    y_col='labels',
                                                    target_size=(299, 299), 
                                                    batch_size=EVAL_BATCH_SIZE, 
                                                    shuffle=False)

## Calculate steps per epoch

Because we're 'streaming' files from the disk rather than loading them all into memory at once, we need tell keras how many times it will need to pull data to get through the entire dataset once (how many steps per epoch)

In [None]:
step_size_train=train_generator.n//train_generator.batch_size
step_size_eval=eval_generator.n//eval_generator.batch_size

num_classes = TRAIN_DF['labels'].nunique()

## Create the model
 Load the network and create a new 'output' layer with the correct number of neurons (1 per model of implant)

In [None]:
architecture = Xception(include_top=False, 
                        weights=PRETRAINED_WEIGHTS, 
                        pooling='avg', 
                        input_shape=(299, 299, 3), 
                        classes=num_classes)
predictions = Dense(num_classes, activation='softmax')(architecture.output)

model = Model(inputs=architecture.input, outputs=predictions)
model.compile(optimizer='adam',
      loss='categorical_crossentropy',
      metrics=['accuracy'])

## Create model checkpoints
- Save the model at the final training epoch (mc2) and at the epoch with the best value for our monitored metric (mc1).
- In our paper, we used only the mc2 models.

In [None]:
# model checkpoints to save model at intervals
mc1 = ModelCheckpoint(f"OUTPUT_DIR/{EXPERIMENT_ID}_{ARCHIT}_{PRETRAINED_WEIGHTS}_{IMAGE_TYPE}_BEST{MONITORED_METRIC}.h5", 
                     monitor=MONITORED_METRIC, mode='auto', verbose=1, save_best_only=True)   # to save best metric
mc2 = ModelCheckpoint(f"OUTPUT_DIR/{EXPERIMENT_ID}_{ARCHIT}_{PRETRAINED_WEIGHTS}_{IMAGE_TYPE}_FINALEPOCH.h5", 
                     monitor=MONITORED_METRIC, mode='auto', verbose=1, save_best_only=False, period=5)   # to save last epoch

## Training
Train for specified number of epochs

In [None]:
# weight balancing to compensate for imbalanced model class sizes
class_weights = class_weight.compute_class_weight(
           'balanced',
            np.unique(train_generator.classes), 
            train_generator.classes)
# fit model
history = model.fit_generator(generator=train_generator,
                steps_per_epoch=step_size_train,
                validation_data=eval_generator,
                validation_steps=step_size_eval,
                epochs=NUMBER_EPOCHS,
                class_weight=class_weights,
                callbacks=[mc1, mc2])

In [None]:
# Plot training accuracy values
plt.plot(history.history['accuracy'], color='k', linestyle='-')
plt.plot(history.history['val_accuracy'], color='r', linestyle='-')
plt.title(f'{ARCHIT} model accuracy',  color='k')
plt.ylabel('Accuracy',  color='k')
plt.xlabel('Epoch',  color='k')
plt.legend(['Training', 'Validation'], loc='upper left')
plt.tick_params(colors='k')
plt.xlim(0, NUMBER_EPOCHS)
plt.ylim(top=1)
plt.savefig(f"OUTPUT_DIR/{ARCHIT}_{IMAGE_TYPE}_accuracy_curve_{EXPERIMENT_ID}.png", 
            dpi=300, facecolor='w', edgecolor='w')
plt.show()


# Plot training loss values
plt.plot(history.history['loss'], color='k', linestyle='-')
plt.plot(history.history['val_loss'], color='r', linestyle='-')
plt.title(f'{ARCHIT} model loss', color='k')
plt.ylabel('Loss', color='k')
plt.xlabel('Epoch', color='k')
plt.legend(['Training', 'Validation'], loc='upper right')
plt.tick_params(colors='k')
plt.xlim(0, NUMBER_EPOCHS)
plt.ylim(bottom=0)
plt.savefig(f"OUTPUT_DIR/{ARCHIT}_{IMAGE_TYPE}_loss_curve_{EXPERIMENT_ID}.png", 
            dpi=300, facecolor='w', edgecolor='w')
plt.show()

## Evaluation
Numerical evaluation: conusion matrix, accuracy, F1 score, top 3 accuracy, trainable parameters

In [None]:
# Image generator for validation images to feed into further evaluation of network performance
eval_generator = eval_datagen.flow_from_dataframe(EVAL_DF,
                                                   directory=None,
                                                   class_mode='categorical',
                                                   x_col='filenames',
                                                   y_col='labels',
                                                   target_size=(299, 299),
                                                   batch_size=EVAL_BATCH_SIZE,
                                                   shuffle=False)

# load best model

saved_model = load_model(f"OUTPUT_DIR/{EXPERIMENT_ID}_{ARCHIT}_{PRETRAINED_WEIGHTS}_{IMAGE_TYPE}_BEST{MONITORED_METRIC}.h5")

# Generate dataframe of predicted labels and true labels

y_pred_max_list = []
y_true_max_list = []

batches = 0
for x_batch, y_batch in eval_generator:
    y_pred = saved_model.predict(x_batch)
    y_pred_max = np.argmax(y_pred, axis=1)
    y_pred_max_list.extend(y_pred_max)
    y_true_max = np.argmax(y_batch, axis=1)
    y_true_max_list.extend(y_true_max)
    batches += 1
    if batches >= eval_generator.n/eval_generator.batch_size: #note calculate number of batches by dividing the number of images in validation set by batch_size of eval_generator
        break
        
prediction_df = pd.DataFrame({'predictions': y_pred_max_list,'labels': y_true_max_list})
  
# Generate confusion matrix numpy array
confusion_mx = confusion_matrix(y_true_max_list,y_pred_max_list)

# Then convert confusion matrix into pd dataframe with labels along column and along top
class_indices = eval_generator.class_indices
confusion_matrix_df = pd.DataFrame(confusion_mx,
                                   index = [f"{i}_{c}_true" for i,c in enumerate(list(class_indices.keys()))],
                                   columns = [f"{i}" for i,c in enumerate(list(class_indices.keys()))])

print("\nConfusion matrix:")
print(confusion_matrix_df)

# Export confusion matrix to CSV file
confusion_matrix_df.to_csv(f"OUTPUT_DIR/{EXPERIMENT_ID}_{ARCHIT}_{PRETRAINED_WEIGHTS}_{IMAGE_TYPE}_confusion_matrix_BEST{MONITORED_METRIC}.csv")

# Overall accuracy
overall_accuracy = accuracy_score(y_true_max_list,y_pred_max_list)
print(f"\nAccuracy: {overall_accuracy}")

# F1 score
f1score = f1_score(y_true_max_list,y_pred_max_list, average='weighted')
print(f"\nF1 score: {f1score}")

# Top 3 categorical accuracy

top_3_accuracy_list = []

batches = 0
for x_batch, y_batch in eval_generator:
    y_pred = saved_model.predict(x_batch)[0]
    top_3_pred = np.argpartition(y_pred, -3)[-3:]
    if top_3_pred[0] == np.argmax(y_batch) or top_3_pred[1] == np.argmax(y_batch) or top_3_pred[2] == np.argmax(y_batch):
        correct = True
    else:
        correct = False
    top_3_accuracy_list.append(correct)
    batches += 1
    if batches >= eval_generator.n/eval_generator.batch_size: #note calculate number of batches by dividing the number of images in validation set by batch_size of eval_generator
        break

image_count =EVAL_DF.shape[0]
top_3_accuracy = sum(top_3_accuracy_list)/image_count
print(f"\nTop 3 accuracy: {top_3_accuracy}")

# Parameter counts (source: https://stackoverflow.com/questions/45046525/how-can-i-get-the-number-of-trainable-parameters-of-a-model-in-keras)

trainable_count = np.sum([K.count_params(w) for w in saved_model.trainable_weights])
non_trainable_count = np.sum([K.count_params(w) for w in saved_model.non_trainable_weights])

print('\nTotal params: {:,}'.format(trainable_count + non_trainable_count))
print('Trainable params: {:,}'.format(trainable_count))
print('Non-trainable params: {:,}'.format(non_trainable_count))

In [None]:
# load last epoch model

saved_model_2 = load_model(f"OUTPUT_DIR/{EXPERIMENT_ID}_{ARCHIT}_{PRETRAINED_WEIGHTS}_{IMAGE_TYPE}_FINALEPOCH.h5")

# Generate dataframe of predicted labels and true labels

y_pred_max_list = []
y_true_max_list = []

batches = 0
for x_batch, y_batch in eval_generator:
    y_pred = saved_model_2.predict(x_batch)
    y_pred_max = np.argmax(y_pred, axis=1)
    y_pred_max_list.extend(y_pred_max)
    y_true_max = np.argmax(y_batch, axis=1)
    y_true_max_list.extend(y_true_max)
    batches += 1
    if batches >= eval_generator.n/eval_generator.batch_size: #note calculate number of batches by dividing the number of images in validation set by batch_size of eval_generator
        break
        
prediction_df = pd.DataFrame({'predictions': y_pred_max_list,'labels': y_true_max_list})
  
# Generate confusion matrix numpy array
confusion_mx = confusion_matrix(y_true_max_list,y_pred_max_list)

# Then convert confusion matrix into pd dataframe with labels along column and along top
class_indices = eval_generator.class_indices
confusion_matrix_df = pd.DataFrame(confusion_mx,
                                   index = [f"{i}_{c}_true" for i,c in enumerate(list(class_indices.keys()))],
                                   columns = [f"{i}" for i,c in enumerate(list(class_indices.keys()))])

print("\nConfusion matrix:")
print(confusion_matrix_df)

# Export confusion matrix to CSV file
confusion_matrix_df.to_csv(f"OUTPUT_DIR/{EXPERIMENT_ID}_{ARCHIT}_{PRETRAINED_WEIGHTS}_{IMAGE_TYPE}_confusion_matrix_FINALEPOCH.csv")

# Overall accuracy
overall_accuracy = accuracy_score(y_true_max_list,y_pred_max_list)
print(f"\nAccuracy: {overall_accuracy}")

# F1 score
f1score = f1_score(y_true_max_list,y_pred_max_list, average='weighted')
print(f"\nF1 score: {f1score}")

# Top 3 categorical accuracy

top_3_accuracy_list = []

batches = 0
for x_batch, y_batch in eval_generator:
    y_pred = saved_model_2.predict(x_batch)[0]
    top_3_pred = np.argpartition(y_pred, -3)[-3:]
    if top_3_pred[0] == np.argmax(y_batch) or top_3_pred[1] == np.argmax(y_batch) or top_3_pred[2] == np.argmax(y_batch):
        correct = True
    else:
        correct = False
    top_3_accuracy_list.append(correct)
    batches += 1
    if batches >= eval_generator.n/eval_generator.batch_size: #note calculate number of batches by dividing the number of images in validation set by batch_size of eval_generator
        break

image_count = EVAL_DF.shape[0]
top_3_accuracy = sum(top_3_accuracy_list)/image_count
print(f"\nTop 3 accuracy: {top_3_accuracy}")

# Parameter counts (source: https://stackoverflow.com/questions/45046525/how-can-i-get-the-number-of-trainable-parameters-of-a-model-in-keras)

trainable_count = np.sum([K.count_params(w) for w in saved_model_2.trainable_weights])
non_trainable_count = np.sum([K.count_params(w) for w in saved_model_2.non_trainable_weights])

print('\nTotal params: {:,}'.format(trainable_count + non_trainable_count))
print('Trainable params: {:,}'.format(trainable_count))
print('Non-trainable params: {:,}'.format(non_trainable_count))

## Visual evaluation:
Plot all images in evaluation dataset set where the predicted implant model was incorrect

In [None]:
# For best metric epoch model

plt.figure(figsize=(24, 60))  # Bigger picture

subplot_idx = 0

for i_batch, (x_batch, y_batch) in enumerate(eval_generator):
    if i_batch > eval_generator.n/eval_generator.batch_size:
        break
    y_pred = saved_model.predict(x_batch)
    y_pred_max = np.argmax(y_pred, axis=1)
    y_true_max = np.argmax(y_batch, axis=1)
    pred_batch = saved_model.predict(x_batch)  # This gives us a probability for each class for all 5 samples...
    for i_img, (x_img, y_img, pred_img) in enumerate(zip(x_batch, y_true_max, y_pred_max)):
        if y_img != pred_img:
            subplot_idx += 1
            plt.subplot(12, 5, subplot_idx)  # The plot number to put the picture in
            plt.imshow(x_img)  # Draw the picture
            plt.title(f"Implant: {list(eval_generator.class_indices.keys())[(y_img)]}\nPredicted: {list(eval_generator.class_indices.keys())[pred_img]}", color='r')
            plt.axis('off')

plt.savefig(f"OUTPUT_DIR/{ARCHIT}_{PRETRAINED_WEIGHTS}_incorrectly_predicted_images_{EXPERIMENT_ID}_{IMAGE_TYPE}_BEST{MONITORED_METRIC}.png", 
            dpi=500)  
plt.show()

In [None]:
# For final epoch model

plt.figure(figsize=(24, 60))  # Bigger picture

subplot_idx = 0

for i_batch, (x_batch, y_batch) in enumerate(eval_generator):
    if i_batch > eval_generator.n/eval_generator.batch_size:
        break
    y_pred = saved_model_2.predict(x_batch)
    y_pred_max = np.argmax(y_pred, axis=1)
    y_true_max = np.argmax(y_batch, axis=1)
    pred_batch = saved_model_2.predict(x_batch)  # This gives us a probability for each class for all 5 samples...
    for i_img, (x_img, y_img, pred_img) in enumerate(zip(x_batch, y_true_max, y_pred_max)):
        if y_img != pred_img:
            subplot_idx += 1
            plt.subplot(12, 5, subplot_idx)  # The plot number to put the picture in
            plt.imshow(x_img)  # Draw the picture
            plt.title(f"Implant: {list(eval_generator.class_indices.keys())[(y_img)]}\nPredicted: {list(eval_generator.class_indices.keys())[pred_img]}", color='r')
            plt.axis('off')

plt.savefig(f"OUTPUT_DIR/{ARCHIT}_{PRETRAINED_WEIGHTS}_incorrectly_predicted_images_{EXPERIMENT_ID}_{IMAGE_TYPE}_FINALEPOCH.png", 
            dpi=500)  
plt.show()