Creating a ResNet50 to model the Chest X-Ray Images (Pneumonia) dataset from Kaggle. This model is taken from the Residual_Networks_v2a Notebook from week 2 of Andrew Ng's Convolutional Neural Networks course on Coursera.

In [2]:
import numpy as np
from keras import layers
from keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D
from keras.models import Model, load_model
from keras.preprocessing import image
from keras.utils import layer_utils
from keras.utils.data_utils import get_file
from keras.applications.imagenet_utils import preprocess_input
import pydot
from IPython.display import SVG
from keras.utils.vis_utils import model_to_dot
from keras.utils import plot_model
from keras.initializers import glorot_uniform
import scipy.misc
from matplotlib.pyplot import imshow
%matplotlib inline

import keras.backend as K
K.set_image_data_format('channels_last')
K.set_learning_phase(1)

Using TensorFlow backend.


In [3]:
#Files are separated into separate folders for train, test, and validation, and also split
#into separate folders for + and - pneumonia.

# Put all the image filepaths into lists

test_neg_filepaths = []
for dirname, _, filenames in os.walk("../input/chest-xray-pneumonia/chest_xray/test/NORMAL"):
    for filename in filenames:
        test_neg_filepaths.append(os.path.join(dirname, filename))
        
test_pos_filepaths = []
for dirname, _, filenames in os.walk("../input/chest-xray-pneumonia/chest_xray/test/PNEUMONIA"):
    for filename in filenames:
        test_pos_filepaths.append(os.path.join(dirname, filename))
        
train_neg_filepaths = []
for dirname, _, filenames in os.walk("../input/chest-xray-pneumonia/chest_xray/train/NORMAL"):
    for filename in filenames:
        train_neg_filepaths.append(os.path.join(dirname, filename))
        
train_pos_filepaths = []
for dirname, _, filenames in os.walk("../input/chest-xray-pneumonia/chest_xray/train/PNEUMONIA"):
    for filename in filenames:
        train_pos_filepaths.append(os.path.join(dirname, filename))
        
val_neg_filepaths = []
for dirname, _, filenames in os.walk("../input/chest-xray-pneumonia/chest_xray/val/NORMAL"):
    for filename in filenames:
        val_neg_filepaths.append(os.path.join(dirname, filename))
        
val_pos_filepaths = []
for dirname, _, filenames in os.walk("../input/chest-xray-pneumonia/chest_xray/val/PNEUMONIA"):
    for filename in filenames:
        val_pos_filepaths.append(os.path.join(dirname, filename))
        
print("Number of training +: " + str(len(train_pos_filepaths)))
print("Number of training -: " + str(len(train_neg_filepaths)))
print("Number of validation +: " + str(len(val_pos_filepaths)))
print("Number of validation -: " + str(len(val_neg_filepaths)))
print("Number of test +: " + str(len(test_pos_filepaths)))
print("Number of test -: " + str(len(test_neg_filepaths)))
m_train_p=len(train_pos_filepaths)
m_train_n=len(train_neg_filepaths)
m_val_p=len(val_pos_filepaths)
m_val_n=len(val_neg_filepaths)
m_test_p=len(test_pos_filepaths)
m_test_n=len(test_neg_filepaths)


Number of training +: 3875
Number of training -: 1341
Number of validation +: 8
Number of validation -: 8
Number of test +: 390
Number of test -: 234


In [4]:
# Create the y data vectors, first column is no pneumonia, second column is neumonia.

y_train_pos = np.ones((3875, 1))
y_train_neg = np.zeros((1341, 1))
y_val_pos = np.ones((8, 1))
y_val_neg = np.zeros((8, 1))
y_test_pos = np.ones((390, 1))
y_test_neg = np.zeros((234, 1))

y_train = np.concatenate((y_train_pos, y_train_neg))
y_val = np.concatenate((y_val_pos, y_val_neg))
y_test = np.concatenate((y_test_pos, y_test_neg))

def convert_to_one_hot(Y, C):
    Y = np.eye(C)[Y.reshape(-1)].T
    return Y

y_train_oh = convert_to_one_hot(y_train.astype(int), 2).T
y_val_oh = convert_to_one_hot(y_val.astype(int), 2).T
y_test_oh = convert_to_one_hot(y_test.astype(int), 2).T

In [5]:
# Load the image data into arrays.

