In [None]:
from os import chdir,listdir
import glob
%tensorflow_version 2.x
import tensorflow as tf
import cv2
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils.class_weight import compute_class_weight
from skimage.color import rgb2gray
from sklearn.metrics import accuracy_score
import pickle
from sklearn.preprocessing import LabelEncoder
from google.colab import drive
import tensorflow.keras.backend as K
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Flatten, Dropout, BatchNormalization,Concatenate
from tensorflow.keras.layers import Conv2D, SeparableConv2D, MaxPool2D, LeakyReLU, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.models import load_model
from sklearn.metrics import plot_roc_curve, roc_curve, auc
from scikitplot.metrics import plot_confusion_matrix,plot_roc
from scipy import interp
from itertools import cycle
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score,classification_report
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score,classification_report



In [None]:
def rgb2gray(rgb):
    return np.dot(rgb[...,:3], [0.2989, 0.5870, 0.1140])

def transform_images(images: np.ndarray):
    """
    Transform images to [-1, 1]
    """
    images = 2*images.astype(np.float32) -1
    return images

def load_covid_data(image_size=150, path='../Data/train',shuffle=False,class_frequency=False):
    
    size = image_size
    files = listdir(path)
    X = []
    Y = []
    
    for direct in files:
        files_in_folder = glob.glob(path+'/'+direct+'/*.jpg')
        for file in files_in_folder:
            data = plt.imread(file)
            data = cv2.resize(data, (size, size))
            data = data.astype('float32') / 255
            if len(data.shape)>2 and data.shape[2] == 3:
                data = rgb2gray(data)
            if len(data.shape)>2 and data.shape[2] == 4:
                data = cv2.cvtColor(data, cv2.COLOR_BGRA2BGR)
                data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
                data = rgb2gray(data)
            X.append(data)       
            Y.append(direct)
    
    print(len(X))
    X = np.array(X).astype(float)
    X = transform_images(X)
    X = X[:, :, :, None]
    
    le=LabelEncoder()
    Y=le.fit_transform(Y)
    Y=np.array(Y).astype(float)
    Y=to_categorical(Y,len(files))

    if shuffle:
       idx = np.random.choice(len(X),size=len(X), replace=False)
       X=X[idx,:,:]
       Y=Y[idx,:]
    if class_frequency:
       classes =le.inverse_transform(np.argmax(Y,axis=1).astype(int))
       unique, counts = np.unique(classes, return_counts=True)
       counts=np.array(counts)
       plt.bar(unique,counts)
       plt.title('Class Frequency(Percent)')
       plt.xlabel('Class')
       plt.ylabel('Frequency')
       plt.show()
    
    return X, Y

image_size=150
x_train, y_train=load_covid_data(image_size, path='Data/train',shuffle=True,class_frequency=True)
x_test,y_test=load_covid_data(image_size, path='Data/test')

#x_train_gan,y_train_gan=load_covid_data(image_size, path='drive/My Drive/Chest_Covid/Data_Gan')

class_weights = compute_class_weight('balanced',np.unique(np.argmax(y_train,axis=1)),np.argmax(y_train,axis=1))
class_weights={0:class_weights[0],
               1:class_weights[1],
               2:class_weights[2]}
#x_train_augmented=np.concatenate((x_train,x_train_gan),axis=0)
#y_train_augmented=np.concatenate((y_train,y_train_gan),axis=0)

#unique, counts = np.unique(np.argmax(y_train_augmented,axis=1).astype(int), return_counts=True)
#counts=np.array(counts)
#plt.bar(unique,counts)
#plt.title('Class Frequency(Percent)')
#plt.xlabel('Class')
#plt.ylabel('Frequency')
#plt.show()


def create_dataset(augmentation,BATCH_SIZE=32):
  if augmentation:
     train_dataset = tf.data.Dataset.from_tensor_slices((x_train_augmented, y_train_augmented))
     train_dataset = train_dataset.batch(BATCH_SIZE)
     train_dataset = train_dataset.cache()
     train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)

  else:
     train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
     train_dataset = train_dataset.batch(BATCH_SIZE)
     train_dataset = train_dataset.cache()
     train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)

  validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
  validation_dataset = validation_dataset.batch(BATCH_SIZE)
  validation_dataset = validation_dataset.cache()
  validation_dataset = validation_dataset.prefetch(tf.data.experimental.AUTOTUNE)

  return train_dataset,validation_dataset

train_dataset,validation_dataset=create_dataset(augmentation=False,BATCH_SIZE=128)
#train_dataset_augmented,validation_dataset_augmented=create_dataset(augmentation=True,BATCH_SIZE=32)




