<a href="https://www.kaggle.com/code/yashtrada/group9-enel645-project-notebook-2603?scriptVersionId=91407296" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [69]:
import numpy as np
import pandas as pd
import seaborn as sns
import itertools

from PIL import Image
import matplotlib.pyplot as plt
from skimage.transform import resize
from keras.models import load_model
import shutil

import tensorflow as tf
from keras.models import Sequential, Model
from keras.layers import Conv2D, MaxPooling2D,GlobalAveragePooling2D
from keras.layers import Dropout, MaxPool2D , Activation, Flatten, Dense, AvgPool2D, Lambda, BatchNormalization, Input

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

from sklearn.metrics import accuracy_score

from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array, array_to_img

In [70]:
!pip install openpyxl



# Data loading and inspection

In [None]:
# #Here, we will only try to build the model for Label prediction (Normal or Pnemonia). Not for the Label_1_Virus_category and Label_2_Virus_category because of highly unbalanced dataset for these sub-categories.
# summary_df = pd.read_csv("/kaggle/input/coronahack-chest-xraydataset/Chest_xray_Corona_dataset_Summary.csv")
# summary_df

In [None]:
metadata_df = pd.read_csv("/kaggle/input/coronahack-chest-xraydataset/Chest_xray_Corona_Metadata.csv").drop(['Label_1_Virus_category', 'Label_2_Virus_category'], axis=1)
metadata_df.head()

In [None]:
sns.set(style="darkgrid")
ax = sns.countplot(metadata_df["Label"])

In [None]:
print("Ratio of Pnemonia to Normal: ", sum(metadata_df["Label"]=="Pnemonia")/sum(metadata_df["Label"]=="Normal"))

In [None]:
train_set = metadata_df[metadata_df['Dataset_type']=='TRAIN']
train_set.drop("Unnamed: 0", axis=1, inplace=True)
train_set

In [None]:
test_set = metadata_df[metadata_df['Dataset_type']=='TEST']
test_set.drop("Unnamed: 0", axis=1, inplace=True)
test_set

**Above we can see that approx. 10% of x-rays images used for test set**

In [None]:
train_set.isna().sum()

In [None]:
test_set.isna().sum()

**So, No null data**

In [None]:
train_set_folder_path = "../input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train/"
test_set_folder_path = "../input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/test/"

In [None]:
train_set["train_img_full_path"] = train_set["X_ray_image_name"].apply(lambda x: train_set_folder_path+x)
test_set["test_img_full_path"] = test_set["X_ray_image_name"].apply(lambda x: test_set_folder_path+x)

In [None]:
#All the images are having different dimensions
for img_path in train_set.head(15)["train_img_full_path"]:
  print(Image.open(img_path).size)

**Having look at few training vs testing images**

In [None]:
print("Training images:                                                   Testing images:")

fig, axs = plt.subplots(5, 2, figsize=(15,18))

counter = 0
for img_path in train_set.head()["train_img_full_path"]:
  axs[counter, 0].imshow(Image.open(img_path))
  counter += 1

print("\n\n\n")

counter = 0
for img_path in test_set.head()["test_img_full_path"]:
  axs[counter, 1].imshow(Image.open(img_path))
  counter += 1


**Having look at Normal x-rays vs Pnemonia**

In [None]:
print("Pnemonia images:                                                   Normal images:")

Pnemonia = train_set[train_set["Label"]=="Pnemonia"]

fig, axs = plt.subplots(5, 2, figsize=(15,18))

counter = 0
for img_path in Pnemonia.head()["train_img_full_path"]:
  axs[counter, 0].imshow(Image.open(img_path))
  counter += 1

print("\n\n\n")

# print("Normal images: ")

Normal = train_set[train_set["Label"]=="Normal"]

counter = 0
for img_path in Normal.head()["train_img_full_path"]:
  axs[counter, 1].imshow(Image.open(img_path))
  counter += 1

**Common parameters for all the models**

