In [None]:
import warnings
warnings.filterwarnings('ignore')

import re
import os
import cv2

import pandas as pd       
import matplotlib as mat
import matplotlib.pyplot as plt    
import numpy as np
import seaborn as sns
%matplotlib inline
import random
import os
import tensorflow as tf
from tensorflow import keras

from sklearn.model_selection import train_test_split
from mlxtend.plotting import plot_confusion_matrix
from sklearn import metrics
from sklearn.metrics import confusion_matrix

import glob

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
print("Num GPUs:", len(physical_devices))

In [None]:
SEED = 42

np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

tf.keras.utils.set_random_seed(SEED)
# this may not work with old tensorflow versions

os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['TF_DETERMINISTIC_OPS'] = '1'

In [None]:
data_dir = "scaled_chest_xray"
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")

AUTOTUNE = tf.data.experimental.AUTOTUNE
IMAGE_SIZE = [180, 180]
BATCH_SIZE = 32
EPOCHS = 10
IMAGE_CROP = 1
RANDOM_STATE = 0
    

In [None]:
filenames_CNN = tf.io.gfile.glob(str(train_dir + '/*/*'))
filenames_CNN.extend(tf.io.gfile.glob(str(test_dir + '/*/*')))

# Split arrays or matrices into random train and test subsets.
t_filenames_CNN, test_filenames_CNN = train_test_split(filenames_CNN, test_size=0.2, random_state = RANDOM_STATE)
train_filenames_CNN, val_filenames_CNN = train_test_split(t_filenames_CNN, test_size=0.2, random_state = RANDOM_STATE)

In [None]:
COUNT_NORMAL_train_CNN = len([filename for filename in train_filenames_CNN if "NORMAL" in filename])
print("Normal images count in training set: " + str(COUNT_NORMAL_train_CNN))

COUNT_PNEUMONIA_train_CNN = len([filename for filename in train_filenames_CNN if "PNEUMONIA" in filename])
print("Pneumonia images count in training set: " + str(COUNT_PNEUMONIA_train_CNN))
print("Sum: " + str(len(train_filenames_CNN)))
print('---------------------------')

#########################################################################################

COUNT_NORMAL_val_CNN = len([filename for filename in val_filenames_CNN if "NORMAL" in filename])
print("Normal images count in validation set: " + str(COUNT_NORMAL_val_CNN))

COUNT_PNEUMONIA_val_CNN = len([filename for filename in val_filenames_CNN if "PNEUMONIA" in filename])
print("Pneumonia images count in validation set: " + str(COUNT_PNEUMONIA_val_CNN))
print("Sum: " + str(len(val_filenames_CNN)))
print('---------------------------')

#########################################################################################

COUNT_NORMAL_test_CNN = len([filename for filename in test_filenames_CNN if "NORMAL" in filename])
print("Normal images count in test set: " + str(COUNT_NORMAL_test_CNN))

COUNT_PNEUMONIA_test_CNN = len([filename for filename in test_filenames_CNN if "PNEUMONIA" in filename])
print("Pneumonia images count in test set: " + str(COUNT_PNEUMONIA_test_CNN))
print("Sum: " + str(len(test_filenames_CNN)) + '\n\n')

In [None]:
def plot_dataset(n,p,name):
    X_axis = np.arange(len(name))
    fig = plt.figure(figsize=(8, 6), dpi=80)
    
    plt.bar(X_axis - 0.2, n, 0.4, label = 'Normal(0)')
    plt.bar(X_axis + 0.2, p, 0.4, label = 'Pneumonia(1)')

    plt.xticks(X_axis, name)
    plt.xlabel('Sets', fontsize=12)
    plt.ylabel('Count', fontsize=12)
    plt.title('Number of cases in sets', fontsize=14)
    plt.legend()
    
    plt.show()

In [None]:
# Plot the results 
plot_dataset([COUNT_NORMAL_train_CNN, COUNT_NORMAL_val_CNN, COUNT_NORMAL_test_CNN], [COUNT_PNEUMONIA_train_CNN, COUNT_PNEUMONIA_val_CNN, COUNT_PNEUMONIA_test_CNN], ['train', 'validation', 'test'])