In [None]:
#new_idea
image_size=150
def get_dropout(input_tensor, rate, mc=False):
    if mc:
        return Dropout(rate=rate)(input_tensor, training=True)
    else:
        return Dropout(rate=rate)(input_tensor)

def get_model(mc,lr=0.00005):
    inputs = Input(shape=(image_size, image_size, 1))
    input2=tf.stack([inputs, inputs, inputs],axis=3)[:,:,:,:,0]
    vgg_model = tf.keras.applications.VGG16(weights='imagenet',
                  include_top=False,
                  input_shape=(image_size, image_size, 3))
    vgg_model.trainable=False

    Vgg_feature=vgg_model(input2)
    # First conv block
    conv1 = Conv2D(filters=16, kernel_size=(3, 3), activation='relu', padding='same')(inputs)
    conv1 = Conv2D(filters=16, kernel_size=(3, 3), activation='relu', padding='same')(conv1)
    conv1 = MaxPool2D(pool_size=(2, 2))(conv1)

   #Second conv block
    conv2 = SeparableConv2D(filters=32, kernel_size=(3, 3), activation='relu', padding='same')(conv1)
    conv2 = SeparableConv2D(filters=32, kernel_size=(3, 3), activation='relu', padding='same')(conv2)
    conv2 = BatchNormalization()(conv2)
    conv2 = MaxPool2D(pool_size=(2, 2))(conv2)
   # Third conv block
    conv3 = SeparableConv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same')(conv2)
    conv3 = SeparableConv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same')(conv3)
    conv3 = BatchNormalization()(conv3)
    conv3 = MaxPool2D(pool_size=(2, 2))(conv3)

  # Fourth conv block
    conv4 = SeparableConv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same')(conv3)
    conv4 = SeparableConv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same',name='target_layer')(conv4)
    conv4 = BatchNormalization()(conv4)
    conv4 = MaxPool2D(pool_size=(2, 2))(conv4)
    conv4=get_dropout(conv4, rate=0.2, mc=mc)

   # Fifth conv block
    conv5 = SeparableConv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same')(conv4)
    conv5 = SeparableConv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same')(conv5)
    conv5 = BatchNormalization()(conv5)
    conv5 = MaxPool2D(pool_size=(2, 2))(conv5)
    conv5=get_dropout(conv5, rate=0.2, mc=mc)

    concatenated_tensor = Concatenate(axis=1)([Flatten()(conv3), Flatten()(conv4), Flatten()(conv5),Flatten()(Vgg_feature)])

    # FC layer
    x = Flatten()(concatenated_tensor)
    x = Dense(units=512, activation='relu')(x)

    x=get_dropout(x, rate=0.7, mc=mc)
    x = Dense(units=128, activation='relu')(x)
    x=get_dropout(x, rate=0.5, mc=mc)
    x = Dense(units=64, activation='relu')(x)
    x=get_dropout(x, rate=0.3, mc=mc)
   # Output layer
    output = Dense(3, activation='softmax')(x)
    METRICS = [
      tf.keras.metrics.Precision(name='precision'),
      tf.keras.metrics.Recall(name='recall'),
      tf.keras.metrics.AUC(name='auc')]
   # Creating model and compiling
    model = Model(inputs=inputs, outputs=output)
    adam = tf.keras.optimizers.Adam(lr=lr)
    model.compile(optimizer=adam, loss='categorical_crossentropy', metrics=['accuracy',METRICS])
    # Callbacks
    if mc:
      mcheck = ModelCheckpoint('model_covid_mc.h5', monitor='val_accuracy', mode='max', verbose=1, save_best_only=True)
    else:
      mcheck = ModelCheckpoint('model_covid_simple.h5', monitor='val_accuracy', mode='max', verbose=1, save_best_only=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_accuracy', factor=0.8,verbose=1,patience=5)
    es = EarlyStopping(monitor='val_accuracy', mode='max', verbose=0, patience=30)
    callbacks=[reduce_lr,es,mcheck]

    return model,callbacks

mc_model,mc_callbacks=get_model(mc=True)
simple_model,simple_callbacks=get_model(mc=False)



In [None]:
train_dataset,validation_dataset=create_dataset(augmentation=False,BATCH_SIZE=32)
train_dataset_augmented,validation_dataset=create_dataset(augmentation=False,BATCH_SIZE=128)
mc_model,mc_callbacks=get_model(mc=True,lr=5e-4)
hist_mc = mc_model.fit(train_dataset_augmented,epochs=200,validation_data=validation_dataset,
                          class_weight=class_weights,callbacks=mc_callbacks)

In [None]:
mc_model=load_model('Chest_Covid/model_covid_mc.h5')

mean_coef=[1,1,1,1,1,1,1,1]
std_coef=[1e-4,1e-3,1e-2,1e-1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1,1.1,
          1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,2]

def mode_robustness(x_test,y_test,model,std_coef):
  result=[]
  for std in std_coef:
    #for i in range(len(np.unique(y_test.argmax(axis=1)))):
        #idx=np.where(np.argmax(y_test,axis=1)==i)[0]
        #y_class=y_test[idx,:]
        #x_class=x_test[idx,:,:,:]
        y_class=y_test
        x_class=x_test
        mean=np.mean(x_class)*0
        sigma=np.std(x_class)*std
        gaussian = np.random.normal(mean, sigma, x_class.shape)
        x_class=x_class+gaussian
        y_p_class=model.predict(x_class)
        acc = accuracy_score(np.argmax(y_class,axis=1), np.argmax(y_p_class,axis=1))
        precision = precision_score(np.argmax(y_class,axis=1), np.argmax(y_p_class,axis=1),average='weighted')
        recall = recall_score(np.argmax(y_class,axis=1), np.argmax(y_p_class,axis=1),average='weighted')
        F1=(2*precision*recall)/(precision+recall)
        result.append([std,acc,precision,recall,F1])
  
  return result    

result=mode_robustness(x_test,y_test,mc_model,std_coef)
print(result)

In [None]:
import matplotlib 
matplotlib.rc('xtick', labelsize=22) 
matplotlib.rc('ytick', labelsize=22)
plt.rcParams.update({'font.size': 22})

def plot_roc_handy(y_test,y_score,lw=2,Name='Roc',calss_name=['COVID19','Normal','Pneumonia'],zoom=False):
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    #y_score=np.array(mc_predictions).mean(axis=0)
    for i in range(int(y_test.shape[1])):
        fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_score[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), y_score.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(int(y_test.shape[1]))]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(int(y_test.shape[1])):
        mean_tpr += interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= int(y_test.shape[1])
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])
    
    f, ax = plt.subplots(figsize=[15, 15])
    #f =plt.figure(figsize=(15, 15))
    ax.plot(fpr["micro"], tpr["micro"],
         label='micro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["micro"]),
         color='deeppink', linestyle=':', linewidth=4)
    ax.plot(fpr["macro"], tpr["macro"],
         label='macro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["macro"]),
         color='navy', linestyle=':', linewidth=4)
    colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])
    for i, color in zip(range(int(y_test.shape[1])), colors):
       ax.plot(fpr[i], tpr[i], color=color, lw=lw,
             label='ROC curve of class {0} (area = {1:0.2f})'
             ''.format(calss_name[i], roc_auc[i]))
    ax.plot([0, 1], [0, 1], 'k--', lw=lw)
    ax.set_xlim([0.001, 1.0])
    ax.set_ylim([0, 1.05])
    ax.set_xlabel('False Positive Rate',fontsize=22)
    ax.set_ylabel('True Positive Rate',fontsize=22)
    ax.set_title(Name,fontsize=22)
    ax.legend(loc="lower right")
    
    # inset axes....
    if zoom:
       axins = ax.inset_axes([0.3, 0.5, 0.4, 0.4])
       for i, color in zip(range(int(y_test.shape[1])), colors):
           if fpr[i].all()<0.2 and tpr[i].all()<0.95:
              axins.plot(fpr[i], tpr[i], color=color, lw=lw,
               label='ROC curve of class {0} (area = {1:0.2f})'
                ''.format(calss_name[i], roc_auc[i]))
               
              axins.plot(fpr["micro"], tpr["micro"],
                label='micro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["micro"]),
               color='deeppink', linestyle=':', linewidth=4)
              
              axins.plot(fpr["macro"], tpr["macro"],
                 label='macro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["macro"]),
                color='navy', linestyle=':', linewidth=4)
               
       #x1, x2, y1, y2 = 0, 0.2, 0.96, 0.96
       x1, x2, y1, y2 = 0.01, 0.3, 0.9, 1.01
       axins.set_xlim(x1, x2)
       axins.set_ylim(y1, y2)
       axins.set_xticklabels('')
       axins.set_yticklabels('')
       ax.indicate_inset_zoom(axins)

    plt.show()
    f.savefig('{}.pdf'.format(Name))
    ax.figure.savefig("{}.pdf".format(Name), bbox_inches='tight')
