In [2]:
import cv2 
import numpy as np 
import os 
import matplotlib.pyplot as plt 
import time 
import mediapipe as mp 
import torch 
import torch.nn as nn 
from tqdm import tqdm

2025-06-06 14:56:03.814133: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1749221764.064340      13 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1749221764.145951      13 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [3]:
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [4]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False                  
    results = model.process(image)                
    image.flags.writeable = True                   
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) 
    return image, results

def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION)
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) 
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) 
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) 

def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [5]:
def normalize_keypoint_block(block,dim = 3):
    if np.all(block == 0):
        return block.flatten()

    block = block.reshape(-1,dim)
    mean = np.mean(block,axis = 0)
    std = np.std(block,axis = 0) + 1e-6
    normed = (block - mean)/std 
    return normed.flatten()

def normalize_frames(frame):
    pose = frame[:132].reshape(33,4)
    left = frame[132:195].reshape(21,3)
    right = frame[195:258].reshape(21,3)

    pose_norm = normalize_keypoint_block(pose,dim = 4)
    #face_norm = normalize_keypoint_block(face,dim = 3)
    left_norm = normalize_keypoint_block(left,dim = 3)
    right_norm = normalize_keypoint_block(right,dim = 3)

    return np.concatenate([pose_norm,left_norm,right_norm])

def normalize_keypoints(keypoints):
    return np.array([normalize_frames(frame) for frame in keypoints])

In [6]:
from sklearn.model_selection import train_test_split
from shutil import copy2

In [7]:
file_paths = []
labels = []
data_dir = '/kaggle/input/keypoint-lstm/cc/'
DEST_DIR = '/kaggle/working/keypoint/'
for class_name in os.listdir(data_dir):
    class_dir = os.path.join(data_dir,class_name)
    for file in os.listdir(class_dir):
        if file.endswith('.npy'):
            file_paths.append(os.path.join(class_dir,file))
            labels.append(class_name)
train_files,valid_files,train_labels, valid_labels = train_test_split(file_paths,labels,test_size = 0.2,stratify=labels,random_state = 11)
def copy_split(files, labels, split_name):
    for fpath, label in tqdm(zip(files, labels), total=len(files), desc=split_name):
        out_dir = os.path.join(DEST_DIR, split_name, label)
        os.makedirs(out_dir, exist_ok=True)
        copy2(fpath, os.path.join(out_dir, os.path.basename(fpath)))

copy_split(train_files, train_labels, 'train')
copy_split(valid_files, valid_labels, 'valid')

train: 100%|██████████| 403/403 [00:02<00:00, 183.07it/s]
valid: 100%|██████████| 101/101 [00:00<00:00, 198.50it/s]


In [8]:
allowed_action = [name for name in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, name))]
len(allowed_action)
label_map = {i : name for name,i in enumerate(allowed_action)}
label_map

{'nice': 0,
 'name': 1,
 'please': 2,
 'yes': 3,
 'sit': 4,
 'hello': 5,
 'meet': 6,
 'you': 7,
 'my': 8}

In [9]:
sequences_train = []
labels_train = []
DATA_PATH = '/kaggle/working/keypoint/train'
expected_len = 80
for action in sorted(allowed_action):
    action_path = os.path.join(DATA_PATH,action)
    if not os.path.exists(action_path):
        continue
        
    for file_name in os.listdir(action_path):
        file_path = os.path.join(action_path,file_name)
        sequence = np.load(file_path)

        if sequence.shape[0] < expected_len:
            pad_len = expected_len - sequence.shape[0]
            pad = np.zeros((pad_len,sequence.shape[1]))
            sequence = np.concatenate((sequence,pad),axis = 0)

        if sequence.shape[0] > expected_len:
            start = (sequence.shape[0] - expected_len)//2
            sequence = sequence[start:start + expected_len]

        if sequence.shape == (expected_len,258): 
            sequences_train.append(sequence)
            labels_train.append(label_map[action])
    

In [10]:
sequences_valid = []
labels_valid = []
DATA_PATH = '/kaggle/working/keypoint/valid'
expected_len = 80
for action in sorted(allowed_action):
    action_path = os.path.join(DATA_PATH,action)
    if not os.path.exists(action_path):
        continue
        
    for file_name in os.listdir(action_path):
        file_path = os.path.join(action_path,file_name)
        sequence = np.load(file_path)

        if sequence.shape[0] < expected_len:
            pad_len = expected_len - sequence.shape[0]
            pad = np.zeros((pad_len,sequence.shape[1]))
            sequence = np.concatenate((sequence,pad),axis = 0)

        if sequence.shape[0] > expected_len:
            start = (sequence.shape[0] - expected_len)//2
            sequence = sequence[start:start + expected_len]

        if sequence.shape == (expected_len,258): 
            sequences_valid.append(sequence)
            labels_valid.append(label_map[action])

In [11]:
import torch.nn as nn 
from torch.utils.data import Dataset
class CustomDataset(Dataset):
    def __init__(self,X,y):
        self.X = torch.tensor(X)
        self.y = torch.tensor(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self,idx):
        return self.X[idx], self.y[idx]
    

In [12]:
X_train = np.array(sequences_train)
y_train = np.array(labels_train)
X_valid = np.array(sequences_valid)
y_valid = np.array(labels_valid)