In [None]:
evaluation_metrics = [
      tf.keras.metrics.TruePositives(name='tp'),
      tf.keras.metrics.FalsePositives(name='fp'),
      tf.keras.metrics.TrueNegatives(name='tn'),
      tf.keras.metrics.FalseNegatives(name='fn'), 
      tf.keras.metrics.BinaryAccuracy(name='accuracy'),
      tf.keras.metrics.Precision(name='precision'),
      tf.keras.metrics.Recall(name='recall')
]

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        #print("Normalized confusion matrix")
    #else:
    #    print('Confusion matrix, without normalization')

    #print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
def make_gradcam_heatmap(img_array, model, last_conv_layer_name):
    # First, we create a model that maps the input image to the activations
    # of the last conv layer as well as the output predictions
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # Then, we compute the gradient of the top predicted class for our input image
    # with respect to the activations of the last conv layer
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        class_channel = preds[:, 0]

    # This is the gradient of the output neuron (top predicted or chosen)
    # with regard to the output feature map of the last conv layer
    grads = tape.gradient(class_channel, last_conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    # then sum all the channels to obtain the heatmap class activation
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    heatmap = heatmap.numpy()
    heatmap_resized = resize(heatmap,(320, 256))
    return heatmap_resized

In [None]:
# %cp ../input/enel645project/best_inception_pnemonia_cnn.h5 ./best_inception_pnemonia_cnn.h5

**Train test split**

In [None]:
train_dataset, val_dataset = train_test_split(train_set, test_size=0.15, shuffle=True, random_state=2022)

In [None]:
train_generator = ImageDataGenerator(rescale=1./255,
                                  rotation_range=90,
                                  width_shift_range=0.15,
                                  height_shift_range=0.15,
                                  horizontal_flip=True,
                                  brightness_range=(0.8, 1.2),
                                  zoom_range=[0.75, 1],
                                  featurewise_center=True,
                                  featurewise_std_normalization=True)

test_generator = ImageDataGenerator(rescale=1./255, featurewise_std_normalization=True)

In [None]:
train_img_gen = train_generator.flow_from_dataframe(
    dataframe=train_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
val_img_gen = train_generator.flow_from_dataframe(
    dataframe=val_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
test_img_gen = test_generator.flow_from_dataframe(
                                              dataframe=test_set,
                                              directory=test_set_folder_path,
                                              x_col='X_ray_image_name',
                                              y_col='Label',
                                              target_size=(320, 256),
                                              batch_size=64,
                                              seed=2022,
                                              shuffle=False)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(train_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(test_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

**Create a model from scratch**

In [None]:
#From professor's notebook

model_path = "/kaggle/working/best_pnemonia_cnn.h5"
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5)

monitor = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss',\
                                             verbose=1,save_best_only=True,\
                                             save_weights_only=True,\
                                             mode='min')
# Learning rate schedule
def schedule(epoch, lr):
    if epoch%5== 0 and epoch!=0:
        lr = lr/2
    return lr

lr_schedule = tf.keras.callbacks.LearningRateScheduler(schedule,verbose = 1)

model_callbacks = [early_stop, lr_schedule, monitor]

In [None]:
# model = Sequential()
# model.add(Conv2D(64, (3, 3), input_shape=(320, 256, 3)))
# model.add(Activation('relu'))
# model.add(Conv2D(32, (3, 3)))
# model.add(Activation('relu'))
# model.add(MaxPooling2D(pool_size=(3, 3)))
# model.add(Conv2D(32, (3, 3)))
# model.add(Activation('relu'))
# model.add(Flatten())
# model.add(Dense(64))
# model.add(Activation('relu'))
# model.add(Dropout(0.25))
# model.add(Dense(16))
# model.add(Activation('relu'))
# model.add(Dropout(0.2))
# model.add(Dense(2,activation='sigmoid'))

model = load_model(model_path)


# # model=Sequential([
# # Conv2D(64, (3, 3), input_shape=(320, 256, 3), activation='relu'),
# # MaxPooling2D((3,3)),
# # Conv2D(32,(3,3),activation='relu'),
# # MaxPooling2D((3,3)),
# # Conv2D(32,(3,3),activation='relu'),
# # Flatten(),
# # Dense(64,activation='relu'),
# # Dropout(0.2),
# # Dense(16,activation='relu'),
# # Dropout(0.2),
# # Dense(2,activation='sigmoid')])

In [None]:
# #let's build model from scratch
# model = Sequential()
# model.add(Conv2D(16, (3, 3), input_shape=(320, 256, 3)))
# model.add(Activation('relu'))
# model.add(MaxPooling2D(pool_size=(2, 2)))

# model.add(Conv2D(32, (3, 3)))
# model.add(Activation('relu'))
# model.add(MaxPooling2D(pool_size=(2, 2)))
# model.add(Dropout(0.25))

# model.add(Conv2D(64,(3,3)))
# model.add(Activation("relu"))
# model.add(Conv2D(128,(3,3)))
# model.add(Activation("relu"))
# model.add(MaxPooling2D(pool_size=(load_weights)))
# model.add(Dropout(0.25))

# model.add(Conv2D(256,(3,3)))
# model.add(Activation("relu"))
# model.add(AvgPool2D(2,2))
# model.add(Dropout(0.25))

# model.add(Conv2D(512,(3,3)))
# model.add(Activation("relu"))
# model.add(AvgPool2D(2,2))
# model.add(Dropout(0.25))

# model.add(Conv2D(1024,(2,2)))
# model.add(Activation("relu"))
# model.add(MaxPool2D(2,2))
    
# model.add(Flatten())
# model.add(Dense(512))
# model.add(Dropout(0.20))
# model.add(Dense(256))
# model.add(Dropout(0.20))
# model.add(Dense(64))
# model.add(Dropout(0.20))
# model.add(Dense(16))
# model.add(Dense(2,activation = 'sigmoid'))
# #model.add(Activation("sigmoid"))

In [None]:
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=evaluation_metrics)

In [None]:
model.summary()

In [None]:
model_progress = model.fit(
    train_img_gen,
    batch_size=10240,
    steps_per_epoch = 64,
    validation_data = val_img_gen,
    epochs = 30,
    #epochs = 30,
    callbacks=model_callbacks
)

model.save(model_path)

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['loss'], color='r', label="Training loss")
plt.plot(model_progress.history['val_loss'], color='b', label="Validation loss")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['accuracy'], color='r', label="Training accuracy")
plt.plot(model_progress.history['val_accuracy'], color='b', label="Validation accuracy")
plt.legend()
plt.show()

In [None]:
test_pred= model.predict(test_img_gen)
predicted_class=np.argmax(test_pred,axis=1)
class_labels = (test_img_gen.class_indices)
prediction = [dict((v,i) for i,v in class_labels.items())[i] for i in predicted_class]
print("Predicted :", predicted_class[:5])
actual_label = test_img_gen.classes
print("actual_label: ", actual_label[:5])

In [None]:
accuracy = accuracy_score(predicted_class,actual_label)
print("Accuracy: %.2f%%" % (accuracy * 100.0))

In [None]:
cnf_matrix = confusion_matrix(predicted_class,actual_label)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure(figsize=(8,8))
plot_confusion_matrix(cnf_matrix, classes=['Normal', 'Pnemonia'],
                      title='Confusion matrix')
plt.show()

**Grad Cam to Visulize model learning**

In [None]:
# # Remove last layer's sigmoid
# model.layers[-1].activation = None

# for i in range(10):
#     img_array = next(test_img_gen)[0]
#     # Generate class activation heatmap
#     heatmap = make_gradcam_heatmap(img_array, model, "conv2d_1")
#     print("Grad Cam Heatmap:")
#     plt.imshow(img_array[0])
#     plt.imshow(heatmap, alpha = 0.65, cmap = "inferno")
#     plt.show()
#     print("X-ray images:")
#     plt.imshow(img_array[0])
#     plt.show()
#     print("\n\n\n\n")

In [None]:
# # Remove last layer's sigmoid
# model.layers[-1].activation = None

# for i in range(10):
#     img_array = next(test_img_gen)[0]
#     # Generate class activation heatmap
#     heatmap = make_gradcam_heatmap(img_array, model, "conv2d_2")
#     print("Grad Cam Heatmap:")
#     plt.imshow(img_array[0])
#     plt.imshow(heatmap, alpha = 0.65, cmap = "inferno")
#     plt.show()
#     print("X-ray images:")
#     plt.imshow(img_array[0])
#     plt.show()
#     print("\n\n\n\n")

In [None]:
# Remove last layer's sigmoid
model.layers[-1].activation = None

for i in range(10):
    img_array = next(test_img_gen)[0]
    # Generate class activation heatmap
    heatmap = make_gradcam_heatmap(img_array, model, "activation_2")
    print("Grad Cam Heatmap:")
    plt.imshow(img_array[0])
    plt.imshow(heatmap, alpha = 0.65, cmap = "inferno")
    plt.show()
    print("X-ray images:")
    plt.imshow(img_array[0])
    plt.show()
    print("\n\n\n\n")

**Transfer Learning Xception model**

In [None]:
#Now let's do data augmentation
train_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.xception.preprocess_input,
                                   zoom_range=[0.75, 1],
                                   brightness_range=(0.8, 1.2),
                                   width_shift_range=0.15,
                                   height_shift_range=0.15,
                                   featurewise_center=True,
                                   featurewise_std_normalization=True)
