# 1. Import and install dependencies

In [None]:
from django.utils.lorem_ipsum import sentence
from torch.ao.nn.quantized.functional import threshold
!pip install -r requirements.txt -q

In [None]:
# Print tensorflow version
import tensorflow as tf
print(tf.__version__)

In [None]:
# Print mediapipe version
import mediapipe as mp
print(mp.__version__)

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import mediapipe as mp

# 2. Keypoint using MP Holistic

In [None]:
# Set up mediapipe instance
mp_holistic = mp.solutions.holistic # Holistic models
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

Next, I will create a function to detect by mediapipe, there are some works we need to do in this function:
- Convert the image from BGR to RGB for detection in mediapipe
- Set image to unwritable for saving memory
- Make detection
- Convert image back to BGR for rendering

In [None]:
# Define a function to detect key points
def mediapipe_detection(image, model):
    image =cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # COLOR CONVERSION BGR TO RGB
    image.flags.writeable = False
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR CONVERSION RGB TO BGR
    return image, results

In [None]:
# Apply the function to the webcam
cap = cv2.VideoCapture(0)
# Set mediapipe models
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)

        # Show to screen
        cv2.imshow('Raw Webcam Feed', frame)
        # Break if 'q' is pressed
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
# Print the results
len(results.pose_landmarks.landmark)

**Note**:<br>
The face and hand landmark models will return no values if nothing is detected. The pose model will return landmarks but the visibility value inside each landmark will be low.


In [None]:
# try to print other landmarks
len(results.left_hand_landmarks.landmark)

Now I need to visualize the landmarks on the frame. I will create a function to draw the landmarks on the frame.

In [None]:
# Define a function to draw landmarks
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections

In [None]:
# The frame before applying the function
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

In [None]:
# Take the result from the previous cell which is the media pipe result for the frame
results

# Draw landmarks to the last frame
draw_landmarks(frame, results)

**Note**:<br>
The `draw_landmarks` function does not return the image but rather applies the landmark visualizations to the current image in place.

In [None]:
# Apply the function to the frame and show to screen
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

Because our landmark for each part are the same, so I will create a function to custom style for each landmark.

In [None]:
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             )
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             )
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             )
    # Draw right hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )

In [None]:
# Apply the function to the webcam
cap = cv2.VideoCapture(0)
# Set mediapipe models
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)

        # Draw landmarks
        draw_styled_landmarks(image, results)

        # Show to screen
        cv2.imshow('Raw Webcam Feed', image)
        # Break if 'q' is pressed
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# 3. Extract Keypoint Values

Now I will create a function to extract the keypoint values from the results and turn them into an array in numpy.

In [None]:
len(results.pose_landmarks.landmark)

In [None]:
# Get list of numpy array for each pose landmark
pose_landmarks = []

for landmark in results.pose_landmarks.landmark:
    landmark_array = np.array([landmark.x, landmark.y, landmark.z, landmark.visibility])
    pose_landmarks.append(landmark_array)

In [None]:
# Handle the pose landmarks in a single array
pose_landmarks = np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in results.pose_landmarks.landmark]).flatten() # Reshape to a single dimension array

# Face
if results.face_landmarks:
    face_landmarks = np.array([[landmark.x, landmark.y, landmark.z] for landmark in results.face_landmarks.landmark]).flatten()
else:
    face_landmarks = np.zeros(468*3)

# Left Hand
if results.left_hand_landmarks:
    left_hand_landmarks = np.array([[landmark.x, landmark.y, landmark.z] for landmark in results.left_hand_landmarks.landmark]).flatten()
else:
    left_hand_landmarks = np.zeros(21*3)

# Right Hand
if results.right_hand_landmarks:
    right_hand_landmarks = np.array([[landmark.x, landmark.y, landmark.z] for landmark in results.right_hand_landmarks.landmark]).flatten()
else:
    right_hand_landmarks = np.zeros(21*3)

In [None]:
print('pose shape:', pose_landmarks.shape)
print('face shape:', face_landmarks.shape)
print('left hand shape:', left_hand_landmarks.shape)
print('right hand shape:', right_hand_landmarks.shape)

In [None]:
right_hand_landmarks

