<a href="https://colab.research.google.com/github/mrinal054/gastrointestinal-video-classifier/blob/main/GI_video_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Gastrointestinal (GI) Tract Video Classifier

Dataset used: HyperKvasir (Link: https://osf.io/mh9sj/)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import sys

sys.path.append('/content/drive/MyDrive/Video_classification/my_utils')
sys.path.append('/content/drive/MyDrive/Video_classification')
sys.path.append('/content/drive/MyDrive/library')

In [None]:
import tqdm
import random
import itertools
import collections
import os
import random
import pathlib
from pathlib import PosixPath # for windows, import Path; for linux, import PosixPath

import cv2
import einops
import numpy as np
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
import keras
from keras import layers
from my_utils import ucf

# Import packages and libraries
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger, TerminateOnNaN
import pandas as pd # to read/write excel or csv file
from matplotlib import pyplot as plt # to plot or visualize data
from datetime import datetime
from copy import deepcopy


In [None]:
HEIGHT = 224
WIDTH = 224
n_classes = 2
seed = random.randint(1, 1000) # this seed will be used to randomly pick videos in the later section

print(f'seed: {seed}')

seed: 506


In [None]:
# Model name
model_name = 'ResNet_PscSE_seed' + str(seed) + '_' + datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
print(model_name)

ResNet_seed506_2023-11-17_15-04-51


(2 + 1)D convolution

In [None]:
class Conv2Plus1D(keras.layers.Layer):
  def __init__(self, filters, kernel_size, padding):
    """
      A sequence of convolutional layers that first apply the convolution operation over the
      spatial dimensions, and then the temporal dimension.
    """
    super().__init__()
    self.seq = keras.Sequential([
        # Spatial decomposition
        layers.Conv3D(filters=filters,
                      kernel_size=(1, kernel_size[1], kernel_size[2]),
                      padding=padding),
        # Temporal decomposition
        layers.Conv3D(filters=filters,
                      kernel_size=(kernel_size[0], 1, 1),
                      padding=padding)
        ])

  def call(self, x):
    return self.seq(x)

Create the main branch of the residual block with the following class. In contrast to the standard ResNet structure this uses the custom Conv2Plus1D layer instead of layers.Conv2D.

In [None]:
class ResidualMain(keras.layers.Layer):
  """
    Residual block of the model with convolution, layer normalization, and the
    activation function, ReLU.
  """
  def __init__(self, filters, kernel_size):
    super().__init__()
    self.seq = keras.Sequential([
        Conv2Plus1D(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization(),
        layers.ReLU(),
        Conv2Plus1D(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization()
    ])

  def call(self, x):
    return self.seq(x)

To add the residual branch to the main branch it needs to have the same size. The Project layer below deals with cases where the number of channels is changed on the branch. In particular, a sequence of densely-connected layer followed by normalization is added.

In [None]:
class Project(keras.layers.Layer):
  """
    Project certain dimensions of the tensor as the data is passed through different
    sized filters and downsampled.
  """
  def __init__(self, units):
    super().__init__()
    self.seq = keras.Sequential([
        layers.Dense(units),
        layers.LayerNormalization()
    ])

  def call(self, x):
    return self.seq(x)

Use add_residual_block to introduce a skip connection between the layers of the model.

In [None]:
def add_residual_block(input, filters, kernel_size):
  """
    Add residual blocks to the model. If the last dimensions of the input data
    and filter size does not match, project it such that last dimension matches.
  """
  out = ResidualMain(filters,
                     kernel_size)(input)

  res = input
  # Using the Keras functional APIs, project the last dimension of the tensor to
  # match the new filter size
  if out.shape[-1] != input.shape[-1]:
    res = Project(out.shape[-1])(res)

  return layers.add([res, out])

Resizing the video is necessary to perform downsampling of the data. In particular, downsampling the video frames allow for the model to examine specific parts of frames to detect patterns that may be specific to a certain action. Through downsampling, non-essential information can be discarded. Moreoever, resizing the video will allow for dimensionality reduction and therefore faster processing through the model.

In [None]:
class ResizeVideo(keras.layers.Layer):
  def __init__(self, height, width):
    super().__init__()
    self.height = height
    self.width = width
    self.resizing_layer = layers.Resizing(self.height, self.width)

  def call(self, video):
    """
      Use the einops library to resize the tensor.

      Args:
        video: Tensor representation of the video, in the form of a set of frames.

      Return:
        A downsampled size of the video according to the new height and width it should be resized to.
    """
    # b stands for batch size, t stands for time, h stands for height,
    # w stands for width, and c stands for the number of channels.
    old_shape = einops.parse_shape(video, 'b t h w c')
    images = einops.rearrange(video, 'b t h w c -> (b t) h w c')
    images = self.resizing_layer(images)
    videos = einops.rearrange(
        images, '(b t) h w c -> b t h w c',
        t = old_shape['t'])
    return videos

Build 3D P-scSE

In [None]:
import tensorflow as tf

class Conv3dReLU(keras.layers.Layer):
    def __init__(
        self,
        out_channels,
        kernel_size,
        padding='same',
        stride=1,
        use_batchnorm=True,
    ):
        super().__init__()

        self.seq = keras.Sequential([
            layers.Conv3D(
                out_channels,
                kernel_size,
                strides=stride,
                padding=padding,
                use_bias=not use_batchnorm,
            ),
            layers.BatchNormalization(),
            layers.ReLU(),
        ])

    def call(self, x):
      return self.seq(x)

#%%
class SCSEModule0(keras.layers.Layer):
    def __init__(self, in_channels, reduction=16, strategy='addition'):
        super().__init__()
        self.strategy = strategy

        self.cSE = keras.Sequential([
            layers.GlobalAveragePooling3D(keepdims=True),
            layers.Conv3D(in_channels // reduction, 1, activation='relu'),
            layers.Conv3D(in_channels, 1, activation='sigmoid'),
        ])

        self.sSE = keras.Sequential([
            layers.Conv3D(1, 1, activation='sigmoid'),
        ])

    def call(self, x):

        x_cSE = self.cSE(x)
        xc = x * x_cSE      # cSE attention

        x_sSE = self.sSE(x)
        x_sSE = layers.Concatenate(axis=-1)([x_sSE] * x.shape[-1])
        xs = x * x_sSE      # sSE

        if self.strategy == 'addition':
          return xc + xs

        elif self.strategy == 'maxout':
            return tf.maximum(xc, xs)

        else:
            raise ValueError("Wrong keyword for attention strategy. Choose from [addition, maxout]")

#%%
class PscSE3D(keras.layers.Layer):
    def __init__(
        self,
        channels, # in_channels and out_channels will be equal
        reduction,
        use_batchnorm=True,
    ):
        super().__init__()

        self.conv1 = Conv3dReLU(
            out_channels=channels, # out_channels will be equal to in_channels
            kernel_size=3,
            padding='same',
            use_batchnorm=use_batchnorm,
        )

        # P-scSE
        self.attention1 = SCSEModule0(in_channels=channels, reduction=reduction, strategy='maxout')
        self.attention2 = SCSEModule0(in_channels=channels, reduction=reduction, strategy='addition')

    def call(self, x):
        x1 = self.attention1(x)  # 1st attention
        x2 = self.attention2(x)  # 2nd attention

        x3 = x1 + x2

        x3 = self.conv1(x3)
        return x3

Use the Keras functional API to build the residual network.

In [None]:
input_shape = (None, 10, HEIGHT, WIDTH, 3)
input = layers.Input(shape=(input_shape[1:]))
x = input

x = Conv2Plus1D(filters=16, kernel_size=(3, 7, 7), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)

# Block 1
x = add_residual_block(x, 16, (3, 3, 3))
x = PscSE3D(channels=16, reduction=8)(x)
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)

# Block 2
x = add_residual_block(x, 32, (3, 3, 3))
x = PscSE3D(channels=32, reduction=8)(x)
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)

# Block 3
x = add_residual_block(x, 64, (3, 3, 3))
x = PscSE3D(channels=64, reduction=8)(x)
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)

# Block 4
x = add_residual_block(x, 128, (3, 3, 3))
x = PscSE3D(channels=128, reduction=8)(x)

x = layers.GlobalAveragePooling3D()(x)
x = layers.Flatten()(x)
x = layers.Dense(n_classes)(x)

model = keras.Model(input, x)


In [None]:
model.summary()

### Helper functions

In [None]:
def walk_dirs(root_dir, posix_path=True, verbose=False):
    "List directories of each file"
    dirs = []
    for foldername, subfolders, filenames in os.walk(root_dir):
        for filename in filenames:
            file_path = os.path.join(foldername, filename)
            if posix_path: dirs.append(PosixPath(file_path))
            else: dirs.append(file_path)
            if verbose: print('File path:', file_path)

    return dirs


def prepare_dirs(root_lower_GI=str, root_upper_GI=str, posix_path=bool, shuffle=bool, seed=int, verbose=bool):

    "Returns pairs of (file_directory, class_value)"

    # Get complete directories of each file
    dirs_lower_GI = walk_dirs(root_lower_GI, posix_path, verbose)
    dirs_upper_GI = walk_dirs(root_upper_GI, posix_path, verbose)

    # Create class values
    class_lower_GI = [0 for i in range(len(dirs_lower_GI))] # [0,0, ....., 0]
    class_upper_GI = [1 for i in range(len(dirs_lower_GI))] # [1,1, ....., 1]

    # Create (dirs, class) pair
    lower_GI_pairs = list(zip(dirs_lower_GI, class_lower_GI))
    upper_GI_pairs = list(zip(dirs_upper_GI, class_upper_GI))

    # Shuffling
    if shuffle:
        random.seed(seed)
        random.shuffle(lower_GI_pairs)
        random.shuffle(upper_GI_pairs)

    return lower_GI_pairs, upper_GI_pairs

Data directory

In [None]:
root_lower_GI = '/content/drive/MyDrive/Video_classification/dataset/HyperKvasir/lower-gi-tract/'
root_upper_GI = '/content/drive/MyDrive/Video_classification/dataset/HyperKvasir/upper-gi-tract/'

n_sample_per_GI = 60 # no. of samples for each GI.
posix_path = True
shuffle = True
verbose = False

lower_GI_pairs, upper_GI_pairs = prepare_dirs(root_lower_GI, root_upper_GI, posix_path, shuffle, seed, verbose)

# Select no. of samples that will be used
selected_lower_GI_pairs = lower_GI_pairs[:n_sample_per_GI]
selected_upper_GI_pairs = upper_GI_pairs[:n_sample_per_GI]

# Split into train, val, and test. Will not do shuffling now. Shuffling will be
# done for training pairs in the training dataloader. Note that lower GI and
# upper GI are themselves already shuffled.
n_train = int(n_sample_per_GI * 0.70)    # 70% for training
n_val_test = n_sample_per_GI - n_train
n_val = int(n_val_test * 0.5)            # 15% for validation
n_test = n_val_test - n_val              # 15% for inference

train_pairs = selected_lower_GI_pairs[:n_train] + selected_upper_GI_pairs[:n_train]
val_pairs = selected_lower_GI_pairs[n_train:n_train+n_val] + selected_upper_GI_pairs[n_train:n_train+n_val]
test_pairs = selected_lower_GI_pairs[n_train+n_val:] + selected_upper_GI_pairs[n_train+n_val:]

print('Total lower GI pairs:', len(lower_GI_pairs))
print('Total upper GI pairs:', len(upper_GI_pairs))
print('Total training pairs:', len(train_pairs))
print('Total validation pairs:', len(val_pairs))
print('Total test pairs:', len(test_pairs))

### Dataloader

In [None]:
# Check the dataloader
fg = ucf.FrameGeneratorMKD(train_pairs, 10, training=True)

frames, label = next(fg())

print(f"Shape: {frames.shape}")
print(f"Label: {label}")

In [None]:
# Output signature
output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))