In [None]:
train_list_ds_CNN = tf.data.Dataset.from_tensor_slices(train_filenames_CNN)
val_list_ds_CNN = tf.data.Dataset.from_tensor_slices(val_filenames_CNN)
test_list_ds_CNN = tf.data.Dataset.from_tensor_slices(test_filenames_CNN)

print('Some example filenames: \n')
for f in train_list_ds_CNN.take(5):
    print(f.numpy())

In [None]:
TRAIN_IMG_COUNT_CNN = tf.data.experimental.cardinality(train_list_ds_CNN).numpy()
print("Training images count: " + str(TRAIN_IMG_COUNT_CNN))

VAL_IMG_COUNT_CNN = tf.data.experimental.cardinality(val_list_ds_CNN).numpy()
print("Validating images count: " + str(VAL_IMG_COUNT_CNN))

TEST_IMG_COUNT_CNN = tf.data.experimental.cardinality(test_list_ds_CNN).numpy()
print("Testing images count: " + str(TEST_IMG_COUNT_CNN))

In [None]:
CLASS_NAMES = ["NORMAL", "PNEUMONIA"]
CLASS_NAMES

In [None]:
def get_label(file_path):
    # convert the path to a list of path components
    parts = tf.strings.split(file_path, os.path.sep)
    # The second to last is the class-directory
    return parts[-2] == "PNEUMONIA"

In [None]:
'''
Function that applies Gaussian Noise to the images.
'''
def add_gaussian_noise(img):
    # image must be scaled in [0, 1]
    with tf.name_scope('Add_gaussian_noise'):
        noise = tf.random.normal(shape=tf.shape(img), mean=0.0, stddev=(200)/(255), dtype=tf.float32)
        noise_img = img + noise
        noise_img = tf.clip_by_value(noise_img, 0.0, 1.0)
    return noise_img

In [None]:
def decode_img(img):
    # convert the compressed string to a 3D uint8 tensor
    img = tf.image.decode_jpeg(img, channels=3)
    # Use `convert_image_dtype` to convert to floats in the [0,1] range.
    img = tf.image.convert_image_dtype(img, tf.float32)
    # For keeping only a portion of the image
    img = tf.image.central_crop(img, IMAGE_CROP)
    # For adding some noise
    #img = add_gaussian_noise(img)
    # resize the image to the desired size.
    return tf.image.resize(img, IMAGE_SIZE)

In [None]:
def process_path(file_path):
    label = get_label(file_path)
    # load the raw data from the file as a string
    img = tf.io.read_file(file_path)
    img = decode_img(img)
    return img, label

In [None]:
train_ds_CNN = train_list_ds_CNN.map(process_path, num_parallel_calls=AUTOTUNE)
val_ds_CNN = val_list_ds_CNN.map(process_path, num_parallel_calls=AUTOTUNE)
test_ds_CNN = test_list_ds_CNN.map(process_path, num_parallel_calls=AUTOTUNE)

In [None]:
for image, label in train_ds_CNN.take(1):
  print("Image shape: ", image.numpy().shape)
  print("Label: ", label.numpy())
  plt.imshow(image)

In [None]:
def prepare_for_training(ds, cache=True, shuffle_buffer_size=1000):
  # This is a small dataset, only load it once, and keep it in memory.
  # use `.cache(filename)` to cache preprocessing work for datasets that don't
  # fit in memory.
  if cache:
      if isinstance(cache, str):
          ds = ds.cache(cache)
      else:
          ds = ds.cache()

  ds = ds.shuffle(buffer_size=shuffle_buffer_size)

  # Repeat forever
  ds = ds.repeat()

  ds = ds.batch(BATCH_SIZE)

  # `prefetch` lets the dataset fetch batches in the background while the model
  # is training.
  ds = ds.prefetch(buffer_size=AUTOTUNE)

  return ds

