## PLAYGROUND TEST

In [1]:
import pandas as pd
import numpy as np
from google.protobuf.json_format import MessageToJson
# FEATURE EXTRACTOR


def distance_between(p1_loc, p2_loc):
    jsonObj = MessageToJson(results.multi_hand_landmarks[0])
    lmk = json.loads(jsonObj)['landmark']
    p1 = pd.DataFrame(lmk).to_numpy()[p1_loc]
    p2 = pd.DataFrame(lmk).to_numpy()[p2_loc]
    squared_dist = np.sum((p1-p2)**2, axis=0)
    return np.sqrt(squared_dist)


def landmark_to_dist_emb(results):
    jsonObj = MessageToJson(results.multi_hand_landmarks[0])
    lmk = json.loads(jsonObj)['landmark']

    emb = np.array([
        # thumb to finger tip
        distance_between(4, 8),
        distance_between(4, 12),
        distance_between(4, 16),
        distance_between(4, 20),
        # wrist to finger tip
        distance_between(4, 0),
        distance_between(8, 0),
        distance_between(12, 0),
        distance_between(16, 0),
        distance_between(20, 0),
        # tip to tip (specific to this application)
        distance_between(8, 12),
        distance_between(12, 16),
        # within finger joint (detect bending)
        distance_between(1, 4),
        distance_between(8, 5),
        distance_between(12, 9),
        distance_between(16, 13),
        distance_between(20, 17),
        # distance from each tip to thumb joint
        distance_between(2, 8),
        distance_between(2, 12),
        distance_between(2, 16),
        distance_between(2, 20)
    ])
    # use np normalize, as min_max may create confusion that the closest fingers has 0 distance
    emb_norm = emb / np.linalg.norm(emb)
    return emb_norm


In [2]:
import mediapipe as mp
import cv2
#pip install mediapipe - -user

# LANDMARK
import json

In [3]:
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
# For webcam input:
#cap = cv2.VideoCapture(0)

# For Video Input
# video = "./dataset/clips/85/0.MOV"
# cap = cv2.VideoCapture(video)

In [None]:
with mp_hands.Hands(max_num_hands=2,
                    min_detection_confidence=0.5,
                    min_tracking_confidence=0.5) as hands:
  print(cap.isOpened())
  while cap.isOpened():
    success, image = cap.read()
    if not success:
      print("Ignoring empty camera frame.")

    image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = hands.process(image)

    # DETECT LANDMARKS
    if results.multi_hand_landmarks:

      #print(landmark_to_dist_emb(results).shape) process input
      jsonObj = MessageToJson(results.multi_hand_landmarks[0])
      lmk = json.loads(jsonObj)['landmark']
      
      #print(len(lmk))  # lmk = hand's landmark

# Draw the hand annotations on the image.
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    if results.multi_hand_landmarks:
      for hand_landmarks in results.multi_hand_landmarks:
        mp_drawing.draw_landmarks(
            image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
    cv2.imshow('MediaPipe Hands', image)
    if cv2.waitKey(5) & 0xFF == 27:
      break
cap.release()


In [3]:
# SCRIPT FILE FORMATTER
import random
import string
import shutil
import os

def generate_random_uid():
    """Generate a random UID."""
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))


def rename_videos(directory_path):
    """Rename videos in the specified directory."""
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.endswith(".MOV"):
                old_path = os.path.join(root, file)
                new_uid = generate_random_uid()
                new_name = f"{new_uid}_0.MOV"
                new_path = os.path.join(root, new_name)

                shutil.move(old_path, new_path)
                print(f"Renamed: {old_path} to {new_path}")


# Specify the directory containing the videos
directory_path = './dataset/clips/85'

# Call the function to rename the videos
rename_videos(directory_path)

