# Importing data



In [None]:
# To Download the WLASL (World Level American Sign Language) Video Kaggle Databse
# https://www.kaggle.com/datasets/risangbaskoro/wlasl-processed/data

In [None]:
from google.colab import drive
import json
drive.mount('/content/drive/')


Mounted at /content/drive/


In [None]:
# data_path = '/content/drive/MyDrive/aps360 labs/' # NOTE: May be person-dependent depending on google drive structure
data_path = '/content/drive/MyDrive/aps360 labs/APS360'
json_path = data_path+"/WLASL_v0.3.json"
with open(json_path, 'r') as file:
    data = json.load(file)

# Data Processing

Convert from videos to images

In [None]:
import os
import cv2
import shutil
import time

def vid_to_img(vid_file, img_dir, frame_interval=1):
  """ From a video url, generate a series of frames (images) with a certain frame frequency.
      The images are stored automatically to the directory specified.

    Args:
        vid_file: The URL of the video file we want to convert.
        img_dir: The path directory for where the images will be stored.
        frame_interval: The frame frequency in which the frames will be taken.
  """


  #open video file
  capture = cv2.VideoCapture(vid_file)
  num_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))

  if capture.isOpened() == False or num_frames == 0:
    print("Error opening video stream or file")
    return False

  #create img_dir if not exit
  os.makedirs(img_dir, exist_ok=True)

  max_frames = 15
  frame_count = 0
  saved_frame = 0
  success = True

  # Loop trough all frames in the video
  while saved_frame < max_frames:
    success, frame = capture.read()
    if not success:
      break

    # frame saved every frame_interval number of frame
    if frame_count % frame_interval == 0 and frame_count >20:
      frame_path = os.path.join(img_dir, f"frame_{saved_frame +1}.jpg")
      cv2.imwrite(frame_path, frame)
      saved_frame += 1

    frame_count += 1

  #clear space
  capture.release()
  cv2.destroyAllWindows()
  return True





In [None]:
save_path = '/content/drive/MyDrive/APS360-data'

# make the directories if they do not exist
os.makedirs(save_path, exist_ok=True)
os.makedirs(os.path.join(save_path, 'Train-48'), exist_ok = True)
os.makedirs(os.path.join(save_path, 'Val-48'), exist_ok=True)
os.makedirs(os.path.join(save_path, 'Test-48'),exist_ok = True)



In [None]:
# the dataset seems to be already split into train, val and test for us
#Only run this once
num_processed = 0
num_train = 0
num_val =0
num_test = 0

for video_class in data:
  label = video_class['gloss']

  for instance in video_class['instances']:
    if num_processed % 50 == 0:
       print(num_processed)

    id = instance ['video_id']
    vid_file = instance['url']

    if instance['split'] == 'train':
      if(num_train >2000):
        break
      train_dir = os.path.join(save_path, 'Train-48',label,id)
      if vid_to_img(vid_file, train_dir) == True:
        num_train += 1
        num_processed += 1


    elif instance['split'] == 'val':
      if(num_val >1000):
        break
      val_dir = os.path.join(save_path, 'Val-48', label,id)
      if vid_to_img(vid_file, val_dir) == True:
          num_val += 1
          num_processed += 1

    else:
      if(num_test >500):
        break
      test_dir = os.path.join(save_path, 'Test-48', label,id)
      if vid_to_img(vid_file, test_dir) == True:
          num_test += 1
          num_processed += 1

print("total processed: ", num_processed)


0


KeyboardInterrupt: 

In [None]:
import numpy as np
vid_path = '/content/drive/MyDrive/aps360 labs/Project'
save_path = '/content/drive/MyDrive/aps360 labs/APS360/Train/frames'
extra_labels = ["help", "who", "what", "want", "computer", "family", "hot","yes", "time", "clothes"]
num = 100000
for label in extra_labels:

  label_dir = os.path.join(vid_path, label)
  id_sequence = np.array([id for id in os.listdir(label_dir)] )

  for id in id_sequence:

    vid_dir = os.path.join(label_dir, id)
    save_dir = os.path.join(save_path,label,str(num))
    vid_to_img(vid_dir, save_dir,2)
    num+=1

# Data Augmentation

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision.datasets as datasets
from torchvision.transforms import v2
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

In [None]:
class ASLDataset(torch.utils.data.Dataset):
  def __init__(self, videos, labels):
    self.videos = videos
    self.labels = labels

  def __len__(self):
    return len(self.labels)

  def __getitem__(self, idx):
    return self.videos[idx], self.labels[idx]


