In [None]:
import numpy as np
import pandas as pd
import shutil 
import matplotlib.pyplot as plt
from keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array, array_to_img
import os,cv2
import numpy as np
from keras import backend as K
from keras.utils import np_utils
from keras.layers.normalization import BatchNormalization
import keras
from keras.models import Sequential
import tensorflow as tf
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
%matplotlib inline

In [None]:
import imutils
from scipy import ndimage as ndi
from skimage.feature import canny
from skimage.filters import roberts, farid, sobel, scharr, prewitt, laplace
from skimage.segmentation import clear_border
from skimage.measure import label, regionprops
from skimage.morphology import area_closing, disk, dilation, binary_erosion, remove_small_objects, erosion, opening, closing, reconstruction, binary_closing

In [None]:
# Define data path
PATH = os.getcwd()
data_path = PATH + '/CT'
data_dir_list = os.listdir(data_path)

In [None]:
img_rows=224
img_cols=224
num_channel=1


# Define the number of classes
num_classes = 2
labels_name={'Covid':0,'non-Covid':1}

img_data_list=[]
labels_list = []
moment_list = []

for dataset in data_dir_list:
    img_list=os.listdir(data_path+'/'+ dataset)
    print ('Loaded the images of dataset-'+'{}\n'.format(dataset))
    label_data = labels_name[dataset]
    for img in img_list:
        im3= cv2.imread(data_path + '/'+ dataset + '/'+ img , 0)
        im2= cv2.imread(data_path + '/'+ dataset + '/'+ img , 0)
        im= cv2.imread(data_path + '/'+ dataset + '/'+ img , 0)
        ret1,th1 = cv2.threshold(im, 127,255, cv2.THRESH_BINARY+cv2.THRESH_OTSU)
        h,w = im.shape[0:2]
        mask = np.zeros(im.shape, dtype=np.uint8)

        left = int(np.round(h/(2)))
        right = int(np.round(h/(2)))
        for y in range(left-int(h/5.2), left+int(h/3.9)):
            for x in range(0, 4):
                im[y][x] = 255

        for y in range(right-int(h/5.2), right+int(h/3.9)):
                for x in range(w-4, w):
                    im[y][x] = 255
        
        ret1,im2 = cv2.threshold(im2,180,180, cv2.THRESH_TOZERO)   
        ## Binary image ##
        binary = im < 150

        ## Remove the blobs connected to the border of the image ##
        cleared = clear_border(binary)

        ## Label image ##
        label_image = label(cleared)


        ## Keep the labels with 2 largest areas ##
        areas = [r.area for r in regionprops(label_image)]
        areas.sort()


        if len(areas) > 2:
            for region in regionprops(label_image):
                if region.area <= 5:
                    for coordinates in region.coords: 
                        label_image[coordinates[0], coordinates[1]] = 0
                        mask[coordinates[0], coordinates[1]] = 255


        large_binary = label_image > 0  

        ## Erosion operation with a disk of radius 2. This operation is seperate the lung nodules attached to the blood vessels ##
        selem = disk(0)
        binary_eroded = binary_erosion(large_binary, selem)


        ## Closure operation with a disk of radius 10. This operation is to keep nodules attached to the lung wall ##
        selem = disk(10)
        binary_closed = binary_closing(binary_eroded, selem)

        start_point = (0, 0) 
        end_point = (0,h) 
        color = (0,0,0) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point, end_point, color, 5)

        start_point2 = (w, 0) 
        end_point2 = (w,h) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point2, end_point2, color, 5)

        start_point2 = (0, h) 
        end_point2 = (w,h) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point2, end_point2, color, 5)

        start_point2 = (0, 0) 
        end_point2 = (w,0) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point2, end_point2, color, 5)


        ## Fill in the small holes inside the binary mask of lungs ##
        edges = roberts(binary_closed)
        hole_closed = ndi.binary_fill_holes(edges)

        ## Superimpose the binary mask on the input image. ##
        ZERO_VALUE = 0
        get_high_vals = (hole_closed == 0)
        im[get_high_vals] = ZERO_VALUE # minimum value
        
                
        input_img_resize=cv2.resize(im,(224,224))
        input_img_resize_gray = cv2.resize(im3,(224,224))
        moments = cv2.moments(input_img_resize_gray)
        huMoments = cv2.HuMoments(moments)   
        moment_list.append(huMoments)
        img_data_list.append(input_img_resize)
        labels_list.append(label_data)  


In [None]:
## Image normalization ##
img_data = np.array(img_data_list)
img_data = img_data.astype('float32')
img_data /= 255
moment_data = np.array(moment_list)
moment_data = moment_data.astype('float32')
moment_data /= 255


In [None]:
## Label one hot encoding ##
labels = np.array(labels_list)
print(np.unique(labels,return_counts=True))
Y = np_utils.to_categorical(labels, num_classes)