# Create the training set
train_ds = tf.data.Dataset.from_generator(ucf.FrameGeneratorMKD(train_pairs, 10, training=True),
                                          output_signature = output_signature)

# Create the validation set
val_ds = tf.data.Dataset.from_generator(ucf.FrameGeneratorMKD(val_pairs, 10),
                                        output_signature = output_signature)

# Print the shapes of the data
train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')

val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')

Use buffered prefetching such that you can yield data from the disk without having I/O become blocking. Two important functions to use while loading data are:

Dataset.cache: keeps the sets of frames in memory after they're loaded off the disk during the first epoch. This function ensures that the dataset does not become a bottleneck while training your model. If your dataset is too large to fit into memory, you can also use this method to create a performant on-disk cache.

Dataset.prefetch: overlaps data preprocessing and model execution while training. Refer to Better performance with the tf.data for details.

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)
val_ds = val_ds.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)

To prepare the data to be fed into the model, use batching. Notice that when working with video data, such as AVI files, the data should be shaped as a five dimensional object. These dimensions are as follows: [batch_size, number_of_frames, height, width, channels]. In comparison, an image would have four dimensions: [batch_size, height, width, channels]. The image below is an illustration of how the shape of video data is represented.

In [None]:
train_ds = train_ds.batch(2)
val_ds = val_ds.batch(2)

