<a href="https://colab.research.google.com/github/mustafa-mohamedz/covidNet_CT_Scans/blob/main/CovidNet_CTscans_v10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Parameters

In [None]:
import os, argparse, random, cv2
import numpy as np
import matplotlib.pyplot as plt

parser = argparse.ArgumentParser(description='COVID-Net Training Script')
parser.add_argument('--lr', default=0.001, type=float, help='Learning rate')
parser.add_argument('--bs', default=8, type=int, help='Batch size')
parser.add_argument('--epochs', default=15, type=int, help='Number of epochs')
parser.add_argument('--outputpath', default='/content/outputs', type=str, help="Path to model output, e.g. weights, logs, etc., defaults to '/content/outputs'")
parser.add_argument('--n_classes', default=3, type=int, help='Number of detected classes, defaults to 3')
parser.add_argument('--trainfile', default='train_COVIDx_CT-2A.txt', type=str, help='Path to train file')
parser.add_argument('--valfile', default='val_COVIDx_CT-2A.txt', type=str, help='Path to validation file')
parser.add_argument('--testfile', default='test_COVIDx_CT-2A.txt', type=str, help='Path to test file')
parser.add_argument('--data_format', default=1, type=int, help='the type of the data folder, 1 = normal folder, 2 = zip compressed')
parser.add_argument('--datadir', default='/content/covidxct/2A_images', type=str, help='Path to data folder used in case of data_format equal 1')
parser.add_argument('--zip_path', default='/content/drive/MyDrive/GP/Graduation Project/covidxct.zip', type=str, help='path to the zip file data used in case of data_format equal 2')
parser.add_argument('--input_size', default=480, type=int, help='Size of input (ex: if 480x480, --input_size 480)')
parser.add_argument('--num_channels', default=3, type=int, help='Number of channels, defaults to 3')

args = parser.parse_args([
    '--lr', '0.0002',
    '--bs', '32',
    '--epochs', '15',
    '--data_format', '2',
    '--zip_path', '/content/drive/MyDrive/GP/Graduation Project/covidxct.zip',
    '--outputpath', '/content/drive/MyDrive/GP/Graduation Project/training/v10'])

# Our Implementation for the model

In [None]:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D , SeparableConv2D , Activation , BatchNormalization, Add, GlobalAveragePooling2D, concatenate, Flatten, Dense, Dropout, DepthwiseConv2D, MaxPooling2D

def batch_relu(input):
    X = BatchNormalization(axis=3)(input)
    X = Activation('relu')(X)
    return X 

def PEPX_MODULE(input , filters, strides = 1):
    # extract number of filters for each layer 
    f1,f2,f3,f4,f5 = filters

    # First stage projection
    X = Conv2D(filters = f1 , kernel_size= 1 , strides = 1)(input)
    X = batch_relu(X)

    # First stage expansion
    X = Conv2D(filters = f2 , kernel_size=1 , strides = 1)(X)
    X = batch_relu(X)

    # Thrid stage : Depth wise convolution
    X = DepthwiseConv2D(kernel_size=3 , strides = strides , padding='same')(X)
    X = batch_relu(X)

    # Second-stage projection 
    X = Conv2D(filters=f4 , kernel_size=1 , strides=1)(X)
    X = batch_relu(X)

    # Second stage Extension
    X = Conv2D(filters =f5 , kernel_size=1 , strides=1)(X)
    X = batch_relu(X)

    return X

