In [1]:
!pip install opencv-python numpy tensorflow scikit-learn matplotlib wandb tdqm wurlitzer



In [2]:
#all the required dependencies of the project
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import random
import wandb

from typing import Tuple
from collections import Counter
import albumentations as A
from tqdm import tqdm
from typing import List
import json
import tensorflow.keras as keras 
import tensorflow as tf

In [3]:
class CONFIG:
    ROOT_DIRECTORY = os.path.join("..", "data", "WLASL")
    JSON_FILE = "WLASL_v0.3.json"
    NSLT_FILE = "nslt_100.json"
    VIDEO_FOLDER = "videos"

    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]
    DEBUG = True
    
    P_OF_TRANSFORM = 0.9
    P_OF_TRANSFORM_COLOR = 0.2
    
    SHIFT_LIMIT=0.1
    SCALE_LIMIT=0.1
    ROTATE_LIMIT=10
    
    # set to small, when prototyping, or 0 when deploying to cloud or PC with loads of RAM
    DATA_LIMIT = 100
    FRAME_SIZE = 20
    
    
    BATCH_SIZE = 4
    PORTION_OF_DATA_FOR_TRAINING = 0.8

    ROUND_DIGIT = 3
    #WANDB_RUN = "mediapipe-asl-dataset"

In [6]:
from sklearn.model_selection import train_test_split
from math import ceil