In [None]:
datasets = ["Train", "Val", "Test"]

dataset_ASL = []
dataset_loaders = []


# For Data Augmentation
transform = v2.Compose(
[
 v2.RandomHorizontalFlip(p=0.5), # horizontal flip
 v2.Resize(size=(255, 255)), # resize resolution
 v2.ColorJitter(brightness = 0.3, saturation=0.5), # +-30% change in brightness, +-50% change in saturation,
 v2.RandomRotation([-30,+30]), # random rotate +- 30 degrees
 v2.GaussianBlur(31)]
)

class_cnt = 0
class_map = {}

for dataset in datasets:
  dataset_path = os.path.join(data_path, dataset)

  dataset_video_list = [] # full dataset as a list, each being (video, label)
  dataset_label_list = []



  for class_name in sorted(os.listdir(dataset_path)): # apple, africa, chair, ...
    class_path = os.path.join(dataset_path, class_name)

    # Get the mapped class map
    if class_name not in class_map:
      class_map[class_name] = class_cnt
      class_cnt += 1

    # tensor representing the "label" for all the videos in this class
    class_tensor = torch.tensor([class_map[class_name]])



    for video_id in os.listdir(class_path):
      video_path = os.path.join(class_path, video_id)

      frames = os.listdir(video_path) # List of frames for that video
      # Remove any non .jpg files
      frames = [x for x in frames if (len(x)<4 or x[-3]!=".jpg")]

      # tensor to hold all the frames for this video
      video_tensor = []

      # Must keep track of this so the SAME transform is applied to ALL frames in the video
      first_transform = False
      transform_state = None

      for frame_name in frames:
        frame_path = os.path.join(video_path, frame_name)
        frame_np = np.array(plt.imread(frame_path)) # read image/frame
        # apply data augmentation

        if first_transform == False:
          # first transform on the video, do it normally
          transform(frame_np) # This is needed to generate a new rng_state, different than the previous video

          transform_state = torch.get_rng_state()
          frame_np = transform(frame_np)

          first_transform = True
        else:
          # load same transform state
          torch.set_rng_state(transform_state)
          frame_np = transform(frame_np)

        # make copy of numpy version to prevent unwritable tensor conversion
        frame_tensor = torch.from_numpy(np.copy(frame_np))

        # append this image to the tensor for the entire video
        video_tensor.append(frame_tensor)

      video_tensor = torch.from_numpy(np.asarray(video_tensor)) # Convert to tensor

    dataset_video_list.append(video_tensor)
    dataset_label_list.append(class_tensor.detach().clone())

  # create dataset
  dataset_video_list = np.asarray(dataset_video_list)
  dataset_label_list = np.asarray(dataset_label_list)

  dataset_ASL.append(ASLDataset(dataset_video_list, dataset_label_list))
  dataset_loaders.append(torch.utils.data.DataLoader(dataset_ASL[-1], batch_size=1))


NameError: name 'v2' is not defined

# Baseline Model
Using 3d convolutional networks


