In [None]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.8.9.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (32.7 MB)
[K     |████████████████████████████████| 32.7 MB 173 kB/s 
Installing collected packages: mediapipe
Successfully installed mediapipe-0.8.9.1


In [None]:
from google.colab import drive
drive.mount('/content/drive2')

Mounted at /content/drive2


In [None]:
import mediapipe as mp
import numpy as np
import cv2
import pandas as pd
import os

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
from tensorflow.keras.utils import to_categorical

### Media Pipe Setup with Cam

In [None]:
#holistic model
mp_holistic = mp.solutions.holistic
#drawing
mp_drawing = mp.solutions.drawing_utils

In [None]:
def mediapipe_detection(img, mp_holistic):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img.flags.writeable = False
    res = mp_holistic.process(img)
    img.flags.writeable = True
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    return img, res

In [None]:
def holistic_drawing(img, holistic_res):
    # Draw face connections
    # mp_drawing.draw_landmarks(img, holistic_res.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
    #                          mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
    #                          mp_drawing.DrawingSpec(color=(80,256,120), thickness=1, circle_radius=1)
    #                          ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(img, holistic_res.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(200,50,50), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(200,25,25), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(img, holistic_res.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(25,25,200), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(50,50,200), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(img, holistic_res.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(25,25,200), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(50,50,200), thickness=2, circle_radius=2)
                             )

In [None]:
cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence = 0.5) as holistic_model:
    while cap.isOpened():
        _, frame = cap.read()
        img, results = mediapipe_detection(frame, holistic_model)
        holistic_drawing(img, results)
        cv2.imshow("OpenCV Cam feed", img)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1)

-1

### Extract mediapipe landmark keypoints to numpy arrays

In [None]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    # face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, lh, rh])

### Directory Setup

In [None]:
DATA_PATH = os.path.join("/content/drive2/MyDrive/Sign Language Detection/Data3")

labels = np.array(['Open', 'to', 'Work'])

num_seqs = 40

seq_len = 30

In [None]:
# for label in labels:
#     for seq in range(num_seqs):
#         try:
#             os.makedirs(os.path.join(DATA_PATH, label, str(seq)))
#         except:
#             pass

### Collecting Data through webcam

In [None]:
# cap = cv2.VideoCapture(0)
# with mp_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence = 0.5) as holistic_model:
#     break_all = 0
#     for label in labels:
#         for seq in range(num_seqs):
#             for frame_num in range(seq_len):
#                 _, frame = cap.read()
#                 img, results = mediapipe_detection(frame, holistic_model)
#                 holistic_drawing(img, results)
                
#                 # NEW Apply wait logic
#                 if frame_num == 0:
#                     if seq == 0:
#                         cv2.putText(img, 'Press a Key to Start Collection for {} Video Number {}'.format(label, seq), (120,200), 
#                                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
#                         cv2.imshow('OpenCV Cam Feed', img)
#                         cv2.waitKey(0)
#                     cv2.putText(img, 'STARTING COLLECTION', (120,300), 
#                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
#                     cv2.putText(img, 'Collecting frames for {} Video Number {}'.format(label, seq), (15,12), 
#                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
#                     cv2.imshow('OpenCV Cam Feed', img)
#                     cv2.waitKey(2000)
#                     # Show to screen
#                 else: 
#                     cv2.putText(img, 'Collecting frames for {} Video Number {}'.format(label, seq), (15,12), 
#                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
#                     # Show to screen
#                     cv2.imshow('OpenCV Cam Feed', img)
                
#                 # NEW Export keypoints
#                 keypoints = extract_keypoints(results)
#                 npy_path = os.path.join(DATA_PATH, label, str(seq), str(frame_num))
#                 np.save(npy_path, keypoints)
                
#                 if cv2.waitKey(10) & 0xFF == ord('q'):
#                     break_all = 1
#                     break
#             if break_all:
#                 break
#         if break_all:
#                 break
# cap.release()
# cv2.destroyAllWindows()
# cv2.waitKey(1)

### Load, Preprocess, and Split Data

In [None]:
#label encoding
label_map = {label:num for num, label in enumerate(labels)}

In [None]:
#load data
sequences, words = [], []

