In [3]:
#@title Evironment Setup and imports
#%matplotlib inline
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# from celluloid import Camera  # getting the camera
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.applications import VGG16, ResNet50, VGG19, DenseNet121, InceptionV3, Xception
from tensorflow.keras.layers import Dense, Dropout, Flatten, BatchNormalization, Activation
from keras.constraints import maxnorm
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from keras.utils import np_utils, to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras_preprocessing.image.dataframe_iterator import DataFrameIterator
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, TensorBoard
from matplotlib.offsetbox import (OffsetImage,
                                  AnnotationBbox)
from IPython.display import HTML  # to show the animation in Jupyter
from sklearn.utils import class_weight
from sklearn.metrics import confusion_matrix
from mlxtend.plotting import plot_confusion_matrix
from sklearn.metrics import roc_curve
from sklearn.metrics import auc
import seaborn as sns
import itertools
import os
import shutil
import random
from google.colab import files
import cv2
import glob
from PIL import Image
#we are importing custom classes from project files
from eda import EDA
from preprocessing import Preprocessing
from modelling import Modelling
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [16]:
#@title  We download the dataset from kaggle
files.upload()
!mkdir ~/.kaggle 
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d tawsifurrahman/covid19-radiography-database

mkdir: cannot create directory ‘/root/.kaggle’: File exists
covid19-radiography-database.zip: Skipping, found more recently modified local copy (use --force to force download)


In [None]:
! unzip covid19-radiography-database.zip 

In [5]:
base = '/content/COVID-19_Radiography_Dataset'
paths   = os.listdir(base)

