In [1]:
# General libraries
import os
import numpy as np
import pandas as pd 
import random
import cv2
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.metrics import accuracy_score, confusion_matrix
# Deep learning libraries
from tensorflow.keras.models import Model,load_model, Sequential
from tensorflow.keras.layers import Input, Dense, Flatten, Dropout, BatchNormalization, ZeroPadding2D
from tensorflow.keras.layers import Conv2D, SeparableConv2D, MaxPooling2D, LeakyReLU, Activation, MaxPool2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from sklearn.utils import class_weight 
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras import layers
from tensorflow.compat.v1 import ConfigProto,InteractiveSession

config = ConfigProto()
config.gpu_options.allow_growth = True 
config.gpu_options.per_process_gpu_memory_fraction = 0.9
session = InteractiveSession(config=config)
#tf.keras.backend.tensorflow_backend.set_session(tf.Session(config=config))
import pickle

In [4]:
from tqdm import tqdm
from keras.utils import to_categorical
input_path = '../proc_input/covid_data/'
def process_data(img_dims, batch_size):
    #Data generation objects
    x_train, y_train = list(), list()
    x_test, y_test = list(), list()
    x_val, y_val = list(), list()
    # I will be making predictions off of the test set in one batch size
    # This is useful to be able to get the confusion matrix
    for cond in tqdm(['/NORMAL/', '/PNEUMONIA/', '/COVID/']):
        ipath = input_path + 'train' + cond
        #print(ipath)
        for img in (os.listdir(input_path + 'train' + cond)):
            impath = input_path+'train'+cond+img
            try:
                imga = cv2.imread(impath) #, cv2.COLOR_BGR2GRAY)
                imga = cv2.resize(imga, (img_dims, img_dims))
                #imga = img_to_array(imga, data_format=None)
                imga = np.dstack([imga])
                if cond=='/NORMAL/':
                    label = 0
                elif cond=='/PNEUMONIA/':
                    label = 1
                else:
                    label = 2
                x_train.append(imga)
                y_train.append(label)
            except:
                print('e')
   
    for cond in tqdm(['/NORMAL/', '/PNEUMONIA/',"/COVID/"]):
        for img in (os.listdir(input_path + 'test' + cond)):
            impath = input_path+'test'+cond+img
            try:
                imga = cv2.imread(impath) #, cv2.COLOR_BGR2GRAY)
                imga = cv2.resize(imga, (img_dims, img_dims))
                #imga = img_to_array(imga, data_format=None)
                imga = np.dstack([imga])
                if cond=='/NORMAL/':
                    label = 0
                elif cond=='/PNEUMONIA/':
                    label = 1
                else:
                    label = 2
                x_test.append(imga)
                y_test.append(label)
            except:
                print('f')
    len_xtrain = len(x_train)
    len_xtest = len(x_test)
   
   
    x_test  = np.asarray(x_test)/255.0
    x_train = np.asarray(x_train)/255.0
    y_train = to_categorical(y_train, 3)
    y_test = to_categorical(y_test, 3)
   
    # data genarators
    #train_datagen =ImageDataGenerator(rescale=1./255, zoom_range=0.3, vertical_flip=True,
    #train_datagen =ImageDataGenerator(zoom_range=0.3, vertical_flip=True,
    #    height_shift_range=0.1,
    #    width_shift_range=0.1,
    #    shear_range=0.2)
    train_datagen = ImageDataGenerator(
        featurewise_center=False,
        featurewise_std_normalization=False,
        horizontal_flip=True,
        height_shift_range=0.11,
        width_shift_range=0.11,
        rotation_range=8,
        shear_range=0.2,
        zoom_range=0.3,
        #brightness_range=(0.9, 1.1),
        fill_mode='constant')
    #test_datagen=ImageDataGenerator(rescale=1./255)
    test_datagen=ImageDataGenerator()
    val_datagen=ImageDataGenerator()
    train_gen = train_datagen.flow(
        x_train, y_train,
        batch_size=batch_size)
    test_gen = test_datagen.flow(
        x_test, y_test,
        batch_size=16)
    return train_gen, test_gen, x_test, y_test, len_xtrain, len_xtest

