#Download the WLASL Datasets

In [None]:
from google.colab import files
files.upload()

In [None]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d risangbaskoro/wlasl-processed
!unzip wlasl-processed.zip -d ./sign-language-dataset

In [None]:
!pip install mediapipe

####Set the length of sequence(FPS), you can adjust it as you want.

In [2]:
sequence_length = 48

####Number of words you want to process

In [7]:
num_words = 2000

#Video processing

####Data augmentation

In [3]:
import os
import cv2
import json
import random

In [4]:
def apply_flip(image):
    """Apply flipping for image"""
    return cv2.flip(image, 1)

def apply_rotation(image, angle):
    """Apply rotation with given angle"""
    h, w = image.shape[:2]
    center = (w // 2, h // 2)
    matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
    return cv2.warpAffine(image, matrix, (w, h))

def apply_color_shift(image, value):
    """Apply color shifting with given value"""
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    hsv[..., 2] = cv2.add(hsv[..., 2], value)
    return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

####Extract Frame From Video

In [None]:
def vidToFrame(vid_file, base_out_dir, sequence_length):
    # Open the video file
    video_capture = cv2.VideoCapture(vid_file)
    success, frame_count = True, 0

    # Create a directory to store the original frames
    os.makedirs(base_out_dir, exist_ok=True)

    # Create directories for each effect (flip, rotation, color shift)
    effect_dirs = {
        "flip": f"{base_out_dir}_flip",
        "rotation": f"{base_out_dir}_rotation",
        "color_shift": f"{base_out_dir}_color_shift"
    }
    for effect_dir in effect_dirs.values():
        os.makedirs(effect_dir, exist_ok=True)

    # Get the total number of frames in the video
    total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the interval between frames to ensure sequence_length frames are captured
    frame_interval = max(int(total_frames / sequence_length), 1)

    # Parameters for effects
    angle = random.randint(-30, 30)  # Random rotation angle
    color_shift_value = random.randint(-50, 50)  # Random color shift value

    print(f"Applying effects: flip, rotation (angle={angle}), color shift (value={color_shift_value})")

    # Extract frames and apply effects
    while frame_count < sequence_length:
        # Set the position in the video to the desired frame
        video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_count * frame_interval)
        success, frame = video_capture.read()
        if not success or frame is None:  # Stop if the frame cannot be read
            break
        frame_count += 1

        # Save the original frame
        original_path = os.path.join(base_out_dir, f"frame_{frame_count}.jpg")
        cv2.imwrite(original_path, frame)

        # Apply and save each effect
        flip_frame = apply_flip(frame)
        flip_path = os.path.join(effect_dirs["flip"], f"frame_{frame_count}.jpg")
        cv2.imwrite(flip_path, flip_frame)

        rotated_frame = apply_rotation(frame, angle)
        rotation_path = os.path.join(effect_dirs["rotation"], f"frame_{frame_count}.jpg")
        cv2.imwrite(rotation_path, rotated_frame)

        color_shifted_frame = apply_color_shift(frame, color_shift_value)
        color_shift_path = os.path.join(effect_dirs["color_shift"], f"frame_{frame_count}.jpg")
        cv2.imwrite(color_shift_path, color_shifted_frame)

    # Release the video file resources
    video_capture.release()

# Paths to dataset and videos
file_path = '/content/sign-language-dataset/WLASL_v0.3.json'
missing_file_path = '/content/sign-language-dataset/missing.txt'
videos_dir = '/content/sign-language-dataset/videos/'

# Load the WLASL dataset
with open(file_path) as file:
    wlasl = json.load(file)

# Read the list of missing videos
with open(missing_file_path, 'r') as file:
    missing_videos = file.read().splitlines()

# Create the dataset directory
dataset_dir = '/content/datasets'
os.makedirs(dataset_dir, exist_ok=True)

# Process each class in the dataset
for i, class_data in enumerate(wlasl):
    if i >= num_words:  # Limit processing to a certain number of classes
        break
    class_name = class_data['gloss']
    print(f"Processing class: {class_name}, {i}")

    # Process each instance (video) for the current class
    for instance in class_data['instances']:
        video_id = instance['video_id']
        if video_id not in missing_videos:  # Skip missing videos
            video_file = os.path.join(videos_dir, f"{video_id}.mp4")
            split_dir = 'Train' if instance['split'] == 'train' else 'Test'
            output_dir = os.path.join(dataset_dir, split_dir, class_name, video_id)
            vidToFrame(video_file, output_dir, sequence_length)  # Convert video to frames
            print(f"Processed video {video_id} ({split_dir})")


####Keypoint extraction

In [10]:
import cv2
import mediapipe as mp
import os
import numpy as np
import json
from tqdm import tqdm