test_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.xception.preprocess_input)

In [None]:
train_img_gen = train_generator.flow_from_dataframe(
    dataframe=train_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
val_img_gen = train_generator.flow_from_dataframe(
    dataframe=val_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
test_img_gen = test_generator.flow_from_dataframe(
                                              dataframe=test_set,
                                              directory=test_set_folder_path,
                                              x_col='X_ray_image_name',
                                              y_col='Label',
                                              target_size=(320, 256),
                                              batch_size=64,
                                              seed=2022,
                                              shuffle=False)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(train_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(test_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

**Let's take the already pre-trained Xception model and apply the transfer learning on our problem set**

In [None]:
#From professor's notebook, need to implement below callback part

model_path = "/kaggle/working/best_xception_pnemonia_cnn.h5"
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5)

monitor = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss',\
                                             verbose=1,save_best_only=True,\
                                             save_weights_only=True,\
                                             mode='min')
# Learning rate schedule
def schedule(epoch, lr):
    if epoch%3 == 0 and epoch!=0:
        lr = lr/2
    return lr

lr_schedule = tf.keras.callbacks.LearningRateScheduler(schedule,verbose = 1)

model_callbacks = [early_stop, lr_schedule, monitor]

In [None]:
# inputs = tf.keras.layers.Input((320,256,3))
# Xception_model=tf.keras.applications.xception.Xception(include_top=False, weights="imagenet",input_shape=(320,256,3), pooling='avg')
# last_layer = Xception_model.get_layer('block14_sepconv2_act')
# last_output = last_layer.output
# input_l = Xception_model.input
# base_model1 = tf.keras.Model(input_l, last_output)

# x = base_model1(inputs)
# x = GlobalAveragePooling2D()(x)
# x = Dropout(0.25)(x)
# x = Dense(2048, activation = "relu")(x)
# x = Dropout(0.25)(x)
# x = Dense(512, activation = "relu")(x)
# x = Dropout(0.25)(x)
# x = Dense(64, activation = "relu")(x)
# x = Dropout(0.25)(x)
# outputs = Dense(2, activation = "sigmoid")(x)

# Xception_transfer_learning_model = tf.keras.Model(inputs, outputs)

Xception_transfer_learning_model = load_model(model_path)

In [None]:
Xception_transfer_learning_model.summary()

In [None]:
Xception_transfer_learning_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=evaluation_metrics)

In [None]:
model_progress = Xception_transfer_learning_model.fit(
    train_img_gen,
    batch_size=10240,
    steps_per_epoch = 64,
    validation_data = val_img_gen,
    epochs = 10,
#     epochs = 1,
    callbacks=model_callbacks
)

Xception_transfer_learning_model.save(model_path)

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['loss'], color='r', label="Training loss")
plt.plot(model_progress.history['val_loss'], color='b', label="Validation loss")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['accuracy'], color='r', label="Training accuracy")
plt.plot(model_progress.history['val_accuracy'], color='b', label="Validation accuracy")
plt.legend()
plt.show()