In [None]:
def extract_keypoints(results):
    # Pose
    if results.pose_landmarks:
        pose_landmarks = np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in results.pose_landmarks.landmark]).flatten() # Reshape to a single dimension array
    else:
        pose_landmarks = np.zeros(33*4)

    # Face
    if results.face_landmarks:
        face_landmarks = np.array([[landmark.x, landmark.y, landmark.z] for landmark in results.face_landmarks.landmark]).flatten()
    else:
        face_landmarks = np.zeros(468*3)

    # Left Hand
    if results.left_hand_landmarks:
        left_hand_landmarks = np.array([[landmark.x, landmark.y, landmark.z] for landmark in results.left_hand_landmarks.landmark]).flatten()
    else:
        left_hand_landmarks = np.zeros(21*3)

    # Right Hand
    if results.right_hand_landmarks:
        right_hand_landmarks = np.array([[landmark.x, landmark.y, landmark.z] for landmark in results.right_hand_landmarks.landmark]).flatten()
    else:
        right_hand_landmarks = np.zeros(21*3)

    return np.concatenate([pose_landmarks, face_landmarks, left_hand_landmarks, right_hand_landmarks])

In [None]:
# Test the function
print('First 10 result: ', extract_keypoints(results)[:10])
print('shape: ', extract_keypoints(results).shape)
print('shape = 33*4 + 468*3 + 21*3 + 21*3 = ', 33*4 + 468*3 + 21*3 + 21*3)

# 4. Setup Folders for Collection

In [None]:
# Define paths for exported data, nnumpy array
DATA_PATH = '/input/processed/MP_Data'

# Actions that I try to detect
actions = np.array(['hello', 'thanks', 'iloveyou'])

# Number videos worth of data
no_sequences = 30

# Number of frames for each video
sequence_length = 30

The main different between **action detection** and other computer vision tasks is that a sequences of data than a single frame is used for detection.

In [None]:
# Create a folder for each action
for action in actions:
    for sequence in range(no_sequences):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

The folder tree will look like this:

```bash
MP_Data
|__ a
|   |____ 0
|   |____ 1
|   |____ ...
|__ b
|   |____ 0
|   |____ 1
|   |____ ...
|__ c
|   |____ 0
|   |____ 1
|   |____ ...
|__ hello
|   |____ 0
|   |____ 1
|   |____ ...
|__ thanks
|   |____ 0
|   |____ 1
|   |____ ...
|__ iloveyou
|   |____ 0
|   |____ 1
|   |____ ...
```
<br>
I am going to collect 30 videos per action. Then each one of those video sequences will contain 30 frames of data. Each frame will contain 1662 landmark values. 30*3 sequences, 30 frames, 1662 landmarks.

# 5. Collect Keypoint Values for Training and Testing
In this section, I will collect the keypoint values for training and testing. I will use the `cv2.putText` function to display the action and sequence number on the screen. I will also use the `cv2.waitKey` function to pause the screen for 2 seconds before starting to collect the data. <br>
For saving extracted data, I will use the `np.save` function to save the data.<br>
For loading the data, I will use the `np.load` function to load the data.

In [None]:
cap = cv2.VideoCapture(0)
# Set mediapipe models
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    # NEW LOOP
    # Loop through actions
    for action in actions:
        # Loop through sequences aka videos
        for sequence in range(no_sequences):
            # Loop through video length aka sequence length
            for frame_num in range(sequence_length):

                # Read feed
                ret, frame = cap.read()

                # Make detections
                image, results = mediapipe_detection(frame, holistic)
#                 print(results)

                # Draw landmarks
                draw_styled_landmarks(image, results)

                # NEW Apply wait logic
                if frame_num == 0:
                    cv2.putText(image, 'STARTING COLLECTION', (120,200),
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(3000)
                else:
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)

                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()

# 6. Preprocess Data and Create Labels and Features

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num, label in enumerate(actions)}

In [None]:
label_map

In [None]:
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = [] # All frames in a single sequence
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num))) # Load frame
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [None]:
np.array(sequences).shape

The shape of the sequences is (180, 30, 1662). This means that there are 180 sequences, each with 30 frames and 1662 landmarks. <br>

In [None]:
np.array(labels).shape

The shape of the labels is (180,). This means that there are 180 labels, one for each sequence.

In [None]:
X = np.array(sequences)

In [None]:
X.shape

In [None]:
y = to_categorical(labels).astype(int)

In [None]:
y

Next, I am going to create the training and testing sets using the `train_test_split` function from scikit-learn.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

In [None]:
# Print the shape of the training and testing sets
X_train.shape, X_test.shape

This mean our model is processing sequential data for sign language recognition.
Each input sample consists of a sequence of 30 frames, where each frame contains 1662 features (which is the keypoints of hand tracking get from mediapipe).
We have 81 training samples and 9 testing samples.

In [None]:
y_train.shape, y_test.shape