class CovidNet:
    def __init__(self, input_shape = (480,480,3)):
        self.input_shape = input_shape

    def get_model(self):
        return self.__build_model()

    def __build_model(self):
        #input tensor with input shape
        x_input = Input(self.input_shape)

        #conv 7*7 layer
        x = Conv2D(filters =56 , kernel_size=7 , strides=2, padding = 'same')(x_input)
        x = batch_relu(x)

        #first block
        filter1 = (28,56,56,28,56)
        PEPX1_1 = PEPX_MODULE(x , filters = filter1)
        PEPX1_1 = MaxPooling2D(pool_size=(2, 2), strides=2)(PEPX1_1)
        upper1 = Conv2D(filters = 56 , kernel_size= 1)(x)
        upper1 = batch_relu(upper1)
        upper1 = MaxPooling2D(pool_size=(2, 2), strides=2)(upper1)
        PEPX1_2 = PEPX_MODULE(Add()([PEPX1_1, upper1]) , filters = filter1)
        PEPX1_3 = PEPX_MODULE(Add()([PEPX1_1, PEPX1_2, upper1]) , filters = filter1)

        #second block
        filter2 = (56,112,112,56,112)
        PEPX2_1 = PEPX_MODULE(Add()([PEPX1_1, PEPX1_2, PEPX1_3, upper1]) , filters = (28,112,112,56,112))
        PEPX2_1 = MaxPooling2D(pool_size=(2, 2), strides=2)(PEPX2_1)
        upper2 = Conv2D(filters = 112 , kernel_size= 1 )(Add()([PEPX1_1, PEPX1_2, PEPX1_3, upper1]))
        upper2 = batch_relu(upper2)
        upper2 = MaxPooling2D(pool_size=(2, 2), strides=2)(upper2)
        PEPX2_2 = PEPX_MODULE(Add()([PEPX2_1, upper2]) , filters =filter2)
        PEPX2_3 = PEPX_MODULE(Add()([PEPX2_1, PEPX2_2, upper2]) , filters = filter2)
        PEPX2_4 = PEPX_MODULE(Add()([PEPX2_1, PEPX2_2, PEPX2_3, upper2]),filters = filter2)

        #third block
        filter3 = (112,224,224,112,224)
        PEPX3_1 = PEPX_MODULE(Add()([PEPX2_1, PEPX2_2, PEPX2_3, PEPX2_4, upper2]),filters = (56,224,224,112,224))
        PEPX3_1 = MaxPooling2D(pool_size=(2, 2), strides=2)(PEPX3_1)
        upper3 = Conv2D(filters = 224, kernel_size = 1)(Add()([PEPX2_1, PEPX2_2, PEPX2_3, PEPX2_4, upper2]))
        upper3 = batch_relu(upper3)
        upper3 = MaxPooling2D(pool_size=(2, 2), strides=2)(upper3)
        PEPX3_2 = PEPX_MODULE(Add()([PEPX3_1, upper3]), filters = filter3)
        PEPX3_3 = PEPX_MODULE(Add()([PEPX3_1, PEPX3_2, upper3]), filters = filter3)
        PEPX3_4 = PEPX_MODULE(Add()([PEPX3_1,PEPX3_2, PEPX3_3, upper3]), filters = filter3)
        PEPX3_5 = PEPX_MODULE(Add()([PEPX3_1,PEPX3_2, PEPX3_3, PEPX3_4, upper3]),filters = filter3)
        PEPX3_6 = PEPX_MODULE(Add()([PEPX3_1,PEPX3_2, PEPX3_3, PEPX3_4, PEPX3_5, upper3]),filters = filter3)

        #fourth block
        filter4 = (212,424,424,212,424)
        PEPX4_1 = PEPX_MODULE(Add()([PEPX3_1, PEPX3_2, PEPX3_3, PEPX3_4, PEPX3_5, PEPX3_6, upper3]),filters = (112,424,424,212,424))
        PEPX4_1 = MaxPooling2D(pool_size=(2, 2), strides=2)(PEPX4_1)
        upper4 = Conv2D(filters = 424, kernel_size = 1)(Add()([PEPX3_1, PEPX3_2, PEPX3_3, PEPX3_4, PEPX3_5, PEPX3_6, upper3]))
        upper4 = batch_relu(upper4)
        upper4 = MaxPooling2D(pool_size=(2, 2), strides=2)(upper4)
        PEPX4_2 = PEPX_MODULE(Add()([PEPX4_1, upper4]), filters = filter4)
        PEPX4_3 = PEPX_MODULE(Add()([PEPX4_1, PEPX4_2, upper4]), filters = filter4)

        #Global average pooling
        x = GlobalAveragePooling2D()(concatenate([PEPX4_1, PEPX4_2, PEPX4_3, upper4], axis=3))

        x = Dense(3, activation='softmax')(x)
        return Model(inputs = x_input, outputs=x)


