In [39]:
# !pip install torch
# !pip install opencv-python
# !pip install torchvision

In [40]:
import os
import cv2
import numpy as np
from glob import glob
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader


In [41]:
DATASET_DIR = "dataset"

# find all mp4 files
files = sorted(glob(os.path.join(DATASET_DIR, "*.mp4")))

# extract class label prefix (first 3 digits)
classes = sorted(list(set([os.path.basename(f)[:3] for f in files])))

print("Detected classes:", classes)
print("Num classes:", len(classes))

gloss_to_idx = {cls: i for i, cls in enumerate(classes)}

Detected classes: ['001', '002', '003', '004', '005', '006', '007', '008', '009', '010', '011', '012', '013', '014', '015', '016', '017', '018', '019', '020', '021', '022', '023', '024', '025', '026', '027', '028', '029', '030', '031', '032', '033', '034', '035', '036', '037', '038', '039', '040', '041', '042', '043', '044', '045', '046', '047', '048', '049', '050', '051', '052', '053', '054', '055', '056', '057', '058', '059', '060', '061', '062', '063', '064']
Num classes: 64


In [42]:
class SignDataset(Dataset):
    def __init__(self, video_folder, gloss_to_idx, num_frames=16, augment=False):
        self.folder = video_folder
        self.gloss_to_idx = gloss_to_idx
        self.num_frames = num_frames
        self.augment = augment

        self.transform = T.Compose([
            T.ToTensor(),
            T.Resize((112,112))
        ])

        # create samples list
        self.samples = []
        videos = sorted(glob(os.path.join(video_folder, "*.mp4")))

        for v in videos:
            basename = os.path.basename(v)
            cls = basename[:3]     # extract class
            if cls in gloss_to_idx:
                self.samples.append({
                    "path": v,
                    "label": gloss_to_idx[cls]
                })

        print("Loaded", len(self.samples), "videos")

    def __len__(self):
        return len(self.samples)

    def load_video(self, path):
        cap = cv2.VideoCapture(path)
        frames = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)

        cap.release()

        # handle broken videos
        if len(frames) == 0:
            return torch.zeros(self.num_frames, 3, 112, 112)

        # sample frames
        idxs = np.linspace(0, len(frames)-1, self.num_frames).astype(int)

        processed = []
        for i in idxs:
            frame = frames[i]

            # ðŸ”¥ Correct preprocessing
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, (112,112))
            
            # Data augmentation for training
            if self.augment:
                # Random horizontal flip
                if np.random.rand() > 0.4:
                    frame = cv2.flip(frame, 1)
                
                # Random brightness/contrast adjustment
                brightness = np.random.uniform(0.7, 1.3)
                frame = np.clip(frame * brightness, 0, 255).astype(np.uint8)
                
                # Random rotation (small angle)
                if np.random.rand() > 0.5:
                    angle = np.random.uniform(-10, 10)
                    h, w = frame.shape[:2]
                    M = cv2.getRotationMatrix2D((w//2, h//2), angle, 1.0)
                    frame = cv2.warpAffine(frame, M, (w, h))
            
            frame = torch.from_numpy(frame).permute(2,0,1).float() / 255.0

            processed.append(frame)

        return torch.stack(processed)


    def __getitem__(self, idx):
        item = self.samples[idx]
        path = item["path"]

        if not os.path.exists(path):
            print("MISSING VIDEO:", path)
            raise FileNotFoundError(path)

        frames = self.load_video(path)
        return frames, item["label"]


In [43]:
from sklearn.model_selection import train_test_split

dataset = SignDataset(DATASET_DIR, gloss_to_idx)

indices = np.arange(len(dataset))

train_idx, temp_idx = train_test_split(indices, test_size=0.2, shuffle=True)
val_idx, test_idx   = train_test_split(temp_idx, test_size=0.5, shuffle=True)

# Create datasets with augmentation for training
train_ds = SignDataset(DATASET_DIR, gloss_to_idx, augment=True)
train_ds = torch.utils.data.Subset(train_ds, train_idx)

val_ds   = torch.utils.data.Subset(dataset, val_idx)
test_ds  = torch.utils.data.Subset(dataset, test_idx)

train_loader = DataLoader(
    train_ds,
    batch_size=4,
    shuffle=True,
    num_workers=0,   # <-- REQUIRED on Windows
    pin_memory=True
)
val_loader   = DataLoader(val_ds, batch_size=4, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds, batch_size=4, shuffle=False, num_workers=0)

print("Train:", len(train_ds))
print("Val:", len(val_ds))
print("Test:", len(test_ds))


Loaded 3200 videos
Loaded 3200 videos
Train: 2560
Val: 320
Test: 320


In [44]:
from torchvision.models import mobilenet_v2

class CNN_LSTM(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        base = mobilenet_v2(weights="IMAGENET1K_V1")
        base.classifier = nn.Identity()
        self.cnn = base

        self.lstm = nn.LSTM(
            input_size=1280,
            hidden_size=512,
            batch_first=True,
            dropout=0.6
        )

        self.dropout = nn.Dropout(0.6)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.reshape(B*T, C, H, W)

        feat = self.cnn(x)
        feat = feat.reshape(B, T, 1280)

        out, _ = self.lstm(feat)
        out = out[:, -1]
        out = self.dropout(out)

        return self.fc(out)


In [45]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = CNN_LSTM(num_classes=len(classes)).to(device)
optimizer = optim.Adam(model.parameters(), lr=5e-5, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()


In [47]:
EPOCHS = 5
PATIENCE = 2

best_val_loss = float('inf')
patience_counter = 0

import json
with open("class_names.json", "w") as f:
    json.dump(classes, f)

for epoch in range(EPOCHS):

    model.train()
    total, correct = 0, 0
    train_loss = 0

    for frames, labels in tqdm(train_loader, desc=f"Epoch {epoch+1} [Train]"):
        frames, labels = frames.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(frames)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        correct += (outputs.argmax(dim=1) == labels).sum().item()
        total += labels.size(0)

    train_acc = correct / total
    train_loss_avg = train_loss / len(train_loader)
    
    # Validation
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for frames, labels in tqdm(val_loader, desc=f"Epoch {epoch+1} [Val]"):
            frames, labels = frames.to(device), labels.to(device)
            outputs = model(frames)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            val_correct += (outputs.argmax(dim=1) == labels).sum().item()
            val_total += labels.size(0)
    
    val_loss_avg = val_loss / len(val_loader)
    val_acc = val_correct / val_total
    
    print(f"Train Loss: {train_loss_avg:.4f} | Acc: {train_acc:.4f} | Val Loss: {val_loss_avg:.4f} | Val Acc: {val_acc:.4f}")
    
    # Early stopping
    # --- Early stopping & Saving ---
    if val_loss_avg < best_val_loss:
        best_val_loss = val_loss_avg
        patience_counter = 0
        
        # ðŸ”¥ ADD THESE LINES TO SAVE THE DATA
        # 1. Save the model weights (the "brain")
        torch.save(model.state_dict(), "best_sign_model.pth")
        
        # 2. Save the class names (the "labels")
        import json
        with open("class_names.json", "w") as f:
            json.dump(classes, f)
            
        print(f"--> Saved new best model with Val Loss: {val_loss_avg:.4f}")
    else:
        patience_counter += 1
    
    if patience_counter >= PATIENCE:
        print(f"Early stopping at epoch {epoch+1}")
        break


Epoch 1 [Train]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 640/640 [28:51<00:00,  2.71s/it]
Epoch 1 [Val]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 80/80 [02:58<00:00,  2.24s/it]


Train Loss: 3.5545 | Acc: 0.1605 | Val Loss: 2.4993 | Val Acc: 0.4375
--> Saved new best model with Val Loss: 2.4993


Epoch 2 [Train]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 640/640 [28:58<00:00,  2.72s/it]
Epoch 2 [Val]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 80/80 [03:00<00:00,  2.26s/it]


Train Loss: 2.1046 | Acc: 0.5156 | Val Loss: 1.3094 | Val Acc: 0.7438
--> Saved new best model with Val Loss: 1.3094


Epoch 3 [Train]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 640/640 [30:08<00:00,  2.83s/it]
Epoch 3 [Val]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 80/80 [03:09<00:00,  2.37s/it]


Train Loss: 1.1622 | Acc: 0.7605 | Val Loss: 0.6880 | Val Acc: 0.8812
--> Saved new best model with Val Loss: 0.6880


Epoch 4 [Train]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 640/640 [30:31<00:00,  2.86s/it]
Epoch 4 [Val]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 80/80 [03:06<00:00,  2.33s/it]


Train Loss: 0.6531 | Acc: 0.8914 | Val Loss: 0.3382 | Val Acc: 0.9437
--> Saved new best model with Val Loss: 0.3382


Epoch 5 [Train]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 640/640 [31:15<00:00,  2.93s/it]
Epoch 5 [Val]: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 80/80 [03:11<00:00,  2.39s/it]

Train Loss: 0.4198 | Acc: 0.9336 | Val Loss: 0.2472 | Val Acc: 0.9656
--> Saved new best model with Val Loss: 0.2472





In [48]:
model.eval()
correct, total = 0, 0

with torch.no_grad():
    for frames, labels in tqdm(test_loader, desc="Testing"):
        frames, labels = frames.to(device), labels.to(device)

        outputs = model(frames)
        preds = outputs.argmax(dim=1)

        correct += (preds == labels).sum().item()
        total += labels.size(0)

test_acc = correct / total
print(f"Test Accuracy: {test_acc:.4f}")

Testing: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 80/80 [03:09<00:00,  2.37s/it]

Test Accuracy: 0.9656





In [None]:
import torch
import json

# 1. Load the classes first
with open("class_names.json", "r") as f:
    loaded_classes = json.load(f)

# 2. Re-create the model structure
device = "cuda" if torch.cuda.is_available() else "cpu"
model = CNN_LSTM(num_classes=len(loaded_classes)).to(device)

# 3. Load the saved "brain" (weights)
model.load_state_dict(torch.load("best_sign_model.pth", map_location=device))
model.eval()

print("Model loaded successfully! You can now use it for predictions.") 

FileNotFoundError: [Errno 2] No such file or directory: 'class_names.json'