In [None]:
import torch.nn as nn
import torch
import os
class cnn3d(nn.Module):
    def __init__(self):
        super(cnn3d, self).__init__()
        self.name = "CNN3"
        self.conv1 = nn.Conv3d(48,64,(3,3,3),(1,2,2),(1,1,1))
        self.pool = nn.MaxPool3d((1,2,2))
        self.conv2 = nn.Conv3d(64,128,(3,3,3),(1,2,2),(1,1,1))
        self.conv3 = nn.Conv3d(128,256,(3,3,3),(1,2,2),(1,1,1))
        self.fc1 = nn.Linear(256 * 3 * 4 * 4,1024)
        self.fc2 = nn.Linear(1024,44)


    def forward(self,x):
        x = self.pool(nn.ReLU()(self.conv1(x)))
        x = self.pool(nn.ReLU()(self.conv2(x)))
        x = torch.flatten(x,1)
        x = nn.ReLU()(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
class getDataset(Dataset):
  def __init__(self, root_dir, transform=None, num_frames=15): # Add num_frames parameter
    self.root_dir = root_dir
    self.transform = transform
    self.classes = os.listdir(root_dir)
    self.files = [] #Holds tuples of path to video folder and class label
    self.num_frames = num_frames # Store the desired number of frames

    for index, label in enumerate(self.classes):
      class_path = os.path.join(root_dir, label)
      for video_folder in os.listdir(class_path):
        #Ignore checkpoints folder don't know why it's accessing it
        if video_folder == '.ipynb_checkpoints':
          continue
        video_path = os.path.join(class_path, video_folder)
        self.files.append((video_path, index)) #store the index of class for memroy-efficiency and avoid the need to convert later

  def __len__(self):
      return len(self.files)

  def __getitem__(self, idx):
          video_path, label = self.files[idx]
          frame_files = [f for f in sorted(os.listdir(video_path)) if f.endswith('.jpg')and not os.path.isdir(os.path.join(video_path, f))]
          images = []
          # Sample a fixed number of frames, ensuring not to exceed available frames
          num_frames_to_sample = min(self.num_frames, len(frame_files))
          selected_frames = np.linspace(0, len(frame_files) - 1, num_frames_to_sample, dtype=int)
          for i in selected_frames:
              frame_file = frame_files[i]
              frame_path = os.path.join(video_path, frame_file)
              try:
                  image = Image.open(frame_path).convert('RGB')
                  if self.transform:
                      image = self.transform(image)
                  images.append(image)
              except Exception as e:
                  print(f"Error loading image {frame_path}: {e}")

          # Pad the images list if it has fewer frames than num_frames
          # Handle the case when images is empty
          if images: # Check if images list is not empty
            while len(images) < self.num_frames:
                images.append(torch.zeros_like(images[0]))  # Assuming images[0] exists
          else:
            # Handle the case when no images were loaded (e.g., return an empty tensor or raise an error)
            return torch.zeros((self.num_frames, 3, 240, 240)), label # Return a tensor of zeros with the expected shape

          if len(images) == 0:
              raise ValueError(f"No images could be loaded for video {video_path}")

          images = torch.stack(images, dim=0)
          return images, label


In [None]:
def plot_training_curve(path):
  """ Plots the training curve for a model run, given the csv files
  containing the train/validation error/loss.
  Args:
  path: The base path of the csv files produced during training
  """
  import matplotlib.pyplot as plt
  train_err = np.loadtxt("{}_train_err.csv".format(path))
  #val_err = np.loadtxt("{}_val_err.csv".format(path))
  plt.title("Validation Error")
  n = len(train_err) # number of epochs
  plt.plot(range(1,n+1), train_err, label="Validation")
 # plt.plot(range(1,n+1), val_err, label="Validation")
  plt.xlabel("Epoch")
  plt.ylabel("Error")
  plt.legend(loc='best')
  plt.show()

In [None]:
def train_baseline(model, num_epochs=20, lr = 0.001, batch_size = 64):
  torch.manual_seed(1000)
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=lr)
  transform = transforms.Compose([transforms.Resize((240,240), interpolation=transforms.InterpolationMode.BILINEAR),transforms.ToTensor()])

  #get dataset
  # root_dir = r'C:\Users\Sihan Chen\Downloads\APS360'
  # train_dir = os.path.join(root_dir, 'small_train')
  # print(train_dir)
  # train_dataset = getDataset(root_dir=train_dir,transform=transform,num_frames=15)
  # train_dataset_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
  # data, label = train_dataset[0]  # Attempt to retrieve the first item
  # print("Sample loaded successfully:", data.shape, label)
  # if torch.cuda.is_available():
    # model = model.cuda()

  #define overall loss and accuracy
  total_train_acc = np.zeros(num_epochs)
  total_train_loss = np.zeros(num_epochs)
  total_val_acc = np.zeros(num_epochs)
  total_val_loss = np.zeros(num_epochs)

  #train
  for epoch in range(num_epochs):

    model.train() #training mode enables dropout and batch notmalization
    print("in training")
    train_loss = 0.0
    train_corr = 0
    total_train = 0
    for inputs, labels in dataloader:

      if torch.cuda.is_available():
            inputs = inputs.cuda()
            labels = labels.cuda()

      optimizer.zero_grad()
      outputs = model(inputs) #forward pass
      loss = criterion(outputs, labels) #calculate loss
      loss.backward() #backward pass
      optimizer.step() #update weights

      train_loss += loss.item() * inputs.size(0)
      total_train += labels.size(0)
      train_corr += (outputs.argmax(1) == labels).sum().item()

    train_acc = train_corr /total_train
    total_train_acc[epoch] = train_acc
    total_train_loss[epoch] = train_loss / total_train


    val_dir = os.path.join(root_dir, 'small_val')
    val_dataset = getDataset(root_dir=val_dir,transform=transform)
    val_dataset_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)


    model.eval()
    val_loss = 0.0
    val_corr = 0
    total_val = 0
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    with torch.no_grad():  # This ensures no gradients are computed during validation
        for v_inputs, v_labels in val_dataloader:
            v_inputs = v_inputs.to(device)  # Move inputs to the correct device
            v_labels = v_labels.to(device)  # Move labels to the correct device

            v_outputs = model(v_inputs)
            v_loss = criterion(v_outputs, v_labels)
            val_loss += v_loss.item() * v_inputs.size(0)
            val_corr += (v_outputs.argmax(1) == v_labels).sum().item()
            total_val += v_labels.size(0)

        val_acc = val_corr / total_val
        total_val_acc[epoch]= val_acc
        total_val_loss[epoch] = val_loss / total_val


    print(f'Epoch {epoch+1}:')
    print(f'Train Loss: {train_loss / total_train:.4f}, Train Accuracy: {train_acc:.4f}')
    print(f'Validation Loss: {val_loss / total_val:.4f}, Validation Accuracy: {val_acc:.4f}')

  path = "model_{0}_bs{1}_lr{2}_epoch{3}".format(model.name,batch_size,lr,num_epochs+1)
  torch.save(model.state_dict(), path)

  np.savetxt("{}_train_acc.csv".format(path), total_train_acc)
  np.savetxt("{}_train_loss.csv".format(path), total_train_loss)
  np.savetxt("{}_val_err.csv".format(path), total_val_acc)
  np.savetxt("{}_val_loss.csv".format(path), total_val_loss)

  plot_training_curve(path)