# Data Loader

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    """Generates data for Keras"""
    def __init__(self, info_path, data_path, batch_size=8, input_dim = (480, 480, 3), num_classes = 3, shuffle=True):
        self.fnames, self.classes, self.bboxes = self.__load_data_info(info_path, shuffle)
        self.data_path = data_path
        self.batch_size = batch_size
        self.input_dim = input_dim
        self.num_classes = num_classes

    def __load_data_info(self, label_file, shuffle):
        """Loads image filenames, classes, and bounding boxes"""
        line_list = []
        with open(label_file, 'r') as f:
            for line in f.readlines():
                line_list.append(line)
        
        if shuffle: random.Random(7).shuffle(line_list)

        fnames, classes, bboxes = [], [], []
        for line in line_list:
            fname, cls, xmin, ymin, xmax, ymax = line.strip('\n').split()
            fnames.append(fname)
            classes.append(int(cls))
            bboxes.append((int(ymin), int(ymax), int(xmin), int(xmax)))
        
        return fnames, classes, bboxes
        
  

    def __preproc_image(self, image, bbox):
        """perform image preprocessing"""
        image = image[bbox[0]:bbox[1],bbox[2]:bbox[3]] # crop image using boundaries
        image = cv2.resize(image, (self.input_dim[0], self.input_dim[1])) # resize to match the input size
        image = np.stack([image] * self.input_dim[2], axis=-1)  # make image channels
        image = image / 255 #change range to [0, 1]
        return image
    
    def __load_images_batch(self, start, end):
        """Load batch of images using start and end indices"""
        image_list = np.empty((self.batch_size, self.input_dim[0], self.input_dim[1], self.input_dim[2]))
        for index in range(start, end):
            bbox = self.bboxes[index]
            image = cv2.imread(os.path.join(self.data_path, self.fnames[index]), cv2.IMREAD_UNCHANGED)
            image = self.__preproc_image(image, bbox)
            image_list[index-start,] = image
        return image_list

    def __load_labels_batch(self, start, end):
        """Load batch of labels using start and end indices
        Classes mapping(Normal: 0, Pneumonia: 1, COVID-19: 2)
        """
        label_list = np.empty((self.batch_size), dtype=int)
        for index in range(start, end):
            label_list[index-start] = self.classes[index]
        return tf.keras.utils.to_categorical(label_list, num_classes=self.num_classes)
        
    def __len__(self):
        """Denotes the number of batches per epoch"""
        return int(np.floor(len(self.fnames) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data"""
        batch_start = index * self.batch_size
        batch_end = min((index + 1) * self.batch_size, len(self.fnames))
        
        X = self.__load_images_batch(batch_start, batch_end)
        y = self.__load_labels_batch(batch_start, batch_end)
        return X, y

In [None]:
from zipfile import ZipFile

class ZipDataGenerator(tf.keras.utils.Sequence):
    """Generates data for Keras from zip file"""
    def __init__(self, zip_path, zip_info_path, zip_data_path, batch_size=8, input_dim = (480, 480, 3), num_classes = 3, shuffle=True):
        self.zf = ZipFile(zip_path)
        self.files_dict = {file.filename: file for file in self.zf.infolist()}
        
        self.fnames, self.classes, self.bboxes = self.__load_data_info(self.files_dict[zip_info_path], shuffle)
        self.zip_data_path = zip_data_path
        self.batch_size = batch_size
        self.input_dim = input_dim
        self.num_classes = num_classes

    def __load_data_info(self, label_file, shuffle):
        """Loads image filenames, classes, and bounding boxes"""
        line_list = []
        with self.zf.open(label_file) as f:
            for line in f.readlines():
                line_list.append(str(line, encoding='UTF-8'))
        
        if shuffle: random.Random(7).shuffle(line_list)

        fnames, classes, bboxes = [], [], []
        for line in line_list:
            fname, cls, xmin, ymin, xmax, ymax = line.strip('\n').split()
            fnames.append(fname)
            classes.append(int(cls))
            bboxes.append((int(ymin), int(ymax), int(xmin), int(xmax)))
        
        return fnames, classes, bboxes
        
  

    def __preproc_image(self, image, bbox):
        """perform image preprocessing"""
        image = image[bbox[0]:bbox[1],bbox[2]:bbox[3]] # crop image using boundaries
        image = cv2.resize(image, (self.input_dim[0], self.input_dim[1])) # resize to match the input size
        image = np.stack([image] * self.input_dim[2], axis=-1)  # make image channels
        image = image / 255 #change range to [0, 1]
        return image
    
    def __load_images_batch(self, start, end):
        """Load batch of images using start and end indices"""
        image_list = np.empty((self.batch_size, self.input_dim[0], self.input_dim[1], self.input_dim[2]))
        for index in range(start, end):
            bbox = self.bboxes[index]
            file = self.files_dict[os.path.join(self.zip_data_path, self.fnames[index])]
            image = cv2.imdecode(np.frombuffer(self.zf.read(file), np.uint8), cv2.IMREAD_UNCHANGED)
            image = self.__preproc_image(image, bbox)
            image_list[index-start,] = image
        return image_list

    def __load_labels_batch(self, start, end):
        """Load batch of labels using start and end indices
        Classes mapping(Normal: 0, Pneumonia: 1, COVID-19: 2)
        """
        label_list = np.empty((self.batch_size), dtype=int)
        for index in range(start, end):
            label_list[index-start] = self.classes[index]
        return tf.keras.utils.to_categorical(label_list, num_classes=self.num_classes)
        
    def __len__(self):
        """Denotes the number of batches per epoch"""
        return int(np.floor(len(self.fnames) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data"""
        batch_start = index * self.batch_size
        batch_end = min((index + 1) * self.batch_size, len(self.fnames))
        
        X = self.__load_images_batch(batch_start, batch_end)
        y = self.__load_labels_batch(batch_start, batch_end)
        return X, y

# Configurations

In [None]:
if args.data_format == 1 :
  train_generator = DataGenerator(info_path = args.datadir + '/../' + args.trainfile,
                            data_path = args.datadir,
                            batch_size=args.bs,
                            input_dim = (args.input_size, args.input_size, args.num_channels),
                            num_classes = args.n_classes)
  val_generator = DataGenerator(info_path = args.datadir + '/../' + args.valfile,
                            data_path = args.datadir,
                            batch_size=args.bs,
                            input_dim = (args.input_size, args.input_size, args.num_channels),
                            num_classes = args.n_classes)
elif args.data_format == 2 :
  train_generator = ZipDataGenerator(zip_path = args.zip_path,
                            zip_info_path = args.trainfile,
                            zip_data_path = "2A_images",
                            batch_size=args.bs,
                            input_dim = (args.input_size, args.input_size, args.num_channels),
                            num_classes = args.n_classes)
  val_generator = ZipDataGenerator(zip_path = args.zip_path,
                            zip_info_path = args.valfile,
                            zip_data_path = "2A_images",
                            batch_size=args.bs,
                            input_dim = (args.input_size, args.input_size, args.num_channels),
                            num_classes = args.n_classes)
  
def learning_rate_scheduler(epoch, lr):
  decay_rate = 0.1
  decay_step = 10
  if epoch % decay_step == 0 and epoch:
    return lr * decay_rate
  else: 
    return lr
      
my_callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=5),
    tf.keras.callbacks.LearningRateScheduler(learning_rate_scheduler, verbose=1),
    tf.keras.callbacks.ModelCheckpoint(filepath=args.outputpath + '/weights' + '/CovidNetCT.{epoch:02d}-{val_loss:.2f}.h5'),
    tf.keras.callbacks.TensorBoard(log_dir=args.outputpath + '/logs')
]