# 7. Build and Train LSTM Neural Network

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping

Next, I am going to create a log directory and set up for tensorBoard callback.

In [None]:
# TensorBoard logging
log_dir = os.path.join('../log')
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
# Early stopping to prevent overfitting
early_stop = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)

In [None]:
# Model Definition
model = Sequential([
    LSTM(64, return_sequences=True, activation='tanh', input_shape=(30, 1662)),
    LSTM(128, return_sequences=True, activation='tanh'),
    LSTM(64, return_sequences=False, activation='tanh'),
    Dense(64, activation='relu'),
    Dense(32, activation='relu'),
    Dense(len(actions), activation='softmax')
])

In [None]:
# Compile Model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
              loss='categorical_crossentropy',
              metrics=['categorical_accuracy'])

In [None]:
# Train Model
history = model.fit(X_train, y_train,
          validation_data=(X_test, y_test),
          epochs=1000,
          batch_size=32,
          callbacks=[tb_callback, early_stop])

As we can see, the model have achieved a pretty high accuracy

In [None]:
model.summary()

Next, I will visualize the training progress.<br>
I will create a function to smooth the curves by reducing fluctuations in the values across epochs.
This helps create a clearer trend by averaging out sharp variations in loss and accuracy.

In [None]:
# NEW FUNC: Apply smoothing (moving average)
def smooth_curve(points, factor=0.8):
    smoothed_points = []
    for i, point in enumerate(points):
        if i == 0:
            smoothed_points.append(point)
        else:
            smoothed_points.append(smoothed_points[-1] * factor + point * (1 - factor))
    return smoothed_points

In [None]:
# Extract training data for visualize
train_loss = history.history['loss']
val_loss = history.history['val_loss']
train_acc = history.history['categorical_accuracy']
val_acc = history.history['val_categorical_accuracy']

# Start counting epoch from 1
epochs = range(1, len(train_loss) + 1)

# Smooth
train_loss_smooth = smooth_curve(train_loss)
val_loss_smooth = smooth_curve(val_loss)
train_acc_smooth = smooth_curve(train_acc)
val_acc_smooth = smooth_curve(val_acc)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Limit epochs shown
max_epochs = min(200, len(epochs))  # Show only first 200 epochs
epochs = epochs[:max_epochs]