In [None]:
baseline =cnn3d()
train_baseline(baseline, num_epochs=10, lr = 0.001, batch_size = 64)


Sample loaded successfully: torch.Size([48, 3, 240, 240]) 0
in training


KeyboardInterrupt: 

# Build and Train Neural Network

In [None]:
!pip install opencv-python mediapipe scikit-learn matplotlib

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [None]:
!pip install torch==2.0.0 opencv-python mediapipe scikit-learn matplotlib



---


Run mediapipe holistic to extract keypoints












In [None]:
#initialise mediapipe components
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [None]:
def process_image_with_mediapipe(frame, model):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame_rgb.flags.writeable = False
    detections = model.process(frame_rgb)
    frame_rgb.flags.writeable = True
    frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
    return frame_bgr, detections

In [None]:
def render_landmarks(frame, detections):
    # Draw facial landmarks
    mp_drawing.draw_landmarks(frame, detections.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                                    mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                    mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                    )
    mp_drawing.draw_landmarks(frame, detections.face_landmarks, mp_holistic.FACEMESH_TESSELATION,
                                    mp_drawing.DrawingSpec(color=(255, 200, 200), thickness=1, circle_radius=1),
                                    mp_drawing.DrawingSpec(color=(200, 255, 200), thickness=1, circle_radius=1)
                                    )
    # Draw pose landmarks
    mp_drawing.draw_landmarks(frame, detections.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                    mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                    )
    # Draw left hand landmarks
    mp_drawing.draw_landmarks(frame, detections.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                    mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                    )
    # Draw right hand landmarks
    mp_drawing.draw_landmarks(frame, detections.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                    mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                    )

In [None]:
def extract_keypoints(detections):
    pose = np.array([[detection.x, detection.y, detection.z, detection.visibility] for detection in detections.pose_landmarks.landmark]).flatten() if detections.pose_landmarks else np.zeros(33*4)
    face = np.array([[detection.x, detection.y, detection.z] for detection in detections.face_landmarks.landmark]).flatten() if detections.face_landmarks else np.zeros(468*3)
    lefth = np.array([[detection.x, detection.y, detection.z] for detection in detections.left_hand_landmarks.landmark]).flatten() if detections.left_hand_landmarks else np.zeros(21*3)
    righth = np.array([[detection.x, detection.y, detection.z]for detection in detections.right_hand_landmarks.landmark]).flatten() if detections.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lefth, righth])

In [None]:
# Directory for storing data
DATA_DIRECTORY = '/content/drive/MyDrive/aps360 labs/APS360/test/frames'#'/content/drive/MyDrive/aps360 labs/APS360/Val'
# TRAIN_DATA_DIRECTORY = '/content/drive/MyDrive/aps360 labs/APS360/Train/frames'

# Obtain labels
gesture_labels = np.array([folder for folder in os.listdir(DATA_DIRECTORY) if os.path.isdir(os.path.join(DATA_DIRECTORY,folder))] )

gesture_labels= np.sort(gesture_labels)

small_labels = gesture_labels[:10]

frame_count = 15

print(small_labels)

