This code is to initialize the pretrained VGG16 model with ImageNet weights and retrain the model from end to end on the current task of detecting Pneumonia and further differentiate Bacterial and Viral Pneumonia types. 

In [None]:
#load libraries
from __future__ import print_function

from keras.utils import plot_model
from keras.models import model_from_json
from keras import applications
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense, GlobalAveragePooling2D
import numpy as np
import time
from keras.optimizers import SGD
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
from scipy import interp
from itertools import cycle
from sklearn.metrics import classification_report,confusion_matrix,accuracy_score
from keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
from evaluation import plot_confusion_matrix
from keras.callbacks import ModelCheckpoint
from keras.callbacks import TensorBoard
%matplotlib inline

In [None]:
# Loading the data
train_data_dir = 'cxr_normal_pneumonia_1024/train'
test_data_dir = 'cxr_normal_pneumonia_1024/test'
img_width = 1024
img_height = 1024
channel = 3
input_img = Input(shape = (img_width, img_height, channel))
epochs = 60
batch_size = 8 #vary this parameter depending on your GPU capacity
num_classes= 2 #[pneumonia, normal] [bacterial, viral]

In [None]:
#define the VGG16 and customize
feature_model = applications.VGG16(weights='imagenet', include_top=False, input_shape=(img_width,img_height,channel))
feature_model=Model(inputs=feature_model.input,outputs=feature_model.get_layer('block5_conv3').output)

#addind the top layers
x = feature_model.output
x = GlobalAveragePooling2D()(x) 
predictions = Dense(num_classes, activation='softmax', name='predictions')(x)
model = Model(inputs=feature_model.input, outputs=predictions)
model.summary()

#plot the model
plot_model(model, to_file='custom_VGG16_model.png')

We performed five-fold cross validation in this study. We have however shown running the script with a sample data fold. We performed no augmentation other than rescaling.We allocated 20% of the training data at random for validation.

In [None]:
#declaring image data generators
train_datagen = ImageDataGenerator(
        rescale=1./255,
        validation_split=0.2) #taking 20% of training for validation 

test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
        train_data_dir,
        target_size=(1024, 1024),
        batch_size=batch_size,
        class_mode='categorical',
        subset='training')

validation_generator = train_datagen.flow_from_directory(
        train_data_dir,
        target_size=(1024, 1024),
        batch_size=batch_size,
        class_mode='categorical',
        subset='validation')

test_generator = test_datagen.flow_from_directory(
        test_data_dir,
        target_size=(1024, 1024),
        batch_size=batch_size,
        class_mode='categorical',shuffle=False)

#count the number of samples
nb_train_samples = len(train_generator.filenames)
nb_validation_samples = len(validation_generator.filenames)
nb_test_samples = len(test_generator.filenames)

#check the class indices
train_generator.class_indices
validation_generator.class_indices
test_generator.class_indices

Allocate balanced weights to penalize over-represented classes

In [None]:
from sklearn.utils import class_weight
class_weights = class_weight.compute_class_weight(
               'balanced',
                np.unique(train_generator.classes), 
                train_generator.classes)
print(class_weights)

In [None]:
#fix the optimizer
sgd = SGD(lr=0.0001, decay=1e-6, momentum=0.9, nesterov=True) #optimize to your requirements

#compile the model
model.compile(optimizer=sgd,
              loss='categorical_crossentropy',
              metrics=['accuracy'])

#compute the training time
start = time.time()

#give the path to store the model weights
filepath = 'weights/' + model.name + '.{epoch:02d}-{val_acc:.4f}.hdf5'

#save only the best weights
checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, 
                             save_weights_only=True, save_best_only=True, mode='max', period=1)

#visualize model performance using TensorBoard
tensor_board = TensorBoard(log_dir='logs/', histogram_freq=0, batch_size=batch_size)
callbacks_list = [checkpoint, tensor_board]

#train model
custom_vgg16_history = model.fit_generator(
      train_generator,
      steps_per_epoch=nb_train_samples // batch_size, 
      epochs=epochs,
      validation_data=validation_generator,
      callbacks=callbacks_list,
      class_weight = class_weights,
      validation_steps=nb_validation_samples // batch_size, 
      verbose=1)

#print the total time taken for training
print(time.time()-start)

In [None]:
#Testing the model's performance by loading the model weights

model.load_weights('weights/weights-improvement-39-0.9615.hdf5') #change this to your path and model weights
custom_vgg16_y_pred = model.predict_generator(test_generator, nb_test_samples/batch_size, workers=1)

#true labels
Y_test=test_generator.classes

#print the shape of y_pred and Y_test
print(custom_vgg16_y_pred.shape)
print(Y_test.shape)

#measure accuracy
custom_vgg16_model_accuracy=accuracy_score(Y_test,custom_vgg16_y_pred.argmax(axis=-1))
print('The accuracy of custom VGG16 model is: ', custom_vgg16_model_accuracy)

In [None]:
#print classification report
target_names = ['class 0(normal)', 'class 1(pneumonia)'] #from the generator.class_indices
print(classification_report(Y_test,custom_vgg16_y_pred.argmax(axis=-1),
                                                              target_names=target_names, digits=4))

In [None]:
# Compute confusion matrix
cnf_matrix = confusion_matrix(Y_test,custom_vgg16_y_pred.argmax(axis=-1))
np.set_printoptions(precision=4)
plt.figure(figsize=(20,10), dpi=100)
plot_confusion_matrix(cnf_matrix, classes=target_names,
                      title='Confusion matrix for custom VGG16 model without normalization')
plt.show()

In [None]:
#store the predictions in .csv files
print(custom_vgg16_y_pred.argmax(axis=-1))
print(Y_test)

np.savetxt('custom_VGG16_y_pred.csv',custom_vgg16_y_pred.argmax(axis=-1),fmt='%i',delimiter = ",")
np.savetxt('Y_test.csv',Y_test,fmt='%i',delimiter = ",")

In [None]:
#compute the ROC-AUC values
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(num_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test, custom_vgg16_y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
fig=plt.figure(figsize=(15,10), dpi=100)
ax = fig.add_subplot(1, 1, 1)
major_ticks = np.arange(0.0, 1.0, 0.10)
minor_ticks = np.arange(0.0, 1.0, 0.10)
ax.set_xticks(major_ticks)
ax.set_xticks(minor_ticks, minor=True)
ax.set_yticks(major_ticks)
ax.set_yticks(minor_ticks, minor=True)
ax.grid(which='both')
lw = 1 
plt.plot(fpr[1], tpr[1], color='red',
         lw=lw, label='ROC curve (area = %0.4f)' % roc_auc[1])
plt.plot([0, 1], [0, 1], color='black', lw=lw, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristics')
plt.legend(loc="lower right")
plt.show()