In [13]:
len(X_train),len(y_train),len(X_valid),len(y_valid)

(403, 403, 101, 101)

In [14]:
from torch.utils.data import DataLoader
keypoint_dataset_train = CustomDataset(X_train,y_train)
keypoint_dataset_val = CustomDataset(X_valid,y_valid)
train_loader = DataLoader(keypoint_dataset_train,batch_size = 32,shuffle = True)
valid_loader = DataLoader(keypoint_dataset_val, batch_size = 32, shuffle = False)

In [15]:
x,y = next(iter(train_loader))
x,y

(tensor([[[ 0.4779,  0.5879, -0.5576,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.4776,  0.5878, -0.5527,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.4773,  0.5878, -0.5524,  ...,  0.0000,  0.0000,  0.0000],
          ...,
          [ 0.4880,  0.5864, -0.4655,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.4879,  0.5888, -0.4986,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.4880,  0.5889, -0.5051,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 0.4716,  0.5698, -0.3971,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.4747,  0.5710, -0.4802,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.4765,  0.5718, -0.5269,  ...,  0.0000,  0.0000,  0.0000],
          ...,
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 0.4331,  0.5807, -0.3807,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.4347,  0.5913,

In [None]:
class LSTMModel(nn.Module):
    def __init__(self,input_size,hidden_size,output_size):
        super(LSTMModel,self).__init__()
        self.lstm = nn.LSTM(input_size,hidden_size,num_layers = 1,batch_first = True)
        self.dropout = nn.Dropout(0.6)
        self.fc1 = nn.Linear(hidden_size*2,hidden_size)
        self.bn = nn.BatchNorm1d(hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size,output_size)
    
    def forward(self,x):
        x, _ = self.lstm(x)
        x = torch.max(x,dim = 1).values
        x = self.dropout(x)
        x = self.fc1(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x 

In [17]:
input_size = 258
hidden_size = 160
output_size = len(allowed_action)
model = LSTMModel(input_size,hidden_size,output_size)

In [18]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [19]:
model = model.to(device)
print(model)

LSTMModel(
  (lstm): LSTM(258, 160, batch_first=True, bidirectional=True)
  (dropout): Dropout(p=0.6, inplace=False)
  (fc1): Linear(in_features=320, out_features=160, bias=True)
  (bn): BatchNorm1d(160, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
  (fc2): Linear(in_features=160, out_features=9, bias=True)
)


In [20]:
import torch.optim as optim
optimizer = optim.Adam(model.parameters(),lr = 1e-3,weight_decay = 1e-5)
loss_function = nn.CrossEntropyLoss()

In [21]:
def train():
    correct = 0
    total = 0
    running_loss = 0
    model.train()
    for x,y in train_loader:
        x = x.float().to(device)
        y = y.long().to(device)
        output = model(x)
        optimizer.zero_grad()
        loss = loss_function(output,y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = torch.max(output,1)
        correct += (predicted == y).sum().item()
        total += y.size(0)
    accuracy = (100*correct)/total
    avg_loss = running_loss/total
    print(f'Loss:{avg_loss: .4f}  Accuracy: {accuracy: .2f}%')

In [22]:
def valid():
    running_loss = 0
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for x,y in valid_loader:
            x = x.float().to(device)
            y = y.long().to(device)
            output = model(x)
            loss = loss_function(output,y)
            running_loss += loss.item() * y.size(0)
            _, predicted = torch.max(output,1)
            correct += (predicted == y).sum().item()
            total += y.size(0)
    avg_loss = running_loss / total
    accuracy = (correct*100)/total
    print(f'Loss:{avg_loss: .4f}  Accuracy:{accuracy: .2f}%')

In [23]:
num_epochs = 10
for epoch in range(num_epochs):
    print(f'Epoch: {epoch+1}/{num_epochs}:')
    train()
    valid()

Epoch: 1/10:
Loss: 0.0626  Accuracy:  31.76%
Loss: 1.9205  Accuracy: 62.38%
Epoch: 2/10:
Loss: 0.0474  Accuracy:  55.09%
Loss: 1.3163  Accuracy: 74.26%
Epoch: 3/10:
Loss: 0.0339  Accuracy:  74.19%
Loss: 0.8319  Accuracy: 87.13%
Epoch: 4/10:
Loss: 0.0259  Accuracy:  83.87%
Loss: 0.5004  Accuracy: 91.09%
Epoch: 5/10:
Loss: 0.0187  Accuracy:  88.34%
Loss: 0.3394  Accuracy: 94.06%
Epoch: 6/10:
Loss: 0.0146  Accuracy:  90.32%
Loss: 0.2610  Accuracy: 94.06%
Epoch: 7/10:
Loss: 0.0113  Accuracy:  93.55%
Loss: 0.2147  Accuracy: 97.03%
Epoch: 8/10:
Loss: 0.0096  Accuracy:  94.04%
Loss: 0.1710  Accuracy: 98.02%
Epoch: 9/10:
Loss: 0.0087  Accuracy:  94.04%
Loss: 0.1923  Accuracy: 96.04%
Epoch: 10/10:
Loss: 0.0088  Accuracy:  93.05%
Loss: 0.1262  Accuracy: 97.03%