In [None]:
## Colour channel expansion ##
if num_channel==1:
    if K.common.image_dim_ordering()=='th':
        img_data= np.expand_dims(img_data, axis=1) 
        moment_data= np.expand_dims(moment_data, axis=1) 
        print (img_data.shape)
    else:
        img_data= np.expand_dims(img_data, axis=3) 
        moment_data= np.expand_dims(moment_data, axis=3) 
        print (img_data.shape)
        print (moment_data.shape)

In [None]:
#Shuffle the dataset
x,y = shuffle(img_data,Y, random_state=2)
x2,y2 = shuffle(moment_data,Y, random_state=2)

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=2)
X_train2, X_test2, y_train2, y_test2 = train_test_split(x2, y2, test_size=0.1, random_state=2)


In [None]:
## Convert train and test set to RGB
X_train = tf.convert_to_tensor(X_train)
X_train = tf.image.grayscale_to_rgb(X_train)

X_test = tf.convert_to_tensor(X_test)
X_test = tf.image.grayscale_to_rgb(X_test)

In [None]:
proto_tensor_train = tf.make_tensor_proto(X_train)
X_train = tf.make_ndarray(proto_tensor_train)
print(X_train.shape)

proto_tensor_val = tf.make_tensor_proto(X_test)
X_test = tf.make_ndarray(proto_tensor_val)
print(X_test.shape)

In [None]:
## Initialization of pre-trained ResNet50
from keras.applications import ResNet50

# include top should be False to remove the softmax layer
pretrained_model = ResNet50(include_top=False, weights='imagenet')

output = pretrained_model.layers[-1].output
output = keras.layers.GlobalAveragePooling2D()(output)
pretrained_model = keras.Model(pretrained_model.input, output)
pretrained_model.summary()

In [None]:
## Features extraction ##
from keras.utils import to_categorical

vgg_features_train = pretrained_model.predict(X_train)
vgg_features_val = pretrained_model.predict(X_test)

vgg_features_train = vgg_features_train.reshape(2052,2048,1)
vgg_features_val = vgg_features_val.reshape(228,2048,1)

In [None]:
# Reshape moment invariant (training) values into 1 dimension
X_train2_1 = X_train2
Moment_Inv = X_train2_1.reshape(2052,7,1)
Resnet_MI = K.concatenate([vgg_features_train, Moment_Inv],axis=1)

# Reshape moment invariant (testing) values into 1 dimension
X_test2_1 = X_test2
Moment_Inv_val = X_test2_1.reshape(228,7,1)
Resnet_MI_val = K.concatenate([vgg_features_val, Moment_Inv_val],axis=1)



In [None]:
import tensorflow as tf
proto_tensor = tf.make_tensor_proto(Resnet_MI)
input_array = tf.make_ndarray(proto_tensor)
input_array = input_array.reshape(2052, 2055,1)
input_array_shape = input_array[0].shape
print(input_array_shape.shape)

proto_tensor_val = tf.make_tensor_proto(Resnet_MI_val)
input_array_val = tf.make_ndarray(proto_tensor_val)
input_array_val = input_array_val.reshape(228, 2055,1)



In [None]:
from keras.layers import Conv2D, MaxPool2D, Flatten, Dense, InputLayer, BatchNormalization, Dropout, GlobalAveragePooling2D, Activation

# Fully Connected layer
model_fusion = Sequential()
model_fusion.add(Flatten(input_shape=(2055,1)))
model_fusion.add(Dense(256, activation='relu'))
model_fusion.add(Dropout(0.5))
model_fusion.add(BatchNormalization())
model_fusion.add(Dense(2, activation='sigmoid'))


# sgd = SGD(lr=0.01, decay= 1e-8, momentum= 0.4, nesterov=True)
# model_fusion.compile(loss='binary_crossentropy', optimizer=sgd, metrics=["accuracy"])

# sgd = SGD(lr=0.00004, decay= 1e-7, momentum= 0.4, nesterov=True)
# model_fusion.compile(loss='binary_crossentropy', optimizer=sgd, metrics=["accuracy"])

anne = ReduceLROnPlateau(monitor='val_accuracy', factor=0.3, patience=5, verbose=1, min_lr=1e-3)
checkpoint = ModelCheckpoint('model/Resnet_Segment_MI.h5', verbose=1, save_best_only=True)

model_fusion.compile(loss='binary_crossentropy', optimizer=keras.optimizers.adam(lr=0.0004, decay= 1e-5), metrics=['accuracy'])
model_fusion.summary()
Fusion = model_fusion.fit(input_array, y_train, batch_size = 224 , epochs=100, verbose=1, validation_data=(input_array_val, y_test),callbacks=[anne, checkpoint])

In [None]:
## Saving training acc and loss to CSV
hist_df = pd.DataFrame(hist.history) 

