In [None]:
'''
!pip install mediapipe
!pip install seaborn
!pip install tensorflow
!pip install openpyxl
'''

'\n!pip install mediapipe\n!pip install seaborn\n!pip install tensorflow\n'

In [1]:
import mediapipe as mp
import tensorflow as tf
import keras
import numpy as np
import pandas as pd
import os
import shutil
import datetime as dt
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import glob
import cv2
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Bidirectional, LSTM, Dense
from keras.callbacks import EarlyStopping
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
np.random.seed(42)
#RUN

In [None]:
# Create a object from Holistic to detect (pose, face, and hands keypoints)
mp_holistic = mp.solutions.holistic
# Drawing utilities
mp_drawing = mp.solutions.drawing_utils

In [None]:
'''
The input image is converted from BGR to RGB because the MediaPipe model expects RGB images.
It sets image.flags.writeable to False before passing it to the model to prevent any unwanted modifications during the inference process.
The images will converted back from RGB to BGR for any further OpenCV operations,
this ensures the image can be processed further by OpenCV without any issues related to color formats.
'''

def mediapipe_detection(image, model):

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [None]:
'''
This function is responsible for drawing the detected landmarks on the image, allowing you to imagen the pose and hand landmarks detected by MediaPipe.

The function draws the pose landmarks, left hand landmarks, and right hand landmarks with different colors and styles.

color: Defines the color of the landmarks.
thickness: Defines how thick the lines connecting the landmarks will be.
circle_radius: Controls the radius of the circles around each landmark.

This will makes it easier to detected landmarks on the image for debugging or interpretation purposes.
'''

def draw_styled_landmarks(image, results):

    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             )
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             )
    # Draw right hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )

In [None]:
'''
This function adjusts the landmarks by centering them around a specific point LikeTHE ( the nose or wrist)
that's to normalize the positions and remove any translation shift in the data.

Reshape>> for thhe landmark array is reshaped to handle 3D points (x, y, z).

Centering>> Like (nose or wrist) is subtracted from each landmark to "center" the landmarks around the specific point.
It's look like Normalization :)
'''

def adjust_landmarks(arr,center):

    # Reshape the array to have shape (n, 3)
    arr_reshaped = arr.reshape(-1, 3)

    # Repeat the center array to have shape (n, 3)
    center_repeated = np.tile(center, (len(arr_reshaped), 1))

    # Subtract the center array from the arr array
    arr_adjusted = arr_reshaped - center_repeated

    # Reshape arr_adjusted back to shape (n*3,)
    arr_adjusted = arr_adjusted.reshape(-1)
    return(arr_adjusted)

In [None]:
'''
This function extracts and adjusts keypoints:
Pose, Left Hand, Right Hand Keypoints: Each set of landmarks (pose, left hand, right hand) is flattened into a 1D array.
Also the landmarks are adjusted by centering around specific points (nose for pose, wrists for hands).
So. it's to extract and adjust the keypoints for each frame, making them ready for further analysis or machine learning models.
'''

def extract_keypoints(results):

    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    nose=pose[:3]
    lh_wrist=lh[:3]
    rh_wrist=rh[:3]
    pose_adjusted = adjust_landmarks(pose,nose)
    lh_adjusted = adjust_landmarks(lh,lh_wrist)
    rh_adjusted = adjust_landmarks(rh,rh_wrist)
    return pose_adjusted, lh_adjusted, rh_adjusted

In [8]:
'''
I have to adjust the range to fit the words and numbers :) #############
'''
# RUN
# Define the different ranges that needed
ranges = [(1, 32), (71, 503)]  # (1, 32) represents numbers, (71, 503) represents Words

# Initialize an empty list to store the results
selected_words = []

# Iterate over each range
for start, end in ranges:
    # Extend the list with zero-padded numbers in the current range
    selected_words.extend([str(num).zfill(4) for num in range(start, end)])

# Print the result
print(selected_words)

