<a href="https://colab.research.google.com/github/raquelcarmo/tropical_cyclones/blob/import-py-files/src/code/TC_Category_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tropical Cyclones Categorization
Script to train Deep Learning models to categorize tropical cyclones based on their topology patterns.

## Imports and configurations

In [None]:
# Insert your desired path to work on
import os
os.chdir('../data')

Run the following cell once per session. This cell links the code folder to the python exectution path.

In [None]:
# Path where the modules are stored
import sys
sys.path.append('../code')

# Import modules
import utils
from models import CategorizationCNN
from data_process import DataProcessor
from visualization import plot_history, plot_history_ft

The following cell allows Jupyter Notebooks to detect changes in external code and to automatically update it without restarting the runtime.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# General imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
from sklearn.model_selection import StratifiedKFold
from datetime import datetime

import tensorflow as tf
from tensorflow.data import Dataset
from tensorflow.keras import Input
from tensorflow.keras.applications import resnet50, mobilenet_v2, vgg16
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.layers import concatenate, Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.metrics import CategoricalAccuracy, TopKCategoricalAccuracy, Precision, Recall, TruePositives, FalsePositives, TrueNegatives, FalseNegatives
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

np.set_printoptions(precision=4)

## 1. Train on data according to csv split into train, val and test sets

### 1.1. Define settings (arguments)

In [None]:
args = {
    'main_dir':        "VV_VH_WS",
    'cnn':             "ResNet",     # choices: ["ResNet", "Mobile"]
    'loss':            "categorical_crossentropy",
    'height':          700,
    'width':           400,
    'eye_only':        True,
    'numerical_vars':  False,
    'normalise':       True,
    'norm_mode':       "model",     # choices=['z-norm', 'model', 'simple', 'none']
    'rotate':          False,
    'crop':            True,
    'crop_mode':       "uniform",   # choices=['uniform', 'weighted']
    'nb_crops':        1,
    'data_aug':        False,
    'batch_size':      8,
    'buffer_size':     100,
    'epochs':          10,
    'learning_rate':   0.0001,
    'nb_splits':       5,
    'dropout':         True,
    'drop_rate':       0.5,
    'finetune':        False
}
args['save_dir'] = os.path.join(args['main_dir'], "results", "cat", "R_700x400_nM_bs8_bf100_e10_lr0001_dr0.5")
args

### 1.2. Prepare the tf.data.Dataset instances to be fed to the model

In [None]:
# Load data
main_dir = args['main_dir']
train_images, train_labels, train_bbox = utils.load_data(args, f"{main_dir}/csv/training.csv")
val_images, val_labels, val_bbox = utils.load_data(args, f"{main_dir}/csv/val.csv")
test_images, test_labels, test_bbox = utils.load_data(args, f"{main_dir}/csv/test.csv")

class_weights = utils.compute_class_weights(f"{main_dir}/csv/full_dataset.csv", args)

# Create an instance of the DataProcessor
p = DataProcessor(args,
                  plot_light = False,          # plot only select_crop() images
                  plot_extensive = False,      # plot extensively all images
                  show_prints = False
                 )

# Generate datasets
# NOTE: utils.create_dataset() allows for further data augmentation
train_ds = utils.prepare_dataset(p, train_images, train_labels, train_bbox)
val_ds = utils.prepare_dataset(p, val_images, val_labels, val_bbox)
test_ds = utils.prepare_dataset(p, test_images, test_labels, test_bbox)

# Perform normalization
train_ds_norm, val_ds_norm = utils.normalisation(train_ds, val_ds, args)
_, test_ds_norm = utils.normalisation(train_ds, test_ds, args)

# Configure for performance
train_dataset = utils.config_performance(train_ds_norm, args, shuffle=True)
val_dataset = utils.config_performance(val_ds_norm, args, flag=True)
test_dataset = utils.config_performance(test_ds_norm, args, flag=True)

### 1.3. Perform end-to-end training of the model

In [None]:
# Directory to save results
save_dir = args['save_dir']
os.makedirs(save_dir, exist_ok=True)

# Create model
model = CategorizationCNN(args)

# Create callbacks
dt = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=5, min_lr=0.00001, verbose=1),
    ModelCheckpoint(os.path.join(save_dir, "best_model.h5"), verbose=1, save_best_only=True),
    TensorBoard(log_dir=f"{save_dir}/logs/{dt}")
]