In [None]:
train_ds_CNN = prepare_for_training(train_ds_CNN)
val_ds_CNN = prepare_for_training(val_ds_CNN)
test_ds_CNN = test_ds_CNN.batch(BATCH_SIZE)

image_batch_CNN, label_batch_CNN = next(iter(train_ds_CNN))

In [None]:
def build_model():
    model = tf.keras.Sequential([
        tf.keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dropout(0.7),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    return model

In [None]:
weight_for_0 = (1 / COUNT_NORMAL_train_CNN) * (TRAIN_IMG_COUNT_CNN / 2.0)
weight_for_1 = (1 / COUNT_PNEUMONIA_train_CNN) * (TRAIN_IMG_COUNT_CNN / 2.0)

class_weights = {0: weight_for_0, 1: weight_for_1}

print('Weight for class 0: {:.2f}'.format(weight_for_0))
print('Weight for class 1: {:.2f}'.format(weight_for_1))

In [None]:
model = build_model()

METRICS = [
    'accuracy',
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall')
]

model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=METRICS
)
model.summary()

In [None]:
history = model.fit(
    train_ds_CNN,
    steps_per_epoch=TRAIN_IMG_COUNT_CNN // BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=val_ds_CNN,
    validation_steps=VAL_IMG_COUNT_CNN // BATCH_SIZE,
    class_weight=class_weights,
)

In [None]:
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("xRayNetNN.h5", monitor="val_loss", mode="min", save_best_only=True, verbose=1)

In [None]:
earlyStopping_cb = tf.keras.callbacks.EarlyStopping(
  patience=10,
  restore_best_weights=True,
)

In [None]:
initial_learning_rate = 0.1

def lr_scheduler(epoch):
  k = 0.1
  lrate = initial_learning_rate * np.exp(-k*epoch)
  return lrate

lr_schedule_cb = tf.keras.callbacks.LearningRateScheduler(lr_scheduler)

In [None]:
history = model.fit(
    train_ds_CNN,
    steps_per_epoch=TRAIN_IMG_COUNT_CNN // BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=val_ds_CNN,
    validation_steps=VAL_IMG_COUNT_CNN // BATCH_SIZE,
    class_weight=class_weights,
    callbacks=[checkpoint_cb, earlyStopping_cb, lr_schedule_cb],
) 

In [None]:
fig, ax = plt.subplots(1, 4, figsize=(20, 3))
ax = ax.ravel()

for i, met in enumerate(['precision', 'recall', 'accuracy', 'loss']):
    ax[i].plot(history.history[met])
    ax[i].plot(history.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

In [None]:
preds_CNN = (model.predict(test_ds_CNN, batch_size=16) > 0.5).astype("int32")

In [None]:
#Get the original labels of each image
orig_test_labels = []
for image, label in test_ds_CNN.as_numpy_iterator():
    for x in label:
        orig_test_labels.append(x)
print(np.array(orig_test_labels).shape)
print(np.array(preds_CNN).shape)

In [None]:
print(np.array(orig_test_labels))
print(np.array(preds_CNN).flatten())

In [None]:
# Get the confusion matrix
cm_CNN  = confusion_matrix(orig_test_labels, preds_CNN)
plt.figure()
plot_confusion_matrix(cm_CNN,figsize=(10,6), hide_ticks=True,cmap=plt.cm.Blues)
plt.xticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.yticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.show()

In [None]:
loss_CNN, acc_CNN, prec_CNN, rec_CNN = model.evaluate(test_ds_CNN)

In [None]:
print('Evaluate function calculating...')
print("Recall of the model is {:.3f}".format(rec_CNN))
print("Precision of the model is {:.3f}".format(prec_CNN))

# Checking if the results are correct by manually calculating Precision and Recall with confusion matrix results
print('\nManually calculating...')
tn, fp, fn, tp = cm_CNN.ravel()

precision_CNN = tp/(tp+fp)
recall_CNN = tp/(tp+fn)

print("Recall of the model is {:.3f}".format(recall_CNN))
print("Precision of the model is {:.3f}".format(precision_CNN))