# LSTM-CNN Model for ASL

This notebook implements a collaborative pipeline for data acquisition, preprocessing, model definition, training, and evaluation to recognize American Sign Language (ASL) gestures.


## Setup

Install necessary packages and mount Google Drive and import all required libraries and frameworks.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install kaggle

In [None]:
from tensorflow import keras
import tensorflow as tf
import torch
import json
import os
import pathlib
import cv2
import numpy as np
import subprocess
import zipfile
import shutil
import random as rnd
from tqdm import tqdm
from torch.utils.data import Dataset
import secrets
import matplotlib.pyplot as plt
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F


## Data Acquisition

Download and extract the dataset from Kaggle (for google collab development, place kaggle.json in google drive).


In [None]:
os.makedirs("/root/.kaggle", exist_ok=True)

shutil.move("root/.kaggle/kaggle.json", "drive/MyDrive/kaggle.json")

os.chmod("/root/.kaggle/kaggle.json", 0o600)


In [None]:
with zipfile.ZipFile("artifact/wlasl-processed.zip", 'r') as zip_ref:
    zip_ref.extractall("artifact/wlasl-processed")


In [None]:
def videos_process(output_folder, json_fp, videos_folder): 
    os.makedirs(output_folder, exist_ok=True)
    # Opening the json
    with open(json_fp, "r") as file:
        data = json.load(file)


    for gloss_data in data:
        gloss_name = gloss_data["gloss"]

        for instance in gloss_data["instances"]:
            video_id = instance["video_id"]
            split = instance["split"]


            source_path = os.path.join(videos_folder, f"{video_id}.mp4")
            dest_folder = os.path.join(output_folder, split, gloss_name)
            dest_path = os.path.join(dest_folder, f"{video_id}.mp4")

            # Ensuring that destination folder exists
            os.makedirs(dest_folder, exist_ok=True)

            if os.path.exists(source_path):
                shutil.copy(source_path, dest_path)
                print(f"Copied {source_path} to {dest_path}")
            else:
                print(f"Video not found: {source_path}")


In [None]:
json_file_path = "artifact/wlasl-processed/WLASL_v0.3.json"
videos_folder = "artifact/wlasl-processed/videos"
output_folder = "drive/MyDrive/dataset_split"

videos_process(output_folder, json_file_path, output_folder)

In [None]:
def bbox_file(info, output_json_path):
    with open(info, "r") as file:
        data = json.load(file)

    id_to_bbox = {}
    for gloss_data in data:
        for instance in gloss_data["instances"]:
            video_id = instance["video_id"]
            boundbox = instance["bbox"]
            id_to_bbox[video_id] = boundbox


    with open(output_json_path, "w") as file:
        json.dump(id_to_bbox, file)

In [None]:
json_file_path = "artifact/wlasl-processed/WLASL_v0.3.json"
newjson_file_path = "drive/MyDrive/newfile.json"

bbox_file(json_file_path, newjson_file_path)

In [None]:
import json
def obtain_bbox(path_to_file: str, name: str) -> list:
    with open(path_to_file, 'r') as file:
        bboxs = json.load(file)
        return bboxs[name]

In [None]:
def crop_frame(bbox: list, frame):

    cropped_frame = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]
    return cropped_frame



In [None]:
import cv2
import torch
def capture_frames(video_path, output_file, bbox):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if bbox:
            frame = crop_frame(bbox, frame)

        if frame_count >= 5 and frame_count <= 35:
            frame_filename = f"video{os.path.basename(video_path).replace('.mp4', '')}_frame_{frame_count}.jpg"
            cv2.imwrite(os.path.join(output_file, frame_filename), frame)
        frame_count += 1






##if len(frames) < max_frames:
        ##pad_frames = [frames[-1]] * (max_frames - len(frames))
        ##frames.extend(pad_frames)


    ##video_tensor = torch.stack(frames)



In [None]:
bbox_path = "drive/MyDrive/newfile.json"
videosfilepath = "drive/MyDrive/dataset_split"
output_folder = "drive/MyDrive/frames"
os.makedirs(output_folder, exist_ok=True)

splits = ['test','train','val']


for split in splits:

    files = [f for f in pathlib.Path(os.path.join(videosfilepath,split)).iterdir()]
    for file in files:
        i = 0
        word = os.path.basename(file)

        output_file = os.path.join(output_folder, split, word)

        os.makedirs(output_file, exist_ok=True)

        if file.is_dir():
            videos = [f for f in file.iterdir()]
        else:
          print(f"⚠️ Skipping non-directory: {file}")
          continue
        for video_path in videos:

            video_id = os.path.basename(video_path).replace(".mp4", "" )
            print(video_id, video_path)
            bbox = obtain_bbox(bbox_path, video_id)

            try:
                vid_output = os.path.join(output_file, f"{i}")
                os.makedirs(vid_output, exist_ok=True)

                capture_frames(video_path, vid_output, bbox)
            except Exception as e:
                print(bbox)

            i += 1