hist_csv_file = 'history/ResNet_MI_Seg_history.csv'
with open(hist_csv_file, mode='w') as f:
    hist_df.to_csv(f)
    
history = pd.read_csv("history/ResNet_MI_Seg_history.csv")
val_loss = history['val_loss'].to_numpy()
val_acc = history['val_accuracy'].to_numpy()
loss = history['loss'].to_numpy()
acc = history['accuracy'].to_numpy()

In [None]:
## Plot Acc vs Loss ##
plt.plot(acc)
plt.plot(loss)
plt.plot(val_acc)
plt.plot(val_loss)
plt.title('Training Accuracy vs Epochs (ResNet+MI+Segmentation) ', fontsize=20)
plt.yticks(fontsize=20)
plt.ylabel('Accuracy (%)', fontsize=20)
plt.xticks(fontsize=20)
plt.xlabel('Epoch',fontsize=20)
plt.legend(['Train_acc', 'Train_loss', 'Val_acc', 'Val_loss'], loc='best', fontsize=22)
plt.gcf().set_size_inches(12,10) 
#plt.savefig('C:/Users/CooL/Desktop/Result/All Training vs Loss/ResNet_MI_train_graph.jpg')
plt.show()

In [None]:
### Testing phase ###

PATH = os.getcwd()
# Define data path
data_path = PATH + '/Testing_Set'
data_dir_list = os.listdir(data_path)

import seaborn as sns
import matplotlib.pyplot as plt   
from sklearn.metrics import confusion_matrix, roc_curve

moment_list2 = []
test_list = []
y_pred = []
labels_name ={'Covid':0,'non-Covid':1}

#ret,thresh3 = cv2.threshold(input_img,200,255,cv2.THRESH_TRUNC)

for dataset in data_dir_list:
    img_list=os.listdir(data_path+'/'+ dataset)
    label_data = labels_name[dataset]
    for img in img_list:
        im = cv2.imread(data_path + '/'+ dataset + '/'+ img, 0 )
        im3 = cv2.imread(data_path + '/'+ dataset + '/'+ img, 0 )
        h,w = im.shape[0:2]
        mask = np.zeros(im.shape, dtype=np.uint8)

        left = int(np.round(h/(2)))
        right = int(np.round(h/(2)))
        for y in range(left-int(h/5.2), left+int(h/3.9)):
            for x in range(0, 4):
                im[y][x] = 255

        for y in range(right-int(h/5.2), right+int(h/3.9)):
                for x in range(w-4, w):
                    im[y][x] = 255

        ## Binary image ##
        binary = im < 150

        ## Remove the blobs connected to the border of the image ##
        cleared = clear_border(binary)

        ## Label image ##
        label_image = label(cleared)


        ## Keep the labels with 2 largest areas ##
        areas = [r.area for r in regionprops(label_image)]
        areas.sort()


        if len(areas) > 2:
            for region in regionprops(label_image):
                if region.area <= 5:
                    for coordinates in region.coords: 
                        label_image[coordinates[0], coordinates[1]] = 0
                        mask[coordinates[0], coordinates[1]] = 255


        large_binary = label_image > 0  

        ## Erosion operation with a disk of radius 2. This operation is seperate the lung nodules attached to the blood vessels ##
        selem = disk(0)
        binary_eroded = binary_erosion(large_binary, selem)


        ## Closure operation with a disk of radius 10. This operation is to keep nodules attached to the lung wall ##
        selem = disk(10)
        binary_closed = binary_closing(binary_eroded, selem)

        start_point = (0, 0) 
        end_point = (0,h) 
        color = (0,0,0) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point, end_point, color, 5)

        start_point2 = (w, 0) 
        end_point2 = (w,h) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point2, end_point2, color, 5)

        start_point2 = (0, h) 
        end_point2 = (w,h) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point2, end_point2, color, 5)

        start_point2 = (0, 0) 
        end_point2 = (w,0) 
        binary_closed = cv2.line(np.float32(binary_closed), start_point2, end_point2, color, 5)


        ## Fill in the small holes inside the binary mask of lungs ##
        edges = roberts(binary_closed)
        hole_closed = ndi.binary_fill_holes(edges)

        ## Superimpose the binary mask on the input image. ##
        ZERO_VALUE = 0
        get_high_vals = (hole_closed == 0)
        im[get_high_vals] = ZERO_VALUE # minimum value
        
        input_img_resize = cv2.resize(im,(224,224))
        input_img_resize_gray = cv2.resize(im3,(224,224))
        moments = cv2.moments(input_img_resize_gray)
        huMoments = cv2.HuMoments(moments)   
        moment_list2.append(huMoments)
        test_list.append(input_img_resize)
        y_pred.append(label_data)


