Google colab libaray imports

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/Colab\ Notebooks/ML4SCI

Install libaray

In [None]:
!pip install tensorflow-addons

Import libaray

In [None]:
from functools import partial

import numpy as np
import pandas as pd
import os
import random
import time
import tensorflow as tf, re, math
from pathlib import Path
import math
from tensorflow.keras import applications, layers, losses, optimizers, metrics, Model, backend
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras.applications import ResNet50, EfficientNetB0, EfficientNetB3, EfficientNetB4, EfficientNetB6
from tensorflow.keras import backend as K 
import tensorflow_addons as tfa
from sklearn.model_selection import train_test_split
from scipy.spatial.distance import euclidean
from matplotlib import pyplot as plt
import plotly.express as px
import tqdm.notebook
import gc
from sklearn.utils.class_weight import compute_class_weight

In [None]:
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
    print("Running on TPU ", tpu.cluster_spec().as_dict()["worker"])
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except ValueError:
    print("Not connected to a TPU runtime. Using CPU/GPU strategy")
    strategy = tf.distribute.MirroredStrategy()
    
BATCH_SIZE = 16
!nvidia-smi

define constant

In [None]:
SEED = int(time.time())

DEV = False
# Training filenames directory
if DEV:
    TRAINING_FILENAME = 'tfrecord_train_shuffle.tfrec'
    TRAINING_VAL_FILENAME = 'tfrecord_train_val_shuffle.tfrec'
else:
    TRAINING_FILENAME = 'tfrecord_train_full_shuffle.tfrec'
    TRAINING_VAL_FILENAME = 'tfrecord_val.tfrec'

TARGET_SHAPE = (150, 150)
N_CLASSES = 3
if DEV:
    NUM_TRAINING_IMAGES = 24000
else:
    NUM_TRAINING_IMAGES = 30000

def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    tf.random.set_seed(seed)
    
seed_everything(SEED)

data augmentation

In [None]:
def random_apply(func, x, p):
    """Randomly apply function func to x with probability p."""
    return tf.cond(
        tf.less(tf.random.uniform([], minval=0, maxval=1, dtype=tf.float32),
                tf.cast(p, tf.float32)),
        lambda: func(x),
        lambda: x)

def random_flip(image):
    image = random_apply(tf.image.flip_left_right, image, p=0.5)
    image = random_apply(tf.image.flip_up_down, image, p=0.5)
    return image

def random_rotate(image):
    angle = tf.random.uniform([], minval=0, maxval=2.*math.pi, dtype=tf.float32)
    image = tfa.image.rotate(image, angle)
    return image

def data_augmentation(image, label):
    image = random_flip(image)
    image = random_apply(random_rotate, image, 0.5)
    return image, label

Load training dataset

In [None]:
def read_labeled_tfrecord(example):
    LABELED_TFREC_FORMAT = {
        "image": tf.io.FixedLenFeature(list(TARGET_SHAPE), tf.float32),
        "label": tf.io.FixedLenFeature([], tf.int64),
    }

    example = tf.io.parse_single_example(example, LABELED_TFREC_FORMAT)
    image = example['image']
    # convert shape from (150, 150) to (150, 150, 3)
    image = image * 255.0
    image = tf.expand_dims(image, -1)
    image = tf.image.grayscale_to_rgb(image)
    label = tf.cast(example['label'], tf.int32)
    label = tf.one_hot(label, N_CLASSES)
    return image, label

# This function loads TF Records and parse them into tensors
def load_dataset(filenames):       
    dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads = tf.data.experimental.AUTOTUNE)
    dataset = dataset.map(read_labeled_tfrecord, num_parallel_calls = tf.data.experimental.AUTOTUNE) 
    return dataset