for label in labels:
    for seq in np.array(os.listdir(os.path.join(DATA_PATH, label))).astype(int):
        print(label, seq)
        window = []
        for frame_num in range(seq_len):
            res = np.load(os.path.join(DATA_PATH, label, str(seq), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        words.append(label_map[label])

Open 20
Open 11
Open 33
Open 35
Open 27
Open 0
Open 32
Open 9
Open 18
Open 34
Open 19
Open 1
Open 7
Open 28
Open 29
Open 6
Open 26
Open 10
Open 17
Open 16
Open 31
Open 39
Open 23
Open 24
Open 37
Open 38
Open 21
Open 36
Open 8
Open 30
Open 4
Open 13
Open 25
Open 22
Open 15
Open 14
Open 5
Open 2
Open 12
Open 3
to 0
to 11
to 35
to 27
to 20
to 9
to 32
to 18
to 34
to 33
to 29
to 19
to 17
to 7
to 28
to 10
to 1
to 6
to 26
to 16
to 39
to 30
to 38
to 21
to 8
to 31
to 37
to 23
to 24
to 36
to 5
to 25
to 22
to 2
to 12
to 13
to 15
to 4
to 3
to 14
Work 35
Work 9
Work 20
Work 32
Work 33
Work 34
Work 0
Work 18
Work 11
Work 27
Work 17
Work 16
Work 19
Work 29
Work 7
Work 26
Work 1
Work 10
Work 28
Work 6
Work 31
Work 39
Work 36
Work 38
Work 23
Work 37
Work 24
Work 30
Work 21
Work 8
Work 3
Work 2
Work 14
Work 15
Work 4
Work 13
Work 22
Work 25
Work 5
Work 12


In [None]:
os.listdir('/content/drive2/MyDrive/Sign Language Detection/Data3/Open/33')

['4.npy',
 '8.npy',
 '9.npy',
 '6.npy',
 '3.npy',
 '5.npy',
 '0.npy',
 '2.npy',
 '7.npy',
 '1.npy',
 '19.npy',
 '16.npy',
 '10.npy',
 '13.npy',
 '17.npy',
 '14.npy',
 '11.npy',
 '15.npy',
 '12.npy',
 '18.npy',
 '29.npy',
 '26.npy',
 '27.npy',
 '21.npy',
 '23.npy',
 '20.npy',
 '28.npy',
 '22.npy',
 '25.npy',
 '24.npy']

In [None]:
X = np.array(sequences)
print(sequences)
print(words)
print(label_map)
print(X.shape)
print(DATA_PATH)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [None]:
#One hot encoding and train-test split
y = to_categorical(words).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

### Build and Train Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import ModelCheckpoint

In [None]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='tanh', input_shape=(30,258)))
model.add(LSTM(128, return_sequences=True, activation='tanh'))
model.add(LSTM(64, return_sequences=False, activation='tanh'))
model.add(Dense(64, activation='tanh'))
model.add(Dense(32, activation='tanh'))
model.add(Dense(labels.shape[0], activation='softmax'))

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_6 (LSTM)               (None, 30, 64)            82688     
                                                                 
 lstm_7 (LSTM)               (None, 30, 128)           98816     
                                                                 
 lstm_8 (LSTM)               (None, 64)                49408     
                                                                 
 dense_6 (Dense)             (None, 64)                4160      
                                                                 
 dense_7 (Dense)             (None, 32)                2080      
                                                                 
 dense_8 (Dense)             (None, 3)                 99        
                                                                 
Total params: 237,251
Trainable params: 237,251
Non-tr

In [None]:
cp_best_val_loss = ModelCheckpoint(
      "SLD_val_loss", monitor='val_loss', mode = 'min', save_weights_only=True, save_best_only=True, verbose=1
)
cp_best_val_acc = ModelCheckpoint(
      "SLD_val_acc", monitor='val_categorical_accuracy', mode = 'max', save_weights_only=True, save_best_only=True, verbose=1
)

In [None]:
model.fit(X_train, y_train, epochs=2000, validation_data = (X_test, y_test), callbacks = [cp_best_val_loss, cp_best_val_acc])

Epoch 1/2000
Epoch 00001: val_loss improved from inf to 0.75865, saving model to SLD_val_loss

Epoch 00001: val_categorical_accuracy improved from -inf to 0.62500, saving model to SLD_val_acc
Epoch 2/2000
Epoch 00002: val_loss improved from 0.75865 to 0.38037, saving model to SLD_val_loss