# Train the model
history = model.fit(
    x = train_dataset,
    steps_per_epoch = len(train_dataset),
    validation_data = val_dataset,
    validation_steps = len(val_dataset),
    epochs = args['epochs'],
    callbacks = callbacks,
    verbose = 1,
    class_weight = class_weights,
    shuffle = True
)

Load the TensorBoard notebook extension.

In [None]:
#%load_ext tensorboard
%reload_ext tensorboard
%tensorboard --logdir VV_VH_WS/results/cat/R_700x400_nM_bs8_bf100_e10_lr0001_dr0.5/logs

Evaluate model on test dataset.

In [None]:
print("Loading best weights from training...")
model.load_weights(os.path.join(save_dir, "best_model.h5"))

results = model.evaluate(
    test_dataset,
    steps = len(test_dataset),
    verbose = 1
)

Make predictions.

In [None]:
# Retrieve a batch of images from the test set
predictions = model.predict(test_dataset)
predictions = tf.where(predictions < 0.5, 0, 1)

print('Predictions:\n', predictions.numpy())
print('Labels:\n')
test_labels = test_dataset.map(lambda x, y: y)
for label in test_labels:
    print(label)
# class_names = {0: "No eye", 1: "Eye"}
# plt.figure(figsize=(10, 10))
# for i in range(9):
#     ax = plt.subplot(3, 3, i + 1)
#     plt.imshow(image_batch[i].astype("uint8"))
#     plt.title(class_names[predictions[i]])
#     plt.axis("off")

## 2. Train on data using the Stratified K-Fold

### 2.1. Perform Stratified 5-fold

In [None]:
def stratified_cv(args):
    dataset_path = f"{main_dir}/csv/full_dataset.csv"
    # Compute class weights
    Y, df, class_weights = utils.compute_class_weights(dataset_path, args)

    # Create an instance of the DataProcessor
    p = DataProcessor(args,
                      plot_light = False,              # plot only select_crop() images
                      plot_extensive = False,          # plot extensively all images
                      show_prints = False
                     )

    # Create and build model
    catNet = CategorizationCNN(args)

    print("Entering in K-fold Cross Validation...")
    stratified_k_fold = StratifiedKFold(n_splits=args['nb_splits'], random_state=42, shuffle=False)
    fold_var = 1

    for train_index, val_index in stratified_k_fold.split(np.zeros(len(df)), Y):
        training_data = df.iloc[train_index]
        validation_data = df.iloc[val_index]

        # Load data
        train_images, train_labels, train_bbox = utils.load_data(args, df=training_data)
        val_images, val_labels, val_bbox = utils.load_data(args, df=validation_data)

        # Generate datasets
        #train_ds = utils.prepare_dataset(p, train_images, train_labels, train_bbox)
        #val_ds = utils.prepare_dataset(p, val_images, val_labels, val_bbox)
        train_ds = utils.create_dataset(p, train_images, train_labels, train_bbox, args)
        val_ds = utils.create_dataset(p, val_images, val_labels, val_bbox, args, flag=True)

        # Perform normalisation
        train_ds_norm, val_ds_norm = utils.normalisation(train_ds, val_ds, args)

        # Configure for performance
        train_ds_perf = utils.config_performance(train_ds_norm, args, shuffle=True)
        val_ds_perf = utils.config_performance(val_ds_norm, args, flag=True)

        # Train the model
        history = catNet.train(train_ds_perf, val_ds_perf, class_weights, fold_var)
        plot_history(history, fold_var, save_dir)
        #print(history)
        
        # Guarantee time for weights to be saved and loaded again
        time.sleep(10) 
        
        # Load best model & predict
        print("Loading best weights from training...")
        catNet.get_eval(val_ds_perf, fold_var)
        catNet.get_preds(val_ds_perf, val_ds.map(lambda x, y: y), fold_var)

        ### Grad-CAM analysis ###
        # Train dataset
        train_gradcam_path = f"{save_dir}/train_gradcam_heatmaps_{fold_var}"
        os.makedirs(train_gradcam_path, exist_ok=True)
        gradcam_train_ds = utils.config_performance(train_ds_norm, args, flag=True)
        utils.grad_cam(catNet.model, train_ds, train_ds_norm, catNet.model.predict(gradcam_train_ds), train_gradcam_path, args)
        # Validation dataset
        gradcam_path = f"{save_dir}/gradcam_heatmaps_{fold_var}"
        os.makedirs(gradcam_path, exist_ok=True)
        utils.grad_cam(catNet.model, val_ds, val_ds_norm, catNet.model.predict(val_ds_perf), gradcam_path, args)

        tf.keras.backend.clear_session()
        catNet.__reset()
        fold_var += 1

    # Save the values of each fold
    catNet.save_metrics()
    return