# This function is to get our training tensors
def get_training_dataset(filenames):
    dataset = load_dataset(filenames)
    dataset = dataset.map(lambda image, label: (image, label), num_parallel_calls = tf.data.experimental.AUTOTUNE)
    dataset = dataset.repeat()
    dataset = dataset.map(data_augmentation, num_parallel_calls = tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(1024)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

def get_validation_dataset(filenames):
    dataset = load_dataset(filenames)
    dataset = dataset.map(lambda image, label: (image, label), num_parallel_calls = tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=False)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

In [None]:
train_dataset = get_training_dataset(TRAINING_FILENAME)
train_val_dataset = get_validation_dataset(TRAINING_VAL_FILENAME)

Inspect training data

In [None]:
def batch_to_numpy_images_and_labels(data):
    images, labels = data
    numpy_images = images.numpy()
    numpy_labels = labels.numpy()
    return numpy_images, numpy_labels

def display_one_image(image, title, subplot, red=False, titlesize=16):
    plt.subplot(*subplot)
    plt.axis('off')
    plt.imshow(image[:,:,0]/255.0, cmap='gray')
    plt.title(title, color='r')
    return (subplot[0], subplot[1], subplot[2]+1)

def display_batch_of_images(databatch, predictions=None):
    # data
    images, labels = batch_to_numpy_images_and_labels(databatch)
        
    # auto-squaring: this will drop data that does not fit into square or square-ish rectangle
    rows = int(math.sqrt(len(images)))
    cols = len(images)//rows
        
    # size and spacing
    FIGSIZE = 13.0
    SPACING = 0.1
    subplot=(rows,cols,1)
    if rows < cols:
        plt.figure(figsize=(FIGSIZE,FIGSIZE/cols*rows))
    else:
        plt.figure(figsize=(FIGSIZE/rows*cols,FIGSIZE))
    
    # display
    for i, (image, label) in enumerate(zip(images[:rows*cols], labels[:rows*cols])):
        #if i > 20: break
        title = label
        dynamic_titlesize = FIGSIZE*SPACING/max(rows,cols)*40+3 # magic formula tested to work from 1x1 to 10x10 images
        subplot = display_one_image(image, title, subplot, titlesize=dynamic_titlesize)
    
    #layout
    plt.tight_layout()
    plt.subplots_adjust(wspace=0, hspace=0)
    plt.show()

train_batch = train_dataset.unbatch().batch(20)
train_batch = iter(train_batch)
display_batch_of_images(next(train_batch))

build model

In [None]:
class GeMPoolingLayer(tf.keras.layers.Layer):
    def __init__(self, p=1., train_p=False):
        super().__init__()
        if train_p:
            self.p = tf.Variable(p, dtype=tf.float32)
        else:
            self.p = p
        self.eps = 1e-6

    def call(self, inputs: tf.Tensor, **kwargs):
        inputs = tf.clip_by_value(inputs, clip_value_min=1e-6, clip_value_max=tf.reduce_max(inputs))
        inputs = tf.pow(inputs, self.p)
        inputs = tf.reduce_mean(inputs, axis=[1, 2], keepdims=False)
        inputs = tf.pow(inputs, 1./self.p)
        return inputs

In [None]:
def get_model():
    base_cnn = EfficientNetB3(
        weights='imagenet', input_shape=TARGET_SHAPE + (3,), include_top=False, drop_connect_rate=0.4
    )
    outputs = GeMPoolingLayer(train_p=True)(base_cnn.output)
    outputs = layers.Dense(N_CLASSES, activation="softmax", name="pred")(outputs)
    model = Model(base_cnn.input, outputs, name="EfficientNet")
    return model

LR Scheduler

Cosine annealing learning rate scheduler with periodic restarts

In [None]:
class SGDRScheduler(tf.keras.callbacks.Callback):
    '''Cosine annealing learning rate scheduler with periodic restarts.
    # Usage
        ```python
            schedule = SGDRScheduler(min_lr=1e-5,
                                     max_lr=1e-2,
                                     steps_per_epoch=np.ceil(epoch_size/batch_size),
                                     lr_decay=0.9,
                                     cycle_length=5,
                                     mult_factor=1.5)
            model.fit(X_train, Y_train, epochs=100, callbacks=[schedule])
        ```
    # Arguments
        min_lr: The lower bound of the learning rate range for the experiment.
        max_lr: The upper bound of the learning rate range for the experiment.
        steps_per_epoch: Number of mini-batches in the dataset. Calculated as `np.ceil(epoch_size/batch_size)`. 
        lr_decay: Reduce the max_lr after the completion of each cycle.
                  Ex. To reduce the max_lr by 20% after each cycle, set this value to 0.8.
        cycle_length: Initial number of epochs in a cycle.
        mult_factor: Scale epochs_to_restart after each full cycle completion.
    # References
        Blog post: jeremyjordan.me/nn-learning-rate
        Original paper: http://arxiv.org/abs/1608.03983
    '''
    def __init__(self,
                 min_lr,
                 max_lr,
                 steps_per_epoch,
                 lr_decay=1,
                 cycle_length=10,
                 mult_factor=2):

        self.min_lr = min_lr
        self.max_lr = max_lr
        self.lr_decay = lr_decay

        self.batch_since_restart = 0
        self.next_restart = cycle_length

        self.steps_per_epoch = steps_per_epoch

        self.cycle_length = cycle_length
        self.mult_factor = mult_factor

        self.history = {}

    def clr(self):
        '''Calculate the learning rate.'''
        fraction_to_restart = self.batch_since_restart / (self.steps_per_epoch * self.cycle_length)
        lr = self.min_lr + 0.5 * (self.max_lr - self.min_lr) * (1 + np.cos(fraction_to_restart * np.pi))
        return lr

    def on_train_begin(self, logs={}):
        '''Initialize the learning rate to the minimum value at the start of training.'''
        logs = logs or {}
        K.set_value(self.model.optimizer.lr, self.max_lr)

    def on_batch_end(self, batch, logs={}):
        '''Record previous batch statistics and update the learning rate.'''
        logs = logs or {}
        self.history.setdefault('lr', []).append(K.get_value(self.model.optimizer.lr))
        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)

        self.batch_since_restart += 1
        K.set_value(self.model.optimizer.lr, self.clr())

    def on_epoch_end(self, epoch, logs={}):
        '''Check for end of current cycle, apply restarts when necessary.'''
        if epoch + 1 == self.next_restart:
            self.batch_since_restart = 0
            self.cycle_length = np.ceil(self.cycle_length * self.mult_factor)
            self.next_restart += self.cycle_length
            self.max_lr *= self.lr_decay
            self.best_weights = self.model.get_weights()

    def on_train_end(self, logs={}):
        '''Set weights to the values from the end of the most recent cycle for best performance.'''
        self.model.set_weights(self.best_weights)

