# Pose-based Sign Language Recognition


In [None]:
!pip install mediapipe

In [None]:

import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time , random
import mediapipe as mp
import copy
import glob
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import keras
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense,Dropout,BatchNormalization,Input,Conv1D,MaxPooling1D,\
                                    TimeDistributed,Activation,Lambda,ReLU,Conv1D,ConvLSTM1D,Flatten
from tensorflow.keras.callbacks import TensorBoard,ModelCheckpoint,EarlyStopping
from tensorflow.keras import layers
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score,confusion_matrix, classification_report
from tensorflow.keras.optimizers import Adam,RMSprop, SGD
from tensorflow.python.client import device_lib
from tensorflow.keras.utils import plot_model
import seaborn as sns
from PIL import ImageFont, ImageDraw, Image
import re # for preprocessing text
from tcn import TCN

# Download the KArSL-100 video dataset for Arabic sign language recognition from https://hamzah-luqman.github.io/KArSL/

# 1. Create Labels

In [None]:
# Read labels
import pandas as pd
labels = pd.read_csv('./KARSL_Labels2.csv')
print (labels)

     SignID        Sign
0         1           0
1         2           1
2         3           2
3         4           3
4         5           4
..      ...         ...
195     196  أم  mother
196     197  أخت sister
197     198  أخ brother
198     199   بنت  girl
199     200   رضيع baby

[200 rows x 2 columns]


In [None]:
labels['Sign']

0               0
1               1
2               2
3               3
4               4
          ...    
195    أم  mother
196    أخت sister
197    أخ brother
198     بنت  girl
199     رضيع baby
Name: Sign, Length: 200, dtype: object

In [None]:
# Use only 100 classes
actions = np.array(labels[70:170])
print(len(actions))

100


In [None]:
actions[0]

array([71, 'هيكل عظمي'], dtype=object)

In [None]:
actions[99]

array([170, 'يقفل ( يغلق ) close'], dtype=object)

# 2. Extract Keypoints using MP Holistic

## Setup Folders for Keypoint Collection

In [None]:
# Create folders to store the extracted keypoints

# 50 videos per sign per signer
no_sequences = 50

# Videos are going to be 30 frames in length
sequence_length = 30

DATA_PATH = "./MP_Data"

# Create a folder for each signer, then folder for each action, then folder for each video
for action in range(1, number_of_actions+1):
    for sequence in range(1,no_sequences+1):
        try:
            # name of action folder is the code of the action e.g. 0001 for action 1
            os.makedirs(os.path.join(DATA_PATH,"signer1",str(action).zfill(4), str(sequence)))
        except:
            pass
for action in range(1, number_of_actions+1):
    for sequence in range(1,no_sequences+1):
        try:
            # name of action folder is the code of the action e.g. 0001 for action 1
            os.makedirs(os.path.join(DATA_PATH,"signer2",str(action).zfill(4), str(sequence)))
        except:
            pass
for action in range(1, number_of_actions+1):
    for sequence in range(1,no_sequences+1):
        try:
            # name of action folder is the code of the action e.g. 0001 for action 1
            os.makedirs(os.path.join(DATA_PATH,"signer3",str(action).zfill(4), str(sequence)))
        except:
            pass

## Extract joint landmarks from video dataset (33 pose landmarks, 468 face landmarks, and 21 hand landmarks per hand).
Each landmark is represented by three points x, y and z