In [None]:
test_pred= Xception_transfer_learning_model.predict(test_img_gen)
predicted_class=np.argmax(test_pred,axis=1)
class_labels = (test_img_gen.class_indices)
prediction = [dict((v,i) for i,v in class_labels.items())[i] for i in predicted_class]
print("Predicted :", predicted_class[:5])
actual_label = test_img_gen.classes
print("actual_label: ", actual_label[:5])

In [None]:
accuracy = accuracy_score(predicted_class,actual_label)
print("Accuracy: %.2f%%" % (accuracy * 100.0))

In [None]:
cnf_matrix = confusion_matrix(predicted_class,actual_label)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure(figsize=(8,8))
plot_confusion_matrix(cnf_matrix, classes=['Normal', 'Pnemonia'],
                      title='Confusion matrix')
plt.show()

**Transfer Learning InceptionV3 model**

In [None]:
#Now let's do data augmentation
train_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.inception_v3.preprocess_input,
                                   zoom_range=[0.75, 1],
                                   brightness_range=(0.8, 1.2),
                                   width_shift_range=0.15,
                                   height_shift_range=0.15,
                                   featurewise_center=True,
                                   featurewise_std_normalization=True)
test_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.inception_v3.preprocess_input)

In [None]:
train_img_gen = train_generator.flow_from_dataframe(
    dataframe=train_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
val_img_gen = train_generator.flow_from_dataframe(
    dataframe=val_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
test_img_gen = test_generator.flow_from_dataframe(
                                              dataframe=test_set,
                                              directory=test_set_folder_path,
                                              x_col='X_ray_image_name',
                                              y_col='Label',
                                              target_size=(320, 256),
                                              batch_size=64,
                                              seed=2022,
                                              shuffle=False)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(train_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(test_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
#From professor's notebook, need to implement below callback part

model_path = "/kaggle/working/best_inception_pnemonia_cnn.h5"
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5)

monitor = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss',\
                                             verbose=1,save_best_only=True,\
                                             save_weights_only=True,\
                                             mode='min')
# Learning rate schedule
def schedule(epoch, lr):
    if epoch%3 == 0 and epoch!=0:
        lr = lr/2
    return lr

lr_schedule = tf.keras.callbacks.LearningRateScheduler(schedule,verbose = 1)

model_callbacks = [early_stop, lr_schedule, monitor]

In [None]:
# inputs = tf.keras.layers.Input((320,256,3))
# Inception_model=tf.keras.applications.InceptionV3(include_top=False, weights='imagenet', pooling='max', input_shape=(320, 256, 3))

# x = Inception_model(inputs)
# x = Dropout(0.25)(x)
# x = Dense(2048, activation = "relu")(x)
# x = Dropout(0.25)(x)
# x = Dense(512, activation = "relu")(x)
# x = Dropout(0.25)(x)
# x = Dense(64, activation = "relu")(x)
# x = Dropout(0.25)(x)
# outputs = Dense(2, activation = "sigmoid")(x)

# Inception_transfer_learning_model = tf.keras.Model(inputs, outputs)

Inception_transfer_learning_model = load_model(model_path)

In [None]:
Inception_transfer_learning_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=evaluation_metrics)

In [None]:
model_progress = Inception_transfer_learning_model.fit(
    train_img_gen,
    batch_size=10240,
    steps_per_epoch = 64,
    validation_data = val_img_gen,
    epochs = 10,
    #epochs = 15,
    callbacks=model_callbacks
)
Inception_transfer_learning_model.save(model_path)

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['loss'], color='r', label="Training loss")
plt.plot(model_progress.history['val_loss'], color='b', label="Validation loss")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['accuracy'], color='r', label="Training accuracy")
plt.plot(model_progress.history['val_accuracy'], color='b', label="Validation accuracy")
plt.legend()
plt.show()

