In [None]:
import tensorflow as tf
import plotly.graph_objects as go
import json
from tensorflow.keras import layers
import cv2
import numpy as np
from PIL import Image
import tensorflow.keras as K
import matplotlib.pyplot as plt
from skimage.transform import resize
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from keras.preprocessing import image

In [None]:
class MyThresholdCallback(tf.keras.callbacks.Callback):
    def __init__(self, threshold):
        super(MyThresholdCallback, self).__init__()
        self.threshold = threshold

    def on_epoch_end(self, epoch, logs=None):
        val_acc = logs["val_accuracy"]
        train_acc = logs["accuracy"]
        if val_acc >= self.threshold and train_acc >= self.threshold:
            self.model.stop_training = True

def plot_training(history_dict, model_name, dataset_name):
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=[x for x in range(NUM_EPOCHS)],
                             y=history_dict.history['accuracy'],
                             mode='lines',
                             name='Training',
                             ))
#     fig.add_trace(go.Scatter(x=[x for x in range(NUM_EPOCHS)],
#                              y=history_dict.history['val_accuracy'],
#                              mode='lines',
#                              name='Validation'))
    fig.update_layout(title=model_name, xaxis_title='Epoch', yaxis_title='Accuracy')
    fig.write_html(f"history_plot_{model_name}_{dataset_name}.html")