['0001', '0002', '0003', '0004', '0005', '0006', '0007', '0008', '0009', '0010', '0011', '0012', '0013', '0014', '0015', '0016', '0017', '0018', '0019', '0020', '0021', '0022', '0023', '0024', '0025', '0026', '0027', '0028', '0029', '0030', '0031', '0071', '0072', '0073', '0074', '0075', '0076', '0077', '0078', '0079', '0080', '0081', '0082', '0083', '0084', '0085', '0086', '0087', '0088', '0089', '0090', '0091', '0092', '0093', '0094', '0095', '0096', '0097', '0098', '0099', '0100', '0101', '0102', '0103', '0104', '0105', '0106', '0107', '0108', '0109', '0110', '0111', '0112', '0113', '0114', '0115', '0116', '0117', '0118', '0119', '0120', '0121', '0122', '0123', '0124', '0125', '0126', '0127', '0128', '0129', '0130', '0131', '0132', '0133', '0134', '0135', '0136', '0137', '0138', '0139', '0140', '0141', '0142', '0143', '0144', '0145', '0146', '0147', '0148', '0149', '0150', '0151', '0152', '0153', '0154', '0155', '0156', '0157', '0158', '0159', '0160', '0161', '0162', '0163', '0164',

In [None]:
'''
This function processes all the video frames of sign language sings for a particular signer and split (train, test, or val).
It does the following:

For each word, it processes video files.

For each frame in the video, the function:
>> Reads the frame.

>> Uses mediapipe_detection to detect landmarks.

>> Extracts the keypoints (pose, left hand, right hand).

>> Appends the keypoints to the respective lists.

>> After processing a video, the keypoints are saved as .npy files.


This function generates numpy arrays of keypoints for each video in the specified folder location.
Args:
    signer(int): the signer of interest. Could be 01 or 02 or 03
    split(str): can be 'train', 'test' or 'val'
'''
def make_keypoint_arrays(path,signer,split):

    os.makedirs('karsl-502',exist_ok = True)
    os.makedirs(f'karsl-502/{signer}',exist_ok = True)
    os.makedirs(f'karsl-502/{signer}/{split}',exist_ok = True)
    working_path = f'karsl-502/{signer}/{split}'
    words_folder = os.path.join(path,str(signer),str(signer), split)

    # Loop through all the subfolders in the folder
    for word in tqdm(selected_words):

        video_files = os.listdir(os.path.join(words_folder, word))
          # Loop through the video files
        for video_file in video_files:
                # Open the video file
            video = sorted(os.listdir(os.path.join(words_folder, word, video_file)))

            # Initialize the list of keypoints for this video
            pose_keypoints, lh_keypoints, rh_keypoints = [], [], []
            with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
              # Loop through the video frames
              for frame in video:
                  # Perform any necessary preprocessing on the frame (e.g., resizing, normalization)
                frame = os.path.join(words_folder, word, video_file,frame)
                frame = cv2.imread(frame)
#                 frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                  # Normalize pixel values to the range [0, 1]
                # Make detections
                image, results = mediapipe_detection(frame, holistic)

                # Extract keypoints
                pose, lh, rh = extract_keypoints(results)
                # Add the keypoints to the list for this video
                pose_keypoints.append(pose)
                lh_keypoints.append(lh)
                rh_keypoints.append(rh)
                # Save the keypoints for this video to a numpy array
                pose_directory = os.path.join(working_path, word,'pose_keypoints')
                lh_directory = os.path.join(working_path, word,'lh_keypoints')
                rh_directory = os.path.join(working_path, word,'rh_keypoints')

                if not os.path.exists(pose_directory):
                    os.makedirs(pose_directory)

                if not os.path.exists(lh_directory):
                    os.makedirs(lh_directory)

                if not os.path.exists(rh_directory):
                    os.makedirs(rh_directory)

                pose_path = os.path.join(pose_directory, video_file)
                np.save(pose_path, pose_keypoints)

                lh_path = os.path.join(lh_directory, video_file)
                np.save(lh_path, lh_keypoints)

                rh_path = os.path.join(rh_directory, video_file)
                np.save(rh_path, rh_keypoints)

In [None]:
#make_keypoint_arrays('G:\Capstone data\karsl-502','01','train')
#make_keypoint_arrays('G:\Capstone data\karsl-502','01','test')
#make_keypoint_arrays('G:\Capstone data\karsl-502','02','train')
#make_keypoint_arrays('G:\Capstone data\karsl-502','02','test')
#make_keypoint_arrays('G:\Capstone data\karsl-502','03','train')
#make_keypoint_arrays('G:\Capstone data\karsl-502','03','test')

100%|████████████████████████████████████████████████████████████████████████████| 463/463 [14:59:06<00:00, 116.52s/it]
100%|██████████████████████████████████████████████████████████████████████████████| 463/463 [5:03:00<00:00, 39.27s/it]
100%|████████████████████████████████████████████████████████████████████████████| 463/463 [21:05:57<00:00, 164.05s/it]
100%|███████████████████████████████████████████████████████████████████████████████████████████████| 463/463 [4:05:33<00:00, 31.82s/it]


In [28]:
# Load the sentences data
sentences_df = pd.read_excel('G:/Capstone data/Sentences.xlsx', sheet_name=None)

# Extract the sentences from each sheet
sentences = {}
for sheet_name, df in sentences_df.items():
    sentences[sheet_name] = df['Sentence'].tolist()

sentences

KeyError: 'Sentence'

To handle multiple words in a single instance, we’ll create a function to process multiple words (video files) and concatenate the sequences for each word in a sentence.

To collect sequences of words and combine them into a sentence,
it's return sequences of keypoints for each sentence.

Sentence Construction: We split each sentence into words, process the video files for each word, and then concatenate them to form a complete sentence sequence.

Return Sentence Sequences: We return the sentence sequences, where each sentence is represented by the concatenation of its word sequences.

**Sentence Construction**: We split each sentence into words, process the video files for each word, and then concatenate them to form a complete sentence sequence.

**Return Sentence Sequences:** We return the sentence sequences, where each sentence is represented by the concatenation of its word sequences.

In [None]:
# def make_sentence_sequences(path, signer, split):
#     # This function now returns sequences for multiple words in a sentence
#     sentence_sequences = []  # Store sequences of keypoints for the entire sentence

#     words_folder = os.path.join(path, str(signer), str(signer), split)

#     for sentence in tqdm(karsl_6['Sign-Arabic']):
#         # Split sentence into individual words
#         words_in_sentence = sentence.split()

#         word_sequences = []  # To hold sequences for each word in the sentence

#         for word in words_in_sentence:
#             # Get word ID and find corresponding video files
#             word_id = w2id.get(word)
#             word_video_dir = os.path.join(words_folder, str(word_id).zfill(4))

#             if os.path.exists(word_video_dir):
#                 video_files = os.listdir(word_video_dir)

#                 for video_file in video_files:
#                     video_path = os.path.join(word_video_dir, video_file)
#                     keypoints_sequence = extract_keypoints_from_video(video_path)
#                     word_sequences.append(keypoints_sequence)

#         # Concatenate word sequences to form a sentence
#         sentence_sequence = np.concatenate(word_sequences, axis=0)
#         sentence_sequences.append(sentence_sequence)

#     return np.array(sentence_sequences)

Modifying the LSTM model slightly to accommodate sequences of multiple words for each sentence.

Input Shape: The model takes a sequence of keypoints (from multiple words), processes them through two LSTM layers, and then outputs a prediction for the sentence.

In [16]:
def build_lstm_model(input_shape, num_classes):
    model = Sequential()
    model.add(Bidirectional(LSTM(128, return_sequences=True), input_shape=input_shape))
    model.add(LSTM(128))  # Process the entire sentence as a sequence of words
    model.add(Dense(64, activation='relu'))  # Dense layer for hidden representation
    model.add(Dense(num_classes, activation='softmax'))  # Output layer for sentence prediction

    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

Now need to format the input data (X) as sequences of keypoints for sentences, and output data (Y) as the corresponding one-hot encoded sentence labels.

X_data: Each sentence is represented by the concatenation of its word sequences.

y_data: The sentence label is assigned using the label_map, which maps each sentence to an ID.

In [25]:
def prepare_data_for_lstm(data_path, signers, split, frame_interval=10):
    X_data, y_data = [], []

    for sentence in tqdm(karsl_6['Sign-Arabic']):
        # Skip non-string sentences
        if not isinstance(sentence, str):
            print(f"Skipping non-string value: {sentence}")
            continue  # Skip this iteration if sentence is not a string
        
        sentence_sequence = []  # Store sequences for this sentence
        sentence_labels = []  # Store labels for this sentence
        words_in_sentence = sentence.split()

        for word in words_in_sentence:
            word_id = w2id.get(word)
            if word_id is None:
                print(f"Word '{word}' not found in w2id dictionary. Skipping this word.")
                continue  # Skip if word is not in w2id
            
            word_video_dir = os.path.join(data_path, str(word_id).zfill(4))

            if os.path.exists(word_video_dir):
                word_sequences = []
                video_files = os.listdir(word_video_dir)

                for video_file in video_files:
                    video_path = os.path.join(word_video_dir, video_file)
                    keypoints_sequence = extract_keypoints_from_video(video_path, frame_interval)
                    word_sequences.append(keypoints_sequence)

                # Concatenate all word sequences into one word sequence
                sentence_sequence.append(np.concatenate(word_sequences, axis=0))
                
                # Retrieve and append the label for this word to the sentence's label list
                label = label_map.get(word)
                if label is not None:
                    sentence_labels.append(label)
                else:
                    print(f"Label for word '{word}' not found. Skipping this word's label.")
            else:
                print(f"Video data for word '{word}' not found at {word_video_dir}. Skipping this word.")

        # Only append to X_data and y_data if there are valid sequences and labels
        if sentence_sequence and sentence_labels:
            X_data.append(np.concatenate(sentence_sequence, axis=0))
            y_data.append(sentence_labels)  # Append the sentence's labels

    # Check if X_data or y_data are empty after processing
    if len(X_data) == 0 or len(y_data) == 0:
        print("Error: No valid data found after processing!")
        return np.array([]), np.array([])  # Return empty arrays if no data

    # Convert y_data to a numpy array with one-hot encoding
    y_data = np.array([to_categorical(labels) for labels in y_data])

    return np.array(X_data), y_data


Training: We split the data into training and validation sets to evaluate the model's performance.

Testing: We also prepare a test set to evaluate the final performance after training.

In [26]:
X_train, y_train = prepare_data_for_lstm(data_path, ['01', '02'], 'train', frame_interval=10)

# Ensure data is not empty before proceeding with the split
if X_train.size == 0 or y_train.size == 0:
    print("Error: Empty training data. Please check the input data.")
else:
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
    print(f'X_train shape: {X_train.shape}')
    print(f'y_train shape: {y_train.shape}')
    print(f'X_val shape: {X_val.shape}')
    print(f'y_val shape: {y_val.shape}')


100%|██████████████████████████████████████████████████████████████████████████████| 463/463 [00:00<00:00, 8111.15it/s]

Skipping non-string value: 0
Skipping non-string value: 1
Skipping non-string value: 2
Skipping non-string value: 3
Skipping non-string value: 4
Skipping non-string value: 5
Skipping non-string value: 6
Skipping non-string value: 7
Skipping non-string value: 8
Skipping non-string value: 9
Skipping non-string value: 10
Skipping non-string value: 20
Skipping non-string value: 30
Skipping non-string value: 40
Skipping non-string value: 50
Skipping non-string value: 60
Skipping non-string value: 70
Skipping non-string value: 80
Skipping non-string value: 90
Skipping non-string value: 100
Skipping non-string value: 200
Skipping non-string value: 300
Skipping non-string value: 400
Skipping non-string value: 500
Skipping non-string value: 600
Skipping non-string value: 700
Skipping non-string value: 800
Skipping non-string value: 900
Skipping non-string value: 1000
Skipping non-string value: 1000000
Skipping non-string value: 10000000
Word 'هيكل' not found in w2id dictionary. Skipping this wo




train the LSTM model using the prepared training data (X_train, y_train) and validation data (X_val, y_val).

The model is trained with a specified number of epochs and batch size.
NOTE THAT FOR >> Early Stopping: We use early stopping to prevent overfitting by monitoring the validation loss.

In [27]:
# Define input shape based on the data
input_shape = X_train.shape[1:]

# Define the number of classes (unique sentences)
num_classes = len(label_map)

# Build and compile the LSTM model
model = build_lstm_model(input_shape, num_classes)

# Train the model
history = model.fit(X_train, y_train, epochs=30, batch_size=32,
                    validation_data=(X_val, y_val),
                    callbacks=[EarlyStopping(monitor='val_loss', patience=5)])

  super().__init__(**kwargs)


ValueError: Input 0 of layer "bidirectional" is incompatible with the layer: expected ndim=3, found ndim=1. Full shape received: (None,)

Model Evaluation

In [None]:
# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f'Test Loss: {test_loss}')
print(f'Test Accuracy: {test_accuracy}')

**>> eypoint Extraction:**
Use extract_keypoints_from_video to process the videos and return sequences of keypoints for each word in a sentence.

**>> Sentence Sequences:**
Create sequences for multiple words and concatenate them to form a full sentence sequence.

**>> LSTM Model:**
Building an LSTM model to handle sentence-level sequences.

**>> Data Preparation:**
Format the data as sequences of keypoints and sentence labels.

**>> Model Training:**
Train the model using the training set and validate it using the validation set.

**>> Evaluation:**
Evaluating the model's performance on the test set.

In [None]:
# Define the Bidirectional LSTM model with Attention


model = tf.keras.Sequential([
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64)),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(len(words), activation='softmax')
])