In [None]:
signers = ["01",  "02", "03"]
for signer in signers:
    Videos_Data_path = f"/Volumes/kfupm/KArSL/{signer}"
    countVideosPerAction = [0] * len(actions)

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        for root, dirs, files in os.walk(Videos_Data_path):
            for file in files:
                filename, extension = os.path.splitext(file)
                if extension == '.mp4' and not filename.startswith('.'):
                    cap = cv2.VideoCapture(os.path.join(root, file))
                    filename2 = filename.split("_")

                    try:
                        actionCode = int(filename2[2])
                    except (IndexError, ValueError):
                        print(f"Skipping file due to filename format: {file}")
                        continue

                    if actionCode < 1 or actionCode > len(actions):
                        print(f"Invalid action code: {actionCode} in file {file}")
                        continue

                    countVideosPerAction[actionCode - 1] += 1
                    if countVideosPerAction[actionCode - 1] > 50:
                        cap.release()
                        continue

                    for frame_num in range(1, sequence_length + 1):
                        ret, frame = cap.read()
                        if ret:
                            image, results = mediapipe_detection(frame, holistic)
                            draw_styled_landmarks(image, results)
                            keypoints = extract_keypoints(results)
                        else:
                            print(f"Missing frame in {file}, frame #{frame_num}")
                            # Repeat last keypoints if available
                            if frame_num == 1:
                                keypoints = np.zeros(1662,)  # or appropriate default
                        # Save keypoints
                        save_dir = os.path.join(DATA_PATH, f"signer{signer}", str(actionCode), str(countVideosPerAction[actionCode - 1]))
                        os.makedirs(save_dir, exist_ok=True)
                        npy_path = os.path.join(save_dir, f"{frame_num}.npy")
                        np.save(npy_path, keypoints)

                        if cv2.waitKey(10) & 0xFF == ord('q'):
                            break

                    cap.release()
        cv2.destroyAllWindows()

## Load data for signer 1, 2 and 3

In [None]:
def load_features(signer, include_nonManual= False):
    # Load extracted keypoints from saved npy files
    DATA_PATH_KEYPOINTS = os.path.join('./MP_Data/',signer)
    sequences, labels = [], []
    for action in range(70,170):
        for sequence in range (1, 51):
            window = []
            for frame_num in range(1,31):
                #Load Frame Keypoints
                res1 = np.load(os.path.join(DATA_PATH_KEYPOINTS, str(action).zfill(4), str(sequence), "{}.npy".format(frame_num)))
                #Take Hands Landmarks
                lh_rh = res1[1536:] # extract hand landmarks from npy file
                pose = res1[0:132]   # extract pose landmarks from npy file

                #Remove z Axis From Landmarks
                for z in range(2,lh_rh.shape[0],3):
                        lh_rh[z] = None
                for z in range(2,pose.shape[0],3):
                        pose[z] = None
                #Romove visibilty indicator from pose
                for z in range(2,pose.shape[0],4):
                        pose[z] = None

                #Remove NaN Data
                lh_rh = lh_rh[np.logical_not(np.isnan(lh_rh))]
                pose = pose[np.logical_not(np.isnan(pose))]

                if (include_nonManual):
                    features = np.concatenate([pose,lh_rh]) # Concatenate manual/non-manual features
                else:
                    features = lh_rh
                #print(len(pose))
                #print(len(lh_rh))
                #print(len(features))
                window.append(features)
            sequences.append(window)
            labels.append(action-70)
    return sequences, labels

# Load extracted features for each signer
sequences1, labels1 = load_features("signer1", True)
sequences2, labels2 = load_features("signer2", True)
sequences3, labels3 = load_features("signer3", True)


In [None]:
np.array(sequences3).shape

(5000, 30, 150)

In [None]:
#Convert Lists To Array
X1 = np.array(sequences1)
#Convert Labels to OHE
y1 = to_categorical(labels1).astype(int)
X1.shape, y1.shape

((5000, 30, 150), (5000, 100))

In [None]:
#Convert Lists To Array
X2 = np.array(sequences2)
#Convert Labels to OHE
y2 = to_categorical(labels2).astype(int)
X2.shape, y2.shape

((5000, 30, 150), (5000, 100))

In [None]:
#Convert Lists To Array
X3 = np.array(sequences3)
#Convert Labels to OHE
y3 = to_categorical(labels3).astype(int)
X3.shape, y3.shape

((5000, 30, 150), (5000, 100))