# Words with manual videos
extra_labels = ["help", "want", "family","yes", "time","clothes"]


In [None]:
#RUN THIS ONCE
# capture = cv2.VideoCapture(0)
# Initialize MediaPipe holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic_model:

    # NEW LOOP
    # Iterate through gestures
    for gesture in extra_labels:
        # Iterate through video sequences
        vid_dir=os.path.join(DATA_DIRECTORY,gesture)

        id_sequence = np.array([id for id in os.listdir(vid_dir) if os.path.isdir(os.path.join(vid_dir,id))] )


        for sequence_id in id_sequence:
            # Iterate through frames in each video sequence
            for frame_number in range(frame_count):

                frame_dir = os.path.join(vid_dir,sequence_id,str(gesture)+"_"+str(frame_number)+'.jpg')
                # Capture frame from video
                frame = cv2.imread(frame_dir)

                if(frame is None):
                  print(sequence_id)
                  break



                # Process frame with MediaPipe
                frame_bgr, detections = process_image_with_mediapipe(frame, holistic_model)

                # Render landmarks on frame
                render_landmarks(frame_bgr, detections)

                # NEW Apply waiting logic
                # if frame_number == 0:
                #     cv2.putText(frame_bgr, 'STARTING COLLECTION', (120,200),
                #                cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                #     cv2.putText(frame_bgr, f'Collecting frames for {gesture} Video Number {sequence_id}', (15,12),
                #                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                #     # Display frame
                #     cv2.imshow('OpenCV Feed', frame_bgr)
                #     cv2.waitKey(500)
                # else:
                #     cv2.putText(frame_bgr, f'Collecting frames for {gesture} Video Number {sequence_id}', (15,12),
                #                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                #     # Display frame
                #     cv2.imshow('OpenCV Feed', frame_bgr)

                # NEW Export keypoints
                keypoints = extract_keypoints(detections)

                npy_file_path = os.path.join(DATA_DIRECTORY, gesture, str(sequence_id), str(frame_number))
                frame_path = os.path.join(DATA_DIRECTORY, gesture, str(sequence_id), str(frame_number)+'.npy')

                np.save(npy_file_path, keypoints)

                # Exit gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

    # capture.release()
    cv2.destroyAllWindows()

In [None]:
from sklearn.model_selection import train_test_split
import torch
import torch.nn.functional as F
gesture_to_index = {gesture:num for num, gesture in enumerate(extra_labels)}
gesture_to_index

In [None]:

data_sequences, target_labels = [], []
val_sequences, val_labels =[],[]
test_sequences, test_labels = [],[]
train_dir = '/content/drive/MyDrive/aps360 labs/APS360/Train/frames'
val_dir = '/content/drive/MyDrive/aps360 labs/APS360/Val'
test_dir = '/content/drive/MyDrive/aps360 labs/APS360/test/frames'
for gesture in extra_labels:
    for sequence_id in np.array(os.listdir(os.path.join(train_dir, gesture))):
        frame_data = []
        frames = np.array(os.listdir(os.path.join(train_dir, gesture, sequence_id)))
        frames = sorted(frames,key=natural_keys)

        for i in range(15) :
            frame_number = frames[i]
            result = np.load(os.path.join(train_dir, gesture, str(sequence_id), f"{frame_number}"))
            frame_data.append(result)
        frame_date = np.array(frame_data)
        data_sequences.append(frame_data)
        target_labels.append(gesture_to_index[gesture])

    for vid_id in np.array(os.listdir(os.path.join(val_dir, gesture))):
        val_frame = []
        for frame_number in range(frame_count):
            val_result = np.load(os.path.join(val_dir, gesture, str(vid_id), f"{frame_number}.npy"))
            if val_result is None:
                val_result = np.load(os.path.join(val_dir, gesture, str(vid_id), "ezgif-frame-00"+str(frame_number)+".npy"))
            val_frame.append(val_result)
        val_sequences.append(val_frame)
        val_labels.append(gesture_to_index[gesture])

    for test_id in np.array(os.listdir(os.path.join(test_dir, gesture))):
        test_frame = []
        for frame_number in range(frame_count):
            test_result = np.load(os.path.join(test_dir, gesture, str(test_id), f"{frame_number}.npy"))
            test_frame.append(test_result)
        test_sequences.append(test_frame)
        test_labels.append(gesture_to_index[gesture])

In [None]:
# =============================================== Prepare train/val/test dataset



x_train = np.array(data_sequences)
x_val = np.array(val_sequences)
x_test = np.array(test_sequences)