print(len(moment_list2))
test_image = np.array(test_list)
test_image = test_image.astype('float32')
test_image /= 255
moment_data = np.array(moment_list2)
moment_data = moment_data.astype('float32')
moment_data /= 255


if num_channel==1:
    if K.common.image_dim_ordering()=='th':
        test_image= np.expand_dims(test_image, axis=1) 
        moment_data= np.expand_dims(moment_data, axis=1) 
        print (test_image.shape)
    else:
        test_image= np.expand_dims(test_image, axis=3) 
        moment_data= np.expand_dims(moment_data, axis=3) 
        print (test_image.shape)
        print (moment_data.shape)



## Convert train and test set to RGB
test_image = tf.convert_to_tensor(test_image)
test_image = tf.image.grayscale_to_rgb(test_image)

proto_tensor_val = tf.make_tensor_proto(test_image)
test_image = tf.make_ndarray(proto_tensor_val)

test_features = pretrained_model.predict(test_image)
test_features = test_features.reshape(200,2048,1)

# Reshape moment invariant (testing) values into 1 dimension
test_data = moment_data
test_data = test_data.reshape(200,7,1)
Test_MI = K.concatenate([test_features, test_data],axis=1)

proto_tensor = tf.make_tensor_proto(Test_MI)
input_array_test = tf.make_ndarray(proto_tensor)
input_array_test = input_array_test.reshape(200, 2055, 1)



predict_list = []
predict_1 = model_fusion.predict(input_array_test)

fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_pred, predict_1[:,1])

from sklearn.metrics import auc
import pandas as pd
auc_keras = auc(fpr_keras, tpr_keras)
optimum = thresholds_keras[np.argmax(tpr_keras - fpr_keras)]
print('Optimal cut off point = ',optimum)
print('AUC = ', auc_keras)

gmeans = np.sqrt(tpr_keras * (1-fpr_keras))
ix = np.argmax(gmeans)
optimal = thresholds_keras[ix]
print('Optimal cut off point = ',thresholds_keras[ix])

x = optimal # Best => x = 0.98892

for j in range(0,1):
    for i in range(0, len(predict_1)):
        if predict_1[i][1] >= x:
            predict_list.append(1)
        else:
            predict_list.append(0)
    print(confusion_matrix(y_pred, predict_list), 'x =',"%.5f" % x, '\n')
    

#### Confusion matrix
cf_matrix = confusion_matrix(y_pred, predict_list)

tp = cf_matrix[0][0]
fn = cf_matrix[0][1]
fp = cf_matrix[1][0]
tn = cf_matrix[1][1]

TrueP = '{0:.2%}'.format(tp/(tp+fn))
FalseN = '{0:.2%}'.format(fn/(tp+fn))
FalseP = '{0:.2%}'.format(fp/(fp+tn))
TrueN = '{0:.2%}'.format(tn/(fp+tn))
print(TrueP, FalseN, FalseP, TrueN)

xy_Labels = ['Covid', 'non-Covid']
group_names = ['True Positive', 'False Negative', 'False Positive','True Negative']
group_counts = ['{0:0.0f}'.format(value) for value in cf_matrix.flatten()]
group_percentage = [TrueP, FalseN, FalseP, TrueN]
labels = [f"{v1}\n{v2}\n{v3}" for v1, v2, v3 in zip(group_names,group_counts,group_percentage)]
labels = np.asarray(labels).reshape(2,2)

### Confusion Matrix (ResNet + MI + Segmentation) ###
plt.title('(Segmentation + Moment Invariant + ResNet-50)', fontsize = 15)
sns.heatmap(cf_matrix, fmt='', annot=labels, annot_kws={'size':18} ,xticklabels=xy_Labels, yticklabels=xy_Labels, cmap='RdPu')
plt.xlabel('Predicted label', fontsize = 15) # x-axis label with fontsize 15
plt.ylabel('True label', fontsize = 15) # y-axis label with fontsize 15
plt.savefig('C:/Users/CooL/Desktop/Result/Resnet_segmented/Moment+ Segmentation/Resnet_Seg_MI_CF.jpg')
plt.show()

### ROC Curves (ResNet + MI + Segmentation) ###
plt.figure(figsize=(6,6))
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras, label='Keras (area = {:.3f})'.format(auc_keras))
plt.scatter(fpr_keras[ix],tpr_keras[ix], marker='o', color='black', label='Optimal Threshold = {:.4f})'.format(optimal))
plt.xlabel('False positive rate (FPR)', fontsize=15)
plt.ylabel('True positive rate (TPR)', fontsize=15)
plt.title('ROC (ResNet-50 + Segmentaion + MI)',fontsize=15)
plt.legend(loc='best', fontsize=15)
plt.savefig('C:/Users/CooL/Desktop/Result/Resnet_segmented/Moment+ Segmentation/Resnet_Seg_MI_graph.jpg')