In [None]:
# You can save the data in one npy file, instead of reading from the folders everytime.
np.save("X1_p.npy", X1)
np.save("X2_p.npy", X2)
np.save("X3_p.npy", X3)
np.save("y1_p.npy", y1)
np.save("y2_p.npy", y2)
np.save("y3_p.npy", y3)

In [None]:
X1 = np.load("X1.npy")
X2 = np.load("X2.npy")
X3 = np.load("X3.npy")
y1 = np.load("y1.npy")
y2 = np.load("y2.npy")
y3 = np.load("y3.npy")

## Split data for signer-independant training and testing

In [None]:
# e.g. for the case of training on data from signer 1,2 and testing on signer 3
X = np.concatenate([X2,X3])
y = np.concatenate([y2,y3])

In [None]:
# Split Data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.20,shuffle=True,stratify=y,random_state=42)
X_test = X1
y_test = y1

In [None]:
X_test = X1
y_test = y1

# 3. Data Augmentation

In [None]:
# Rotation Augmentation
def augment_data_rotataion(X,y):
    '''
    input: X,y  as numpy array Shape: [Samples,Timesteps,Features]
    output: Augmented X,y as numpy array Shape:[Samples,Timesteps,Features]
    '''
    # Make an Array with Shape Like Original One
    augmented_X = np.zeros_like(X)
    augmented_y = np.zeros_like(y)

    #Looping in all Examples
    for ex in range(X.shape[0]):
        # Get Random Angle Betwwen -5,5
        rotation_angle = random.randint(-5,5)
        # Convert it to Radians
        theta = np.radians(rotation_angle)
        c, s = np.cos(theta), np.sin(theta)
        # Build a Rotation Matrix
        rotation_matrix = np.array(((c, -s), (s, c)))
        # Looping Each Frame
        for frame in range(X.shape[1]):
            window = []
            # looping each Point within Frame
            for i in range(0,X.shape[2]-1,2):
                # Get Keypoint
                keypoint = np.array([X[ex][frame][i],X[ex][frame][i+1]])
                # Calculate Rotated Keypoint
                rotated_keypoint = np.dot(rotation_matrix, keypoint)
                keypoint_x = rotated_keypoint[0]
                keypoint_y = rotated_keypoint[1]
                # Append New Keypoint To our Data
                window.extend([keypoint_x,keypoint_y])
            augmented_X[ex][frame] = np.array(window)
        augmented_y[ex] = y[ex]
    return augmented_X,augmented_y

In [None]:
# Scale Augmentation
def augment_data_scale(X,y):
    '''
    input: X,y  as numpy array Shape: [Samples,Timesteps,Features]
    output: Augmented X,y as numpy array Shape: [Samples,Timesteps,Features]
    '''
    # Make an Array with Shape Like Original One
    augmented_X = np.zeros_like(X)
    augmented_y = np.zeros_like(y)
    # Looping in Each Sample
    for ex in range(X.shape[0]):
        # Get Random Scale Factor
        SCALE = round(random.random(),2)
        for frame in range(X.shape[1]):
            # Calculate New Point
            augmented_X[ex][frame] = X[ex][frame]*SCALE
        augmented_y[ex] = y[ex]
    return augmented_X,augmented_y

In [None]:
# Augmented Rotated Data
rot_x,rot_y = augment_data_rotataion(X_train,y_train)
# Augmented Scaled Data
scaled_x,scaled_y = augment_data_scale(X_train,y_train)

In [None]:
# Concatenate all data [Original and Augmented]
X_train = np.concatenate([X_train,rot_x,scaled_x])
y_train = np.concatenate([y_train,rot_y,scaled_y])
X_train.shape,y_train.shape

((24000, 30, 84), (24000, 100))

In [None]:
X_train.shape, X_test.shape,X_val.shape, y_val.shape, y_train.shape, y_test.shape

((24000, 30, 84),
 (5000, 30, 84),
 (2000, 30, 84),
 (2000, 100),
 (24000, 100),
 (5000, 100))

# 4. Build Model

In [None]:
# Define time steps
timesteps = X.shape[1]