In [None]:
test_pred= Inception_transfer_learning_model.predict(test_img_gen)
predicted_class=np.argmax(test_pred,axis=1)
class_labels = (test_img_gen.class_indices)
prediction = [dict((v,i) for i,v in class_labels.items())[i] for i in predicted_class]
print("Predicted :", predicted_class[:5])
actual_label = test_img_gen.classes
print("actual_label: ", actual_label[:5])

In [None]:
accuracy = accuracy_score(predicted_class,actual_label)
print("Accuracy: %.2f%%" % (accuracy * 100.0))

In [None]:
cnf_matrix = confusion_matrix(predicted_class,actual_label)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure(figsize=(8,8))
plot_confusion_matrix(cnf_matrix, classes=['Normal', 'Pnemonia'],
                      title='Confusion matrix')
plt.show()

**Transfer Learning MobileNet model**

In [None]:
#Now let's do data augmentation
train_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet_v2.preprocess_input,
                                   zoom_range=[0.75, 1],
                                   brightness_range=(0.8, 1.2),
                                   width_shift_range=0.15,
                                   height_shift_range=0.15,
                                   featurewise_center=True,
                                   featurewise_std_normalization=True)
test_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet_v2.preprocess_input)

In [None]:
train_img_gen = train_generator.flow_from_dataframe(
    dataframe=train_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
val_img_gen = train_generator.flow_from_dataframe(
    dataframe=val_dataset,
    directory=train_set_folder_path,
    x_col='X_ray_image_name',
    y_col='Label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
test_img_gen = test_generator.flow_from_dataframe(
                                              dataframe=test_set,
                                              directory=test_set_folder_path,
                                              x_col='X_ray_image_name',
                                              y_col='Label',
                                              target_size=(320, 256),
                                              batch_size=64,
                                              seed=2022,
                                              shuffle=False)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(train_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(test_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
#From professor's notebook, need to implement below callback part

model_path = "/kaggle/working/best_mobilenetv2_pnemonia_cnn.h5"
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5)

monitor = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss',\
                                             verbose=1,save_best_only=True,\
                                             save_weights_only=True,\
                                             mode='min')
# Learning rate schedule
def schedule(epoch, lr):
    if epoch%3 == 0 and epoch!=0:
        lr = lr/2
    return lr

lr_schedule = tf.keras.callbacks.LearningRateScheduler(schedule,verbose = 1)

model_callbacks = [early_stop, lr_schedule, monitor]

In [None]:
inputs = tf.keras.layers.Input((320,256,3))
MobileNet_model=tf.keras.applications.MobileNetV2(include_top=False, weights='imagenet', pooling='max', input_shape=(320, 256, 3))

x = MobileNet_model(inputs)
x = Dropout(0.25)(x)
x = Dense(2048, activation = "relu")(x)
x = Dropout(0.25)(x)
x = Dense(512, activation = "relu")(x)
x = Dropout(0.25)(x)
x = Dense(64, activation = "relu")(x)
x = Dropout(0.25)(x)
outputs = Dense(2, activation = "sigmoid")(x)

MobileNet_transfer_learning_model = tf.keras.Model(inputs, outputs)

# MobileNet_transfer_learning_model = load_model(model_path)

In [None]:
MobileNet_transfer_learning_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=evaluation_metrics)

In [None]:
model_progress = MobileNet_transfer_learning_model.fit(
    train_img_gen,
    batch_size=10240,
    steps_per_epoch = 64,
    validation_data = val_img_gen,
    #epochs = 10,
    epochs = 10,
    callbacks=model_callbacks
)

MobileNet_transfer_learning_model.save(model_path)

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['loss'], color='r', label="Training loss")
plt.plot(model_progress.history['val_loss'], color='b', label="Validation loss")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['accuracy'], color='r', label="Training accuracy")
plt.plot(model_progress.history['val_accuracy'], color='b', label="Validation accuracy")
plt.legend()
plt.show()