target_size = 128
n_H_max = target_size
n_W_max = target_size
x_train_pos = np.zeros((3875, n_H_max, n_W_max, 3))
x_train_neg = np.zeros((1341, n_H_max, n_W_max, 3))
x_val_pos = np.zeros((8, n_H_max, n_W_max, 3))
x_val_neg = np.zeros((8, n_H_max, n_W_max, 3))
x_test_pos = np.zeros((390, n_H_max, n_W_max, 3))
x_test_neg = np.zeros((234, n_H_max, n_W_max, 3))


for path in train_pos_filepaths:
    img = image.load_img(path, target_size=(target_size, target_size))
    x = image.img_to_array(img)
    #x = np.expand_dims(x, axis=0)
    x = x/255.0
    x_train_pos[0] = x
    
for path in train_neg_filepaths:
    img = image.load_img(path, target_size=(target_size, target_size))
    x = image.img_to_array(img)
    #x = np.expand_dims(x, axis=0)
    x = x/255.0
    x_train_neg[0] = x   
for path in val_pos_filepaths:
    img = image.load_img(path, target_size=(target_size, target_size))
    x = image.img_to_array(img)
    #x = np.expand_dims(x, axis=0)
    x = x/255.0
    x_val_pos[0] = x
    
for path in val_neg_filepaths:
    img = image.load_img(path, target_size=(target_size, target_size))
    x = image.img_to_array(img)
    #x = np.expand_dims(x, axis=0)
    x = x/255.0
    x_val_neg[0] = x
    
for path in test_pos_filepaths:
    img = image.load_img(path, target_size=(target_size, target_size))
    x = image.img_to_array(img)
    #x = np.expand_dims(x, axis=0)
    x = x/255.0
    x_test_pos[0] = x
    
for path in train_neg_filepaths:
    img = image.load_img(path, target_size=(target_size, target_size))
    x = image.img_to_array(img)
    #x = np.expand_dims(x, axis=0)
    x = x/255.0
    x_test_neg[0] = x

x_train = np.concatenate((x_train_pos, x_train_neg), axis = 0)
x_test = np.concatenate((x_test_pos, x_test_neg), axis = 0)
x_val = np.concatenate((x_val_pos, x_val_neg), axis = 0)

In [6]:
print("x_train: " + str(x_train.shape))
print("y_train: " + str(y_train.shape))
print("x_val: " + str(x_val.shape))
print("y_val: " + str(y_val.shape))
print("x_test: " + str(x_test.shape))
print("y_test: " + str(x_test.shape))

x_train: (5216, 128, 128, 3)
y_train: (5216, 1)
x_val: (16, 128, 128, 3)
y_val: (16, 1)
x_test: (624, 128, 128, 3)
y_test: (624, 128, 128, 3)