#y_p
#np.array(mc_predictions).mean(axis=0)
plot_roc_handy(y_test,y_pred ,zoom=True,lw=2,Name='ROC of Random Forest (X-Ray)',
               calss_name=['COVID19','Normal','Pneumonia'])


In [None]:
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure

def plot_CM_handy(y_test,y_score,lw=2,Name='Confusion Matrix of Fusion Model without Uncertainty (X-Ray)',classes=['COVID19','Normal','Pneumonia']):
    CM=confusion_matrix(np.argmax(y_test,axis=1),np.argmax(y_score,axis=1))
    cm=CM
    cmap=plt.cm.Blues
    fig, ax = plt.subplots(figsize=(6,6),dpi=80, facecolor='w', edgecolor='k')
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.set_title(Name)
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=classes, yticklabels=classes,
           ylabel='True Label',
           xlabel='Predicted Label')
    ax.set_xticklabels(classes,fontsize=15)
    ax.set_yticklabels(classes,fontsize=15)
    ax.set_ylabel('True Label',fontsize=15)
    ax.set_xlabel('Predicted Label',fontsize=15)

    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")
    fmt = '.1f' 
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
           figure(num=None, figsize=(5, 5), dpi=80, facecolor='w', edgecolor='k')
           ax.text(j, i, format(cm[i, j], fmt),fontsize=12,
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")

    fig.tight_layout()
    fig.savefig('{}.pdf'.format(Name),dpi=300)
    ax.figure.savefig("{}.pdf".format(Name), bbox_inches='tight')
  #fig.savefig('CM.png', dpi=300)

plot_CM_handy(y_test,y_pred,
              lw=2,Name='Confusion Matrix of Decision Tree (X-Ray)',
              classes=['COVID19','Normal','Pneumonia'])

In [None]:

def evaluation(y_test,y_pred):
     acc = accuracy_score(y_test, y_pred)
     precision = precision_score(y_test, y_pred,average='weighted')
     recall = recall_score(y_test, y_pred,average='weighted')
     F1=(2*precision*recall)/(precision+recall)
     return acc,precision,recall,F1



x_train_statistical=x_train.reshape(x_train.shape[0],x_train.shape[1]*x_train.shape[2]*x_train.shape[3])
y_train_statistical=np.argmax(y_train,axis=1)
x_test_statistical=x_test.reshape(x_test.shape[0],x_test.shape[1]*x_test.shape[2]*x_test.shape[3])
y_test_statistical=np.argmax(y_test,axis=1)

classifier1=DecisionTreeClassifier(max_depth=50,class_weight='balanced')
param_grid_dt = {'max_depth':[50,100,150,200,300]}
grid = GridSearchCV(DecisionTreeClassifier(class_weight='balanced',random_state=0) ,param_grid_dt,cv=3)
classifier1.fit(x_train_statistical, y_train_statistical)
print('Best params of dt is:',grid.get_params)
classifier1=grid.best_estimator_
y_pred_dt=classifier1.predict(x_test_statistical)
acc,precision,recall,F1=evaluation(y_test_statistical,y_pred_dt)
print('Dt test results are:',evaluation(y_test_statistical,y_pred_dt))
print('Dt train results are:',evaluation(y_train_statistical,classifier1.predict(x_train_statistical)))

In [None]:
classifier2=RandomForestClassifier(max_depth=50,n_estimators=200,class_weight='balanced',random_state=0)
classifier2.fit(x_train_statistical, y_train_statistical)
#print('Best params of dt is:',grid.get_params)
#classifier1=grid.best_estimator_
y_pred_rf=classifier2.predict(x_test_statistical)
acc,precision,recall,F1=evaluation(y_test_statistical,y_pred_rf)
print('Rf test results are:',evaluation(y_test_statistical,y_pred_rf))
print('Rf train results are:',evaluation(y_train_statistical,classifier1.predict(x_train_statistical)))



In [None]:
from sklearn.metrics import plot_roc_curve, roc_curve, auc
from scikitplot.metrics import plot_confusion_matrix,plot_roc
from scipy import interp
from itertools import cycle

#plot_roc(y_test_statistical, classifier1.predict_proba(x_test_statistical),title="Decision Tree (X-Ray)")
#plot_roc(y_test_statistical, classifier2.predict_proba(x_test_statistical),title="Random Forest (X-Ray)")
plot_roc_handy(y_test, y_p,lw=2,Name='Roc of fusion model without uncertainty (X-Ray)',
               calss_name=['COVID19','Normal','Pneumonia'])

In [None]:
import tqdm

mc_model=load_model('Chest_Covid/model_covid_mc.h5')
number_prediction=200
mc_predictions = []
for i in tqdm.tqdm(range(number_prediction)):
    y_p = mc_model.predict(x_test,batch_size=256)
    mc_predictions.append(y_p)