num_classes = len(extra_labels)
# Convert numerical labels back to gesture names
gesture_names = [extra_labels[i] for i in target_labels]
val_gestures = [extra_labels[i] for i in val_labels]
test_gestures = [extra_labels[i] for i in test_labels]

# Convert labels to one-hot encoding
label_indices = torch.tensor([gesture_to_index[gesture] for gesture in gesture_names])  # Use gesture names to get indices
val_indices = torch.tensor([gesture_to_index[gesture] for gesture in val_gestures])
test_indices = torch.tensor([gesture_to_index[gesture]for gesture in test_gestures])

y_train = F.one_hot(label_indices, num_classes).int()
y_val = F.one_hot(val_indices, num_classes).int()
y_test = F.one_hot(test_indices, num_classes).int()

# ============= Creating pytorch dataset and loader
y_train = y_train.numpy()
labels_l = []
for i in range(len(y_train)):
  labels_l.append(y_train[i])

for i in range(len(data_sequences)):
  data_sequences[i] = np.array(data_sequences[i])

x_train = torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(labels_l, dtype=torch.float32)

print(isinstance(x_train, torch.Tensor))

dataset = torch.utils.data.TensorDataset(x_train, y_train)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
# ============== Create validation datasete and loader
y_val = y_val.numpy()
labels_v = []
for i in range(len(y_val)):
  labels_v.append(y_val[i])

for i in range(len(val_sequences)):
  val_sequences[i] = np.array(val_sequences[i])

x_val = torch.tensor(x_val, dtype=torch.float32)
y_val = torch.tensor(labels_v, dtype=torch.float32)
print(len(x_val))
val_dataset = torch.utils.data.TensorDataset(x_val, y_val)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=True)

# ============== Create test datasete and loader
y_test = y_test.numpy()
labels_t = []
for i in range(len(y_test)):
  labels_t.append(y_test[i])

for i in range(len(test_sequences)):
  test_sequences[i] = np.array(test_sequences[i])

x_test = torch.tensor(x_test, dtype=torch.float32)
y_test = torch.tensor(labels_t, dtype=torch.float32)
test_dataset = torch.utils.data.TensorDataset(x_test, y_test)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)

# ==============Finish


x_train =torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)

x_val = torch.tensor(x_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)