In [None]:
stratified_cv(args)

## 3. Perform fine-tuning

### 3.1. Define settings (arguments)

In [None]:
args = {
    'main_dir':        "VV_VH_WS",
    'cnn':             "ResNet",     # choices: ["ResNet", "Mobile"]
    'loss':            "categorical_crossentropy",
    'height':          700,
    'width':           400,
    'eye_only':        True,
    'numerical_vars':  False,
    'normalise':       True,
    'norm_mode':       "model",     # choices=['z-norm', 'model', 'simple', 'none']
    'rotate':          False,
    'crop':            True,
    'crop_mode':       "uniform",   # choices=['uniform', 'weighted']
    'nb_crops':        1,
    'data_aug':        False,
    'batch_size':      8,
    'buffer_size':     100,
    'epochs':          10,
    'learning_rate':   0.0001,
    'nb_splits':       5,
    'dropout':         True,
    'drop_rate':       0.5,
    'finetune':        True,
    'finetune_at':     -5,
    'initial_epochs':  20,
    'finetune_epochs': 10
}
args['save_dir'] = os.path.join(args['main_dir'], "results", "cat", "R_700x400_nM_bs8_bf100_lr0001_dr0.5_ft-5_ie20_fe10")
args

### 3.2. Using the csv split into train, val and test sets

In [None]:
# Load data
main_dir = args['main_dir']
train_images, train_labels, train_bbox = utils.load_data(args, f"{main_dir}/csv/training.csv")
val_images, val_labels, val_bbox = utils.load_data(args, f"{main_dir}/csv/val.csv")
test_images, test_labels, test_bbox = utils.load_data(args, f"{main_dir}/csv/test.csv")

class_weights = utils.compute_class_weights(f"{main_dir}/csv/full_dataset.csv", args)

# Create an instance of the DataProcessor
p = DataProcessor(args,
                  plot_light = False,          # plot only select_crop() images
                  plot_extensive = False,      # plot extensively all images
                  show_prints = False
                 )

# Generate datasets
# NOTE: utils.create_dataset() allows for further data augmentation
train_ds = utils.prepare_dataset(p, train_images, train_labels, train_bbox)
val_ds = utils.prepare_dataset(p, val_images, val_labels, val_bbox)
test_ds = utils.prepare_dataset(p, test_images, test_labels, test_bbox)

# Perform normalization
train_ds_norm, val_ds_norm = utils.normalisation(train_ds, val_ds, args)
_, test_ds_norm = utils.normalisation(train_ds, test_ds, args)

# Configure for performance
train_dataset = utils.config_performance(train_ds_norm, args, shuffle=True)
val_dataset = utils.config_performance(val_ds_norm, args, flag=True)
test_dataset = utils.config_performance(test_ds_norm, args, flag=True)

In [None]:
# Directory to save results
save_dir = args['save_dir']
os.makedirs(save_dir, exist_ok=True)

In [None]:
if args['cnn'] == "ResNet":
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(args['width'], args['height'], 3))
elif args['cnn'] == "Mobile":
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(args['width'], args['height'], 3))
elif args['cnn'] == "VGG":
    base_model = VGG16(weights='imagenet', include_top=False, input_shape=(args['width'], args['height'], 3))
else:
    sys.exit("Incert valid cnn model. Options: Mobile, ResNet or VGG (case sensitive)")

# Freeze the base_model
base_model.trainable = False

inputs = Input(shape=(args['width'], args['height'], 3))
x = base_model(inputs, training=False)
x = GlobalAveragePooling2D()(x)
x = Dropout(args['drop_rate'])(x) if args['dropout'] else x

classes = 6 if not args['eye_only'] else 5
outputs = Dense(classes, activation="softmax")(x)
model = Model(inputs, outputs)

# Compile the model (should be done *after* setting layers to non-trainable)
eval_metrics = [CategoricalAccuracy(name="accuracy"), TopKCategoricalAccuracy(k=2, name="top2_accuracy"),
                Precision(name="precision"), Recall(name="recall"), 
                TruePositives(name='tp'), FalsePositives(name='fp'),
                TrueNegatives(name='tn'), FalseNegatives(name='fn')]

model.compile(
    optimizer = Adam(learning_rate=args['learning_rate']), 
    loss = args['loss'],
    metrics = eval_metrics
)

