In [None]:
import os
import random
import time

import numpy as np
import pandas as pd
import matplotlib.pylab as plt
import albumentations as A

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_addons as tfa

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential

from sklearn.metrics import *
import scikitplot as skplt

print(tf.config.list_physical_devices('GPU'))

In [None]:
def seed_all(s):
    random.seed(s)
    np.random.seed(s)
    tf.random.set_seed(s)
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
    os.environ['PYTHONHASHSEED'] = str(s) 

In [None]:
TRAINING_NAME = "CL_VGG16"
SEED = 124 
AUTOTUNE = tf.data.experimental.AUTOTUNE

IMAGE_SIZE = 224
BATCH_SIZE = 16
EPOCHS = 20
CLASS_NAMES = ['normal', 'pneumonia', 'COVID-19']

In [None]:
# Define the augmentation policies
transforms = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.Rotate(p=0.5, limit=15),
    A.RandomBrightnessContrast(p=0.5, brightness_limit=(-0.2, 0.2), contrast_limit=(-0.1, 0.1), brightness_by_max=True),
    A.RandomResizedCrop(p=0.5, height=IMAGE_SIZE, width=IMAGE_SIZE, scale=(0.9, 1.1), ratio=(0.05, 1.1), interpolation=0),
])

# Apply augmentation policies.
def aug_fn(image):
    data = {"image":image}
    aug_data = transforms(**data)
    aug_img = aug_data["image"] 
    return aug_img

# Augmentation policies
def apply_augmentation(image, label):
    aug_img = tf.numpy_function(func=aug_fn, inp=[image], Tout=tf.float32)
    aug_img.set_shape((IMAGE_SIZE, IMAGE_SIZE, 3))    
    return aug_img, label

# Preprocess image
def preprocess_data(image, label):
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = image/255.0
    image = tf.squeeze(image, 0) 
    label = tf.squeeze(label, 0) 
    return image, label

# View image from dataset
def view_image(ds, col=8, row=2, size=(25,7)):
    plt.figure(figsize=size)
    plt.subplots_adjust(wspace=0.05, hspace=0.15)
    for images, labels in ds.take(1):
        for i in range(col*row):
            img_numpy = images[i].numpy()
            ax = plt.subplot(row, col, i + 1)
            shape = str(images[i].numpy().shape)
            plt.imshow(img_numpy)
            plt.title(CLASS_NAMES[np.argmax(labels[i].numpy())])
            plt.axis("off") 
    plt.tight_layout
    return None

# Plot training history
def training_history(history):
    accuracy = history['accuracy']
    val_accuracy = history['val_accuracy']

    loss = history['loss']
    val_loss = history['val_loss']

    epochs_range = range(len(history['loss']))

    plt.figure(figsize=(32, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, accuracy, label='Training Accuracy')
    plt.plot(epochs_range, val_accuracy, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')

    plt.show()
    return None

# Parse test images
def decode_test(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.cast(img, tf.float32)
    img = tf.image.resize(img, [IMAGE_SIZE, IMAGE_SIZE], antialias=True)/255
    return img

In [None]:
seed_all(SEED)

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    '../dataset/clahe/train/',
    label_mode = 'categorical',
    class_names = CLASS_NAMES,
    batch_size = 1,
    image_size = (IMAGE_SIZE, IMAGE_SIZE),
    shuffle = True,
    seed = SEED,
    interpolation = 'bilinear'
)

valid_ds = tf.keras.preprocessing.image_dataset_from_directory(
    '../dataset/clahe/valid/',
    label_mode = 'categorical',
    class_names = CLASS_NAMES,
    batch_size = 1,
    image_size = (IMAGE_SIZE, IMAGE_SIZE),
    shuffle = True,
    seed = SEED,
    interpolation = 'bilinear'
)

train_ds = (
    train_ds.map(preprocess_data, num_parallel_calls=AUTOTUNE)
    .map(apply_augmentation, num_parallel_calls=AUTOTUNE)
    .batch(BATCH_SIZE)
    .prefetch(AUTOTUNE)
)

valid_ds = (
    valid_ds.map(preprocess_data, num_parallel_calls=AUTOTUNE)
    .batch(BATCH_SIZE)
    .prefetch(AUTOTUNE)
)

In [None]:
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout

# create the base pre-trained model
base_model = VGG16(weights='imagenet', include_top=False)

# add a global spatial average pooling layer
x = base_model.output
x = GlobalAveragePooling2D()(x)
# let's add a dropout layer
x = Dropout(0.2)(x)
# and a logistic layer -- let's say we have 200 classes
predictions = Dense(3, activation='softmax')(x)
# this is the model we will train
model = Model(inputs=base_model.input, outputs=predictions)

model.summary()

In [None]:
clr_scheduler = tfa.optimizers.CyclicalLearningRate( 
    initial_learning_rate=3e-7,  maximal_learning_rate=7e-3,
    step_size=3*(20994//BATCH_SIZE),  
    scale_fn=lambda x: 1 / (2.0 ** (x - 1)), 
    scale_mode='cycle'
)
    
METRICS = [
    'accuracy',
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall'),
]

earlyStopping = tf.keras.callbacks.EarlyStopping(patience=5, monitor='loss', verbose=1, restore_best_weights=True)

model.compile(
    optimizer=tf.keras.optimizers.SGD(learning_rate=clr_scheduler) , 
    loss=tf.keras.losses.CategoricalCrossentropy(), 
    metrics=METRICS
)

start_training = time.time()

# train model
history = model.fit(
    train_ds, 
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=1,
    callbacks=[earlyStopping],
    validation_data=valid_ds,
)

end_training = time.time()
print(f"Time taken to train model : {end_training-start_training} sec")

training_history(history.history)

In [None]:
tests_df = pd.read_csv('../dataset/test.csv')
tests_df['path'] = '../dataset/clahe/test/'+ tests_df.label + '/' + tests_df.filename

test_ds = tf.data.Dataset.from_tensor_slices(tests_df.path) 
test_ds = test_ds.map(decode_test,num_parallel_calls=AUTOTUNE).batch(len(tests_df))

test_index = np.argmax(tests_df[CLASS_NAMES].values, axis=1)
test_label = tests_df.label.values

start_predict = time.time()

test_pred = model.predict(test_ds)
pred_index = np.argmax(test_pred, axis=1)
pred_label = np.array(CLASS_NAMES)[pred_index]

end_predict = time.time()
print(f"Inference time : {end_predict-start_predict} sec")

print(classification_report(test_index, pred_index, target_names=CLASS_NAMES, zero_division=0, digits=4))
print('f1_score        :', f1_score(test_index, pred_index, average='micro'))
print('accuracy_score  :', accuracy_score(test_index, pred_index))

cm = skplt.metrics.plot_confusion_matrix(test_label, pred_label, figsize=(8, 8), normalize=False)
roc = skplt.metrics.plot_roc(test_index, test_pred, figsize=(10,8))

In [None]:
hist_df = pd.DataFrame(history.history) 
hist_df.to_csv("../history/"+TRAINING_NAME+".csv", index=False)
model.save_weights("../model_weight/"+TRAINING_NAME+"_weights.h5")