In [7]:
def identity_block(X, f, filters, stage, block):
    """
    Implementation of the identity block
    
    Arguments:
    X -- input tensor of shape (m, n_H_prev, n_W_prev, n_C_prev)
    f -- integer, specifying the shape of the middle CONV's window for the main path
    filters -- python list of integers, defining the number of filters in the CONV layers of the main path
    stage -- integer, used to name the layers, depending on their position in the network
    block -- string/character, used to name the layers, depending on their position in the network
    
    Returns:
    X -- output of the identity block, tensor of shape (n_H, n_W, n_C)
    """
    
    # defining name basis
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    
    # Retrieve Filters
    F1, F2, F3 = filters
    
    # Save the input value. We need this later to add back to the main path. 
    X_shortcut = X
    
    # First component of main path
    X = Conv2D(filters = F1, kernel_size = (1, 1), strides = (1,1), padding = 'valid', name = conv_name_base + '2a', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3, name = bn_name_base + '2a')(X)
    X = Activation('relu')(X)
    
    # Second component of main path
    X = Conv2D(filters = F2, kernel_size = (f, f), strides = (1,1), padding = 'same', name = conv_name_base + '2b', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3, name = bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    # Third component of main path
    X = Conv2D(filters = F3, kernel_size = (1, 1), strides = (1,1), padding = 'valid', name = conv_name_base + '2c', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3, name = bn_name_base + '2c')(X)

    # Add shortcut value to main path, and pass it through a RELU activation
    X = Add()([X,X_shortcut])
    X = Activation('relu')(X)
    
    
    return X

In [8]:
def convolutional_block(X, f, filters, stage, block, s = 2):
    """
    Implementation of the convolutional block
    
    Arguments:
    X -- input tensor of shape (m, n_H_prev, n_W_prev, n_C_prev)
    f -- integer, specifying the shape of the middle CONV's window for the main path
    filters -- python list of integers, defining the number of filters in the CONV layers of the main path
    stage -- integer, used to name the layers, depending on their position in the network
    block -- string/character, used to name the layers, depending on their position in the network
    s -- Integer, specifying the stride to be used
    
    Returns:
    X -- output of the convolutional block, tensor of shape (n_H, n_W, n_C)
    """
    
    # defining name basis
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    
    # Retrieve Filters
    F1, F2, F3 = filters
    
    # Save the input value
    X_shortcut = X


    ##### MAIN PATH #####
    # First component of main path 
    X = Conv2D(filters = F1, kernel_size = (1, 1), strides = (s,s), padding = 'valid', name = conv_name_base + '2a', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3, name = bn_name_base + '2a')(X)
    X = Activation('relu')(X)
    

    # Second component of main path
    X = Conv2D(filters = F2, kernel_size = (f, f), strides = (1,1), padding = 'same', name = conv_name_base + '2b', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3, name = bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    # Third component of main path
    X = Conv2D(filters = F3, kernel_size = (1, 1), strides = (1,1), padding = 'valid', name = conv_name_base + '2c', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3, name = bn_name_base + '2c')(X)

    ##### SHORTCUT PATH ####
    X_shortcut = Conv2D(filters = F3, kernel_size = (1, 1), strides = (s,s), padding = 'valid', name = conv_name_base + '1', kernel_initializer = glorot_uniform(seed=0))(X_shortcut)
    X_shortcut = BatchNormalization(axis = 3, name = bn_name_base + '1')(X_shortcut)

    # Final step: Add shortcut value to main path, and pass it through a RELU activation
    X = Add()([X,X_shortcut])
    X = Activation('relu')(X)
    
    return X

In [9]:
def ResNet50(input_shape = (target_size, target_size, 3), classes = 2):
    """
    Implementation of the popular ResNet50 the following architecture:
    CONV2D -> BATCHNORM -> RELU -> MAXPOOL -> CONVBLOCK -> IDBLOCK*2 -> CONVBLOCK -> IDBLOCK*3
    -> CONVBLOCK -> IDBLOCK*5 -> CONVBLOCK -> IDBLOCK*2 -> AVGPOOL -> TOPLAYER

    Arguments:
    input_shape -- shape of the images of the dataset
    classes -- integer, number of classes

    Returns:
    model -- a Model() instance in Keras
    """
    
    # Define the input as a tensor with shape input_shape
    X_input = Input(input_shape)

    
    # Zero-Padding
    X = ZeroPadding2D((3, 3))(X_input)
    
    # Stage 1
    X = Conv2D(64, (7, 7), strides = (2, 2), name = 'conv1', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3, name = 'bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling2D((3, 3), strides=(2, 2))(X)

    # Stage 2
    X = convolutional_block(X, f = 3, filters = [64, 64, 256], stage = 2, block='a', s = 1)
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='b')
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='c')


    # Stage 3 
    X = convolutional_block(X, f = 3, filters = [128, 128, 512], stage = 3, block='a', s = 2)
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='b')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='c')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='d')

    # Stage 4
    X = convolutional_block(X, f = 3, filters = [256, 256, 1024], stage = 4, block='a', s = 2)
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='b')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='c')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='d')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='e')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='f')

    # Stage 5
    X = convolutional_block(X, f = 3, filters = [512, 512, 2048], stage = 5, block='a', s = 2)
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='b')
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='c')

    # AVGPOOL 
    X = AveragePooling2D(pool_size=(2, 2), strides=None, padding='valid', data_format=None)(X)

    # output layer
    X = Flatten()(X)
    X = Dense(classes, activation='softmax', name='fc' + str(classes), kernel_initializer = glorot_uniform(seed=0))(X)
    
    # Create model
    model = Model(inputs = X_input, outputs = X, name='ResNet50')

    return model

In [10]:
model = ResNet50(input_shape = (target_size, target_size, 3), classes = 2)

In [11]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [12]:
model.fit(x_train, y_train_oh, epochs = 2, batch_size = 16)

Epoch 1/2
Epoch 2/2


<keras.callbacks.callbacks.History at 0x7fe4e1f9de10>

In [13]:
val_preds = model.evaluate(x_val, y_val_oh)
print ("Loss = " + str(val_preds[0]))
print ("Val Accuracy = " + str(val_preds[1]))

Loss = 2.2170112133026123
Val Accuracy = 0.4375


In [14]:
test_preds = model.evaluate(x_test, y_test_oh)
print ("Loss = " + str(test_preds[0]))
print ("Test Accuracy = " + str(test_preds[1]))

Loss = 0.9911226336008463
Test Accuracy = 0.6266025900840759