# Define number of features
features = X.shape[2]

def model_predict(model, X_pred, y_pred):
    yhat = model.predict(X_pred)
    ytrue = np.argmax(y_pred, axis=1).tolist()
    yhat = np.argmax(yhat, axis=1).tolist()
    print(accuracy_score(ytrue, yhat))

## 4.1 LSTM

In [None]:
# Build a LSTM Model Arch
model_lstm3 = Sequential()
model_lstm3.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(timesteps,features))) # frames * Features
model_lstm3.add(LSTM(128, return_sequences=True, activation='relu'))
model_lstm3.add(LSTM(64, return_sequences=False, activation='relu'))
model_lstm3.add(Dense(64, activation='relu'))
model_lstm3.add(Dense(32, activation='relu'))
model_lstm3.add(Dense(actions.shape[0], activation='softmax'))
model_lstm3.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_3 (LSTM)               (None, 30, 64)            55040     
                                                                 
 lstm_4 (LSTM)               (None, 30, 128)           98816     
                                                                 
 lstm_5 (LSTM)               (None, 64)                49408     
                                                                 
 dense_6 (Dense)             (None, 64)                4160      
                                                                 
 dense_7 (Dense)             (None, 32)                2080      
                                                                 
 dense_8 (Dense)             (None, 100)               3300      
                                                                 
Total params: 212,804
Trainable params: 212,804
Non-tr

In [None]:
# Compilation Configuration
model_lstm3.compile(optimizer=Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
# Define Callbacks
log_dir = os.path.join('Logs/LSTM_test_on_2_p')
tb_callback = TensorBoard(log_dir=log_dir,histogram_freq=1,
                          update_freq='epoch',
                          profile_batch=0) ## !tensorboard --logdir=.
mc = ModelCheckpoint('Models/LSTM_test_on_2_p.h5', monitor='val_categorical_accuracy', mode='max', verbose=1,save_best_only=True)
es = EarlyStopping(monitor='val_categorical_accuracy', mode='max', verbose=1,patience=10)
callbacks = [tb_callback,mc,es]

In [None]:
model_lstm3.fit(X_train, y_train, epochs=1000, callbacks=[callbacks],batch_size=32,validation_data=(X_val, y_val))

In [None]:
# load best model, saved by early stopping
loaded_model = tf.keras.models.load_model('Models/LSTM_test_on_2_p.h5')

In [None]:
model_predict(loaded_model, X_test, y_test)

## 4.2 Temporal Convolutional Network (TCN)

In [None]:
# from https://github.com/philipperemy/keras-tcn
!pip install keras-tcn


In [None]:
# Build a TCN Model Arch
model_TCN3 = Sequential()
model_TCN3.add(TCN(64, return_sequences=True, activation='relu', input_shape=(timesteps,features))) # frames * Features
model_TCN3.add(TCN(128, return_sequences=True, activation='relu'))
model_TCN3.add(TCN(64))
model_TCN3.add(Dense(64, activation='relu'))
model_TCN3.add(Dense(32, activation='relu'))
model_TCN3.add(Dense(actions.shape[0], activation='softmax'))

model_TCN3.compile(optimizer=Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['categorical_accuracy'])

model_TCN3.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 tcn (TCN)                   (None, 30, 64)            174400    
                                                                 
 tcn_1 (TCN)                 (None, 30, 128)           575104    
                                                                 
 tcn_2 (TCN)                 (None, 64)                168768    
                                                                 
 dense_3 (Dense)             (None, 64)                4160      
                                                                 
 dense_4 (Dense)             (None, 32)                2080      
                                                                 
 dense_5 (Dense)             (None, 100)               3300      
                                                                 
Total params: 927,812
Trainable params: 927,812
Non-tr

In [None]:
# Define Callbacks
log_dir = os.path.join('/Logs/TCN_test_on_1_p')
tb_callback = TensorBoard(log_dir=log_dir,histogram_freq=1,
                          update_freq='epoch',
                          profile_batch=0) ## !tensorboard --logdir=.
mc = ModelCheckpoint('/Models/TCN_test_on_1_p.h5', monitor='val_categorical_accuracy', mode='max', verbose=1,save_best_only=True)
es = EarlyStopping(monitor='val_loss', verbose=1,patience=10)
callbacks = [tb_callback,mc,es]

In [None]:
model_TCN3.fit(X_train, y_train, epochs=1000, callbacks=[callbacks],batch_size=32,validation_data=(X_val, y_val))

In [None]:
loaded_model = tf.keras.models.load_model('./Models/TCN_test_on_1_11d.h5',custom_objects={'TCN': TCN})

In [None]:
model_predict(loaded_model, X_test, y_test)

In [None]:
loaded_model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 tcn_6 (TCN)                 (None, 30, 64)            174400    
                                                                 
 tcn_7 (TCN)                 (None, 30, 128)           575104    
                                                                 
 tcn_8 (TCN)                 (None, 64)                168768    
                                                                 
 dense_6 (Dense)             (None, 64)                4160      
                                                                 
 dense_7 (Dense)             (None, 32)                2080      
                                                                 
 dense_8 (Dense)             (None, 100)               3300      
                                                                 
Total params: 927,812
Trainable params: 927,812
Non-tr

## 4.3 Transformer Encoder

In [None]:
# Transformer Encoder from: https://keras.io/examples/timeseries/timeseries_transformer_classification/
# Create transformwe encoder
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs
    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

    def compute_mask(self, inputs, mask=None):
        mask = tf.reduce_any(tf.cast(inputs, "bool"), axis=-1)
        return mask

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'sequence_length': self.sequence_length,
            'output_dim': self.output_dim,

        })
        return config