# Plot Loss
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, train_loss_smooth[:max_epochs], 'b', label='Training Loss')
plt.plot(epochs, val_loss_smooth[:max_epochs], 'r', label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.grid(True)

# Plot Accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_acc_smooth[:max_epochs], 'b', label='Training Accuracy')
plt.plot(epochs, val_acc_smooth[:max_epochs], 'r', label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training vs Validation Accuracy')
plt.legend()
plt.grid(True)

plt.show()


# 8. Make Predictions


In [None]:
res = model.predict(X_test)

In [None]:
print(res)

In [None]:
print("Class Probabilities for Sample 3:", res[3])

In [None]:
actions[np.argmax(res[3])]

In [None]:
actions[np.argmax(y_test[3])]


# 9. Save Weights

In [None]:
model.save('/home/martinvalentine/Desktop/SignLanguageDetectionLSTM/exp_v1_no_dropout-0.0005/models.h5')

In [None]:
del model

In [None]:
# models.load_weights('/content/sign_language.h5')
model.load_weights('/home/martinvalentine/Desktop/SignLanguageDetectionLSTM/exp_v1_no_dropout-0.0005/models.h5')

# 10. Evaluation using Confusion Matrix and Accuracy

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [None]:
yhat = model.predict(X_test)

In [None]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

The **Confusion Matrix** is organised as follows:<br>
>[[TRUE N, FALSE N],<br>
>[FALSE N, TRUE P]]

In [None]:
cm = multilabel_confusion_matrix(ytrue, yhat)
print(cm)

In [None]:
# Visualized plot
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Plot each class confusion matrix
num_classes = len(cm)
fig, axes = plt.subplots(1, num_classes, figsize=(4 * num_classes, 4))

for i in range(num_classes):
    sns.heatmap(cm[i], annot=True, fmt="d", cmap="Blues", ax=axes[i])
    axes[i].set_title(f"Class {i}")
    axes[i].set_xlabel("Predicted")
    axes[i].set_ylabel("True")

plt.tight_layout()
plt.show()

If you're still unsure about how to interpret a **confusion matrix**, here's a breakdown of what each value means:

---

| **True \ Predicted** | **Predicted 0** | **Predicted 1** |
|----------------------|----------------|----------------|
| **True 0** (Negative Class) | **TN** (True Negative) | **FP** (False Positive) |
| **True 1** (Positive Class) | **FN** (False Negative) | **TP** (True Positive) |

**What Each Term Means:**
- **TN (Top-Left Corner):** Correctly predicted **0** (Negative class).
- **FP (Top-Right Corner):** Incorrectly predicted **1** (but it was actually **0**).
- **FN (Bottom-Left Corner):** Incorrectly predicted **0** (but it was actually **1**).
- **TP (Bottom-Right Corner):** Correctly predicted **1** (Positive class).

---

#### Example: Confusion Matrix for **Class 0**
From the **first plot (Class 0):**
- **TN = 5** (Correctly predicted **"not class 0"**)
- **FP = 0** (Did not wrongly predict class 0)
- **FN = 0** (Did not wrongly ignore class 0)
- **TP = 4** (Correctly predicted **class 0**)

**➡ Model performance for Class 0:**
✔ **4 correct predictions and 0 mistakes!**

---

#### What we can do with this plot?
- If a class has **low TP (True Positives)**, consider **balancing the dataset**.
- If FP or FN are high, the model may need **hyperparameter tuning** or **better features**.
- Use this breakdown to **evaluate how well your model performs per class**.


In [None]:
accuracy_score(ytrue, yhat)

# 11. Test in Real Time

In [None]:
cap.release()
cv2.destroyAllWindows()

In [None]:
pred_res = model.predict(np.expand_dims(X_test[0], axis=0))[0]

In [None]:
pred_res[np.argmax(pred_res)]

In [None]:
# 1. NEW detection variables
sequence = [] # For storing 30 frames in order to make a prediction on
sentence = [] # Concatenate detections history
threshold = 0.8

# Apply the function to the webcam
cap = cv2.VideoCapture(0)
# Set mediapipe models
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)

        # Draw landmarks
        draw_styled_landmarks(image, results)

        # 2. PREDICTION LOGIC
        keypoints = extract_keypoints(results)
        # sequence.append(keypoints)
        sequence.insert(0,keypoints)
        # sequence = sequence[-30:] # Grab the last 30 frames to make prediction
        sequence = sequence[:30]

        if len(sequence) == 30:
            input_sequence =  np.expand_dims(sequence, axis=0) # Explain in bellow
            print("Model input shape:", input_sequence.shape)  # (1, 30, 1662)

            # Predict sign language action
            res = model.predict(input_sequence)

            # Reshape output if necessary
            res = res[0]  # Extract first batch prediction
            res = res.flatten()  # Ensure it is a 1D array

            # Determine the most likely action
            predicted_action = actions[np.argmax(res)]
            print("Predicted action:", predicted_action)

        # 3. VISUALIZATION LOGIC
            if res[np.argmax(res)] > threshold:
                if len(sentence) > 0:
                    if actions[np.argmax(res)] != sentence[-1]:  # Avoid consecutive duplicates
                        sentence.append(actions[np.argmax(res)])
                else:
                    sentence.append(actions[np.argmax(res)])

                if len(sentence) > 5:
                    sentence = sentence[-5:]  # Keep only the last 5 words

        cv2.rectangle(image, (0,0), (640,40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        # Show to screen
        cv2.imshow('Raw Webcam Feed', image)

        # Break if 'q' is pressed
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

### What is `np.expandims()` function?
If we run this bellow code to make prediction on a `X_test` item:
```python
model.predict(X_test[0])
```
We will get an error because the shape of this item is incorrect.


In [None]:
model.predict(X_test[0])

In [None]:
# Print the shape of X_test[0]
X_test[0].shape

The shape of this item is `(30, 1662)` but the shape that our model expected is `(num_sequences, 30, 1662)` with `num_sequences` equal to 30 (30 frames). So in this case we will need to expand dimension of this item by using `np.expandims()` function:
```python
import numpy as np
np.expand_dims(X_test[0], axis=0)
```
with `axis=0` meaning we are adding a new dimension at the 0th axis (batch dimension).

This transforms the shape from **(30, 1662) → (1, 30, 1662)**, making it compatible with the model's expected input format.

In [None]:
import numpy as np
np.expand_dims(X_test[0], axis=0)

In [None]:
np.expand_dims(X_test[0], axis=0).shape

Now we can test with our model.

In [None]:
model.predict(np.expand_dims(X_test[0], axis=0))

In [None]:
actions[np.argmax(model.predict(np.expand_dims(X_test[0], axis=0)))]

In [None]:
actions[np.argmax(y_test[0])]