In [6]:
#@title Exploratory Data Analysis Class
class EDA:
    data = None
    plots = {}
    root = None
    def __init__(self, root):
        self.root = root
    def load_data(self):
        # We are creating our test set
        paths = os.listdir(self.root)
        base = self.root
        ls = {'path': [], 'class': [], 'mean': [], 'std': [], 'min': [], 'max': [], 'image': []}
        if os.path.isdir(self.root) is True:
            paths = os.listdir(self.root)
            pr = 10.
            for k in range(len(paths)):
                if os.path.isdir(os.path.join(base, paths[k])):
                    for f in os.listdir(os.path.join(base, paths[k])):
                        file = os.path.join(os.path.join(base, paths[k]), f)
                        arr_img = cv2.imread(file)
                        mean = arr_img.mean()
                        std = arr_img.std()
                        minimum = arr_img.min()
                        maximum = arr_img.max()
                        clas = f.split('/')[-1].split('.')
                        classes = clas[0].split('-')[0]
                        ls['path'].append(file)
                        ls['class'].append(classes)
                        ls['mean'].append(mean)
                        ls['std'].append(std)
                        ls['min'].append(minimum)
                        ls['max'].append(maximum)
                        ls['image'].append(arr_img)
        df = pd.DataFrame(ls)
        df['image_r'] = df['path'].map(lambda x: np.asarray(Image.open(x).resize((75, 75))))
        return df

    def visualize(self):
        self.data = self.load_data()
        samples, features = self.data.shape

        # We are checking if the figures directory exist, if not we are creating it
        if os.path.isdir(os.path.join(os.path.join(os.getcwd(),'figures'))) is False:
            os.mkdir(os.path.join(os.getcwd(),'figures'))
        # Samples per class
        plt.figure(figsize=(10, 6))
        sns.set(style="ticks", font_scale=1)
        ax = sns.countplot(data=self.data, x='class', order=self.data['class'].value_counts().index, palette="flare")
        sns.despine(top=True, right=True, left=True, bottom=False)
        plt.xticks(rotation=0, fontsize=12)
        ax.set_xlabel('Class Type - Diagnosis', fontsize=14, weight='bold')
        ax.set(yticklabels=[])
        ax.axes.get_yaxis().set_visible(False)
        plt.title('Number of Sample X-Ray Images per Class', fontsize=12, weight='bold');

        for p in ax.patches:
            ax.annotate("%.1f%%" % (100 * float(p.get_height() / samples)),
                        (p.get_x() + p.get_width() / 2., abs(p.get_height())),
                        ha='center', va='bottom', color='black', xytext=(0, 10), rotation='horizontal',
                        textcoords='offset points')
        plt.savefig(f'{os.getcwd()}/figures/image_class_distribution.png', dpi=300)

        self.plot_sample_images(3, "sample_images_all_channels", True)
        self.plot_sample_images(3, "sample_images_single_channels", False)

    #     We are plotting color distribution of images
        ax = sns.displot(data=self.data, x='mean', kind="kde", hue='class', fill=False);
        plt.title('Images Colour Mean Value Distribution by Class', fontsize=12, weight='bold');
        plt.savefig(f'{os.getcwd()}/figures/image_mean_color.png', dpi=300)

        #
        ax = sns.displot(data=self.data, x='max', kind="kde", hue='class');
        plt.title('Images Colour Max Value Distribution by Class', fontsize=12, weight='bold');
        plt.savefig(f'{os.getcwd()}/figures/image_max_color.png', dpi=300)

        ax = sns.displot(data=self.data, x='min', kind="kde", hue='class');
        plt.title('Images Colour Min Value Distribution by Class', fontsize=12, weight='bold');
        plt.savefig(f'{os.getcwd()}/figures/image_min_color.png', dpi=300)

        # we are plotting the mean and standard deviation of the dataset
        plt.figure(figsize=(10, 6))
        sns.set(style="ticks", font_scale=1)
        ax = sns.scatterplot(data=self.data, x="mean", y=self.data['std'], hue='class', alpha=0.8);
        sns.despine(top=True, right=True, left=False, bottom=False)
        plt.xticks(rotation=0, fontsize=12)
        ax.set_xlabel('Image Channel Colour Mean', fontsize=12, weight='bold')
        ax.set_ylabel('Image Channel Colour Standard Deviation', fontsize=12, weight='bold')
        plt.title('Mean and Standard Deviation of Image Samples', fontsize=12, weight='bold');
        plt.savefig(f'{os.getcwd()}/figures/image_distribution_scatter.png', dpi=300)

        plt.figure(figsize=(14, 8))
        g = sns.FacetGrid(self.data, col="class", height=5);
        g.map_dataframe(sns.scatterplot, x='mean', y='std', hue='class');
        g.set_titles(col_template="{col_name}", row_template="{row_name}", size=16)
        g.fig.subplots_adjust(top=.7)
        # g.fig.suptitle('Mean and Standard Deviation of Image Samples',fontsize=16, weight = 'bold')
        axes = g.axes.flatten()
        axes[0].set_ylabel('Standard Deviation');
        for ax in axes:
            ax.set_xlabel('Mean')
        g.fig.tight_layout()
        plt.savefig(f'{os.getcwd()}/figures/image_distribution_4_plots.png', dpi=300)

        #we are plotting samples of the images in an annotation  box

        DF_sample = self.data.sample(frac=0.1, replace=False, random_state=1)
        paths = DF_sample['path']

        fig, ax = plt.subplots(figsize=(10, 6))
        ab = sns.scatterplot(data=DF_sample, x="mean", y='std')
        sns.despine(top=True, right=True, left=False, bottom=False)
        ax.set_xlabel('Image Channel Colour Mean', fontsize=12, weight='bold')
        ax.set_ylabel('Image Channel Colour Standard Deviation', fontsize=14, weight='bold')
        plt.title('Mean and Standard Deviation of Image Samples - 10% of Data', fontsize=12, weight='bold');

        for x0, y0, path in zip(DF_sample['mean'], DF_sample['std'], paths):
            ab = AnnotationBbox(self.getImage(path), (x0, y0), frameon=False)
            ax.add_artist(ab)
        plt.savefig(f'{os.getcwd()}/figures/image_bounding_box.png', dpi=300)



    def getImage(self,path):
        imdata = plt.imread(path)
        return OffsetImage(imdata, zoom=0.1)

    def plot_sample_images(self, n_samples=3, pltname="sample_images_all_channels", channels=True):
        fig, m_axs = plt.subplots(4, n_samples, figsize=(4 * n_samples, 3 * 4))
        for n_axs, (type_name, type_rows) in zip(m_axs, self.data.sort_values(['class']).groupby('class')):
            n_axs[1].set_title(type_name, fontsize=12, weight='bold')
            for c_ax, (_, c_row) in zip(n_axs, type_rows.sample(n_samples, random_state=1234).iterrows()):
                picture = c_row['path']
                if channels:
                    image = cv2.imread(picture)
                else:
                    image = plt.imread(picture)
                c_ax.imshow(image)
                c_ax.axis('off')

        plt.savefig(f'{os.getcwd()}/figures/{pltname}.png', dpi=300)