In [None]:
class FrameSequencer(Dataset):
    def __init__(self, root_dir, sample_size = 5, batch_size = 35, target_size=(224, 224), shuffle=True):

        self.root_dir = root_dir
        self.sample_size = sample_size
        self.batch_size = batch_size
        self.target_size = target_size
        self.shuffle = shuffle

        self.classes = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
        self.class_indicies = {clss: i for i, clss in enumerate(self.classes)}

        self.samples = []

        for cls in self.classes:
            cls_dir = os.path.join(root_dir, cls)
            vids = os.listdir(cls_dir)
            for i in vids:
                frame_files = (sorted(
                    [os.path.join(cls_dir, i, fname)
                                    for fname in os.listdir(os.path.join(cls_dir, i)) if fname.lower().endswith('.jpg')]
                                    ))


                for j in range(0, len(frame_files) - self.sample_size + 1, self.sample_size):
                    sequence = frame_files[j: j + self.sample_size]
                    self.samples.append((sequence, self.class_indicies[cls]))


        self.shuffler()


    def __len__(self):
        return int(np.ceil(len(self.samples) / self.batch_size))

    def __getitem__(self, index):

        seq_paths, label = self.samples[index]
        sequence_imgs = []

        for fp in seq_paths:

            img = cv2.imread(fp)
            if img is None:
                continue

            img = cv2.resize(img, self.target_size)
            img = np.transpose(img, (2, 0, 1))
            sequence_imgs.append(img)

        images = torch.from_numpy(np.array(sequence_imgs)).float()
        label = torch.tensor(label).long()

        return images, label


    def shuffler(self):
        if self.shuffle:
            rnd.shuffle(self.samples)



In [None]:

def show_sample(g):
        images, labels = g[0]

        for i, sequence in enumerate(images):
            classname = list(g.class_indicies.keys())[list(g.class_indicies.values()).index(labels[i])]
            rows = 1
            cols = len(sequence)
            for j, image in enumerate(sequence):


                plt.subplot(rows, cols, j + 1)
                plt.imshow(cv2.cvtColor(image.numpy(), cv2.COLOR_BGR2RGB))
                plt.axis("off")
                plt.title(classname)
            plt.show()


In [None]:
class CNN_LSTM(nn.Module):
    def __init__(self, feature_size=1280, hidden_size=256, num_classes=10, num_layers=1, device="cpu"):
        super(CNN_LSTM, self).__init__()
        mobilenet = models.mobilenet_v2(pretrained=True)
        self.cnn = mobilenet.features.to(device)
        self.lstm = nn.LSTM(input_size=feature_size, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x: (batch, seq_len, channels, height, width)
        try:
          batch_size, seq_len, C, H, W = x.shape
          x = x.view(batch_size * seq_len, C, H, W)

          with torch.no_grad():
              features = self.cnn(x)
              features = F.adaptive_avg_pool2d(features, (1, 1))

          lstm_input = features.view(batch_size, seq_len, -1)

          output, _ = self.lstm(lstm_input)
          last_hidden = output[:, -1, :]
          return self.fc(last_hidden)
        except Exception as e:
          print(f"The error is {e}\n The output of x.shape is: {x.shape} \n\n\n X is: {x}")





In [None]:
from torch.utils.data import DataLoader


dataset = FrameSequencer(root_dir=r"drive/MyDrive/frames/train", batch_size=5, sample_size=5)
val_dataset = FrameSequencer(root_dir=r"drive/MyDrive/frames/val", batch_size=5, sample_size=5)
test_dataset = FrameSequencer(root_dir=r"drive/MyDrive/frames/test", batch_size=5, sample_size=5)





In [None]:

train_dataloader = DataLoader(dataset, batch_size=128, shuffle=True, num_workers=2,
                            pin_memory=True)
val_dataloader = DataLoader(val_dataset, batch_size=128, num_workers=2,
                          pin_memory=True)

In [None]:
model = CNN_LSTM(num_classes=len(dataset.classes), device="cuda").to("cuda")
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0, restore_best_weights=True):

        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.early_stop = False
        self.best_loss = float('inf')
        self.best_model_state = None
        self.restore_best_weights = restore_best_weights

    def __call__(self, model, validation_loss):
        if validation_loss < self.best_loss - self.min_delta:
            self.best_loss = validation_loss
            self.counter = 0
            if self.restore_best_weights:
                self.best_model_state = deepcopy(model.state_dict())
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

    def restore_model(self, model):
        if self.best_model_state is not None:
            model.load_state_dict(self.best_model_state)
        return model

In [None]:
def validate(model, dataloader, criterion, device="cpu"):
    model.eval()
    losses = []
    with torch.no_grad():
        for batch_x, batch_y in dataloader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            losses.append(loss.item())
    return sum(losses) / len(losses)

def train_one_epoch(model, train_loader, criterion, optimizer, device="cpu"):
    model.train()
    total_loss = 0.0
    for batch_x, batch_y in tqdm(train_loader, desc="Training", leave=False):
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    return total_loss / len(train_loader)

def training_loop(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    early_stopper=None,
    epochs=10,
    device="cpu"
):
    for epoch in range(epochs):
        # Training
        model.train()
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)


        # Validation
        val_loss = validate(model, val_loader, criterion, device)

        # Early stopping
        if early_stopper:
            early_stopper(model, val_loss)
            if early_stopper.early_stop:
                print(f"Early stopping triggered at epoch {epoch+1}")
                break

        print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

    # Restore best model if early stopping was used
    if early_stopper and early_stopper.restore_best_weights:
        model = early_stopper.restore_model(model)

    return model

In [None]:
early_stopping = EarlyStopping()


In [None]:
model = training_loop(
    model=model,
    train_loader=train_dataloader,
    val_loader=val_dataloader,
    criterion=criterion,
    optimizer=optimizer,
    early_stopper=early_stopping,
    epochs=10,
    device="cuda"
)

In [None]:
torch.save(model.state_dict(), "asl_model.pth")