Renamed: ./dataset/clips/85\0f0lzm9c_0.MOV to ./dataset/clips/85\p5ga19wa_0.MOV
Renamed: ./dataset/clips/85\0mb6to35_0.MOV to ./dataset/clips/85\mato6gvs_0.MOV
Renamed: ./dataset/clips/85\0om5g1it_0.MOV to ./dataset/clips/85\fdpnld0t_0.MOV
Renamed: ./dataset/clips/85\394m2pcf_0.MOV to ./dataset/clips/85\r4znhltx_0.MOV
Renamed: ./dataset/clips/85\3r4pmpw0_0.MOV to ./dataset/clips/85\inl5n07c_0.MOV
Renamed: ./dataset/clips/85\5r1fmott_0.MOV to ./dataset/clips/85\f9r9bsva_0.MOV
Renamed: ./dataset/clips/85\77o8wpno_0.MOV to ./dataset/clips/85\qygo7g5s_0.MOV
Renamed: ./dataset/clips/85\801rxjav_0.MOV to ./dataset/clips/85\4asdlwz1_0.MOV
Renamed: ./dataset/clips/85\9xu0jpcs_0.MOV to ./dataset/clips/85\11taoceh_0.MOV
Renamed: ./dataset/clips/85\aklxt2hx_0.MOV to ./dataset/clips/85\940bqr4z_0.MOV
Renamed: ./dataset/clips/85\d8hlk6ts_0.MOV to ./dataset/clips/85\i6641z1q_0.MOV
Renamed: ./dataset/clips/85\gnyv9lo8_0.MOV to ./dataset/clips/85\vvzajti5_0.MOV
Renamed: ./dataset/clips/85\hhc0oi3e_0.M

In [4]:
import os
import cv2
import mediapipe as mp

arr = os.listdir('./dataset/clips/85')
video_class_all = []
landmark_npy_all = []
handnn = mp.solutions.hands.Hands(
    max_num_hands=2, min_detection_confidence=0.6, min_tracking_confidence=0.6)

for idx, eachVideo in enumerate(arr):
    landmark_npy_single = []  # Reset for each video
    video = './dataset/clips/85/' + eachVideo
    cap = cv2.VideoCapture(video)
    video_class_all.append(int(video.split('_')[1].split('.')[0]))
    print(video)
    while cap.isOpened():
        success, image = cap.read()
        if not success:
            break

        image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = handnn.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                landmark_npy_single.append(landmark_to_dist_emb(results))

    # Append landmarks for each video only once
    landmark_npy_all.append(landmark_npy_single)

    cap.release()

    if ((idx + 1) % 10) == 0:
        print(f'Finished for {(idx + 1)} videos')

print(f'Finished for total {len(arr)} videos. Completed.')

./dataset/clips/85/11taoceh_0.MOV
./dataset/clips/85/4asdlwz1_0.MOV
./dataset/clips/85/4wjsa5wq_0.MOV
./dataset/clips/85/940bqr4z_0.MOV
./dataset/clips/85/9dppbec3_0.MOV
./dataset/clips/85/f9r9bsva_0.MOV
./dataset/clips/85/fdpnld0t_0.MOV
./dataset/clips/85/fx5jgfby_0.MOV
./dataset/clips/85/gwa534to_0.MOV
./dataset/clips/85/i6641z1q_0.MOV
Finished for 10 videos
./dataset/clips/85/inl5n07c_0.MOV
./dataset/clips/85/mato6gvs_0.MOV
./dataset/clips/85/nz7q913t_0.MOV
./dataset/clips/85/opunh4zs_0.MOV
./dataset/clips/85/p5ga19wa_0.MOV
./dataset/clips/85/qygo7g5s_0.MOV
./dataset/clips/85/r4znhltx_0.MOV
./dataset/clips/85/titl2dvs_0.MOV
./dataset/clips/85/tzd3o8eu_0.MOV
./dataset/clips/85/vvzajti5_0.MOV
Finished for 20 videos
Finished for total 20 videos. Completed.


In [5]:
from keras.preprocessing.sequence import pad_sequences
import math


def skip_frame(landmark_npy_all, frame=50):
    new_lmk_array = []


    for each in landmark_npy_all:
        if len(each) <= frame:
        # if its less than frame, dont need to skip
            new_lmk_array.append(each)
        else:
        # skip frame by ceiling
            to_round = math.ceil(len(each)/frame)
            new_lmk_array.append(each[::to_round])
    return new_lmk_array