def densenet201():
    base_model = tf.keras.applications.DenseNet201(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    outputs = base_model.output
    outputs = tf.keras.layers.GlobalAveragePooling2D()(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    outputs = tf.keras.layers.Dense(256, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(l1=1e-4, l2=1e-4))(outputs)
    outputs = tf.keras.layers.Dropout(0.5)(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    outputs = tf.keras.layers.Dense(128, activation='relu', kernel_initializer='he_uniform')(outputs)
    outputs = tf.keras.layers.Dropout(0.3)(outputs)
    predicts = layers.Dense(2, activation="softmax")(outputs)

    final_model = tf.keras.Model(inputs=base_model.input, outputs=predicts, name='densenet201')
    return final_model

def vgg16():
    base_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    outputs = base_model.output
    outputs = layers.AveragePooling2D((2, 2), strides=(2, 2))(outputs)
    outputs = layers.Flatten(name="flatten")(outputs)
    outputs = layers.Dense(1024, activation="relu")(outputs)
    outputs = layers.Dropout(0.2)(outputs)
    outputs = layers.Dense(512, activation="relu")(outputs)
    outputs = layers.Dropout(0.2)(outputs)
    outputs = layers.Dense(64, activation="relu")(outputs)
    predicts = layers.Dense(2, activation="softmax")(outputs)

    final_model = tf.keras.Model(inputs=base_model.input, outputs=predicts, name='vgg16')
    return final_model

def vgg19():
    base_model = tf.keras.applications.VGG19(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    outputs = base_model.output
    outputs = layers.AveragePooling2D((2, 2), strides=(2, 2))(outputs)
    outputs = layers.Flatten(name="flatten")(outputs)
    outputs = layers.Dense(1024, activation="relu")(outputs)
    outputs = layers.Dropout(0.2)(outputs)
    outputs = layers.Dense(512, activation="relu")(outputs)
    outputs = layers.Dropout(0.2)(outputs)
    outputs = layers.Dense(64, activation="relu")(outputs)
    predicts = layers.Dense(2, activation="softmax")(outputs)

    final_model = tf.keras.Model(inputs=base_model.input, outputs=predicts, name='vgg19')
    return final_model

def xception():
    base_model = tf.keras.applications.Xception(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    outputs = base_model.output
    outputs = layers.experimental.preprocessing.RandomRotation((-0.15, 0.15))(outputs)
    outputs = layers.experimental.preprocessing.RandomFlip('horizontal')(outputs)
    outputs = layers.GlobalMaxPooling2D()(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    outputs = tf.keras.layers.Dense(units=512, activation='relu', kernel_initializer='he_uniform', kernel_regularizer=tf.keras.regularizers.L1L2())(outputs)
    outputs = tf.keras.layers.GaussianDropout(0.5)(outputs)
    outputs = tf.keras.layers.Dense(units=128, activation='relu')(outputs)
    outputs = tf.keras.layers.GaussianDropout(0.5)(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    predicts = layers.Dense(2, activation="softmax")(outputs)

    final_model = tf.keras.Model(inputs=base_model.input, outputs=predicts, name='xception')
    return final_model

def inceptionv3():
    base_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    outputs = base_model.output
    outputs = tf.keras.layers.GlobalAveragePooling2D()(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    outputs = tf.keras.layers.Dense(312, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(l1=2e-5, l2=1e-4))(outputs)
    outputs = tf.keras.layers.Dropout(0.5)(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    outputs = tf.keras.layers.Dense(128, activation='relu', kernel_initializer='he_uniform')(outputs)
    outputs = tf.keras.layers.Dropout(0.3)(outputs)
    predicts = layers.Dense(2, activation="softmax")(outputs)

    final_model = tf.keras.Model(inputs=base_model.input, outputs=predicts, name='inceptionv3')
    return final_model

def mobilenetv2():
    base_model = tf.keras.applications.MobileNetV2(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    outputs = base_model.output
    outputs = layers.AveragePooling2D((2, 2), strides=(2, 2))(outputs)
    outputs = layers.Flatten(name="flatten")(outputs)
    outputs = layers.Dense(1024, activation="relu")(outputs)
    outputs = layers.Dropout(0.2)(outputs)
    outputs = layers.Dense(512, activation="relu")(outputs)
    outputs = layers.Dropout(0.2)(outputs)
    outputs = layers.Dense(64, activation="relu")(outputs)
    predicts = layers.Dense(2, activation="softmax")(outputs)

    final_model = tf.keras.Model(inputs=base_model.input, outputs=predicts, name='mobilenetv2')
    return final_model

def resnet50v2():
    base_model = tf.keras.applications.ResNet50V2(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    outputs = base_model.output
    outputs = tf.keras.layers.GlobalAveragePooling2D()(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    outputs = tf.keras.layers.Dense(512, activation='relu', kernel_regularizer=tf.keras.regularizers.L1L2(l1=1e-4, l2=1e-4))(outputs)
    outputs = tf.keras.layers.Dropout(0.5)(outputs)
    outputs = tf.keras.layers.BatchNormalization()(outputs)
    outputs = tf.keras.layers.Dense(128, activation='relu', kernel_initializer='he_uniform')(outputs)
    outputs = tf.keras.layers.Dropout(0.3)(outputs)
    predicts = layers.Dense(2, activation="softmax")(outputs)

    final_model = tf.keras.Model(inputs=base_model.input, outputs=predicts, name='resnet50v2')
    return final_model


In [None]:
# Constants and Parameters
IMG_SIZE = (224, 224)
NUM_EPOCHS = 15
SPLIT = 0.15
ACCURACY_THRESHOLD = 0.99
DATASET_NAME = ''
ROOT_DATASETS_PATH = r'C:\Users\504\Documents'
TRAIN_DATASET_PATH = fr'{ROOT_DATASETS_PATH}\{DATASET_NAME}\train'
VAL_DATASET_PATH = fr'{ROOT_DATASETS_PATH}\{DATASET_NAME}\val'
TEST_DATASET_PATH = fr'{ROOT_DATASETS_PATH}\{DATASET_NAME}\test'

#acc_callback = MyThresholdCallback(ACCURACY_THRESHOLD)

In [None]:
# Data pipeline
def make_generators(dataset_name):
    DATASET_NAME = dataset_name
    TRAIN_DATASET_PATH = fr'{ROOT_DATASETS_PATH}\{DATASET_NAME}\train'
    #VAL_DATASET_PATH = fr'{ROOT_DATASETS_PATH}\{DATASET_NAME}\val'
    train_dataset = tf.keras.preprocessing.image_dataset_from_directory(directory=TRAIN_DATASET_PATH,
                                                                    shuffle=True,
                                                                    validation_split=SPLIT,
                                                                    image_size=IMG_SIZE,
                                                                    color_mode="rgb", label_mode="categorical",
                                                                    seed=15,
                                                                    subset="training", batch_size=32)

    val_dataset = tf.keras.preprocessing.image_dataset_from_directory(directory=TRAIN_DATASET_PATH,
                                                                        shuffle=True,
                                                                        validation_split=SPLIT,
                                                                        image_size=IMG_SIZE,
                                                                        color_mode="rgb", label_mode="categorical",
                                                                        seed=15,
                                                                        subset="validation", batch_size=32)

    DATASET_NAME = ''
    return (train_dataset, val_dataset)

In [None]:
# Compilation and training
models_list = [densenet201(), vgg16(), vgg19(), xception(), inceptionv3(), mobilenetv2(), resnet50v2()]
datasets_list = ['CovidX', 'SaoPaulo', 'UCSD']
# trained_models_list = dict()
final_pred = 0.0
i = 0
for dataset in datasets_list:
  
    train_datagen, val_datagen = make_generators(dataset)
    for model in models_list:
        model.compile(optimizer='nadam',
                      loss='categorical_crossentropy',
                      metrics=['accuracy',
                               tf.keras.metrics.Precision(name='prec'),
                               tf.keras.metrics.Recall(name='recall'),
                               tf.keras.metrics.TruePositives(name='tp'),
                               tf.keras.metrics.TrueNegatives(name='tn'),
                               tf.keras.metrics.FalsePositives(name='fp'),
                               tf.keras.metrics.FalseNegatives(name='fn')])
        with tf.device('/device:GPU:0'):
            history = model.fit(train_datagen.take(10),
                                validation_data=val_datagen.take(10),
                                epochs=NUM_EPOCHS,
                                batch_size=32)
        final_pred += model.predict_proba(val_datagen)
        i+=1
        print(final_pred)
        #plot_training(history, model.name, dataset)
        #model.save_weights(fr'/saved_weights/{model.name}' + f'[{dataset}]')
        #model.save(fr'/saved_models/{model.name}' + f'[{dataset}]')
        print('\n\n\n')
        print(model.name + f'[{dataset}]')
        print('\n\n\n')
        model_enc = model.name + f'[{dataset}]'
        trained_models[model_enc] = [model, dataset]
final_pred /= i
print(final_pred)

In [None]:
def make_test_generator(dataset_name):
    DATASET_NAME = dataset_name
    TEST_DATASET_PATH = fr'{ROOT_DATASETS_PATH}/{DATASET_NAME}/test'
    test_dataset = tf.keras.preprocessing.image_dataset_from_directory(directory=TEST_DATASET_PATH,
                                                                    shuffle=True,
                                                                    image_size=IMG_SIZE,
                                                                    color_mode="rgb", label_mode="categorical",
                                                                    batch_size=32)
    return test_dataset

In [None]:
# Evaluation
metrics_dictionary = dict()
for model in trained_models:
    if trained_models[model][1] not in datasets_list:
        continue
    test_datagen = make_test_generator(trained_models[model][1])
    metrics = trained_models[model][0].evaluate(test_datagen, return_dict=True)
    metrics_dictionary[model] = metrics
json.dump(metrics_dictionary, open(r'logs_dictionary.json', 'w'))

In [None]:
assert tf.test.is_gpu_available()
assert tf.test.is_built_with_cuda()
tf.config.list_physical_devices()