Training

In [None]:
import numpy as np 
from tensorflow import keras
from matplotlib import pyplot as plt
from IPython.display import clear_output

class PlotLearning(keras.callbacks.Callback):
    """
    Callback to plot the learning curves of the model during training.
    """
    def on_train_begin(self, logs={}):
        self.metrics = {}
        for metric in logs:
            self.metrics[metric] = []
            

    def on_epoch_end(self, epoch, logs={}):
        # Storing metrics
        for metric in logs:
            if metric in self.metrics:
                self.metrics[metric].append(logs.get(metric))
            else:
                self.metrics[metric] = [logs.get(metric)]
        
        # Plotting
        metrics = [x for x in logs if 'val' not in x]
        
        f, axs = plt.subplots(1, len(metrics), figsize=(15,5))
        clear_output(wait=True)

        for i, metric in enumerate(metrics):
            axs[i].plot(range(1, epoch + 2), 
                        self.metrics[metric], 
                        label=metric)
            if logs['val_' + metric]:
                axs[i].plot(range(1, epoch + 2), 
                            self.metrics['val_' + metric], 
                            label='val_' + metric)
                
            axs[i].legend()
            axs[i].grid()

        plt.tight_layout()
        plt.show()

In [None]:
checkpoint_path = "weights_effnetb3_final.{epoch:05d}.hdf5"
# Create a callback that saves the model's weights
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 monitor = 'val_loss',
                                                 save_weights_only=True,
                                                 save_best_only=False,
                                                 mode = 'min',
                                                 verbose=1)