In [None]:
test_pred= MobileNet_transfer_learning_model.predict(test_img_gen)
predicted_class=np.argmax(test_pred,axis=1)
class_labels = (test_img_gen.class_indices)
prediction = [dict((v,i) for i,v in class_labels.items())[i] for i in predicted_class]
print("Predicted :", predicted_class[:5])
actual_label = test_img_gen.classes
print("actual_label: ", actual_label[:5])

In [None]:
accuracy = accuracy_score(predicted_class,actual_label)
print("Accuracy: %.2f%%" % (accuracy * 100.0))

In [None]:
cnf_matrix = confusion_matrix(predicted_class,actual_label)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure(figsize=(8,8))
plot_confusion_matrix(cnf_matrix, classes=['Normal', 'Pnemonia'],
                      title='Confusion matrix')
plt.show()

**Implementing transfer learning of our best peformed model on a separate but a similar database**

In [None]:
COVID_df = pd.read_excel('../input/covid19-radiography-database/COVID-19_Radiography_Dataset/COVID.metadata.xlsx')
Lung_Opacity_df = pd.read_excel('../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Lung_Opacity.metadata.xlsx')
Normal_df = pd.read_excel('../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Normal.metadata.xlsx')
Pneumonia_df = pd.read_excel('../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Viral Pneumonia.metadata.xlsx')