class SignRecognitionDataset(keras.utils.Sequence):

    def __init__(self, max_start : int, max_end) -> None:
        # setup the paths
        video_path = os.path.join(CONFIG.ROOT_DIRECTORY, CONFIG.VIDEO_FOLDER)
        dataset_description = os.path.join(CONFIG.ROOT_DIRECTORY)

        # load the filepaths for videos
        self.video_paths = [os.path.join(video_path, file) for file in os.listdir(video_path)]

        # load the dataset config json
        self.config_json = None
        with open(os.path.join(CONFIG.ROOT_DIRECTORY, CONFIG.JSON_FILE)) as f:
            self.config_json = json.load(f)

        # load the dataset json
        self.dataset_json = None
        with open(os.path.join(CONFIG.ROOT_DIRECTORY, CONFIG.NSLT_FILE)) as f:
            self.dataset_json = json.load(f)
        
        self.videos_paths = []
        self.paths_not_found = []
        self.labels = []
        self.start_frames = []
        self.end_frames = []
       
        self.dataset_json = None
        with open(os.path.join(CONFIG.ROOT_DIRECTORY, CONFIG.NSLT_FILE)) as f:
            self.dataset_json = json.load(f) 

        for el in tqdm(self.dataset_json.items()):
            video_id, properties = el[0], el[1]
            path = os.path.join(video_path, video_id + ".mp4")
            
            if not os.path.exists(path):
                self.paths_not_found.append(path)
                continue

            subset = properties["subset"]
            label, start, end = properties["action"]
            
            if start > max_start:
                continue
                
            if end > max_end:
                continue
            
            self.videos_paths.append(path)
            self.labels.append(label)
            self.start_frames.append(start)
            self.end_frames.append(end)
    
        self.videos_paths = np.array(self.video_paths)
        self.paths_not_found = np.array(self.paths_not_found)
        self.labels = np.array(self.labels)
        self.start_frames = np.array(self.start_frames)
        self.end_frames = np.array(self.end_frames)

        self.unique_labels = np.unique(self.labels)
        
    
    def preprocess_trajectory(self, traj : List[np.ndarray]):
        return traj
    
    def __len__(self):
        return len(self.videos_paths)

    def __getitem__(self, idx):
        path, label = self.videos_paths[idx], self.labels[idx]
        trajectory = SignRecognitionDataset.get_video(path)
        
        return self.preprocess_trajectory(trajectory), label

    def permutate(self):
        l = len(self.videos_paths)
        mask = np.arange(l)
        np.random.shuffle(mask)
        
        self.videos_paths = np.array(self.videos_paths)[mask]
        self.labels = np.array(self.labels)[mask]
        self.start_frames = np.array(self.start_frames)[mask]
        self.end_frames = np.array(self.end_frames)[mask]
        
    def sort_by_size(self):
        c = Counter(self.labels)
        _mask = sorted([ (10000 * c[l] + l, i) for i, l in enumerate(self.labels)])[::-1]
        mask_by_size = np.array([el[1] for el in _mask])

        self.video_paths        = np.array(self.video_paths)[mask_by_size]
        self.labels             = np.array(self.labels)[mask_by_size]
        self.start_frames       = np.array(self.start_frames)[mask_by_size]
        self.end_frames         = np.array(self.end_frames)[mask_by_size]

    def crop_video(self, trajectory : np.array) -> np.array:
        cropped = trajectory
        
        if self.FRAME_SIZE != 0:
            frame_size = len(trajectory)
            start = 0 
            
            if frame_size > self.FRAME_SIZE:
                start = np.random.randint(0, frame_size - self.FRAME_SIZE)
            cropped = trajectory[start: (start + self.FRAME_SIZE)]
                    
            if len(cropped) < self.FRAME_SIZE:
                necessary = self.FRAME_SIZE - len(cropped)
                t, h, w, c = trajectory.shape
                cropped = np.concatenate([cropped, np.zeros((necessary, h, w, c))], axis= 0)
                
            return cropped
                        
        return trajectory
                
    @staticmethod
    def get_video(video_path : str) -> List[np.ndarray]:

        if not os.path.exists(video_path):
            return None

        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            return None

        frames = []
        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                frame_array = np.array(frame)
                frames.append(cv2.cvtColor(frame_array, cv2.COLOR_BGR2RGB))
            else:
                break

        cap.release()
        return np.array(frames)

    @staticmethod
    def rescale_video(frames : np.ndarray, desired_shape) -> np.ndarray:
        refined = []
        for img in frames: 
            y, x, c = img.shape
            cropped = img[:, (x // 2 - y//2) : (x // 2 + y//2), :]
            scaled = cv2.resize(cropped, desired_shape)
            refined.append(scaled)
    
        return np.array(refined)

In [19]:
class SignRecognitionDatasetMHICachedV2(SignRecognitionDataset):

    def __init__(self, max_start: int, max_end, 
                 per_image_transform=None,
                 after_MHI_transform=None,
                 scaled_resolution : Tuple[int]= (224, 224),
                 frame_size:int=CONFIG.FRAME_SIZE,
                 data_limit : int = CONFIG.DATA_LIMIT,
                 decay : float = 0.7,
                 threshold_method : str = "regular",
                 threshold_val : float = 25/255.,
                 by_size=True,
                 train_val_split:float=CONFIG.PORTION_OF_DATA_FOR_TRAINING,
                 split:str="train") -> None:
        super().__init__(max_start, max_end)
        self.scaled_resolution = scaled_resolution
        self.DATA_LIMIT = data_limit
        self.by_size = by_size
        self.per_image_transform = per_image_transform
        self.after_MHI_transform = after_MHI_transform
        self.FRAME_SIZE = frame_size
        self.decay = decay
        self.threshold_method = threshold_method
        self.threshold_val = threshold_val
        
        self.keywords = ["image" ] + list(str(i) for i in range(frame_size-1))
        
        self.sort_by_size()
        
        if self.DATA_LIMIT > 0:
            self.video_paths        = self.video_paths[:self.DATA_LIMIT]
            self.labels             = self.labels[:self.DATA_LIMIT]
            self.start_frames       = self.start_frames[:self.DATA_LIMIT]
            self.end_frames         = self.end_frames[:self.DATA_LIMIT]
        
        
        self.label_2_id = { key : i for i, key in enumerate(np.unique(self.labels))}
        if data_limit < 0:
            train_ds_x, val_ds_x = train_test_split(self.videos_paths, train_size=train_val_split, random_state=42)
            train_ds_y, val_ds_y = train_test_split(self.labels, train_size=train_val_split, random_state=42)
            
        else:
            self.unique_labels = np.unique(self.labels[:data_limit])
            self.label_2_id = { key : i for i, key in enumerate(np.unique(self.labels))}
            
            train_ds_x, val_ds_x = train_test_split(self.videos_paths[:data_limit], train_size=train_val_split, random_state=42)
            train_ds_y, val_ds_y = train_test_split(self.labels[:data_limit], train_size=train_val_split, random_state=42)
                                    
        if split.lower() == "train":
            self.videos_paths = train_ds_x
            self.labels = train_ds_y
            
        elif split.lower() == "val":
            self.videos_paths = val_ds_x
            self.labels = val_ds_y
        
        else: 
            raise Exception("")

        self.cache_data()

    
    def cache_data(self):    
        self.cached_X = []
        self.cached_Y = []
        
        for i,path in tqdm(enumerate(self.videos_paths), desc="Cacheing"):
            
            trajectory = SignRecognitionDataset.get_video(path)  
            trajectory, label = self.preprocess_trajectory(trajectory), self.labels[i]     
            
            cached_X = []
            
            trajectory_length = trajectory.shape[0]
            if trajectory_length < self.FRAME_SIZE:
                cached_X.append(self.postprocess_trajectory(trajectory))
            else:
                for i in range(0, trajectory_length - self.FRAME_SIZE):
                    cached_X.append(self.postprocess_trajectory(trajectory[i: (i + self.FRAME_SIZE)]))

            
            onehotencoded = np.zeros(len(self.unique_labels))
            onehotencoded[self.label_2_id[label]] = 1.0
                    
            self.cached_X.append(cached_X)
            self.cached_Y.append(onehotencoded)
        
    def preprocess_trajectory(self, traj : List[np.ndarray]):
        return SignRecognitionDataset.rescale_video(traj, self.scaled_resolution)
        
    def __len__(self):
        return len(self.videos_paths)

    def postprocess_trajectory(self, traj : np.ndarray) -> np.ndarray:
        # timeframe, Width, height, channels
        ts, w, h, c = traj.shape
        
        mhi = np.zeros((w, h))
                
        for i in range(1, ts):
            frame_diff = np.abs(traj[i] - traj[i-1])
            gray_diff = np.mean(frame_diff, axis=2)
            
            _, binary_diff = cv2.threshold(gray_diff, self.threshold_val, 1.0, cv2.THRESH_BINARY)
    
    
            mhi = mhi * (1.0 - self.decay) + binary_diff

        mhi = mhi.reshape(mhi.shape[0], mhi.shape[1], 1)
        return np.concatenate([mhi,mhi,mhi], axis=2)


    def __getitem__(self, idx):
        
        trajectory, label = self.cached_X[idx], self.cached_Y[idx]
        
        # get random MHI from trajectory
        traj_len = len(trajectory)
        
        trajectory = trajectory[np.random.randint(traj_len)]        
        trajectory = self.per_image_transform(image=trajectory)["image"]

        return trajectory, label
    
    
import matplotlib.pyplot as plt 

transform = A.Compose(
    [
        A.Normalize(mean=CONFIG.mean, std=CONFIG.std),
        A.HorizontalFlip(p=CONFIG.P_OF_TRANSFORM),
        A.ShiftScaleRotate(p=CONFIG.P_OF_TRANSFORM, shift_limit=CONFIG.SHIFT_LIMIT, 
                           scale_limit=CONFIG.SCALE_LIMIT, rotate_limit=CONFIG.ROTATE_LIMIT),
        #A.RandomBrightnessContrast(p=CONFIG.P_OF_TRANSFORM_COLOR),
        #A.RGBShift(p=CONFIG.P_OF_TRANSFORM_COLOR),
    ]
)

train_ds = SignRecognitionDatasetMHICachedV2(1, 150, per_image_transform=transform, decay=0.09,
                                             data_limit=10, split="train")
val_ds = SignRecognitionDatasetMHICachedV2(1, 150, per_image_transform=transform, decay=0.09,
                                           data_limit=10, split="val")


print(f"size of train_ds = {len(train_ds)}, size of val_ds = {len(val_ds)}")
x, y = next(iter(train_ds))
x_shape = x.shape
y_shape = y.shape
print(x_shape, y_shape)

100%|██████████| 2038/2038 [00:00<00:00, 42621.05it/s]
Cacheing: 8it [00:09,  1.22s/it]
100%|██████████| 2038/2038 [00:00<00:00, 42861.04it/s]
Cacheing: 2it [00:01,  1.22it/s]

size of train_ds = 8, size of val_ds = 2
(224, 224, 3) (1,)





In [5]:
def dataset_train_generator():
    # Instantiate your existing dataset loader

    for i in range(len(train_ds)):
        X_batch, Y_batch = train_ds[i]
        yield X_batch, Y_batch
        
def dataset_val_generator():
    # Instantiate your existing dataset loader

    for i in range(len(val_ds)):
        X_batch, Y_batch = val_ds[i]
        yield X_batch, Y_batch        

train_dataset = tf.data.Dataset.from_generator(
    lambda: dataset_train_generator(),
    output_types=(tf.float32, tf.float32),  # Adjust types based on your actual data
    output_shapes=(x_shape, y_shape)
).prefetch(tf.data.AUTOTUNE).batch(CONFIG.BATCH_SIZE)

val_dataset = tf.data.Dataset.from_generator(
    lambda: dataset_val_generator(),
    output_types=(tf.float32, tf.float32),  # Adjust types based on your actual data
    output_shapes=(x_shape, y_shape)
).prefetch(tf.data.AUTOTUNE).batch(CONFIG.BATCH_SIZE)

NameError: name 'x_shape' is not defined

In [25]:
import time 

ls = [] 

start_t = time.time()
for x,y in tqdm(train_dataset):
    ls.append(y)

print(f"iteration through dataset took :  {round(time.time() - start_t, CONFIG.ROUND_DIGIT)} s")

10it [00:05,  1.73it/s]

iteration through dataset took :  5.791 s





## Training - Setup

In [23]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, LeakyReLU
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras import backend as K
from wandb.keras import WandbMetricsLogger
from keras.callbacks import Callback
from wandb.keras import WandbMetricsLogger
import tensorflow as tf
import wandb


class CosineAnnealingLearningRateScheduler(Callback):
    def __init__(self, max_lr, min_lr, T_max):
        super(CosineAnnealingLearningRateScheduler, self).__init__()
        self.max_lr = max_lr  # Maximum learning rate (i.e., start learning rate)
        self.min_lr = min_lr  # Minimum learning rate
        self.T_max = T_max    # Specifies the number of epochs per cycle
        self.t = 0            # Current epoch

    def on_epoch_begin(self, epoch, logs=None):
        self.t += 1
        cos = np.cos(np.pi * (self.t % self.T_max) / self.T_max)
        lr = self.min_lr + 0.5 * (self.max_lr - self.min_lr) * (1 + cos)

        keras.backend.set_value(self.model.optimizer.lr, lr)

def keras_train(model, filepath : str, run_name : str, max_lr = 1e-4, min_lr = 5e-5, T_max=50, epochs=100,
                decay = 0.09,USE_WANDB=True): 


    train_ds = SignRecognitionDatasetMHI(1, 150, per_image_transform=transform, decay=decay, split="train")
    val_ds = SignRecognitionDatasetMHI(1, 150, per_image_transform=transform, decay=decay, split="val")
    
    checkpoint = keras.callbacks.ModelCheckpoint(filepath,
                                                 monitor="val_categorical_accuracy",
                                                 verbose=0,
                                                 save_best_only=True,
                                                 mode="max",
                                                 save_freq="epoch")
    
    cosine_annealer = CosineAnnealingLearningRateScheduler(max_lr=max_lr,
                                                           min_lr=min_lr,
                                                           T_max=T_max)

    callbacks = [checkpoint, cosine_annealer]

    if USE_WANDB:
        wandb.init(project=CONFIG.WANDB_RUN,
                        name=run_name,
                        notes="Model summary : \n" + str(model),
                        config={"max_lr" : max_lr, 
                                "min_lr" : min_lr, 
                                "scheduler" : "cosineAnnealer", 
                                "epochs" : epochs, 
                                "T_max" : T_max, 
                                "train_size" : len(train_ds.labels),
                                "val_size" : len(val_ds.labels),
                                "unique_classes" : len(np.unique(train_ds.labels)), 
                                "video_length" : CONFIG.FRAME_SIZE,
                                "decay" : decay,
                                "SHIFT_LIMIT" : CONFIG.SHIFT_LIMIT,
                                "SCALE_LIMIT" : CONFIG.SCALE_LIMIT,
                                "ROTATE_LIMIT" : CONFIG.ROTATE_LIMIT
                               })
        callbacks.append(WandbMetricsLogger())
    
    #Adam Optimizer - fixed learning rate.
    adam_optimizer = tf.keras.optimizers.Adam(learning_rate=max_lr, clipnorm=1.)
    #lr_metric = get_lr_metric(adam_optimizer)

    model.compile(optimizer=adam_optimizer, loss='categorical_crossentropy', metrics=['categorical_accuracy'])
    
    
    #history = model.fit(train_dataset_parquet, epochs=epochs, validation_data = val_dataset_parquet, batch_size = 8, callbacks=[WandbMetricsLogger(), checkpoint, cosine_annealer])
    history = model.fit(train_ds, epochs=epochs, validation_data = val_ds, batch_size = 8, callbacks=callbacks)
    #wandb.finish()
    if USE_WANDB:      
        wandb.finish()
    
    return history

## Train MobileNetV3Small Transfer Learning with ImageNet

In [14]:
MHI_SHAPE = (224,224,3)
#reshaped_data = data.reshape(-1, 480, 640, 1)
# get the base model, exclude final dense layers - we will modify/output this
base_model = tf.keras.applications.MobileNetV3Small(input_shape = MHI_SHAPE,
                                               include_top = False,
                                               weights = 'imagenet',
                                               pooling='max')
# Freeze the convolutional base
base_model.trainable = False
#get the base model summary
base_model.summary()

Model: "MobilenetV3small"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 rescaling (Rescaling)       (None, 224, 224, 3)          0         ['input_2[0][0]']             
                                                                                                  
 Conv (Conv2D)               (None, 112, 112, 16)         432       ['rescaling[0][0]']           
                                                                                                  
 Conv/BatchNorm (BatchNorma  (None, 112, 112, 16)         64        ['Conv[0][0]']                
 lization)                                                                         

In [15]:
model = Sequential()
base = tf.keras.applications.ResNet50V2(include_top=False)
model.add(base)
model.add(Dense(len(train_ds.unique_labels), activation='softmax'))
model.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50v2_weights_tf_dim_ordering_tf_kernels_notop.h5
Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50v2 (Functional)     (None, None, None, 2048   23564800  
                             )                                   
                                                                 
 dense_1 (Dense)             (None, None, None, 7)     14343     
                                                                 
Total params: 23579143 (89.95 MB)
Trainable params: 23533703 (89.77 MB)
Non-trainable params: 45440 (177.50 KB)
_________________________________________________________________


In [33]:
#len(train_ds.unique_labels) outputs the length of the labels.
#MobileNetV3 without additional dense layer - add softmax classification layer
model = Sequential()
model.add(base_model)
model.add(Dense(len(train_ds.unique_labels), activation='softmax'))

keras_train(model, filepath=os.path.join("models", "MHI_MobileNetV3Small.tf"))

Epoch 1/100
     10/Unknown - 11s 587ms/step - loss: 2.7736 - categorical_accuracy: 0.1250INFO:tensorflow:Assets written to: models\MHI_MobileNetV3Small.tf\assets


INFO:tensorflow:Assets written to: models\MHI_MobileNetV3Small.tf\assets


Epoch 2/100


INFO:tensorflow:Assets written to: models\MHI_MobileNetV3Small.tf\assets


Epoch 3/100


INFO:tensorflow:Assets written to: models\MHI_MobileNetV3Small.tf\assets


Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100

KeyboardInterrupt: 