## IMPORT LIBRARIES

In [None]:
import os
import tqdm
from tqdm import tqdm
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras

print('TensorFlow version: {}'.format(tf.__version__))
print('Keras version: {}'.format(keras.__version__))

In [None]:
seed = 42
np.random.seed(seed)
tf.random.set_seed(seed)

In [None]:
DATA_DIR = '../input/chest-xray-pneumonia/chest_xray/'
DATASETS = ['train', 'val', 'test']
CLASSES_DICT = {'NORMAL':0, 'PNEUMONIA':1}

## EDA

In [None]:
def generate_bar_chart(labels, values, dataset):
    plt.figure(figsize=(5,3), dpi=100)
    plt.bar(labels, values)
    plt.title('Distribution of classes in {} dataset'.format(dataset))
    plt.show()

for dataset in DATASETS:
    num_normal = len(os.listdir(os.path.join(DATA_DIR, dataset, 'NORMAL')))
    num_pneumonia = len(os.listdir(os.path.join(DATA_DIR, dataset, 'PNEUMONIA')))
    generate_bar_chart(['NORMAL', 'PNEUMONIA'], [num_normal, num_pneumonia], dataset)
    print('{} set: {} normal images and {} pneumonia images'.format(dataset.upper(), num_normal, num_pneumonia))
    

## MODEL TRAINING

In [None]:
def img_loader(data_dir, img_height, img_width, batch_size):
    
    train_datagen = keras.preprocessing.image.ImageDataGenerator(rescale=1/255.0, zoom_range=0.3)
    test_datagen = keras.preprocessing.image.ImageDataGenerator(rescale=1/255.0)
    
    train_generator = train_datagen.flow_from_directory(directory=os.path.join(data_dir,'train'), target_size=(img_height,img_width), batch_size=batch_size, shuffle=True, class_mode='binary')
    test_generator = test_datagen.flow_from_directory(directory=os.path.join(data_dir,'test'), target_size=(img_height,img_width), batch_size=batch_size, shuffle=True, class_mode='binary')
    
    return train_generator, test_generator

def create_dataset(data_dir, dataset, classes, img_height, img_width):
    data = []
    target_index = []
    for class_name, class_idx in classes.items():
        path = os.path.join(data_dir, dataset, class_name)
        for img_file in tqdm(os.listdir(path)):
            img_array = cv2.resize(cv2.imread(os.path.join(path, img_file)),(img_height, img_width))
            data.append(img_array)
            target_index.append(class_idx)
        print('{} class loaded successfully'.format(class_name))
    return data, target_index
    

In [None]:
IMG_HEIGHT = 224
IMG_WIDTH = 224
BATCH_SIZE = 32
NUM_EPOCHS = 15

## CNN MODEL 1: Custom CNN

In [None]:
train_generator, valid_generator = img_loader(DATA_DIR, IMG_HEIGHT, IMG_WIDTH, BATCH_SIZE)