Epoch 00002: val_categorical_accuracy improved from 0.62500 to 1.00000, saving model to SLD_val_acc
Epoch 3/2000
Epoch 00003: val_loss improved from 0.38037 to 0.28693, saving model to SLD_val_loss

Epoch 00003: val_categorical_accuracy did not improve from 1.00000
Epoch 4/2000
Epoch 00004: val_loss improved from 0.28693 to 0.07746, saving model to SLD_val_loss

Epoch 00004: val_categorical_accuracy did not improve from 1.00000
Epoch 5/2000
Epoch 00005: val_loss improved from 0.07746 to 0.03308, saving model to SLD_val_loss

Epoch 00005: val_categorical_accuracy did not improve from 1.00000
Epoch 6/2000
Epoch 00006: val_loss did not improve from 0.03308

Epoch 00006: val_categorical_accuracy did not 

KeyboardInterrupt: ignored

In [None]:
model.save('sign3_lang_model.h5')

In [None]:
model.load_weights('/content/drive2/MyDrive/Sign Language Detection/SLD_val_loss')
# /content/drive2/MyDrive/Sign Language Detection/

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f5c5e17ef10>

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 30, 64)            82688     
                                                                 
 lstm_1 (LSTM)               (None, 30, 128)           98816     
                                                                 
 lstm_2 (LSTM)               (None, 64)                49408     
                                                                 
 dense (Dense)               (None, 64)                4160      
                                                                 
 dense_1 (Dense)             (None, 32)                2080      
                                                                 
 dense_2 (Dense)             (None, 3)                 99        
                                                                 
Total params: 237,251
Trainable params: 237,251
Non-trai

### Prediction / Inference on Videos

In [None]:
from google.colab.patches import cv2_imshow
from tensorflow.python.training.tracking.util import capture_dependencies
sequence = []
sentence = []
predictions = []
threshold = 0.999

input = '/content/drive2/MyDrive/Sign Language Detection/open2work41.mov'

cap = cv2.VideoCapture(input)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fps = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
out = cv2.VideoWriter('output.avi', 0, cv2.VideoWriter_fourcc(*'MJPG'), 30, (width, height))

# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()
        if ret:
            # Make detections
            image, results = mediapipe_detection(frame, holistic)
            print(results)
            
            # Draw landmarks
            holistic_drawing(image, results)
            
            # 2. Prediction logic
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]
            
            if len(sequence) == 30:
                res = model.predict(np.expand_dims(sequence, axis=0))[0]
                print(labels[np.argmax(res)])
                predictions.append(np.argmax(res))
                
                print(res[np.argmax(res)])
            #3. Viz logic
                if np.unique(predictions[-10:])[0]==np.argmax(res):
                    if res[np.argmax(res)] > threshold: 
                        
                        if len(sentence) > 0: 
                            if labels[np.argmax(res)] != sentence[-1]:
                                sentence.append(labels[np.argmax(res)])
                        else:
                            sentence.append(labels[np.argmax(res)])

                if len(sentence) > 10: 
                    sentence = sentence[-10:]
                
            cv2.rectangle(image, (0,0), (int(width/2.5), 100), (20, 105, 60), -1)
            cv2.putText(image, ' '.join(sentence), (3,70), 
                          cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 6, cv2.LINE_AA)
            cv2.rectangle(image, (275, height-15), (width-275, height-35), (20, 105, 60), -1)
            cv2.putText(image, "PS: I'm not differently-abled. It's a Sign Language Detection project I worked on", (310,height-20),
                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)

            
            # Write frame to video
            cv2_imshow(image)
            out.write(image)
        else:
          break
    cap.release()
    cv2.destroyAllWindows()

### Disclaimer

Colab doesn't detect webcam and you can't use it for mediapipe detection and dataset collection through webcam so most of that was done locally and then training and inference using Tensorflow was performed on Colab.

You can uncomment the commented part if you wish to do all that locally.
In my case, I had some clash between mediapipe and tensorflow on the ARM architecture m1 mac.

The notebook uses the [approach to Sign Language Detection](https://www.youtube.com/watch?v=doDUihpj6ro&t=7087s) by Nicholas Renotte, of course with a whole bunch of tweaks to suit my usecase 🙂

**Tweaks:**
- Input and output in the form of videos to work with colab.
- Remove face landmarks as they end up just being noise.
- Use tanh activation as it works way better with LSTMs compared to relu.
- Colors and Cosmetics.
- Disclaimer at bottom.
- Different threshold value for inference.