train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')

val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')

In [None]:
# Build the model
model.build(train_frames)

In [None]:
# Visualize the model
keras.utils.plot_model(model, expand_nested=True, dpi=60, show_shapes=True)

Train the model

In [None]:
# Create checkpoint directory
checkpoint_loc = os.path.join('/content/drive/MyDrive/Video_classification/checkpoints', model_name)
os.makedirs(checkpoint_loc, exist_ok=True) # if the directory does not exist, then create it.

# checkpoint_path = os.path.join(checkpoint_loc, "cp-{epoch:04d}.ckpt") # use this checkpoint path when save_best_only is False
# checkpoint_path = os.path.join(checkpoint_loc, "best_model.ckpt") # # use this checkpoint path when save_best_only is True

checkpoint_path = os.path.join(checkpoint_loc, "best_model.h5") # # use this checkpoint path when save_best_only is True

In [None]:
# Callbacks
callbacks = [
    # EarlyStopping(monitor='', patience=5, verbose=1),
    ReduceLROnPlateau(factor=0.1,
                      monitor='val_loss',
                      patience=10,
                      min_lr=0.00001,
                      verbose=1,
                      mode='auto'),
    ModelCheckpoint(checkpoint_path,
                      monitor = 'val_loss',
                      verbose = 1,
                      save_best_only=True,
                      save_weights_only=True,
                      period=1),
    # CSVLogger(os.path.join(log_path, datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.csv'), separator=',', append=True),
    # TerminateOnNaN()
]