num_epochs = 100
STEPS_PER_EPOCH = NUM_TRAINING_IMAGES // BATCH_SIZE #// 10

# cosine annealing learning rate scheduler with periodic restarts
lr_sched = SGDRScheduler(min_lr=1.e-6,
                        max_lr=5.e-4,
                        steps_per_epoch=np.ceil(NUM_TRAINING_IMAGES / BATCH_SIZE),
                        lr_decay=0.85,
                        cycle_length=20,
                        mult_factor=1.5)

optimizer = tfa.optimizers.AdamW(learning_rate=1.e-4, weight_decay=1e-5, clipvalue=700)
loss = tf.keras.losses.CategoricalCrossentropy()

with strategy.scope():
    model = get_model()
    model.summary()

    model.compile(
        loss = loss,
        metrics = [tf.keras.metrics.CategoricalAccuracy(),
                   ],
        optimizer = optimizer,  
    )


In [None]:
with strategy.scope():
    history = model.fit(
        train_dataset, steps_per_epoch=STEPS_PER_EPOCH, epochs=num_epochs, validation_data=train_val_dataset, callbacks=[cp_callback, lr_sched, PlotLearning()]
    )

In [None]:
hist_df = pd.DataFrame(history.history) 

# or save to csv: 
hist_csv_file = 'history_effnetb3_final.csv'
with open(hist_csv_file, mode='w') as f:
    hist_df.to_csv(f)

Testing the CNN Model on Validation Data

In [None]:
VALIDATION_FILENAME = 'tfrecord_val.tfrec'
val_dataset = get_validation_dataset(VALIDATION_FILENAME)

with strategy.scope():
    trained_model = get_model()
    trained_model.load_weights('weights_effnetb3_final.00089.hdf5')


Get the prediction

In [None]:
# Get the true label in one-hot e.g. [[0, 1, 0], [0, 0, 1], ...]
y_val = np.concatenate([y for x, y in val_dataset], axis=0)

# Get the prodicted label e.g. [[0.999, 0.001, 0.01], [0.023, 0.982, 0.001], ...]
with strategy.scope():
    y_score = trained_model.predict(val_dataset)

Plot ROC curve

In [None]:
from sklearn.metrics import roc_auc_score, roc_curve, auc
from scipy import interp
from itertools import cycle

n_classes = y_val.shape[1]
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_val[:, i], y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

fpr["micro"], tpr["micro"], _ = roc_curve(y_val.ravel(), y_score.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
    mean_tpr += interp(all_fpr, fpr[i], tpr[i])

mean_tpr /= n_classes

fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

plt.rcParams['figure.figsize'] = [7, 5]
lw = 2
plt.figure()
plt.plot(fpr["micro"], tpr["micro"],
         label='micro-average (area = {})'
               ''.format(round(roc_auc["micro"],5)),
         color='deeppink', linestyle=':', linewidth=4)

plt.plot(fpr["macro"], tpr["macro"],
         label='macro-average (area = {})'
               ''.format(round(roc_auc["macro"],5)),
         color='navy', linestyle=':', linewidth=4)

labels = ['no sub', 'spherical', 'vortex']
colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])
for i, color in zip(range(n_classes), colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=lw,
             label='{} (area = {})'
             ''.format(labels[i], round(roc_auc[i],5)))

# Plot the ROC 
plt.plot([0, 1], [0, 1], 'k--', lw=lw)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right", prop={"size":10})