Import libraries

In [1]:
import os
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from torch.autograd import Variable
import torch.optim as optim
from PIL import Image
import pandas as pd
import cv2
import zipfile
print(torch.__version__)

  from .autonotebook import tqdm as notebook_tqdm


1.13.1


Setup the environment

In [2]:
current_dir = os.getcwd()
os.chdir(os.path.dirname(current_dir))
current_dir = os.getcwd()
print('Current directory: ', current_dir)

Current directory:  /Users/oksanaerm/gesture_recognition


In [3]:
data_dir = os.path.join(current_dir, 'data/')
train_dir = os.path.join(data_dir, 'train/')
val_dir = os.path.join(data_dir, 'val/')
test_dir = os.path.join(data_dir, 'test/')
print('Data directory: ', data_dir)

Data directory:  /Users/oksanaerm/gesture_recognition/data/


Import data

In [None]:
! pip install kaggle

In [6]:
with open("kaggle.json", "r") as f:
    import json
    api_token = json.load(f)
os.environ['KAGGLE_USERNAME'] = api_token["username"]
os.environ['KAGGLE_KEY'] = api_token["key"]
print('Kaggle API token loaded.')

Kaggle API token loaded.


In [9]:
! kaggle datasets download -d imsparsh/gesture-recognition -p data

# Check for the presence of the dataset file
if any(file.endswith('.zip') for file in data_dir):
    print("Dataset downloaded successfully.")
else:
    print("Dataset not found.")

Downloading gesture-recognition.zip to data
100%|█████████████████████████████████████▉| 1.60G/1.60G [09:15<00:00, 5.82MB/s]
100%|██████████████████████████████████████| 1.60G/1.60G [09:15<00:00, 3.09MB/s]
Dataset not found.


In [12]:
with zipfile.ZipFile(os.path.join(data_dir, 'gesture-recognition.zip'), 'r') as zip_ref:
    zip_ref.extractall(data_dir)
    print("Dataset extracted successfully.")
os.remove(os.path.join(data_dir, 'gesture-recognition.zip'))

Dataset extracted successfully.


In [4]:
def get_transform(image_size: tuple) -> transforms.Compose:
    return transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(image_size),
        transforms.ToTensor()
    ])

In [5]:
class GestureDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.gesture_folders = sorted(
            [folder for folder in os.listdir(self.root_dir)])
        self.class_to_label = {
            'Thumbs_Down': 0, 'Right_Swipe': 1, 'Thumbs_Up': 2, 'Left_Swipe': 3, 'Stop': 4}

    def __len__(self):
        return len(self.gesture_folders)

    def __getitem__(self, idx):
        gesture_path = os.path.join(self.root_dir, self.gesture_folders[idx])

        # Normalize folder name by replacing underscores and converting to lower case
        normalized_folder_name = self.gesture_folders[idx].replace(
            '_', ' ').lower()

        # Try to match normalized folder name with normalized keys from the dictionary
        for gesture_type in self.class_to_label.keys():
            normalized_gesture_type = gesture_type.replace('_', ' ').lower()
            if normalized_gesture_type in normalized_folder_name:
                gesture_class_str = gesture_type
                break
        else:
            raise ValueError(
                f"Unknown gesture type in folder name: {self.gesture_folders[idx]}")

        # Use the dictionary to get the integer label
        gesture_class = self.class_to_label.get(gesture_class_str, -1)

        frames = []
        for img_name in sorted(os.listdir(gesture_path)):
            img_path = os.path.join(gesture_path, img_name)
            image = Image.open(img_path).convert('RGB')  # Convert to RGB
            image = np.array(image)  # Convert to ndarray
            if self.transform:
                image = self.transform(image)
            frames.append(image)

        frames_tensor = torch.stack(frames, dim=0)

        return frames_tensor, gesture_class

In [6]:
BATCH_SIZE = 4
IMAGE_SIZE = (120, 160)
SHUFFLE = True

# Initialize transforms
transform = get_transform(IMAGE_SIZE)

# Initialize train dataset and DataLoader
train_dataset = GestureDataset(
    root_dir=train_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=SHUFFLE)

# Initialize validation dataset and DataLoader
val_dataset = GestureDataset(root_dir=val_dir, transform=transform)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=SHUFFLE)


Model architecture

In [10]:
import torch.nn.init as init

# Initialize weights
class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool3d(1)
        self.fc = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction, in_channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1, 1)
        return x * y.expand_as(x)