# Mediapipe models and utilities
mp_holistic = mp.solutions.holistic  # Mediapipe holistic model for detecting pose, face, and hands
mp_drawing = mp.solutions.drawing_utils  # Utility for drawing landmarks
mp_drawing_styles = mp.solutions.drawing_styles  # Utility for styling landmarks

# Function to detect and draw landmarks from a video frame using Mediapipe
def mediapipe_detection(image, model):
    # Convert the image from BGR (OpenCV default) to RGB (Mediapipe requirement)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False  # Mark image as read-only to improve performance
    results = model.process(image)  # Process the image to detect landmarks
    image.flags.writeable = True  # Allow image modifications again
    # Convert the image back to BGR for OpenCV compatibility
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results  # Return the processed image and detection results

# Function to extract keypoints from Mediapipe detection results
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33 * 4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468 * 3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21 * 3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21 * 3)
    return np.concatenate([pose, face, lh, rh])


In [11]:
import mediapipe as mp
import tensorflow as tf
import keras
import numpy as np
import pandas as pd
import os
import shutil
import datetime as dt
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import glob
import cv2
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Bidirectional, LSTM, Dense
from keras.callbacks import EarlyStopping
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
np.random.seed(42)

####Loop through all frames

In [26]:
def make_keypoint_arrays(path, split):
    # Get the list of all subdirectories (words) in the specified split folder
    selected_words = os.listdir(os.path.join(path, split))

    # Create base directories for storing npy arrays
    os.makedirs('/content/datasets/npy_arrays', exist_ok=True)
    os.makedirs(f'/content/datasets/npy_arrays/{split}', exist_ok=True)
    working_path = f'/content/datasets/npy_arrays/{split}'

    # Path to the folder containing word subfolders
    words_folder = os.path.join(path, split)
    selected_words1 = []

    # Filter words that have not been processed (not present in the npy folder)
    for words1 in selected_words:
        npy_fold = os.listdir(working_path)
        if words1 not in npy_fold:
            selected_words1.append(words1)

    # Process each word folder that hasn't been processed yet
    for word in tqdm(selected_words1):
        word_path = os.path.join(working_path, word)
        os.makedirs(word_path, exist_ok=True)
        video_files = os.listdir(os.path.join(words_folder, word))
        for video_file in video_files:
            video_path = os.path.join(word_path, video_file)
            os.makedirs(video_path, exist_ok=True)
            video = sorted(os.listdir(os.path.join(words_folder, word, video_file)))

            with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                frame_count = 0
                for frame in video:
                    frame_path = os.path.join(words_folder, word, video_file, frame)
                    frame = cv2.imread(frame_path)
                    image, results = mediapipe_detection(frame, holistic)
                    keypoints = extract_keypoints(results)
                    npy_file_path = os.path.join(video_path, f'{frame_count}.npy')
                    np.save(npy_file_path, keypoints)

                    frame_count += 1

            # Pad the remaining frames with zero arrays if the sequence length is not reached
            while frame_count < sequence_length:
                npy_file_path = os.path.join(video_path, f'{frame_count}.npy')
                np.save(npy_file_path, np.zeros(1662))
                frame_count += 1

In [None]:
make_keypoint_arrays(f'{dataset_dir}','Train/')

In [None]:
make_keypoint_arrays(f'{dataset_dir}','Test')

# 6. Preprocess Data and Create Labels and Features

In [None]:
import numpy as np
import os
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

words = np.array(os.listdir('/content/datasets/Train'))
print(words)
label_map = {label: num for num, label in enumerate(words)}
print(label_map)

sequences = []
labels = []

# Loop through each word in the list of words
for word in words:
    DATA_PATH = os.path.join('/content/datasets/npy_arrays/Train', word)
    videos = os.listdir(DATA_PATH)

    # Loop through each video folder in the current word's directory
    for video in videos:
        # List all sequence files in the current video directory
        for sequence in np.array(os.listdir(os.path.join(DATA_PATH, video))).astype(str):
            window = []

            # Loop through a fixed number of sequence frames (sequence_length is predefined)
            for frame_num in range(sequence_length):
                # Load the .npy file corresponding to the current frame number
                res = np.load(os.path.join(DATA_PATH, video, "{}.npy".format(frame_num)))
                window.append(res)

            sequences.append(window)
            labels.append(label_map[word])

# Convert sequences and labels into numpy arrays for compatibility with machine learning libraries
np.array(sequences).shape
np.array(labels).shape
X = np.array(sequences)
X.shape
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)
y_test.shape


# 7. Build and Train LSTM Neural Network

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(words.shape[0], activation='softmax'))

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.fit(X_train, y_train, epochs=200, callbacks=[tb_callback])
model.summary()