model.summary()

In [None]:
# Create callbacks
dt = datetime.now().strftime("%d-%m-%Y_%H:%M:%S")
callbacks = [
    ModelCheckpoint(f"{save_dir}/best_model_frozen.h5", verbose=1, save_best_only=True),
    TensorBoard(log_dir = f"{save_dir}/logs/{dt}")
]

# Train the top layer of the model on the dataset, the weights
# of the pre-trained network will not be updated during training
history = model.fit(
    train_dataset,
    steps_per_epoch = len(train_dataset),
    validation_data = val_dataset,
    validation_steps = len(val_dataset),
    epochs = args['initial_epochs'],
    callbacks = callbacks,
    class_weight = class_weights,
    shuffle = True
)

acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

Plot train/val losses.

In [None]:
fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, figsize=(10,10))
ax1.plot(acc, label='Training Accuracy')
ax1.plot(val_acc, label='Validation Accuracy')
ax1.legend(loc='lower right')
ax1.set(ylabel = "Accuracy",
        title = 'Training and Validation Accuracy')
#plt.ylim([min(plt.ylim()),1])

ax2.plot(loss, label='Training Loss')
ax2.plot(val_loss, label='Validation Loss')
ax2.legend(loc='upper right')
ax2.set(xlabel = 'epoch', 
        ylabel = 'Cross Entropy',
        title = 'Training and Validation Loss')
#plt.ylim([0,max(plt.ylim())])

plt.show()

Fine-tuning step.

In [None]:
# Unfreeze the whole base model
base_model.trainable = True

# Try to fine-tune a small number of top layers rather than the whole model
# Let's take a look to see how many layers are in the base model
print("Number of layers in the base model: ", len(base_model.layers))

if args['finetune_at'] > 0:
    # fine-tune from this layer onwards, freezing all layers before
    for layer in base_model.layers[:args['finetune_at']]:
        layer.trainable = False
else:
    # input is given as number of FINAL layers to finetune
    nb_layers2ft = abs(args['finetune_at'])
    total2freeze = len(base_model.layers) - nb_layers2ft
    for layer in base_model.layers[:total2freeze]:
        layer.trainable = False

# Recompile the model for the modifications to take effect, with a low learning rate
model.compile(
    optimizer = Adam(learning_rate = 1e-5),
    loss = args['loss'],
    metrics = eval_metrics
)

model.summary()

In [None]:
# Adjust callbacks
dt = datetime.now().strftime("%d-%m-%Y_%H:%M:%S")
callbacks = [
    #EarlyStopping(patience=5, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=5, min_lr=0.000001, verbose=1),
    ModelCheckpoint(f"{save_dir}/best_model_fine_tuned.h5", verbose=1, save_best_only=True),
    TensorBoard(log_dir = f"{save_dir}/logs/{dt}")
]

total_epochs = args['initial_epochs'] + args['finetune_epochs']

# Train the entire model end-to-end
history_fine = model.fit(
    train_dataset,
    steps_per_epoch = len(train_dataset),
    validation_data = val_dataset,
    validation_steps = len(val_dataset),
    epochs = total_epochs,
    initial_epoch = history.epoch[-1],
    callbacks = callbacks,
    class_weight = class_weights,
    shuffle = True
)

Add history of the fine-tuning step to the previous history.

In [None]:
acc += history_fine.history['accuracy']
val_acc += history_fine.history['val_accuracy']

loss += history_fine.history['loss']
val_loss += history_fine.history['val_loss']

Plot train/val losses.

In [None]:
fig, (ax1, ax2) = plt.subplots(nrows = 2, ncols = 1, figsize=(10, 10))
ax1.plot(acc, label='Training Accuracy')
ax1.plot(val_acc, label='Validation Accuracy')
#plt.ylim([min(plt.ylim()),1])
ax1.plot([args['initial_epochs']-1, args['initial_epochs']-1],
          plt.ylim(), label='Start Fine Tuning')
ax1.legend(loc='lower right')
ax1.set(ylabel = "Accuracy",
        title = 'Training and Validation Accuracy')

ax2.plot(loss, label='Training Loss')
ax2.plot(val_loss, label='Validation Loss')
#plt.ylim([0, 1.0])
ax2.plot([args['initial_epochs']-1, args['initial_epochs']-1],
         plt.ylim(), label='Start Fine Tuning')