In [None]:
model.compile(loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              optimizer = keras.optimizers.Adam(learning_rate = 0.0001),
              metrics = ['accuracy'])

In [None]:
history = model.fit(x = train_ds,
                    validation_data = val_ds,
                    epochs = 50,
                    callbacks=callbacks,
                    )

Create plots of the loss and accuracy on the training and validation sets:

In [None]:
def plot_history(history, full_dir):
  """
    Plotting training and validation learning curves.

    Args:
      history: model history with all the metric measures
  """
  fig, (ax1, ax2) = plt.subplots(2,1, figsize=(5, 8))

  # Plot loss
  ax1.set_title('Loss')
  ax1.plot(history.history['loss'], label = 'train')
  ax1.plot(history.history['val_loss'], label = 'test')
  ax1.set_ylabel('Loss')

  # Determine upper bound of y-axis
  max_loss = max(history.history['loss'] + history.history['val_loss'])

  ax1.set_ylim([0, np.ceil(max_loss)])
  ax1.set_xlabel('Epoch')
  ax1.legend(['Train', 'Validation'])

  # Plot accuracy
  ax2.set_title('Accuracy')
  ax2.plot(history.history['accuracy'],  label = 'train')
  ax2.plot(history.history['val_accuracy'], label = 'test')
  ax2.set_ylabel('Accuracy')
  ax2.set_ylim([0, 1])
  ax2.set_xlabel('Epoch')
  ax2.legend(['Train', 'Validation'])

  plt.savefig(full_dir)
  # plt.show()


root = '/content/drive/MyDrive/Video_classification/'
save_fig_dir = os.path.join(root, "plots")
os.makedirs(save_fig_dir, exist_ok=True)
full_dir = os.path.join(save_fig_dir, model_name + '.png')

plot_history(history, full_dir)

Save the model

In [None]:
# model.save_weights(os.path.join(checkpoint_loc, "last_model.h5"))

### Inference

Load model

In [None]:
# # Load model from checkpoint
# from keras.models import load_model

# # Checkpoint directory
# # checkpoint_path = os.path.join('/content/drive/MyDrive/MS_sdsmt/checkpoints/LSTM_2023-10-09_01-06-20', "best_model.ckpt")

# model = load_model(checkpoint_path, compile=False)

# Uncomment to load from model weights
model.load_weights(os.path.join(checkpoint_loc, "best_model.h5"))

In [None]:
output_signature = (tf.TensorSpec(shape=(None, None, None, 3), dtype=tf.float32),
                    tf.TensorSpec(shape=(), dtype=tf.int16))

# Create the test set using a separate generator or method
test_ds = tf.data.Dataset.from_generator(ucf.FrameGeneratorMKD(test_pairs, 10, training=False),
                                        output_signature=output_signature)