# Define the model
class Deep3DCNN(nn.Module):
    def __init__(self, num_classes=5):
        super(Deep3DCNN, self).__init__()
        
        # Convolutional layers
        self.conv_layer = nn.Sequential(
            nn.Conv3d(in_channels=30, out_channels=64, kernel_size=(1, 3, 3), padding=(0, 1, 1), groups=2),
            nn.BatchNorm3d(64),
            nn.ReLU(inplace=True),
            SEBlock(64),
            nn.MaxPool3d((1, 2, 2), stride=(1, 2, 2)),
            
            nn.Conv3d(in_channels=64, out_channels=128, kernel_size=(1, 3, 3), padding=(0, 1, 1), groups=2),
            nn.BatchNorm3d(128),
            nn.ReLU(inplace=True),
            SEBlock(128),
            nn.MaxPool3d((1, 2, 2), stride=(1, 2, 2)),
            nn.Dropout3d(0.2),

            nn.Conv3d(in_channels=128, out_channels=256, kernel_size=(1, 3, 3), padding=(0, 1, 1), groups=2),
            nn.BatchNorm3d(256),
            nn.ReLU(inplace=True),
            SEBlock(256),
            nn.MaxPool3d((1, 2, 2), stride=(1, 2, 2)),
            nn.Dropout3d(0.2),
        )
        # Adaptive pool to make output size (batch_size, channels, 1, 1, 1)
        self.adaptive_pool = nn.AdaptiveAvgPool3d((1, 1, 1))
        
        # Fully connected layer
        self.fc_layer = nn.Sequential(
            nn.Linear(256, 2048),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            nn.Linear(2048, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, num_classes)
        )

        self._initialize_weights()

    # Initialize weights using Xavier initialization to improve convergence
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                init.xavier_normal_(m.weight)
            elif isinstance(m, nn.Linear):
                init.xavier_normal_(m.weight)

    def forward(self, x):
        x = self.conv_layer(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layer(x)
        return x

Initialise the model

In [12]:
from torch.optim import lr_scheduler

# Initialize the model
num_classes = 5
model = Deep3DCNN(num_classes=num_classes)
if torch.cuda.is_available():
    model = model.cuda()

# Initialize optimizer with weight decay and different learning rates
optimizer = torch.optim.Adam([
    {'params': model.conv_layer.parameters(), 'lr': 1e-4},
    {'params': model.fc_layer.parameters(), 'lr': 1e-3},
], lr=1e-3, weight_decay=1e-5)

# Loss function
criterion = nn.CrossEntropyLoss()

# Learning rate scheduler
scheduler = lr_scheduler.ReduceLROnPlateau(
    optimizer, 'min', patience=5, verbose=True)

# DataLoader initialization
train_dataloader = train_loader
val_dataloader = val_loader

Training pipeline

In [18]:
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from torch.cuda.amp import autocast, GradScaler

# Hyperparameter Configuration
config = {
    'num_epochs': 15,
    'log_interval': 10,
    'val_interval': 1,
    'batch_size': 32,
    'lr': 0.001,
}

writer = SummaryWriter()
scaler = GradScaler()
best_val_loss = float('inf')

# Main Loop
for epoch in range(config['num_epochs']):
    model.train()
    running_loss = 0.0
    
    # Progress tracking with tqdm
    for i, (videos, labels) in enumerate(tqdm(train_dataloader, desc=f"Epoch {epoch + 1}")):
        
        if torch.cuda.is_available():
            videos, labels = videos.cuda(), labels.cuda()
        
        optimizer.zero_grad()
        
        # Using context manager for AMP
        with autocast():
            outputs = model(videos)
            loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # In-place operation for running_loss
        running_loss += loss.item()
        
        # Log and reset running loss
        if i % config['log_interval'] == (config['log_interval'] - 1):
            writer.add_scalar('Training Loss', running_loss / config['log_interval'], epoch * len(train_dataloader) + i)
            running_loss = 0.0

    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for videos, labels in tqdm(val_dataloader, desc=f"Validation Epoch {epoch + 1}"):
            
            if torch.cuda.is_available():
                videos, labels = videos.cuda(), labels.cuda()
            
            outputs = model(videos)
            loss = criterion(outputs, labels)
            
            # In-place operation for val_loss
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(val_dataloader)
    writer.add_scalar('Validation Loss', val_loss, epoch)
    accuracy = 100 * correct / total
    writer.add_scalar('Validation Accuracy', accuracy, epoch)

    print(
        f"[Validation] Epoch {epoch + 1} | Loss: {val_loss:.3f} | Accuracy: {accuracy:.2f}%")

    # Scheduler step based on the validation loss
    scheduler.step(val_loss)

    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_gesture_recog.pth')

Epoch 1: 100%|██████████| 166/166 [04:28<00:00,  1.62s/it]
Validation Epoch 1: 100%|██████████| 25/25 [00:21<00:00,  1.17it/s]


[Validation] Epoch 1 | Loss: 1.204 | Accuracy: 41.00%


Epoch 2: 100%|██████████| 166/166 [04:25<00:00,  1.60s/it]
Validation Epoch 2: 100%|██████████| 25/25 [00:20<00:00,  1.19it/s]


[Validation] Epoch 2 | Loss: 1.207 | Accuracy: 45.00%


Epoch 3: 100%|██████████| 166/166 [04:25<00:00,  1.60s/it]
Validation Epoch 3: 100%|██████████| 25/25 [00:21<00:00,  1.18it/s]


[Validation] Epoch 3 | Loss: 1.066 | Accuracy: 53.00%


Epoch 4:  93%|█████████▎| 154/166 [04:05<00:19,  1.66s/it]

Test pipeline

In [99]:
transform = transforms.Compose([
    transforms.Resize((120, 160)),
    transforms.ToTensor(),
])

In [None]:
from collections import deque

# Initialize the camera
camera = cv2.VideoCapture(0)

# Initialize the model and set it to evaluation mode
model = Deep3DCNN(num_classes=5)
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

# Initialize deque for storing recent frames
recent_frames = deque(maxlen=30)

In [None]:
while True:
    ret, frame = camera.read()
    if not ret:
        print("Failed to grab frame")
        break

    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    rgb_frame = Image.fromarray(rgb_frame)
    transformed_frame = transform(rgb_frame)

    # Add the frame to deque
    recent_frames.append(transformed_frame)

    if len(recent_frames) == 30:
        # Make a prediction
        test_sample = torch.stack(list(recent_frames), dim=0).unsqueeze(0)
        if torch.cuda.is_available():
            test_sample = test_sample.cuda()

        with torch.no_grad():
            output = model(test_sample)
            _, prediction = torch.max(output.data, 1)

        print(f"Predicted Gesture: {prediction.item()}")

        # Display prediction on the frame
        cv2.putText(frame, f"Gesture: {prediction.item()}", (10, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    # Display the resulting frame
    cv2.imshow("Gesture Recognition", frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

In [None]:
#Clean up
camera.release()
cv2.destroyAllWindows()