This code evaluates the sensitivity of the trained model to occlusion. The idea is to check if the trained model is truly identifying the location of the object in the image by systematically occluding different portions of the image with a mask and evaluating the net output. The code utilizes the trained model to detect salient regions in the image that helps in making the prediction. When the region of interest is occluded, you can witness the class probability decreases to a very low value signifying that that model will not be able to categorize the image to its appropriate category.

To begin with, let us define a few functions to load the data and convert them to Keras compatible targets. We will load the libraries to begin with.

In [None]:
# load libraries
import cv2
import numpy as np
import os
from keras.utils import np_utils
import matplotlib.pyplot as plt
import itertools
import time
import copy
import math
import pylab
import seaborn as sns
from keras.models import Sequential
from keras.models import Model
from keras import applications
from keras import optimizers
from keras.callbacks import ModelCheckpoint, TensorBoard
from keras.layers import Conv2D, Activation, Dense, MaxPooling2D, Flatten, Dropout, GlobalAveragePooling2D
from sklearn.metrics import log_loss
from sklearn.utils import class_weight
from keras.optimizers import SGD
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import classification_report,confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
from sklearn.metrics import average_precision_score
%matplotlib inline

We performed 5-fold cross validation at the patient level. we had train and test splits for each fold to ensure that none of the patienet information in the training data leaks into the test data. We randomly split 10% of the training data for validation. 

In [None]:
#define data directories
train_data_dir = 'f1_mal/train' #path to your data
valid_data_dir = 'f1_mal/valid'
test_data_dir = 'f1_mal/test'

# declare the number of samples in each category
nb_train_samples = 22284 #  modify for your dataset
nb_valid_samples = 2476 #  modify for your dataset
nb_test_samples = 2730 # modify for your dataset
num_classes = 2 # binary classification 
img_rows_orig = 100 # modify these values depending on your requirements
img_cols_orig = 100 

Lets define functions to load and resize the training, validation and test data.

In [None]:
def load_training_data():
    labels = os.listdir(train_data_dir)
    total = len(labels)
    X_train = np.ndarray((nb_train_samples, img_rows_orig, img_cols_orig, 3), dtype=np.uint8)
    Y_train = np.zeros((nb_train_samples,), dtype='uint8')
    i = 0
    print('-'*30)
    print('Creating training images...')
    print('-'*30)
    j = 0
    for label in labels:
        image_names_train = os.listdir(os.path.join(train_data_dir, label))
        total = len(image_names_train)
        print(label, total)
        for image_name in image_names_train:
            img = cv2.imread(os.path.join(train_data_dir, label, image_name), cv2.IMREAD_COLOR)
            img = np.array([img])
            X_train[i] = img
            Y_train[i] = j
            if i % 100 == 0:
                print('Done: {0}/{1} images'.format(i, total))
            i += 1
        j += 1    
    print(i)                
    print('Loading done.')
    print('Transform targets to keras compatible format.')
    Y_train = np_utils.to_categorical(Y_train[:nb_train_samples], num_classes)
    np.save('imgs_train.npy', X_train, Y_train) #save as numpy files
    return X_train, Y_train
    
def load_validation_data():
    # Load validation images
    labels = os.listdir(valid_data_dir)
    X_valid = np.ndarray((nb_valid_samples, img_rows_orig, img_cols_orig, 3), dtype=np.uint8)
    Y_valid = np.zeros((nb_valid_samples,), dtype='uint8')
    i = 0
    print('-'*30)
    print('Creating validation images...')
    print('-'*30)
    j = 0
    for label in labels:
        image_names_valid = os.listdir(os.path.join(valid_data_dir, label))
        total = len(image_names_valid)
        print(label, total)
        for image_name in image_names_valid:
            img = cv2.imread(os.path.join(valid_data_dir, label, image_name), cv2.IMREAD_COLOR)
            img = np.array([img])
            X_valid[i] = img
            Y_valid[i] = j
            if i % 100 == 0:
                print('Done: {0}/{1} images'.format(i, total))
            i += 1
        j += 1
    print(i)            
    print('Loading done.')
    print('Transform targets to keras compatible format.');
    Y_valid = np_utils.to_categorical(Y_valid[:nb_valid_samples], num_classes)
    np.save('imgs_valid.npy', X_valid, Y_valid) #save as numpy files
    return X_valid, Y_valid