In [None]:
Normal_df.head()

In [None]:
Normal_df["FILE NAME"]=Normal_df["FILE NAME"].apply(lambda x: x.lower().capitalize())

In [None]:
Normal_df.head()

In [None]:
COVID_df["label"] = "COVID"
COVID_df["full_img_path"] = "../input/covid19-radiography-database/COVID-19_Radiography_Dataset/COVID/images/"+ COVID_df["FILE NAME"]+".png"
Lung_Opacity_df["label"] = "Lung_Opacity"
Lung_Opacity_df["full_img_path"] = "../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Lung_Opacity/images/"+ Lung_Opacity_df["FILE NAME"]+".png"
Normal_df["label"] = "Normal"
Normal_df["full_img_path"] = "../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Normal/images/"+ Normal_df["FILE NAME"]+".png"
Pneumonia_df["label"] = "Pneumonia"
Pneumonia_df["full_img_path"] = "../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Viral Pneumonia/images/"+ Pneumonia_df["FILE NAME"]+".png"

In [None]:
all_data = pd.concat([pd.concat([pd.concat([COVID_df, Lung_Opacity_df]), Normal_df]), Pneumonia_df])

In [None]:
all_data["filename_cleaned"] = all_data["FILE NAME"].apply(lambda x: x+'.png')

In [None]:
sns.set(style="darkgrid")
ax = sns.countplot(all_data["label"])

In [None]:
print("COVID images:                  Lung_Opacity images:                 Normal images:                 Pneumonia images:")

fig, axs = plt.subplots(5, 4, figsize=(15,15))

COVID = all_data[all_data["label"]=="COVID"]
counter = 0
for img_path in COVID.head()["full_img_path"]:
  axs[counter, 0].imshow(Image.open(img_path))
  counter += 1
    
Lung_Opacity = all_data[all_data["label"]=="Lung_Opacity"]
counter = 0
for img_path in Lung_Opacity.head()["full_img_path"]:
  axs[counter, 1].imshow(Image.open(img_path))
  counter += 1
    
Normal = all_data[all_data["label"]=="Normal"]
counter = 0
for img_path in Normal.head()["full_img_path"]:
  axs[counter, 2].imshow(Image.open(img_path))
  counter += 1

# print("Normal images: ")

Pneumonia = all_data[all_data["label"]=="Pneumonia"]
counter = 0
for img_path in Pneumonia.head()["full_img_path"]:
  axs[counter, 3].imshow(Image.open(img_path))
  counter += 1

In [None]:
# %ls ./train/

In [None]:
%rm -r ./train/
%rm -r ./test/
%rm -r ./val/

In [None]:
%mkdir train
%mkdir test
%mkdir val

In [None]:
train_val_dataset, test_dataset = train_test_split(all_data, test_size=0.1, shuffle=True, random_state=42)

In [None]:
train_dataset, val_dataset = train_test_split(train_val_dataset, test_size=0.1, shuffle=True, random_state=42)

In [None]:
for img_path in train_dataset["full_img_path"]:
    shutil.copy2(img_path, "./train/")
    
for img_path in test_dataset["full_img_path"]:
    shutil.copy2(img_path, "./test/")
    
for img_path in val_dataset["full_img_path"]:
    shutil.copy2(img_path, "./val/")

In [None]:
train_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.inception_v3.preprocess_input,
                                   zoom_range=[0.75, 1],
                                   brightness_range=(0.8, 1.2),
                                   width_shift_range=0.15,
                                   height_shift_range=0.15,
                                   featurewise_center=True,
                                   featurewise_std_normalization=True)
test_generator = ImageDataGenerator(preprocessing_function=tf.keras.applications.inception_v3.preprocess_input)