In [None]:
def CNN1():
    cnn_model = keras.models.Sequential([
        keras.layers.Conv2D(32, 5, activation='relu', padding='same', input_shape=[IMG_HEIGHT,IMG_WIDTH,3]),
        keras.layers.BatchNormalization(),
        keras.layers.Conv2D(32, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.MaxPooling2D(2),
        keras.layers.Conv2D(32, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.Conv2D(32, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.MaxPooling2D(2),
        keras.layers.Conv2D(64, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.Conv2D(64, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.MaxPooling2D(2),
        keras.layers.Conv2D(128, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.Conv2D(128, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.MaxPooling2D(2),
        keras.layers.Conv2D(128, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.Conv2D(128, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.MaxPooling2D(2),
        keras.layers.Conv2D(256, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.Conv2D(256, 3, activation='relu', padding='same'),
        keras.layers.BatchNormalization(),
        keras.layers.MaxPooling2D(2),
        keras.layers.Flatten(),
        keras.layers.Dense(512, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dropout(0.3),
        keras.layers.Dense(1, activation='sigmoid')

    ])

    cnn_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return cnn_model

cnn_model = CNN1()

In [None]:
with open('modelsummary.txt', 'w') as f:

    cnn_model.summary(print_fn=lambda x: f.write(x + '\n'))

In [None]:
checkpoint = keras.callbacks.ModelCheckpoint(filepath='best_model.h5', monitor = 'val_accuracy', save_best_only=True, save_weights_only=False)
lr_scheduler = keras.callbacks.ReduceLROnPlateau(factor=0.3,patience=1)
early_stopping_cb = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)

In [None]:
hist = cnn_model.fit_generator(
           train_generator, steps_per_epoch=train_generator.samples // BATCH_SIZE, 
           epochs=NUM_EPOCHS, validation_data=valid_generator, 
           validation_steps=valid_generator.samples // BATCH_SIZE, verbose=2, callbacks=[early_stopping_cb, lr_scheduler])

In [None]:
test_dataset, test_labels = create_dataset(DATA_DIR, 'test', CLASSES_DICT, IMG_HEIGHT, IMG_WIDTH)

shuffled_indices3 = np.random.permutation(len(test_dataset))
X_test = np.array(test_dataset)[shuffled_indices3]/255.0
Y_test = np.array(test_labels)[shuffled_indices3]

In [None]:
cnn_model.evaluate(X_test, Y_test)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(10, 3))
ax = ax.ravel()

for i, met in enumerate(['accuracy', 'loss']):
    ax[i].plot(hist.history[met])
    ax[i].plot(hist.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

In [None]:
#cnn_model.save('./CNN1.h5')

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix
from mlxtend.plotting import plot_confusion_matrix

def get_metrics(Y, Y_predictions):
    
    accuracy = accuracy_score(Y, np.round(Y_predictions))
    cm = confusion_matrix(Y, np.round(Y_predictions))
    tn, fp, fn, tp = cm.ravel()
    precision = tp/(tp+fp)
    recall = tp/(tp+fn)
    f1_score = (2*precision*recall)/(precision+recall)

    return accuracy, cm, precision, recall, f1_score
    



In [None]:
Y_test_preds = cnn_model.predict(X_test)
accuracy, cm, precision, recall, f1_score = get_metrics(Y_test, Y_test_preds)

print('CONFUSION MATRIX')
plt.figure()
plot_confusion_matrix(cm,figsize=(12,8), hide_ticks=True, cmap=plt.cm.Blues)
plt.xticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.yticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.show()

print('\nTESTING METRICS')
print('Accuracy: {}'.format(accuracy))
print('Precision: {}'.format(precision))
print('Recall: {}'.format(recall))
print('F1-Score: {}'.format(f1_score))


print('\nTRAINING METRICS')
train_accuracy = np.round((hist.history['accuracy'][-1]), 2)
print('Train accuracy: {}'.format(train_accuracy))


## CNN MODEL 2: VGG16

In [None]:
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input

In [None]:
vgg16 = VGG16(weights='imagenet', include_top=False, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3))

for layer in vgg16.layers:
    layer.trainable = False

In [None]:
flatten_layer = keras.layers.Flatten()(vgg16.output)
fc1 = keras.layers.Dense(512, activation='selu')(flatten_layer)
output = keras.layers.Dense(1, activation='sigmoid')(fc1)

vgg_model = keras.Model(inputs=vgg16.input, outputs=output)
vgg_model.summary()

In [None]:
vgg_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
train_generator, valid_generator = img_loader(DATA_DIR, IMG_HEIGHT, IMG_WIDTH, BATCH_SIZE)

In [None]:
hist = vgg_model.fit_generator(
           train_generator, steps_per_epoch=train_generator.samples // BATCH_SIZE, 
           epochs=NUM_EPOCHS, validation_data=valid_generator, 
           validation_steps=valid_generator.samples // BATCH_SIZE, verbose=2, callbacks=[early_stopping_cb, lr_scheduler])

In [None]:
for layer in vgg16.layers[-7:]:
    layer.trainable = True
    
vgg_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
hist = vgg_model.fit_generator(
           train_generator, steps_per_epoch=train_generator.samples // BATCH_SIZE, 
           epochs=NUM_EPOCHS, validation_data=valid_generator, 
           validation_steps=valid_generator.samples // BATCH_SIZE, verbose=2, callbacks=[early_stopping_cb, lr_scheduler])

In [None]:
test_dataset, test_labels = create_dataset(DATA_DIR, 'test', CLASSES_DICT, IMG_HEIGHT, IMG_WIDTH)

shuffled_indices2 = np.random.permutation(len(test_dataset))
X_test = np.array(test_dataset)[shuffled_indices2]/255.0
Y_test = np.array(test_labels)[shuffled_indices2]

In [None]:
vgg_model.evaluate(X_test, Y_test)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(10, 3))
ax = ax.ravel()

for i, met in enumerate(['accuracy', 'loss']):
    ax[i].plot(hist.history[met])
    ax[i].plot(hist.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

In [None]:


Y_test_preds = vgg_model.predict(X_test)
accuracy, cm, precision, recall, f1_score = get_metrics(Y_test, Y_test_preds)

print('CONFUSION MATRIX')
plt.figure()
plot_confusion_matrix(cm,figsize=(12,8), hide_ticks=True, cmap=plt.cm.Blues)
plt.xticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.yticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.show()

print('\nTESTING METRICS')
print('Accuracy: {}'.format(accuracy))
print('Precision: {}'.format(precision))
print('Recall: {}'.format(recall))
print('F1-Score: {}'.format(f1_score))


print('\nTRAINING METRICS')
train_accuracy = np.round((hist.history['accuracy'][-1]), 2)
print('Train accuracy: {}'.format(train_accuracy))

In [None]:
#vgg_model.save('./VGG16.h5')

## Autoencoder + ANN

In [None]:
X_train, Y_train = create_dataset(DATA_DIR, 'train', CLASSES_DICT, 64, 64)
X_test, Y_test = create_dataset(DATA_DIR, 'test', CLASSES_DICT, 64, 64)

In [None]:
shuffled_indices1 = np.random.permutation(len(X_train))
X_train = np.array(X_train)[shuffled_indices1]/255.0
Y_train = np.array(Y_train)[shuffled_indices1]

shuffled_indices2 = np.random.permutation(len(X_test))
X_test = np.array(X_test)[shuffled_indices2]/255.0
Y_test = np.array(Y_test)[shuffled_indices2]


In [None]:
print(Y_train[0])
plt.imshow(X_train[0,:,:])
plt.show()

print(Y_test[0])
plt.imshow(X_test[0,:,:])
plt.show()

In [None]:
print(X_train.shape, Y_train.shape)
print(X_test.shape, Y_test.shape)

In [None]:
encoder = keras.models.Sequential([
    keras.layers.Conv2D(16, 3, activation='selu', padding='same', input_shape=[64,64,3]),
    keras.layers.MaxPooling2D(2),
    keras.layers.Conv2D(32, 3, activation='selu', padding='same'),
    keras.layers.MaxPooling2D(2),
    keras.layers.Conv2D(64, 3, activation='selu', padding='same'),
    keras.layers.MaxPooling2D(2),
    keras.layers.Conv2D(64, 3, activation='selu', padding='same'),
    keras.layers.MaxPooling2D(2),
    keras.layers.Conv2D(128, 3, activation='selu', padding='same'),
    keras.layers.MaxPooling2D(2)
])

decoder = keras.models.Sequential([
    keras.layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='selu', input_shape=[2,2,128]),
    keras.layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='selu'),
    keras.layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='selu'),
    keras.layers.Conv2DTranspose(16, 3, strides=2, padding='same', activation='selu'),
    keras.layers.Conv2DTranspose(3, 3, strides=2, padding='same', activation='selu'),
    keras.layers.Reshape([64,64,3])
])


autoencoder = keras.models.Sequential([encoder, decoder])

autoencoder.compile(loss='mean_squared_error', optimizer='adam')

In [None]:
autoencoder.summary()

In [None]:
lr_scheduler = keras.callbacks.ReduceLROnPlateau(factor=0.3,patience=3)
early_stopping_cb = keras.callbacks.EarlyStopping(patience=8, restore_best_weights=True)

In [None]:
hist = autoencoder.fit(X_train, X_train, epochs=200, batch_size=128, shuffle=True, validation_data=(X_test, X_test), callbacks=[lr_scheduler, early_stopping_cb])

In [None]:
print('Original Image')
plt.imshow(X_test[0,:,:])
plt.show()

In [None]:
print('Reconstructed Image')
plt.imshow(autoencoder.predict(X_test[0,:,:].reshape([1,64,64,3])).squeeze(0))
plt.show()

In [None]:
X_train_latent = encoder.predict(X_train)
X_test_latent = encoder.predict(X_test)

Y_train = Y_train.reshape(-1, 1)
Y_test = Y_test.reshape(-1,1)

print(X_train_latent.shape, X_test_latent.shape)
print(Y_train.shape, Y_test.shape)

In [None]:
ann = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[2,2,128]),
    keras.layers.Dense(500, activation='selu'),
    keras.layers.BatchNormalization(),
    keras.layers.Dense(500, activation='selu'),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(300, activation='selu'),
    keras.layers.BatchNormalization(),
    keras.layers.Dense(300, activation='selu'),
    keras.layers.Dropout(0.3),
    keras.layers.Dense(100, activation='selu'),
    keras.layers.BatchNormalization(),
    keras.layers.Dense(100, activation='selu'),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(30, activation='selu'),
    keras.layers.BatchNormalization(),
    keras.layers.Dense(30, activation='selu')
    keras.layers.Dense(5, activation='selu'),
    keras.layers.Dense(1, activation='sigmoid'),
])

ann.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
ann.summary()

In [None]:
checkpoint = keras.callbacks.ModelCheckpoint(filepath='best_model.h5', monitor = 'val_accuracy', save_best_only=True, save_weights_only=False)
lr_scheduler = keras.callbacks.ReduceLROnPlateau(factor=0.3,patience=3)
early_stopping_cb = keras.callbacks.EarlyStopping(patience=8, restore_best_weights=True)

In [None]:
hist = ann.fit(X_train_latent, Y_train, epochs=200, batch_size=128, shuffle=True, validation_data=(X_test_latent, Y_test), callbacks=[lr_scheduler, early_stopping_cb])

In [None]:
ann.evaluate(X_test_latent, Y_test)

In [None]:
Y_test_preds = ann.predict(X_test_latent)
accuracy, cm, precision, recall, f1_score = get_metrics(Y_test, Y_test_preds)

print('CONFUSION MATRIX')
plt.figure()
plot_confusion_matrix(cm,figsize=(12,8), hide_ticks=True, cmap=plt.cm.Blues)
plt.xticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.yticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.show()

print('\nTESTING METRICS')
print('Accuracy: {}'.format(accuracy))
print('Precision: {}'.format(precision))
print('Recall: {}'.format(recall))
print('F1-Score: {}'.format(f1_score))


print('\nTRAINING METRICS')
train_accuracy = np.round((hist.history['accuracy'][-1]), 2)
print('Train accuracy: {}'.format(train_accuracy))

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(10, 3))
ax = ax.ravel()

for i, met in enumerate(['accuracy', 'loss']):
    ax[i].plot(hist.history[met])
    ax[i].plot(hist.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

In [None]:
def create_preprocessed_dataset(data_dir, dataset_type, classes, img_height, img_width):
    data = []
    target_index = []
    for class_name, class_idx in classes.items():
        path = os.path.join(data_dir, dataset_type, class_name)
        for img_file in tqdm(os.listdir(path)):
            img_array = cv2.resize(cv2.imread(os.path.join(path, img_file)),(img_height, img_width))
            data.append(img_array)
            target_index.append(class_idx)
        print('{} class loaded successfully'.format(class_name))
        
    shuffled_indices = np.random.permutation(len(data))
    dataset = tf.image.resize(np.array(data)[shuffled_indices], [224,224])
    labels = np.array(target_index)[shuffled_indices]
    print('shuffled {} set'.format(dataset_type))
    preprocessed_dataset = keras.applications.resnet50.preprocess_input(dataset)
    print('preprocessed {} set'.format(dataset_type))
    
    return preprocessed_dataset, labels
    
    