# Compile the model

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

# Set up early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',  # Metric to monitor for early stopping
    mode='min',  # Set mode to 'min' for minimizing the metric
    patience=5,  # Number of epochs with no improvement before stopping
    restore_best_weights=True,  # Restore the best model weights
    verbose=1
)


model_training_history = model.fit(X_train, y_train, batch_size=32, validation_data=(X_val,y_val), validation_batch_size=32, epochs=50, callbacks=[early_stopping])


# Evaluate the model on train data
model_evaluation_history = model.evaluate(X_train, y_train)


# Evaluate the model on test data
model_evaluation_history = model.evaluate(X_test, y_test)



def plot_metric(model_training_history, metric_name_1, metric_name_2, plot_name):
    '''
    This function will plot the metrics passed to it in a graph.
    Args:
        model_training_history: A history object containing a record of training and validation
                                loss values and metrics values at successive epochs
        metric_name_1:          The name of the first metric that needs to be plotted in the graph.
        metric_name_2:          The name of the second metric that needs to be plotted in the graph.
        plot_name:              The title of the graph.
    '''

    # Get metric values using metric names as identifiers.
    metric_value_1 = model_training_history.history[metric_name_1]
    metric_value_2 = model_training_history.history[metric_name_2]

    # Construct a range object which will be used as x-axis (horizontal plane) of the graph.
    epochs = range(len(metric_value_1))

    # Plot the Graph.
    plt.plot(epochs, metric_value_1, 'blue', label = metric_name_1)
    plt.plot(epochs, metric_value_2, 'red', label = metric_name_2)

    # Add title to the plot.
    plt.title(str(plot_name))

    # Add legend to the plot.
    plt.legend()