# Training

In [None]:
instance = CovidNet()
model = instance.get_model()

model.compile(loss="categorical_crossentropy",
              optimizer=tf.keras.optimizers.Adam(learning_rate=args.lr),
              metrics=["accuracy"])

history = model.fit(x = train_generator, 
          epochs=args.epochs, 
          callbacks=my_callbacks, 
          validation_data=val_generator, 
          initial_epoch=0) 

# Testing

In [None]:
instance = CovidNet()
model = instance.get_model()

model.compile(loss="categorical_crossentropy",
              optimizer=tf.keras.optimizers.Adam(learning_rate=args.lr),
              metrics=["accuracy"])

model.load_weights('/content/drive/MyDrive/GP/Graduation Project/training/v10/weights/CovidNetCT.15-0.12.h5')

test_generator = ZipDataGenerator(zip_path = args.zip_path,
                            zip_info_path = args.testfile,
                            zip_data_path = "2A_images",
                            batch_size=args.bs,
                            input_dim = (args.input_size, args.input_size, args.num_channels),
                            num_classes = args.n_classes)
output=model.evaluate(
    x=test_generator,
    verbose=1,
)
print(output)

[0.13897405564785004, 0.9895443320274353]


# Model Summary

In [None]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 480, 480, 3) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 240, 240, 56) 8288        input_1[0][0]                    
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 240, 240, 56) 224         conv2d[0][0]                     
__________________________________________________________________________________________________
activation (Activation)         (None, 240, 240, 56) 0           batch_normalization[0][0]        
______________________________________________________________________________________________