# Import and Install General Dependencies

In [22]:
!pip3 install mediapipe==0.10.9



In [23]:
!pip install tensorflow opencv-python scikit-learn matplotlib



In [54]:
!pip install pytube

Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl.metadata (5.0 kB)
Downloading pytube-15.0.0-py3-none-any.whl (57 kB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m[31m1.4 MB/s[0m eta [36m0:00:01[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-15.0.0


In [49]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import sys

# Get Videos from Microsoft Dataset

In [3]:
import json
import pandas as pd

In [4]:
# Load JSON at the given filepath
def load_data(filepath):
    with open(filepath, 'r') as file:
        return json.load(file)

In [5]:
# Load the train, test, and validation json files
train_data = load_data('Microsoft-ASL-dataset/MSASL_train.json')
test_data = load_data('Microsoft-ASL-dataset/MSASL_test.json')
val_data = load_data('Microsoft-ASL-dataset/MSASL_val.json')

In [6]:
# Convert to DataFrame for easier manipulation
train_df = pd.DataFrame(train_data)
test_df = pd.DataFrame(test_data)
val_df = pd.DataFrame(val_data)

# Download Videos from Microsoft Dataset

In [136]:
from pytube import YouTube, exceptions
import time
import shutil

In [140]:
def check_disk_space(directory, required_bytes):
    total, used, free = shutil.disk_usage(directory)
    return free > required_bytes

In [141]:
def download_video(url, data_path):
    # Ensure that the directory I will be saving the video to exists
    directory = f'{data_path}/Videos'
    if not os.path.exists(directory):
        print(f"Error: The directory {directory} does not exist.")
        sys.exit(1)

    try:
        yt = YouTube(url)
        stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
        
        # Estimate file size (in bytes)
        file_size = stream.filesize
        if not check_disk_space(directory, file_size):
            print(f"Not enough disk space to download video: {file_size} bytes needed.")
            return 0
            
        video_id = yt.video_id
        video_path = stream.download(output_path=directory, filename=f'{video_id}.mp4')
        
        return video_path
    except exceptions.VideoUnavailable:
        print(f"Video {url} for {action} is unavailable.")
        return None

In [210]:
# Define the all recognizable actions
DATA_PATH = os.path.join('MicrosoftData')
actions = []
train_action_freq_map = {}
test_action_freq_map = {}
unique_urls = set()
num_train_vids, num_test_vids = 0, 0

# Get all actions and the frequency of those actions in the training set
for i, row in train_df.iterrows():
    if row['label'] < 100:        
        if row['url'] not in unique_urls:
            num_train_vids += 1
            unique_urls.add(row['url'])
            
        # Check if we have seen the action before. If not add it to actions list
        if row['text'] not in train_action_freq_map:
            train_action_freq_map[row['text']] = 1
            actions.append(row['text'])
        else:
            train_action_freq_map[row['text']] += 1
unique_urls.clear()

# Add the frequencies in the testing set
for i, row in test_df.iterrows():
    if row['label'] < 100:
        if row['url'] not in unique_urls:
            num_test_vids += 1
            unique_urls.add(row['url'])
            
        if row['text'] not in test_action_freq_map:
            test_action_freq_map[row['text']] = 1
        else:
            test_action_freq_map[row['text']] += 1
unique_urls.clear()

num_train_data, num_test_data = sum(train_action_freq_map.values()), sum(test_action_freq_map.values())

In [302]:
print(sum(unavailable_videos.values()))
print()
print(max(unavailable_videos.values()))

486
10


In [223]:
# Create a directory structure
# Make directory to hold all the videos, train, and test data
os.makedirs(os.path.join(DATA_PATH, 'Videos'), exist_ok=True)
train_dir = os.path.join(DATA_PATH, 'Train')
test_dir = os.path.join(DATA_PATH, 'Test')
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# Make directories that will hold all the extracted key points
for action in actions:
#    # Make all of the training dirs
#    for i in range(train_action_freq_map[action]):
#        try:
#            os.makedirs(os.path.join(train_dir, action, str(i)))
#        except:
#            pass
    # Make all of the testing dirs
    for i in range(test_action_freq_map[action]):
        try:
            os.makedirs(os.path.join(test_dir, action, str(i)))
        except:
            pass

In [143]:
# Download all unique videos
unavailable_videos = {action:0 for action in actions}
unique_urls = set()
urls_skipped_for_space = []

# Download the training data videos
dowload_start_time = time.time()
print('Downloading Training Videos------------------------------------------------------------------------------------------------------------------------')
c = 0
for i, row in train_df.iterrows():
    if row['label'] < 100 and row['url'] not in unique_urls:
        c += 1
        print(f'Downloading Training Video {c}/{num_train_vids}')
        unique_urls.add(row['url'])
        result = download_video(row['url'], DATA_PATH)
        if result is None:
            unavailable_videos[row['text']] += 1
        elif result == 0:
            urls_skipped_for_space.append(row['url'])
            
print('Finished Downloading Training Videos---------------------------------------------------------------------------------------------------------------------')
train_download_end_time = time.time()

# Download the testing data videos
print('Downloading Testing Videos------------------------------------------------------------------------------------------------------------------------')
c = 0
for i, row in test_df.iterrows():
    if row['label'] < 100 and row['url'] not in unique_urls:
        c += 1
        print(f'Downloading Testing Video for {action} ({c}/{num_test_vids})')
        unique_urls.add(row['url'])
        result = download_video(row['url'], DATA_PATH)
        if result is None:
            unavailable_videos[row['text']] += 1
        elif result == 0:
            urls_skipped_for_space.append(row['url'])
print('Finished Downloading Testing Videos------------------------------------------------------------------------------------------------------------------------')
download_end_time = time.time()
print(f'Unavailable videos: {unavailable_videos}')
print(f'Skipped these {len(urls_skipped_for_space)} Videos for space: {urls_skipped_for_space}')
total_time = download_end_time - dowload_start_time
print(f'Total time: {total_time}')


Downloading Training Videos------------------------------------------------------------------------------------------------------------------------
Downloading Training Video 1/1141
Downloading Training Video 2/1141
Video https://www.youtube.com/watch?v=1AyT77LqJzQ for go is unavailable.
Downloading Training Video 3/1141
Downloading Training Video 4/1141
Video www.youtube.com/watch?v=7y5Ye-2-ZBs for go is unavailable.
Downloading Training Video 5/1141
Downloading Training Video 6/1141
Downloading Training Video 7/1141
Video https://www.youtube.com/watch?v=0Beq_NIDj2c for go is unavailable.
Downloading Training Video 8/1141
Downloading Training Video 9/1141
Downloading Training Video 10/1141
Video www.youtube.com/watch?v=AoQAPgEUIAs for go is unavailable.
Downloading Training Video 11/1141
Downloading Training Video 12/1141
Downloading Training Video 13/1141
Video https://www.youtube.com/watch?v=U_nbv5Mq00c for go is unavailable.
Downloading Training Video 14/1141
Video www.youtube.com/

In [148]:
print(f'There were {sum(unavailable_videos.values())} out of {num_test_vids + num_train_vids} That were unable to be downloaded')
for key, val in unavailable_videos.items():
    if val == 10:
        print(f'Action {key} was unable to access its video {val} times')

There were 486 out of 1498 That were unable to be downloaded
Action eat was unable to access its video 10 times
Action hello was unable to access its video 10 times


# Detect and Extract Key Features

In [198]:
# Used to detect the features which will be extracted
mp_holistic = mp.solutions.holistic

In [199]:
# Find key features on the current frame
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert color from BGR->RGB
    image.flags.writeable = False
    results = model.process(image) # Make prediction on the current image/frame
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # Convert color from RGB->BGR
    return results

In [200]:
# Create an array of features when they are detected
def extract_keypoints(results):
    # If there is no data for the body part in that frame then we want to create an array of zeros of the same size
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [201]:
# Experiment function get every single frame in the video
def extract_keypoints_from_video(video_dir, feature_data_file, video_name, start_time, end_time, detection_method):
    video_file = video_dir + '/' + video_name
    cap = cv2.VideoCapture(video_file)
    cap.set(cv2.CAP_PROP_POS_MSEC, start_time * 1000) # Set the start time to the correct millisecond
    frame_name = 0
    
    while cap.isOpened():
        # Read in the next frame
        frame_id = int(round(cap.get(cv2.CAP_PROP_POS_FRAMES)))
        ret, frame = cap.read()
        
        # Check if the frame was successfully read and is smaller than the end frame
        if ret and frame_id <= end_time * cap.get(cv2.CAP_PROP_FPS):
            # Detect our features in the image
            results = mediapipe_detection(frame, detection_method)

            # Save the frame keypoints to its corresponding file
            keypoints = extract_keypoints(results)
            npy_path = os.path.join(feature_data_file, str(frame_name))
            np.save(npy_path, keypoints)
            frame_name += 1
        else:
            break
    cap.release()
    return frames

In [224]:
# Loop through all of the training and testing data and extract the key points
action_seen = {action:0 for action in actions}
train_dir = os.path.join(DATA_PATH, 'Train')
test_dir = os.path.join(DATA_PATH, 'Test')
video_path = os.path.join(DATA_PATH, 'Videos')
c_train, c_test = 0, 0
video_errors = {}

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    # Extract points for training data
    #print ('Extracting Key points from training data-----------------------------------------------------------------------------------------')
    #for i, row in train_df.iterrows():
    #    if row['label'] < 100:
    #        # Get the path to the video we are transcribing and its key feature destinations
            action = row['text']
            file_num = str(action_seen[action])
            feature_data_file = os.path.join(train_dir, action, file_num)
            
            # Display progress
            c_train += 1
            print(f'Training data {c_train}/{num_train_data}')

            # Get the video name
            yt = YouTube(row['url'])
            video_id = yt.video_id
            video_name = f'{video_id}.mp4'
            
            # Get video start and stop time
            st = row['start_time']
            et = row['end_time']

            if video_name in os.listdir(video_path):
                try:
                    extract_keypoints_from_video(video_path, feature_data_file, video_name, st, et, holistic)
                    action_seen[action] += 1
                except Exception as e:
                    print(f"Error processing {video_name} for Action {action}: {str(e)}")
                    if video_name not in video_errors:
                        video_errors[video_name] = 1
                    else:
                        video_errors[video_name] += 1
                

    # Extract points for test data
    s_time = time.time()
    print ('Extracting Key points from testing data-----------------------------------------------------------------------------------------')
    for i, row in test_df.iterrows():
        if row['label'] < 100:
            # Get the path to the video we are transcribing and its key feature destinations
            action = row['text']
            file_num = str(action_seen[action])
            feature_data_file = os.path.join(test_dir, action, file_num)

            # Display progress
            c_test += 1
            print(f'Testing data {c_test}/{num_test_data}')

            # Get the video name
            yt = YouTube(row['url'])
            video_id = yt.video_id
            video_name = f'{video_id}.mp4'
            
            # Get video start and stop time
            st = row['start_time']
            et = row['end_time']

            if video_name in os.listdir(video_path):
                try:
                    extract_keypoints_from_video(video_path, feature_data_file, video_name, st, et, holistic)
                    action_seen[action] += 1
                except Exception as e:
                    print(f"Error processing {video_name} for Action {action}: {str(e)}")
                    if video_name not in video_errors:
                        video_errors[video_name] = 1
                    else:
                        video_errors[video_name] += 1
    e_time = time.time()
    print(f'Runtime: {e_time-s_time}')

Extracting Key points from testing data-----------------------------------------------------------------------------------------
Testing data 1/757
Testing data 2/757
Testing data 3/757
Testing data 4/757
Testing data 5/757


I0000 00:00:1714535497.240549       1 gl_context.cc:344] GL version: 2.1 (2.1 Metal - 88), renderer: Apple M1 Pro


Testing data 6/757
Testing data 7/757
Testing data 8/757
Testing data 9/757
Testing data 10/757
Testing data 11/757
Testing data 12/757
Testing data 13/757
Testing data 14/757
Testing data 15/757
Testing data 16/757
Testing data 17/757
Testing data 18/757
Testing data 19/757
Testing data 20/757
Testing data 21/757
Testing data 22/757
Testing data 23/757
Testing data 24/757
Testing data 25/757
Testing data 26/757
Testing data 27/757
Testing data 28/757
Testing data 29/757
Testing data 30/757
Testing data 31/757
Testing data 32/757
Testing data 33/757
Testing data 34/757
Testing data 35/757
Testing data 36/757
Testing data 37/757
Testing data 38/757
Testing data 39/757
Testing data 40/757
Testing data 41/757
Testing data 42/757
Testing data 43/757
Testing data 44/757
Testing data 45/757
Testing data 46/757
Testing data 47/757
Testing data 48/757
Testing data 49/757
Testing data 50/757
Testing data 51/757
Testing data 52/757
Testing data 53/757
Testing data 54/757
Testing data 55/757
Test

# Preprocess Data, Create Labels, and Create Features

In [247]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from keras.preprocessing.sequence import pad_sequences

In [275]:
label_map = {label:num for num, label in enumerate(actions)}

In [278]:
def load_keypoint_features(base_directory, sequences, labels):
    for action_name in os.listdir(base_directory):
        action_path = os.path.join(base_directory, action_name)
        # Check if it's actually a directory
        if os.path.isdir(action_path):
            # Loop through each video directory within the action directory
            for video_name in os.listdir(action_path):
                window = []
                video_path = os.path.join(action_path, video_name)
                # Check if the current path is a directory and not empty
                if os.path.isdir(video_path) and os.listdir(video_path):
                    # For each video, go through load the extracted feature data at every frame
                    for frame in os.listdir(video_path):
                        if frame.endswith('.npy'):
                            res = np.load(os.path.join(video_path, frame))
                            window.append(res)
                        else:
                            print(f'Skipped file {frame}')
                    sequences.append(window)
                    labels.append(label_map[action_name])

    padded_sequences = pad_sequences(sequences, padding='post', dtype='float32')
    categorical_labels = to_categorical(labels).astype(int)
    return padded_sequences, categorical_labels

In [279]:
# NOTE: Sequences represent extracted features (x-data) and labels represent label/category (y-data)
train_sequences, train_labels = [], []
test_sequences, test_labels = [], []

train_base_directory = os.path.join(DATA_PATH, 'Train')
test_base_directory = os.path.join(DATA_PATH, 'Test')
X_train, y_train = load_keypoint_features(train_base_directory, train_sequences, train_labels)
X_test, y_test = load_keypoint_features(test_base_directory, test_sequences, test_labels)

In [280]:
print(X_test.shape)
print(y_test.shape)
print(X_train.shape)
print(y_train.shape)

(522, 168, 1662)
(522, 100)
(2813, 270, 1662)
(2813, 100)


# Build and Train the Nueural Network

In [286]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Masking, Bidirectional, Dropout
from tensorflow.keras.optimizers import Adam
# TODO: Decide if I want to remove the logs for final submission
from tensorflow.keras.callbacks import TensorBoard

In [282]:
# TODO: Remove this for final submission?
# Set up the log directory for TensorBoard
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [296]:
# Define the model architecture
model = Sequential()
model.add(Masking(mask_value=0., input_shape=(None, 1662)))
model.add(Bidirectional(LSTM(64, return_sequences=True, activation='tanh')))
model.add(Dropout(0.5))
model.add(LSTM(128, return_sequences=True, activation='tanh'))
model.add(Dropout(0.5))
model.add(LSTM(64, return_sequences=False, activation='tanh'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(len(actions), activation='softmax'))

In [299]:
#optimizer = Adam(learning_rate=0.01, clipnorm=1.0)
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
#model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [300]:
# Train the model on training data
# TODO: Figure out sweet in training for epochs GET GRAPHS SO I CAN JUSTIFY IN REPORT
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])

Epoch 1/2000
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m56s[0m 585ms/step - categorical_accuracy: 0.0175 - loss: 4.5950
Epoch 2/2000
[1m31/88[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m38s[0m 672ms/step - categorical_accuracy: 0.0131 - loss: 4.5797

KeyboardInterrupt: 

# Save and Load Model TODO: Remove this Section for Final Submission

In [46]:
# TODO: Figure out submission format and delete, move, or change load_weights correspondingly
# Save the model
model.save('CustomData_375Epochs.keras')
#model.load_weights('action.keras')

# Evaluate Model Performance with Testing Data

In [47]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [303]:
print(X_test.shape)

(522, 168, 1662)


In [48]:
# Make predictions on the testing data
yhat = model.predict(X_test)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 190ms/step


In [49]:
# Convert the predictions from their one-hot encoded (e.g. [1,0,0], [0,1,0]) prediction to corresponding categorical label (e.g. 0, 1, 2)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [52]:
# Use these metrics to evaluate performance of model
print(multilabel_confusion_matrix(ytrue, yhat))
print(accuracy_score(ytrue, yhat))

[[[3 1]
  [1 0]]

 [[3 1]
  [0 1]]

 [[2 0]
  [1 2]]]
0.6


# Test the Model in Real Time

In [21]:
from scipy import stats

In [None]:
# Delclare variables that are used for making detections
sequence = []
sentence = []
predictions = []
threshold = 0.5

# Set up videocapture and loop through frames
cap = cv2.VideoCapture(0) #Device value 0 should correspond to the webcam
# Check if the camera opened successfully
if not cap.isOpened():
    print("Error: Could not open video capture device.")
    exit()

# Set the mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    try:
        while cap.isOpened():
            ret, frame = cap.read() # Capture the return value and frame at each point in camera
            # Make sure that the frame is read correctly
            if not ret:
                print("Can't receive frame (stream end?). Exiting ...")
                break
    
            # Make detections
            image, results = mediapipe_detection(frame, holistic)

            # Draw landmarks on frame
            draw_styled_landmarks(image, results)

            # Prediction logic keeps the most recent 30 frames for each prediction
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]

            # We only want to start making predictions once we have 30 frames of data to use
            if len(sequence) == 30:
                res = model.predict(np.expand_dims(sequence, axis=0))[0] # Need to use expand_dims because our model is expecting a row indicating number of sequences
                predictions.append(np.argmax(res))

                # Visualization Logic
                # Only make predictions if it has been confident in that prediction for 10 slides in a row
                if np.unique(predictions[-10:])[0] == np.argmax(res):
                    # We only want to make a prediction if the model is confident in that sign
                    if res[np.argmax(res)] > threshold:
                        if len(sentence) > 0:
                            # We don't want to add an action to the sentence until it is a new action (not double counting actions)
                            if actions[np.argmax(res)] != sentence[-1]:
                                sentence.append(actions[np.argmax(res)])
                        else:
                            sentence.append(actions[np.argmax(res)])
    
                # Make sure that we do not end with giant sentences
                if len(sentence) > 5:
                    sentence = sentence[-5:]

            # Render rectangle around prediction
            cv2.rectangle(image, (0,0), (2000, 40), (500, 117, 16), -1)
            cv2.putText(image, ' '.join(sentence), (3,30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
            cv2.imshow('OpenCV Feed', image) # Show the current frame to the user and names the feed "OpenCV Feed"
        
            # Break out of the capture feed gracefully if the q key is pressed
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break     
    finally:
        cap.release()
        cv2.destroyAllWindows()
        # There is a known bug on Mac where destroyAllWindows doesn't work unless a certain amount of time is spent waiting after
        for i in range (1,5):
            cv2.waitKey(1)