In [None]:
def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    mlp_dropout=0,
):
    inputs = keras.Input(shape=input_shape)
    x = PositionalEmbedding(
        input_shape[0], input_shape[1], name="frame_position_embedding"
    )(inputs)


    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(actions.shape[0], activation="softmax")(x)
    return keras.Model(inputs, outputs)

In [None]:
input_shape = X_train.shape[1:]
model_transformer1 = build_model(
    input_shape,
    head_size=256,
    num_heads=9,
    ff_dim=2048,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.4,
    dropout=0.25,
)
model_transformer1.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 30, 84)]     0           []                               
                                                                                                  
 frame_position_embedding (Posi  (None, 30, 84)      2520        ['input_4[0][0]']                
 tionalEmbedding)                                                                                 
                                                                                                  
 layer_normalization_16 (LayerN  (None, 30, 84)      168         ['frame_position_embedding[0][0]'
 ormalization)                                                   ]                                
                                                                                            

In [None]:
model_transformer1.compile(
    loss="categorical_crossentropy",
    optimizer=Adam(learning_rate=1e-5),
    metrics=["categorical_accuracy"],
)

In [None]:
log_dir = os.path.join('./Logs/transformer_encoder_test_on_1_positional')
tb_callback = TensorBoard(log_dir=log_dir,histogram_freq=1,
                          update_freq='epoch',
                          profile_batch=0) ## !tensorboard --logdir=.
mc = ModelCheckpoint('./Models/transformer_encoder_test_on_1_positional.h5', monitor='val_categorical_accuracy', mode='max', verbose=1,save_best_only=True)
es = EarlyStopping(monitor='val_categorical_accuracy', mode='max', verbose=1,patience=5)
callbacks = [tb_callback,mc,es]

In [None]:
model_transformer1.fit(
    X_train,
    y_train,
    validation_data=(X_val,y_val),
    epochs=1000,
    batch_size=64, #8
    #initial_epoch = 124,
    callbacks=callbacks,
)

In [None]:
# loading the saved model`
loaded_model = tf.keras.models.load_model("./Models/transformer_encoder_test_on_2_positional.h5",custom_objects={'PositionalEmbedding': PositionalEmbedding})

In [None]:
model_predict(loaded_model, X_test, y_test)