In [7]:
#@title We are performing EDA
eda =  EDA(base)

In [None]:
eda.visualize()

In [9]:
#@title We proceeding with data preprocessing step 
class Preprocessing:
    base = None
    test_path = None
    root = os.getcwd()
    basepath = None
    dataset = None
    paths =  None

    def __init__(self, basepath):
        self.dataset = 'dataset'
        if os.path.isdir(os.path.join(self.root, self.dataset)) is False:
            os.mkdir(os.path.join(self.root, self.dataset))
        if os.path.isdir(os.path.join(self.root, self.dataset + '/test')) is False:
            os.mkdir(os.path.join(self.root, self.dataset + '/test'))
        self.dataset = os.path.join(self.root, self.dataset)
        self.test_path = os.path.join(self.root, self.dataset + '/test')
        self.basepath = basepath
        self.paths =  os.listdir(self.basepath)
        self.path = os.path.join(self.root,'dataset')

    def make_testdir(self):
        # We are creating the test directories
        for k in range(len(self.paths)):
            if os.path.isdir(os.path.join(self.basepath, self.paths[k])):
                if os.path.isdir(os.path.join(self.test_path, self.paths[k])) is False:
                    os.mkdir(os.path.join(self.test_path, self.paths[k]))

    def testset(self,pr=0.1):
        # We are creating our test set
        if os.path.isdir(self.path) is True:
            paths = os.listdir(self.basepath)
            pr = pr
            for k in range(len(paths)):
                if os.path.isdir(os.path.join(self.basepath, self.paths[k])):
                    # We are moving 2% of the sample dataset to test

                    n = np.int(len(os.listdir(os.path.join(self.basepath, self.paths[k]))) * pr)
                    for c in random.sample(glob.glob(os.path.join(self.basepath, self.paths[k] + '/' + self.paths[k] + '*')), n):
                        shutil.move(c, os.path.join(self.test_path, self.paths[k]))

    def make_testset(self):
        self.make_testdir()
        self.testset(0.1)


In [12]:
pp =  Preprocessing(base)

In [13]:
pp.make_testset()