In [None]:
train_img_gen = train_generator.flow_from_dataframe(
    dataframe=train_dataset,
    directory="./train",
    x_col='filename_cleaned',
    y_col='label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
val_img_gen = train_generator.flow_from_dataframe(
    dataframe=val_dataset,
    directory="./val",
    x_col='filename_cleaned',
    y_col='label',
    target_size=(320, 256),
    batch_size=64,
    seed=2022,
    shuffle=True,
    class_mode='categorical'
)

In [None]:
test_img_gen = test_generator.flow_from_dataframe(
              dataframe=test_dataset,
              directory="./test",
              x_col='filename_cleaned',
              y_col='label',
              target_size=(320, 256),
              batch_size=64,
              seed=2022,
              shuffle=False)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(train_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(15,15))
for i in range(5):
	image = next(test_img_gen)[0][0]
	# plot image
	ax[i].imshow(image)
	ax[i].axis('off')

In [None]:
%cp ../input/enel645-project-models-2603/best_transfer_learning_on_new_dataset_cnn.h5 ./best_transfer_learning_on_new_dataset_cnn.h5

In [None]:
#From professor's notebook

model_path = "/kaggle/working/best_transfer_learning_on_new_dataset_cnn.h5"
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 5)

monitor = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss',\
                                             verbose=1,save_best_only=True,\
                                             save_weights_only=True,\
                                             mode='min')
# Learning rate schedule
def schedule(epoch, lr):
    if epoch%3 == 0 and epoch!=0:
        lr = lr/2
    return lr

lr_schedule = tf.keras.callbacks.LearningRateScheduler(schedule,verbose = 1)

model_callbacks = [early_stop, lr_schedule, monitor]

In [None]:
# best_performing_transfer_learning_model = load_model(model_path)

# inputs = tf.keras.layers.Input((320,256,3))
# last_layer = best_performing_transfer_learning_model.get_layer('dense_13')
# last_output = last_layer.output
# input_l = best_performing_transfer_learning_model.input
# best_performing_base_model = tf.keras.Model(input_l, last_output)

# x = best_performing_base_model(inputs)
# x = Dropout(0.20)(x)
# outputs = Dense(4, activation = "softmax")(x)

# best_performing_model_with_transfer_learning_on_new_dataset = tf.keras.Model(inputs, outputs)

best_performing_model_with_transfer_learning_on_new_dataset = load_model(model_path)

In [None]:
best_performing_model_with_transfer_learning_on_new_dataset.summary()

In [None]:
best_performing_model_with_transfer_learning_on_new_dataset.compile(loss=tf.keras.losses.CategoricalCrossentropy(), optimizer='adam', metrics=evaluation_metrics)

In [None]:
model_progress = best_performing_model_with_transfer_learning_on_new_dataset.fit(
    train_img_gen,
    batch_size=10240,
    steps_per_epoch = 64,
    validation_data = val_img_gen,
    epochs = 30,
#     epochs = 1,
    callbacks=model_callbacks
)

best_performing_model_with_transfer_learning_on_new_dataset.save(model_path)

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['loss'], color='r', label="Training loss")
plt.plot(model_progress.history['val_loss'], color='b', label="Validation loss")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(10,8))
plt.plot(model_progress.history['accuracy'], color='r', label="Training accuracy")
plt.plot(model_progress.history['val_accuracy'], color='b', label="Validation accuracy")
plt.legend()
plt.show()

In [None]:
test_pred= best_performing_model_with_transfer_learning_on_new_dataset.predict(test_img_gen)
predicted_class=np.argmax(test_pred,axis=1)
class_labels = (test_img_gen.class_indices)
prediction = [dict((v,i) for i,v in class_labels.items())[i] for i in predicted_class]
print("Predicted :", predicted_class[:5])
actual_label = test_img_gen.classes
print("actual_label: ", actual_label[:5])

In [None]:
accuracy = accuracy_score(predicted_class,actual_label)
print("Accuracy: %.2f%%" % (accuracy * 100.0))

In [None]:
cnf_matrix = confusion_matrix(predicted_class,actual_label)
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure(figsize=(8,8))
plot_confusion_matrix(cnf_matrix, classes=['Normal', 'Pnemonia'],
                      title='Confusion matrix')
plt.show()