def load_test_data():
    labels = os.listdir(test_data_dir)
    X_test = np.ndarray((nb_test_samples, img_rows_orig, img_cols_orig, 3), dtype=np.uint8)
    Y_test = np.zeros((nb_test_samples,), dtype='uint8')
    i = 0
    print('-'*30)
    print('Creating test images...')
    print('-'*30)
    j = 0
    for label in labels:
        image_names_test = os.listdir(os.path.join(test_data_dir, label))
        total = len(image_names_test)
        print(label, total)
        for image_name in image_names_test:
            img = cv2.imread(os.path.join(test_data_dir, label, image_name), cv2.IMREAD_COLOR)
            img = np.array([img])
            X_test[i] = img
            Y_test[i] = j
            if i % 100 == 0:
                print('Done: {0}/{1} images'.format(i, total))
            i += 1
        j += 1
    print(i)            
    print('Loading done.')
    print('Transform targets to keras compatible format.');
    Y_test = np_utils.to_categorical(Y_test[:nb_test_samples], num_classes)
    np.save('imgs_test.npy', X_test, Y_test) #save as numpy files
    return X_test, Y_test

We will define functions to resize the original images to that dimensions required for the pretrained models using the functions defined below.

In [None]:
def load_resized_training_data(img_rows, img_cols):

    X_train, Y_train = load_training_data()
    X_train = np.array([cv2.resize(img, (img_rows,img_cols)) for img in X_train[:nb_train_samples,:,:,:]])
    
    return X_train, Y_train
    
def load_resized_validation_data(img_rows, img_cols):

    X_valid, Y_valid = load_validation_data()
    X_valid = np.array([cv2.resize(img, (img_rows,img_cols)) for img in X_valid[:nb_valid_samples,:,:,:]])
        
    return X_valid, Y_valid   

def load_resized_test_data(img_rows, img_cols):

    X_test, Y_test = load_test_data()
    X_test = np.array([cv2.resize(img, (img_rows,img_cols)) for img in X_test[:nb_test_samples,:,:,:]])
    
    return X_test, Y_test

An evaluation script has been written to compute the confusion matrix for the performance of the trained model. This function prints and plots the confusion matrix. Normalization can be applied by setting 'normalize=True'.

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False, #if true all values in confusion matrix is between 0 and 1
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

We will now proceed to extract the features from our dataset using the pretrained models.

In [None]:
img_rows=224 
img_cols=224
channel = 3 #RGB
num_classes = 2 #binary classification
batch_size = 32 # modify based on the GPUs in your system
num_epoch = 100 # modify depending on the model's convergence with your data

#load data
X_train, Y_train = load_resized_training_data(img_rows, img_cols)
X_valid, Y_valid = load_resized_validation_data(img_rows, img_cols)
X_test, Y_test = load_resized_test_data(img_rows, img_cols)


#print the shape of the data
print(X_train.shape, Y_train.shape)
print(X_valid.shape, Y_valid.shape)
print(X_test.shape, Y_test.shape)

Let us now configure our pretrained model. This code uses VGG16 as a feature extractor.

you can use the rest of the models like:

ResNet50:

feature_model = applications.ResNet50((weights='imagenet', include_top=False, input_shape=(img_rows, img_cols, 3)) 
feature_model = Model(input=feature_model.input, output=feature_model.get_layer('res5c_branch2c').output) 

Xception:

feature_model = applications.Xception((weights='imagenet', include_top=False, input_shape=(img_rows, img_cols, 3))
feature_model = Model(input=feature_model.input, output=feature_model.get_layer('block14_sepconv1').output) 

DenseNet121:

For DenseNet, the main file densenet121_model is included to this repository. The model can be used as :
feature_model = densenet121_model(img_rows=img_rows, img_cols=img_cols, color_type=channel, num_classes=num_classes)

In [None]:
base_model = applications.VGG16(weights='imagenet', include_top=False, input_shape=(img_rows, img_cols, channel))

#extract feature from the optimal layer for your data
base_model = Model(input=base_model.input, output=base_model.get_layer('block5_conv2').output) 

#get the model summary
base_model.summary()

Lets modify the architecture by adding a global spatial average pooling layer and a fully-connected layer with a dropout ratio of 0.5 to prevent overfitting and help model generalization. We will train only the top layers which are randomly initialized, freeze all the convolutional layers to prevent large gradient updates wrecking the learned weights. 

In [None]:
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(num_classes, activation='softmax', name='predictions')(x)

# this is the model we will train
model = Model(inputs=base_model.input, outputs=predictions)

# Train only the top layers (which were randomly initialized)
# i.e. freeze all convolutional layers to prevent large gradient updates wrecking the learned weights
for layer in base_model.layers:
    layer.trainable = False

#fix the optimizer
sgd = SGD(lr=0.00001, decay=1e-6, momentum=0.9, nesterov=True) 

#compile the model
model.compile(optimizer=sgd,
              loss='categorical_crossentropy',
              metrics=['accuracy'])

Its time to train the model. We will store only the best model weights by initializing callbacks. Also we can view the performance of our model during run-time by visualizing the performance graphs with Tensorboard. Create a log directory named 'logs' to store the training logs and a separate folder named 'weights' to store the model weights. You can visualize tensorboard graphs simply by navigating to your working directory and do:

$tensorboard --logdir=path/to/log-directory/ --port 6006

Then open localhost:6006 in your browser to view the performance graphs, model architecture and other parameters of your interest.

In [None]:
filepath = 'weights/' + model.name + '.{epoch:02d}-{val_acc:.4f}.hdf5'
checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, 
                             save_weights_only=True, save_best_only=True, mode='max', period=1)
tensor_board = TensorBoard(log_dir='logs/', histogram_freq=0, batch_size=batch_size)
callbacks_list = [checkpoint, tensor_board]

#compute training time
t=time.time()
hist = model.fit(X_train, Y_train, batch_size=batch_size, 
                 callbacks=callbacks_list,
                 epochs=num_epoch, verbose=1, 
                 shuffle=True, validation_data=[X_valid, Y_valid])

#compute the training time
print('Training time: %s' % (time.time()-t))

If you want to visualize the performance of the model in the console other than with Tensorboard, you can use the following script.

In [None]:
train_loss=hist.history['loss']
val_loss=hist.history['val_loss']
train_acc=hist.history['acc']
val_acc=hist.history['val_acc']
xc=range(num_epoch)

plt.figure(1,figsize=(20,10), dpi=100)
plt.plot(xc,train_loss)
plt.plot(xc,val_loss)
plt.xlabel('num of Epochs')
plt.ylabel('loss')
plt.title('train_loss vs val_loss')
plt.grid(True)
plt.legend(['train','val'])
plt.style.use(['classic'])

plt.figure(2,figsize=(20,10), dpi=100)
plt.plot(xc,train_acc)
plt.plot(xc,val_acc)
plt.xlabel('num of Epochs')
plt.ylabel('accuracy')
plt.title('train_acc vs val_acc')
plt.grid(True)
plt.legend(['train','val'])
plt.style.use(['classic'])

Once the model is trained, load the best model weights to predict on the test data.

In [None]:
model.load_weights('weights/model_2.01-0.8546.hdf5') #modify for your own model

In [None]:
#predict on the test data
X_test, Y_test = load_resized_test_data(img_rows, img_cols)
print(X_test.shape, Y_test.shape)
print('-'*30)
print('Predicting on the test data...')
print('-'*30)
y_pred = model.predict(X_test, batch_size=batch_size, verbose=1)

# compute the accuracy
Test_accuracy = accuracy_score(Y_test.argmax(axis=-1),y_pred.argmax(axis=-1))
print("Test_Accuracy = ",Test_accuracy)

Let us now compute the performance metrics for the pretrained VGG16 model with the test data. The performance metrics involve computing the ROC-AUC values, cross-entropy loss score, average precision score, prediction probabilities and storing these values and plotting the ROC curves.

In [None]:
#compute the ROC-AUC values
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(num_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
    
# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(Y_test.ravel(), y_pred.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

#Plot ROC curves
plt.figure(figsize=(20,10), dpi=100)
lw = 1
plt.plot(fpr[1], tpr[1], color='red',
         lw=lw, label='ROC curve (area = %0.4f)' % roc_auc[1])
plt.plot([0, 1], [0, 1], color='black', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristics')
plt.legend(loc="lower right")
plt.show()

# computhe the cross-entropy loss score
score = log_loss(Y_test,y_pred)
print(score)

# compute the average precision score
prec_score = average_precision_score(Y_test,y_pred)  
print(prec_score)

# transfer it back
y_pred = np.argmax(y_pred, axis=1)
Y_test = np.argmax(Y_test, axis=1)
print(y_pred)
print(Y_test)

#save the predictions as a CSV file for further analysis
np.savetxt('vgg16_model_y_pred.csv',y_pred,fmt='%i',delimiter = ",")
np.savetxt('vgg16_model_Y_test.csv',Y_test,fmt='%i',delimiter = ",")

Now, let us plot the confusion matrix of the model's performance.

In [None]:
target_names = ['class 0(abnormal)', 'class 1(normal)'] #decide the labels for your own data
print(classification_report(Y_test,y_pred,target_names=target_names))
print(confusion_matrix(Y_test,y_pred))
cnf_matrix = (confusion_matrix(Y_test,y_pred))
np.set_printoptions(precision=4)

# Plot non-normalized confusion matrix
plt.figure(figsize=(20,10), dpi=100)
plot_confusion_matrix(cnf_matrix, classes=target_names,
                  title='Confusion matrix')
plt.show()

Let us begin performing the occlusion sensitivity measurements.

In [None]:
# Enter here the path to the image:
image_path = 'C48P9thinF_IMG_20150721_164304_cell_4.png' #the image to visualize

We need to specify the parameters of the occluding window. These parameters can be varied to observe the effect on the input image

In [None]:
occluding_size = 15
occluding_pixel = 0
occluding_stride = 5 

In [None]:
#fix the optimizer
sgd = SGD(lr=1e-6, decay=1e-6, momentum=0.9, nesterov=True) 

#compile the model
model.compile(optimizer=sgd,
              loss='categorical_crossentropy', metrics=['accuracy'])

Lets begin writing the definition file to perform the occlusion sensitivity measurement. We will read the image, find the index of the winning class through model prediction. We will then create a mask with the specified occlusion parameters and tile it through the image. Whenever the mask obstrcuts the region of interest in the image used by the model to make the predictions to categorize the image to its approprite category, we can witness a drastic drop in the class probability for the true class. 

In [None]:
def Occlusion_exp(image_path, occluding_size, occluding_pixel, occluding_stride):
    
    image = cv2.imread(image_path)
    im = cv2.resize(image, (100,100)).astype(np.float32)
    im = np.expand_dims(im, axis=0)
    out = model.predict(im)
    out = out[0]
    
    # Getting the index of the winning class:
    m = max(out)
    index_object = [i for i, j in enumerate(out) if j == m]
    height, width, _ = image.shape
    output_height = int(math.ceil((height-occluding_size) / occluding_stride + 1))
    output_width = int(math.ceil((width-occluding_size) / occluding_stride + 1))
    heatmap = np.zeros((output_height, output_width))
    
    for h in range(output_height):
        for w in range(output_width):
            
            # Occluder region: varies based on the occlusion stride
            h_start = h * occluding_stride
            w_start = w * occluding_stride
            h_end = min(height, h_start + occluding_size)
            w_end = min(width, w_start + occluding_size)
            
            # Getting the image copy, applying the occluding window and classifying it again:
            input_image = copy.copy(image)
            input_image[h_start:h_end, w_start:w_end,:] =  occluding_pixel            
            im = cv2.resize(input_image, (100,100)).astype(np.float32)
            im = np.expand_dims(im, axis=0)
            out = model.predict(im)
            out = out[0]
            print('scanning position (%s, %s)'%(h,w))
            
            # It's possible to evaluate the trained model sensitivity to a specific category.
            # To do so, you have to change the variable "index_object" by the index of the class of interest.
            prob = (out[0]) # here 0 for abnormal and 1 for normal cell 
            heatmap[h,w] = prob 
    
    fig = pylab.figure()
    params = {'legend.fontsize': 'xx-large',
          'figure.figsize': (10, 10),
         'figure.dpi': 100,
         'axes.labelsize': 'xx-large',
         'axes.titlesize':'xx-large',
         'xtick.labelsize':'xx-large',
         'ytick.labelsize':'xx-large'}
    pylab.rcParams.update(params)
    fig.add_subplot(2, 1, 1)  # this line outputs images side-by-side    
    sns.heatmap(heatmap,xticklabels=False, yticklabels=False)
    fig.add_subplot(2, 1, 2)
    plt.imshow(image)
    plt.show()
    print('Object index is %s'%index_object)
    
Occlusion_exp(image_path, occluding_size, occluding_pixel, occluding_stride)