In [5]:
# Hyperparameters
img_dims = 150
batch_size = 128

# Getting the data
train_gen, val_gen, x_test, y_test, len_xtrain, len_xval = process_data(img_dims, batch_size)

100%|██████████| 3/3 [01:26<00:00, 28.86s/it]
100%|██████████| 3/3 [00:10<00:00,  3.52s/it]


In [6]:
from tensorflow.keras.regularizers import l1
from tensorflow.keras.applications import ResNet152V2
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras import backend as K

def resnet_model(img_dims):
    input_tensor = Input(shape=(img_dims, img_dims, 3))
    base_model = ResNet152V2(include_top=False, input_tensor=input_tensor)

    x = base_model.output

    x = GlobalAveragePooling2D()(x)
    x = Dropout(0.5)(x)
    x = Dense(4096, activation='relu',activity_regularizer=l1(0.0001))(x)
    x = Dropout(0.1)(x)
    x = Dense(2048, activation='relu',activity_regularizer=l1(0.0001))(x)
    x = Dropout(0.03)(x)
    x = Dense(1024, activation='relu',activity_regularizer=l1(0.0001))(x)
    x = Dropout(0.03)(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(0.01)(x)
    x = Dense(64, activation='relu')(x)

    predictions = Dense(3, activation='softmax')(x)


    # this is the model we will train
    model = Model(inputs=base_model.input, outputs=predictions)

    model.summary()
    return model

In [8]:
checkpoint = ModelCheckpoint(filepath='../outputs/resnet.hdf5', save_best_only=True, save_weights_only=True, monitor='val_accuracy')
lr_reduce = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, verbose=2, mode='max',min_lr=0.0001)
early_stop = EarlyStopping(monitor='val_accuracy', patience=15, mode='min')

model = resnet_model(img_dims)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
weights = {0:.7, 1:1.53, 2:17}
print(weights)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 150, 150, 3) 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 156, 156, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 75, 75, 64)   9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
pool1_pad (ZeroPadding2D)       (None, 77, 77, 64)   0           conv1_conv[0][0]                 
_____________

In [None]:
hist = model.fit_generator(
               train_gen, steps_per_epoch=len_xtrain// batch_size, 
               epochs=200, validation_data=val_gen, 
               validation_steps=len_xval// batch_size,callbacks=[checkpoint,lr_reduce],class_weight = weights)

In [10]:
model.load_weights('resnet_weights.hdf5')
model.save("../outputs/resnet")

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(10, 3))
ax = ax.ravel()

for i, met in enumerate(['accuracy', 'loss']):
    ax[i].plot(hist.history[met])
    ax[i].plot(hist.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

In [9]:
#model.load_weights('../outputs/resnet_weights.hdf5')


In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix
preds = model.predict(x_test)

In [None]:
output1 = []
out_test = []
for i in range(len(preds)):
    output1.append(list(preds[i]).index(max(preds[i])))
for i in range(len(y_test)):
    out_test.append(list(y_test[i]).index(max(y_test[i])))

print(list(np.round(output1)).count(0)/len(output1))
print(list(out_test).count(0)/len(out_test))

In [None]:
acc = accuracy_score(out_test, output1)*100
print(acc)

In [None]:
cm = confusion_matrix(out_test, output1)
print("normal_sense= ", cm[0][0]/(cm[0][0]+cm[0][1]+cm[0][2]))
print("pneumonia_sense= ", cm[1][1]/(cm[1][0]+cm[1][1]+cm[1][2]))
print("covid_sense= ", cm[2][2]/(cm[2][0]+cm[2][1]+cm[2][2]))

In [None]:
import seaborn as sn
df_cm = pd.DataFrame(cm, index = [i for i in ["NORMAL","PNEUMONIA","COVID"]],
                  columns = [i for i in ["NORMAL","PNEUMONIA","COVID"]])
plt.figure(figsize = (10,7))
x = sn.heatmap(df_cm, annot=True,fmt="d", cmap = "Blues")