ax2.legend(loc='upper right')
ax2.set(xlabel = 'epoch', 
      ylabel = 'Cross Entropy',
      title = 'Training and Validation Loss')
plt.show()

fig.savefig(f'{save_dir}/learning_curves.png', bbox_inches='tight')

Evaluate model on test dataset.

In [None]:
print("Loading best weights from training...")
model.load_weights(f"{save_dir}/best_model_fine_tuned.h5")

results = model.evaluate(
    test_dataset, 
    steps = len(test_dataset),
    verbose = 1
)

### 3.3. Using Stratified-K Fold

In [None]:
def stratified_cv_ft(args):
    dataset_path = f"{main_dir}/csv/full_dataset.csv"
    # Compute class weights
    Y, df, class_weights = utils.compute_class_weights(dataset_path, args)

    # Create an instance of the DataProcessor
    p = DataProcessor(args,
                      plot_light = False,              # plot only select_crop() images
                      plot_extensive = False,          # plot extensively all images
                      show_prints = False
                     )

    # Create and build model
    catNet = CategorizationCNN(args)

    print("Entering in K-fold Cross Validation...")
    stratified_k_fold = StratifiedKFold(n_splits=args['nb_splits'], random_state=42, shuffle=False)
    fold_var = 1

    for train_index, val_index in stratified_k_fold.split(np.zeros(len(df)), Y):
        training_data = df.iloc[train_index]
        validation_data = df.iloc[val_index]

        # Load data
        train_images, train_labels, train_bbox = utils.load_data(args, df=training_data)
        val_images, val_labels, val_bbox = utils.load_data(args, df=validation_data)

        # Generate datasets
        #train_ds = utils.prepare_dataset(p, train_images, train_labels, train_bbox)
        #val_ds = utils.prepare_dataset(p, val_images, val_labels, val_bbox)
        train_ds = utils.create_dataset(p, train_images, train_labels, train_bbox, args)
        val_ds = utils.create_dataset(p, val_images, val_labels, val_bbox, args, flag=True)

        # Perform normalisation
        train_ds_norm, val_ds_norm = utils.normalisation(train_ds, val_ds, args)

        # Configure for performance
        train_ds_perf = utils.config_performance(train_ds_norm, args, shuffle=True)
        val_ds_perf = utils.config_performance(val_ds_norm, args, flag=True)

        # Train classifier with frozen base model
        history1 = catNet.trainftStage1(train_ds_perf, val_ds_perf, class_weights, fold_var)

        acc = history1.history['accuracy']
        validation_acc = history1.history['val_accuracy']
        loss = history1.history['loss']
        validation_loss = history1.history['val_loss']

        # Build and train unfrozen model
        catNet.__buildftStage2()
        history2 = catNet.trainftStage2(train_ds_perf, val_ds_perf, class_weights, history1, fold_var)

        acc += history2.history['accuracy']
        validation_acc += history2.history['val_accuracy']
        loss += history2.history['loss']
        validation_loss += history2.history['val_loss']

        # Plot train/val losses
        hDict = {'acc':acc, 'validation_acc':validation_acc, 
                'loss':loss, 'validation_loss':validation_loss}
        plot_history_ft(args, hDict, fold_var, save_dir)

        # Guarantee time for weights to be saved and loaded again
        time.sleep(10)

        print("Loading best weights from training...")
        catNet.get_eval(val_ds_perf, fold_var)
        catNet.get_preds(val_ds_perf, val_ds.map(lambda x, y: y), fold_var)

        ### Grad-CAM analysis ###
        # Train dataset
        train_gradcam_path = f"{save_dir}/train_gradcam_heatmaps_{fold_var}"
        os.makedirs(train_gradcam_path, exist_ok=True)
        gradcam_train_ds_norm = utils.config_performance(train_ds_norm, args, flag=True)
        utils.grad_cam(catNet.model, train_ds, train_ds_norm, catNet.model.predict(gradcam_train_ds_norm), train_gradcam_path, args)
        # Validation dataset
        gradcam_path = f"{save_dir}/gradcam_heatmaps_{fold_var}"
        os.makedirs(gradcam_path, exist_ok=True)
        utils.grad_cam(catNet.model, val_ds, val_ds_norm, catNet.model.predict(val_ds_perf), gradcam_path, args)

        # Reset model and clear session
        tf.keras.backend.clear_session()
        catNet.__reset()
        fold_var += 1

    # Save the values of each fold
    catNet.save_metrics()
    return

In [None]:
stratified_cv_ft(args)