x_test = torch.tensor(x_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

test_indices
# x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.05)

Building Model

In [None]:
# Define LSTM model
import torch.nn as nn
import torch.optim as optim

class LSTMNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super(LSTMNetwork, self).__init__()
        self.lstm_layer1 = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.lstm_layer2 = nn.LSTM(hidden_dim, 128, batch_first=True)
        self.lstm_layer3 = nn.LSTM(128, hidden_dim, batch_first=True)
        self.fc_layer1 = nn.Linear(hidden_dim, 18)
        # self.fc_layer2 = nn.Linear(64, 32)
        self.fc_layer3 = nn.Linear(18, num_classes)
        self.activation = nn.ReLU()
        self.output_layer = nn.Softmax(dim=1)
        self.dropout = nn.Dropout(0.2)



    def forward(self, x):
        x, _ = self.lstm_layer1(x)
        x = self.dropout(x)
        # x = self.activation(x)
        # x, _ = self.lstm_layer2(x)
        # x = self.dropout(x)
        x = self.activation(x)
        # x, _ = self.lstm_layer3(x)
        # x = self.activation(x)
        x = self.fc_layer1(x[:, -1, :])
        x = self.activation(x)
        # x = self.fc_layer2(x)
        # x = self.activation(x)
        x = self.fc_layer3(x)
        x = self.output_layer(x)
        return x


Model training

In [None]:
# Model configuration
input_dim = 1662
hidden_dim = 256

# Initialize model
model = LSTMNetwork(input_dim, hidden_dim,num_classes)
print(num_classes)
torch.manual_seed(1000)
# Define loss and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr = 0.0009)

# Training loop
num_epochs = 700

train_acc_plot, train_loss_plot, val_acc_plot, val_loss_plot = [],[],[],[]
val_top2_plot = []

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_corr = 0
    total_train = 0

    for inputs, labels in dataloader:
      optimizer.zero_grad()

      predictions = model(inputs)
      # print(predictions.shape)
      loss = loss_fn(predictions, torch.argmax(labels, dim=1))

      # loss = loss_fn(predictions, y_train)

      loss.backward()
      optimizer.step()

      # Compute accuracy
      # print(labels.size(0))
      threshold = 0.5
      pred_classes = (predictions >= threshold).float()
      pred_labels = predictions.argmax(dim=1)
      true_labels = torch.argmax(labels, dim=1)

      train_loss += loss.item() * inputs.size(0)
      total_train += labels.size(0)
      train_corr += (pred_labels == true_labels).sum().item()



    train_acc = train_corr / total_train
    train_loss = train_loss / total_train

    train_acc_plot.append(train_acc)
    train_loss_plot.append(train_loss)


      # _, predicted_labels = torch.max(predictions.data, 1)
      # _, true_labels = torch.max(y_train, 1)  # Convert one-hot encoded labels to class indices
      # accuracy = (predicted_labels == true_labels).sum().item() / y_train.size(0)

    # Compute Validation
    model.eval()
    val_loss = 0.0
    val_corr = 0
    total_val = 0
    top2_corr = 0.0
    stop = False
    with torch.no_grad():  # This ensures no gradients are computed during validation
        for v_inputs, v_labels in val_dataloader:

            v_outputs = model(v_inputs)

            val_pred = (v_outputs >= threshold).float()
            val_pred_labels = v_outputs.argmax(dim=1)
            val_true_labels = torch.argmax(v_labels, dim=1)

            v_labels = v_labels.argmax(dim=1)
            top2_preds =v_outputs.topk(2, dim=1).indices

            top2_labels = v_labels.unsqueeze(1).expand_as(top2_preds)

            correct = (top2_preds == top2_labels).any(dim=1)



            v_loss = loss_fn(v_outputs, v_labels)
            val_loss += v_loss.item() * v_inputs.size(0)
            val_corr += (val_pred_labels == val_true_labels).sum().item()
            total_val += v_labels.size(0)
            top2_corr += correct.sum().item()

        val_acc = val_corr / total_val
        val_loss = val_loss / total_val

        val_acc_plot.append(val_acc)
        val_loss_plot.append(val_loss)
        val_top2 = top2_corr/ total_val
        val_top2_plot.append(val_top2)
    # Print every 10 epochs
    if (epoch + 1) % 10 == 0:

        print(f'Epoch {epoch+1}/{num_epochs},Train Loss: {loss.item()},Train Accuracy: {train_acc * 100:.2f}%')
        print(f'Validation Loss: {val_loss}, Validation Accuracy: {val_acc * 100:.2f}%, top2 val acc: {val_top2 * 100:.2f}%')
        if (val_acc > 0.9):
          if(stop):
            break
          stop = True
# tensorboard_writer.close()

# Model summary
print(model)

In [None]:
import matplotlib.pyplot as plt

# plot the results
plt.title("Train vs Validation Loss")
n = 470 # number of epochs
plt.plot(range(1,n + 1), train_loss_plot, label="Train")
plt.plot(range(1,n + 1), val_loss_plot, label="Validation")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend(loc='best')
plt.show()

plt.title("Train vs Validation Accuracy ")
plt.plot(range(1,n + 1), train_acc_plot, label="Train")
plt.plot(range(1,n + 1), val_acc_plot, label="Validation")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend(loc='best')
plt.show()

plt.title("Validation Top 2 Accuracy ")
plt.plot(range(1,n + 1),val_top2_plot, label="Validation")
plt.xlabel("Epoch")
plt.ylabel("Top-2 Accuracy")
plt.legend(loc='best')
plt.show()

# Testing

In [None]:
# Testing


test_loss = 0.0
test_corr = 0
total_test = 0
top2_corr = 0.0
test_acc_plot, test_loss_plot = [],[]
with torch.no_grad():  # This ensures no gradients are computed during validation
        for t_inputs, t_labels in test_dataloader:

            t_outputs = model(t_inputs)


            test_pred_labels = t_outputs.argmax(dim=1)
            test_true_labels = torch.argmax(t_labels, dim=1)

            t_labels = t_labels.argmax(dim=1)
            top2_preds =t_outputs.topk(2, dim=1).indices

            top2_labels = t_labels.unsqueeze(1).expand_as(top2_preds)

            correct = (top2_preds == top2_labels).any(dim=1)



            t_loss = loss_fn(t_outputs, t_labels)
            test_loss += t_loss.item() * t_inputs.size(0)
            test_corr += (test_pred_labels == test_true_labels).sum().item()
            total_test += t_labels.size(0)
            top2_corr += correct

        test_acc = test_corr / total_test
        test_loss = test_loss / total_test

test_top2 = top2_corr.sum().item() / total_test
print(f'Test Accuracy: {test_acc * 100: .2f}%, Test Loss: {test_loss},Top 2 accuracy: {test_top2 *100 :.2f}%')

Collecting gtts
  Downloading gTTS-2.5.2-py3-none-any.whl.metadata (4.1 kB)
Downloading gTTS-2.5.2-py3-none-any.whl (29 kB)
Installing collected packages: gtts
Successfully installed gtts-2.5.2


In [None]:
from sklearn.metrics import confusion_matrix
import torchmetrics
model.eval()

# Disable gradient calculation
with torch.no_grad():
    # Predicting the outputs
    test_predictions = model(x_test)

    # Get the predicted class labels
    _, predicted_labels = torch.max(test_predictions, 1)

    # Convert y_test to tensor
    true_labels = torch.argmax(y_test, dim=1)

# Compute accuracy
accuracy_metric = torchmetrics.Accuracy(task='multiclass', num_classes=10)
accuracy = accuracy_metric(predicted_labels, true_labels)
print(f'Accuracy: {accuracy.item()}')

# Convert tensors to numpy arrays for scikit-learn functions
true_labels_np = true_labels.cpu().numpy()
predicted_labels_np = predicted_labels.cpu().numpy()

# Calculate multilabel confusion matrix
confusion_matrix = confusion_matrix(true_labels_np, predicted_labels_np)
print(confusion_matrix)

In [None]:
class_accuracies = confusion_matrix.diagonal() / confusion_matrix.sum(axis=1)

for i, class_accuracy in enumerate(class_accuracies):
    print(f'Accuracy for class {i}: {class_accuracy * 100:.2f}%')

#Demonstartion and Evaluation on New Data

In [None]:
!pip install pyttsx3

In [None]:
import pyttsx3

In [None]:
colors = [(255, 0, 0),(0, 255, 0),(0, 0, 255),(255, 255, 0), (128, 0, 128),(0, 128, 128) ]
#(255, 0, 0),(0, 255, 0),(0, 0, 255),(255, 255, 0), (0, 255, 255), (255, 0, 255),(128, 128, 0),(128, 0, 128),(0, 128, 128),(128, 128, 128)
#Blue, Green, Red, Cyan, Yellow, Magenta, Olive, Purple, Teal, Grey
def probability_visual(result, gestures, in_frame, colors):
    out_frame = in_frame.copy()
    for num, prob in enumerate(result):
        cv2.rectangle(out_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(out_frame, extra_labels[num], (0, 85+num*40), cv2.FONT_HERSHEY_PLAIN, 1, (0,0,0), 2, cv2.LINE_AA)

    return out_frame

In [None]:
engine = pyttsx3.init()

In [None]:
#can run this code only on jupyter notebook as it tries to access the webcam
import torch

sqn = []
phrase = []
guess = []
probChosen = 0.9

extra_labels = np.array(["help", "want", 'family', 'yes', 'time', 'clothes'])

cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        det, frame = cap.read()

        framebgr, detection = process_image_with_mediapipe(frame, holistic)
        print(detection)

        render_landmarks(framebgr, detection)

        keypoints = extract_keypoints(detection)
        sqn.append(keypoints)
        sqn = sqn[-20:]

        if len(sqn) == 20:
            # Convert sqn to a PyTorch tensor
            input_tensor = torch.tensor(np.expand_dims(sqn, axis=0)).float()

            # Set model to evaluation mode
            model.eval()

            # Make predictions
            with torch.no_grad():
                res = model(input_tensor)[0]
            predicted_label = torch.argmax(res).item()
            print(extra_labels[predicted_label])
            guess.append(predicted_label)

            if np.unique(guess[-10:])[0] == predicted_label:
                if res[predicted_label].item() > probChosen:

                    if len(phrase) > 0:
                        if extra_labels[predicted_label] != phrase[-1]:
                            phrase.append(extra_labels[predicted_label])
                            engine.say(extra_labels[np.argmax(res)])
                            engine.runAndWait()
                    else:
                        phrase.append(extra_labels[predicted_label])
                        engine.say(extra_labels[np.argmax(res)])
                        engine.runAndWait()

            if len(phrase) > 5:
                phrase = phrase[-5:]

            framebgr = probability_visual(res.cpu().numpy(), extra_labels, framebgr, colors)

        cv2.rectangle(framebgr, (0,430), (640, 500), (0, 0, 0), -1)
        cv2.putText(framebgr, ' '.join(phrase), (2,460),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        cv2.imshow('render', framebgr)

        if cv2.waitKey(10) & 0xFF == ord('v'):
            break

    cap.release()
    cv2.destroyAllWindows()