new_lmk_array = skip_frame(landmark_npy_all)
train_x = pad_sequences(new_lmk_array, padding='post', maxlen=50, dtype='float32')


In [6]:
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split

In [10]:
classes = len(set(video_class_all))
feature_len = 20
max_len = 50
# hot encode output
train_y = to_categorical([i-1 for i in video_class_all], num_classes=classes)
print('Training y with shape of: ', train_y.shape)
print('Training x with shape of: ', train_x.shape)
X_train, X_test, y_train, y_test = train_test_split(train_x, train_y, test_size=0.2)
print("----")
print("Shape of X_train: ", X_train.shape)
print("Shape of y_train: ", y_train.shape)
print("Shape of X_test: ", X_train.shape)
print("Shape of y_test: ", y_train.shape)

#print(X_train[0][0])

Training y with shape of:  (20, 1)
Training x with shape of:  (20, 50, 20)
----
Shape of X_train:  (16, 50, 20)
Shape of y_train:  (16, 1)
Shape of X_test:  (16, 50, 20)
Shape of y_test:  (16, 1)


In [11]:
from keras.models import Sequential
from keras.layers import LSTM, Dropout, Dense, BatchNormalization, Activation
from keras.callbacks import LearningRateScheduler

In [12]:
model = Sequential()
model.add(LSTM(256, return_sequences=True, input_shape=(max_len, feature_len)))
model.add(Dropout(0.25))
model.add(LSTM(256, return_sequences=True))
model.add(Dropout(0.25))
model.add(LSTM(128, return_sequences=False))
model.add(Dense(64))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dense(classes, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()


def lrSchedule(epoch):
    lr = 0.001
    if epoch > 200:
        lr *= 0.0005
    elif epoch > 120:
        lr *= 0.005
    elif epoch > 50:
        lr *= 0.01
    elif epoch > 30:
        lr *= 0.1

    print('Learning rate: ', lr)
    return lr


LRScheduler = LearningRateScheduler(lrSchedule)
callbacks_list = [LRScheduler]

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 50, 256)           283648    
                                                                 
 dropout (Dropout)           (None, 50, 256)           0         
                                                                 
 lstm_1 (LSTM)               (None, 50, 256)           525312    
                                                                 
 dropout_1 (Dropout)         (None, 50, 256)           0         
                                                                 
 lstm_2 (LSTM)               (None, 128)               197120    
                                                                 
 dense (Dense)               (None, 64)                8256      
                                                                 
 batch_normalization (Batch  (None, 64)                2

In [None]:
verbose, epochs, batch_size = 1, 300, 8
model.fit(X_train, y_train, validation_data = (X_test, y_test), epochs=epochs, batch_size=batch_size, verbose=verbose, shuffle=True)

In [116]:
model.save("fsl-pi.h5")

  saving_api.save_model(


In [3]:
from keras.models import load_model

# Replace 'your_model_path' with the actual path to your trained model
trained_model = load_model('./fsl-pi.h5')


In [4]:
# Create a sample input with shape (50, 20)
sample_input = np.zeros(shape=(50, 20))

# Add an extra dimension to match the model's expected input shape
sample_input = np.expand_dims(sample_input, axis=0)

# Check the shape before making predictions
print(sample_input.shape)

# Make the prediction
result = trained_model.predict(sample_input)

# Print the result
print("Prediction:", result)

(1, 50, 20)
Prediction: [[1.]]


In [17]:
print(sample_input[0])


[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.

In [20]:
import numpy as np
import cv2
import mediapipe as mp
from google.protobuf.json_format import MessageToJson
import json

mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
cap = cv2.VideoCapture(0)

# Use a list to store individual 1D arrays
sequential_list = []
with mp_hands.Hands(max_num_hands=2,
                    min_detection_confidence=0.5,
                    min_tracking_confidence=0.5) as hands:
    print(cap.isOpened())
    while cap.isOpened():
        success, image = cap.read()
        if not success:
            print("Ignoring empty camera frame.")

        image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = hands.process(image)

        # DETECT LANDMARKS
        if results.multi_hand_landmarks:
            # Append the values to the list
            sequential_list.append(landmark_to_dist_emb(results))
            print(np.array(sequential_list).shape)

            jsonObj = MessageToJson(results.multi_hand_landmarks[0])
            lmk = json.loads(jsonObj)['landmark']
        else:
            print("Hand Gesture Recognition Interrupted.")
            sequential_list.clear()
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                mp_drawing.draw_landmarks(
                    image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
        cv2.imshow('MediaPipe Hands', image)

        # Check if the length of the list exceeds 50
        if len(sequential_list) >= 50:
            # Convert the list to a 3D NumPy array
            sequential = np.expand_dims(np.array(sequential_list), axis=0)
            print(trained_model.predict(sequential))
            # Clear the list
            sequential_list.clear()

        if cv2.waitKey(5) & 0xFF == 27:
            break

cap.release()
cv2.destroyAllWindows()

True
(1, 20)
(2, 20)
(3, 20)
(4, 20)
(5, 20)
(6, 20)
(7, 20)
(8, 20)
(9, 20)
(10, 20)
(11, 20)
(12, 20)
(13, 20)
(14, 20)
(15, 20)
(16, 20)
(17, 20)
(18, 20)
(19, 20)
(20, 20)
(21, 20)
(22, 20)
(23, 20)
(24, 20)
(25, 20)
(26, 20)
(27, 20)
(28, 20)
(29, 20)
(30, 20)
(31, 20)
(32, 20)
(33, 20)
(34, 20)
(35, 20)
(36, 20)
(37, 20)
(38, 20)
(39, 20)
(40, 20)
(41, 20)
(42, 20)
(43, 20)
(44, 20)
(45, 20)
(46, 20)
(47, 20)
(48, 20)
(49, 20)
(50, 20)
(1, 50, 20)
(1, 20)
(2, 20)
(3, 20)
(4, 20)
(5, 20)
(6, 20)
(7, 20)
(8, 20)
(9, 20)
(10, 20)
(11, 20)
(12, 20)
(13, 20)
(14, 20)
(15, 20)
(16, 20)
(17, 20)
(18, 20)
(19, 20)
(20, 20)
(21, 20)
(22, 20)
(23, 20)
(24, 20)
(25, 20)
(26, 20)
(27, 20)
(28, 20)
(29, 20)
(30, 20)
(31, 20)
(32, 20)
(33, 20)
(34, 20)
(35, 20)
(36, 20)
(37, 20)
(38, 20)
(39, 20)
(40, 20)
(41, 20)
(42, 20)
(43, 20)
(44, 20)
(45, 20)
(46, 20)
(47, 20)
(48, 20)
(49, 20)
(50, 20)
(1, 50, 20)
(1, 20)
(2, 20)
(3, 20)
(4, 20)
(5, 20)
(6, 20)
(7, 20)
(8, 20)
(9, 20)
(10, 20)
(11, 20)

## IMAGE DATA SCRAPING

In [None]:
!pip install simple_image_download

In [None]:
from simple_image_download import simple_image_download as simp
response = simp.simple_image_download
keywords = ["something here", "something here"]

# keyword, n images
for kw in keywords:
    response().download(kw, 300)

## LABEL DATASET

In [None]:
!pip install labelImg

In [None]:
!labelImg

## .MOV TO IMAGES

In [None]:
import cv2


def convert_video_to_images(video_path, output_path, start_time_seconds=2):
    # Open the video file
    cap = cv2.VideoCapture(video_path)

    # Get the frames per second (fps) and total number of frames
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the starting frame based on the start time
    start_frame = int(start_time_seconds * fps)

    # Set the video capture object to start from the calculated frame
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    # Loop through the frames and save them as images
    current_frame = start_frame
    while current_frame < total_frames:
        ret, frame = cap.read()
        if not ret:
            break

        # Save the frame as an image
        image_path = f"{output_path}/frame_{current_frame:04d}.png"
        cv2.imwrite(image_path, frame)

        # Increment the current frame
        current_frame += 1

    # Release the video capture object
    cap.release()


# Example usage:
video_path = 'path/to/your/video.mov'
output_path = 'path/to/output/folder'
convert_video_to_images(video_path, output_path, start_time_seconds=2)