# Visualize the training and validation loss metrices.
plot_metric(model_training_history, 'loss', 'val_loss', 'Total Loss vs Total Validation Loss')


#Predicted sign
res = model.predict(X_test)
words[np.argmax(res[1])]


#Real sign
words[np.argmax(y_test[1])]



# Get the loss and accuracy from model_evaluation_history.
model_evaluation_loss, model_evaluation_accuracy = model_evaluation_history

# Define the string date format.
# Get the current Date and Time in a DateTime Object.
# Convert the DateTime object to string according to the style mentioned in date_time_format string.
date_time_format = '%Y_%m_%d__%H_%M_%S'
current_date_time_dt = dt.datetime.now()
current_date_time_string = dt.datetime.strftime(current_date_time_dt, date_time_format)

# Define a useful name for our model to make it easy for us while navigating through multiple saved models.
model_file_name = f'Kaleem_model_2_signers___Date_Time_{current_date_time_string}___Loss_{model_evaluation_loss}___Accuracy_{model_evaluation_accuracy}.h5'

# Save your Model.
model.save(model_file_name)


def get_key_by_value(dictionary, value):
    for key, val in dictionary.items():
        if val == value:
            return key
    return None



ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

y = []
for v in ytrue:
    y.append(get_key_by_value(label_map, v))
print(y)


y = [karsl_6[karsl_6['Sign-Arabic'] == v]['Sign-English'].values[0] for v in y]
print(y)



ypred = []
for v in yhat:
    ypred.append(get_key_by_value(label_map, v))
print(ypred)



ypred = [karsl_6[karsl_6['Sign-Arabic'] == v]['Sign-English'].values[0] for v in ypred]
print(ypred)





from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Assuming y and ypred are your target labels and predicted labels, respectively

# Select the first 20 classes
y_subset = y[:200]
ypred_subset = ypred[:200]

# Get unique class labels
class_labels = np.unique(y_subset)

# Compute confusion matrix
cm = confusion_matrix(y_subset, ypred_subset, labels=class_labels)

# Create a DataFrame from the confusion matrix
df_cm = pd.DataFrame(cm, index=class_labels, columns=class_labels)
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.set(font_scale=1.3)  # for label size
sns.heatmap(df_cm, cmap="Blues", annot=True, fmt="d", annot_kws={"size": 12})
plt.title("Confusion Matrix - First 20 Classes")
plt.show()