In [15]:
#@title Model training, validation and testing
class Modelling:
    datapath = None
    testpath = None
    # augmentation parameters
    # you can use preprocessing_function instead of rescale in all generators
    # if you are using a pretrained network
    train_augmentation_parameters = dict(
        rescale=1.0 / 255.0,
        rotation_range=45,
        width_shift_range=0.15,
        height_shift_range=0.2,
        fill_mode='nearest',
        # crop_and_pad=0.25,
        shear_range=16.0,
        validation_split=0.2
    )
    # training parameters
    test_augmentation_parameters = dict(
        rescale=1.0 / 255.0
    )
    NUM_CLASSES = 4
    BATCH_SIZE = 32
    CLASS_MODE = 'categorical'
    # CLASS_MODE = 'binary'
    COLOR_MODE = 'grayscale'
    TARGET_SIZE = (128, 128)
    EPOCHS = 100
    SEED = 214
    train_datagen = None
    train_generator = None
    test_datagen  =  None
    test_generator  =  None
    valid_generator = None
    model = None
    optim_params = None
    reduce_lr = None
    check_point = None
    early_stop = None
    y_labels = None
    classweight_dict = None
    hist = None
    y_pred =  None
    y_pred_classes  =  None
    score  =  None
    classweight = None
    classes = ["COVID", "Lung_Opacity", "Normal", "Viral Pneumonia"]

    def __init__(self, datapath, testpath):
        self.datapath = datapath
        self.testpath = testpath
        pass

    def augment_data(self):
        # Using the training phase generators
        self.train_datagen = ImageDataGenerator(**self.train_augmentation_parameters)

        self.train_generator = self.train_datagen.flow_from_directory(
            self.datapath,  # +"/train",
            target_size=self.TARGET_SIZE,
            batch_size=self.BATCH_SIZE,
            class_mode=self.CLASS_MODE,
            subset="training"
        )

        self.valid_generator = self.train_datagen.flow_from_directory(
            self.datapath,
            target_size=self.TARGET_SIZE,
            batch_size=self.BATCH_SIZE,
            class_mode=self.CLASS_MODE,
            subset="validation"
        )

        # Test data generator
        self.test_datagen = ImageDataGenerator(**self.test_augmentation_parameters)
        self.test_generator = self.test_datagen.flow_from_directory(
            self.testpath,
            target_size=self.TARGET_SIZE,
            batch_size=self.BATCH_SIZE,
            class_mode=self.CLASS_MODE,
            shuffle=False,
        )

        print()
        print(np.bincount(self.train_generator.classes))
        print(np.bincount(self.valid_generator.classes))

    def build(self):
        K.clear_session()

        # base_model = VGG16(weights='imagenet', include_top=False, input_shape=TARGET_SIZE+(3,) ) base_model =
        # VGG16(weights='weights/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False,
        # input_shape=TARGET_SIZE+(3,))

        # base_model = VGG16(weights='imagenet', include_top=False, input_shape=TARGET_SIZE+(3,))

        # base_model = VGG19(weights='weights/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False,
        # input_shape=TARGET_SIZE+(3,))
        base_model = ResNet50(weights='imagenet', include_top=False, input_shape=self.TARGET_SIZE + (3,))
        # base_model = DenseNet121( weights='weights/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5',
        # include_top=False, input_shape=TARGET_SIZE+(3,)) base_model = InceptionV3(
        # weights='weights/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False,
        # input_shape=TARGET_SIZE+(3,)) can also try other architectures

        x = base_model.output
        x = Flatten()(x)
        # x = Dense(512, activation='relu')(x)
        # x = Dropout(0.1)(x)
        # x = Dense(256, activation='relu')(x)
        # x = Dropout(0.1)(x)
        # x = Dense(128, activation='relu')(x)
        # x = Dropout(0.1)(x)
        # x = Dense(64, activation='relu')(x)
        # x = Dropout(0.1)(x)

        x = Dense(self.NUM_CLASSES, activation='softmax')(x)

        model = Model(inputs=base_model.input, outputs=x)

        # print(model.summary())

        # for layer in model.layers[0:-14]:
        #     layer.trainable = False

        # print_layers(model)
        self.model = model

    def printmodel(self):
        for idx, layer in enumerate(self.model.layers):
            print("layer {}: {}, trainable: {}".format(idx, layer.name, layer.trainable))

    def get_model(self):
        self.build()
        self.augment_data()
        self.configuremodel()
        return self.model

    def configuremodel(self):
        self.optim_params = dict(
            learning_rate=0.0001,
            momentum=0.9394867962846013,
            decay=0.0001
        )

        self.model.compile(
            loss='categorical_crossentropy',
            optimizer=SGD(**self.optim_params),
            metrics=['accuracy'])
        self.check_point = ModelCheckpoint('weighted_model_v2.best.h5', monitor='val_loss', verbose=1,
                                           save_best_only=True, save_weights_only=False, save_freq=1)

        self.reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, verbose=1, min_lr=0.00001);

        self.early_stop = EarlyStopping(monitor='val_loss', min_delta=0.001, patience=11, verbose=1,
                                        restore_best_weights=True);
        # y_labels = np.argmax(y_train, axis=1)
        self.y_labels = self.train_generator.classes
        self.classweight = class_weight.compute_class_weight('balanced', np.unique(self.y_labels), self.y_labels)

        self.classweight_dict = {}
        for i in range(len(self.classweight)):
            self.classweight_dict[i] = self.classweight[i]

        print(self.classweight_dict)
        print(np.bincount(self.train_generator.classes))

    def train(self):
        self.hist = self.model.fit(
            self.train_generator,
            validation_data=self.valid_generator,
            epochs=self.EPOCHS
        )
        train_hist = pd.DataFrame(self.hist.history)
        train_hist.to_csv(os.path.join(os.getcwd(), 'figures/train_history.csv'), index=False)
        self.plot_acc_loss()
        self.model.save(os.path.join(os.getcwd(), "models/ResNet50.model.h5"))
        self.model.save_weights(os.path.join(os.getcwd(), "models/ResNet50.weights.model..h5"))
    def plot_acc_loss(self):
        # We are checking if the figures directory exist, if not we are creating it
        if os.path.isdir(os.path.join(os.path.join(os.getcwd(),'figures'))) is False:
            os.mkdir(os.path.join(os.getcwd(),'figures'))

        fig = plt.figure(figsize=(8, 6), dpi=150)
        plt.plot(self.hist.history['accuracy'])
        plt.plot(self.hist.history['val_accuracy'])
        plt.title('model accuracy')
        plt.ylabel('accuracy')
        plt.xlabel('epoch')
        plt.legend(['train', 'validation'], loc='lower right')
        plt.savefig(os.path.join(os.getcwd(), '/figures/train_val_accuracy.png'), dpi=300)

        fig = plt.figure(figsize=(8, 6))
        plt.plot(self.hist.history['loss'])
        plt.plot(self.hist.history['val_loss'])
        plt.title('model loss')
        plt.ylabel('loss')
        plt.xlabel('epoch')
        plt.legend(['train', 'test'], loc='upper right')
        plt.savefig(os.path.join(os.getcwd(), '/figures/train_val_loss.png'), dpi=300)
    def test(self):
        print(np.bincount(self.test_generator.classes), "\n")
        self.score = self.model.evaluate(self.test_generator, verbose=0)
        # Predic
        self.y_pred = self.model.predict_generator(generator=self.test_generator)
        # print(y_pred[:10])
        # to get the prediction, we pick the class with with the highest probability
        self.y_pred_classes = np.argmax(self.y_pred, axis=1)
        # y_true = np.argmax(y_val, axis = 1)
        y_true = self.test_generator.classes

        conf_mtx = confusion_matrix(y_true, self.y_pred_classes)
        plot_confusion_matrix(conf_mtx, figsize=(12, 8), hide_ticks=True, cmap=plt.cm.Blues, colorbar=True)
        plt.xticks(range(4), self.classes, fontsize=16)
        plt.yticks(range(4), self.classes, fontsize=16)
        plt.savefig(os.path.join(os.getcwd(), '/figures/test_confusion_matrix.png'), dpi=300)
        print('Model Loss: {}, Accuracy: {}'.format(self.score[0], self.score[1]))

        # we are analyzing model complexity using ROC
        self.plot_roc()
    def plot_roc(self):
        fpr = dict()
        tpr = dict()
        roc_auc = dict()

        y_test = to_categorical(self.test_generator.classes)

        for i in range(self.NUM_CLASSES):
            fpr[i], tpr[i], _ = roc_curve(y_test[:, i], self.y_pred[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])

        # for i in range(1):
        #     fpr[i], tpr[i], _ = roc_curve(y_true, y_pred)
        #     roc_auc[i] = auc(fpr[i], tpr[i])

        plt.figure(figsize=(10, 6))

        for i in range(self.NUM_CLASSES):
            plt.plot(fpr[i], tpr[i], lw=2,
                     label='ROC curve of {0} (area = {1:0.3f})'.format(self.classes[i], roc_auc[i]))

        # for i in range(1):
        #     plt.plot(fpr[i], tpr[i], lw=2,
        #              label='ROC curve (area = {:0.3f})'.format(roc_auc[i]))

        plt.plot(fpr[0], fpr[0], 'k-', label='random guessing')

        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('ROC curve')
        plt.legend(loc="lower right")

        plt.tight_layout()
        plt.savefig(os.path.join(os.getcwd(), 'figures/roc_curve.png'), dpi=300)

In [14]:
test_path =  '/content/dataset/test'
datapath  =  base

In [None]:
model = modelling.get_model()

In [None]:
modelling.train()

In [None]:
modelling.test()