# Apply similar processing steps as the training set
test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE) # omitting shuffle

test_ds = test_ds.batch(2)

Evaluate

In [None]:
model.evaluate(test_ds, return_dict=True)

To visualize model performance further, use a confusion matrix. The confusion matrix allows you to assess the performance of the classification model beyond accuracy. In order to build the confusion matrix for this multi-class classification problem, get the actual values in the test set and the predicted values.

In [None]:
def get_actual_predicted_labels(dataset):
  """
    Create a list of actual ground truth values and the predictions from the model.

    Args:
      dataset: An iterable data structure, such as a TensorFlow Dataset, with features and labels.

    Return:
      Ground truth and predicted values for a particular dataset.
  """
  actual = [labels for _, labels in dataset.unbatch()]
  predicted = model.predict(dataset)

  actual = tf.stack(actual, axis=0)
  predicted = tf.concat(predicted, axis=0)
  predicted = tf.argmax(predicted, axis=1)

  return actual, predicted

In [None]:
actual, predicted = get_actual_predicted_labels(test_ds)

In [None]:
print('Actual:\n', actual)
print('Predicted:\n', predicted)

In [None]:
# Create test directory to save test plots
test_save_dir = os.path.join(root, "predictions")
os.makedirs(test_save_dir, exist_ok=True)

### Plot confusion matrix

In [None]:
def plot_confusion_matrix(actual, predicted, full_dir):
  plt.figure(figsize=(8, 6))
  cm = tf.math.confusion_matrix(actual, predicted)
  sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
              xticklabels=['Predicted 0', 'Predicted 1'],
              yticklabels=['Actual 0', 'Actual 1'])
  plt.xlabel('Predicted label')
  plt.ylabel('True label')
  plt.title('Confusion Matrix')
  plt.savefig(full_dir)
  # plt.show()

full_dir = os.path.join(test_save_dir, 'conf_mat_' + model_name + '.png')
plot_confusion_matrix(actual, predicted, full_dir)

### Classification report

In [None]:
actual = actual.cpu().numpy().tolist()
predicted = predicted.cpu().numpy().tolist()

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.metrics import roc_curve, RocCurveDisplay, auc
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_auc_score
import pandas as pd

# ============ CM ======================
acc = accuracy_score(actual, predicted)
print('Accuracy:', acc*100)

cm = confusion_matrix(actual, predicted)

#---------------Generate confusion matrix---------------#
cr = classification_report(actual, predicted,
                           labels=[0, 1], output_dict=True)

df_cm = pd.DataFrame(cr).transpose() # convert confusion matrix to dataframe
print(df_cm)
df_cm.to_excel(os.path.join(test_save_dir, 'classification_report_' + model_name + ".xlsx"))

In [None]:
# Compute ROC curve and ROC area for each class
fpr, tpr, _ = roc_curve(actual, predicted)
roc_auc = roc_auc_score(actual, predicted)

# Plot ROC curve with AUC value annotation
plt.figure(figsize=(8, 8))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')

plt.savefig(os.path.join(test_save_dir, 'roc_' + model_name + ".png" ))
# plt.show()

Calculate precision and recall using tensorflow. The precision and recall values for each class can also be calculated using a confusion matrix.

In [None]:
def calculate_classification_metrics(y_actual, y_pred, labels):
  """
    Calculate the precision and recall of a classification model using the ground truth and
    predicted values.

    Args:
      y_actual: Ground truth labels.
      y_pred: Predicted labels.
      labels: List of classification labels.

    Return:
      Precision and recall measures.
  """
  cm = tf.math.confusion_matrix(y_actual, y_pred)
  tp = np.diag(cm) # Diagonal represents true positives
  precision = dict()
  recall = dict()
  for i in range(len(labels)):
    col = cm[:, i]
    fp = np.sum(col) - tp[i] # Sum of column minus true positive is false negative

    row = cm[i, :]
    fn = np.sum(row) - tp[i] # Sum of row minus true positive, is false negative

    precision[labels[i]] = tp[i] / (tp[i] + fp) # Precision

    recall[labels[i]] = tp[i] / (tp[i] + fn) # Recall

  return precision, recall

In [None]:
precision, recall = calculate_classification_metrics(actual, predicted, labels=[0,1]) # Test dataset

Reference: https://www.tensorflow.org/tutorials/video/video_classification