# Sign Language Translator

#### Imports

In [None]:
# pip install tensorflow==2.15

%matplotlib widget
import concurrent
import cv2
import json
import mediapipe as mp
import numpy as np
import os
import seaborn as sns
import tensorflow as tf
import time

from collections import namedtuple
from keras.callbacks import TensorBoard, LearningRateScheduler, EarlyStopping, ModelCheckpoint
from keras.layers import BatchNormalization, LSTM, Dense
from keras.metrics import categorical_crossentropy
from keras.models import Sequential, load_model
from keras.optimizers import Adam
from keras.utils import to_categorical
from matplotlib import pyplot as plt
from mediapipe import solutions
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from translate_inv.library.skeleton import Skeleton
from translate_inv.library.calc_R import calc_R

#### Globals

In [None]:
hands_base_options = python.BaseOptions(model_asset_path='hand_landmarker.task')
hands_options = vision.HandLandmarkerOptions(base_options=hands_base_options,
                                       running_mode=mp.tasks.vision.RunningMode.VIDEO,
                                       num_hands=2,
                                       min_hand_detection_confidence=0.6,
                                       min_hand_presence_confidence=0.6,
                                       min_tracking_confidence=0.5)

pose_base_options = python.BaseOptions(model_asset_path='pose_landmarker_lite.task')
pose_options = vision.PoseLandmarkerOptions(base_options=pose_base_options,
                                       running_mode=mp.tasks.vision.RunningMode.VIDEO,
                                       min_pose_detection_confidence=0.7,
                                       min_pose_presence_confidence=0.7,
                                       min_tracking_confidence=0.5)

# Reference skeleton
skeleton = Skeleton(os.path.join('translate_inv', 'assets', 'chars', 'Xbot.glb'))

# Path for dataset
dataset_path = os.path.join('dataset')

# Path for processed dataset
processed_dataset_path = os.path.join('processed_dataset')

# Classes
classes = sorted(os.listdir(processed_dataset_path))

# Path for test dataset
test_dataset_path = os.path.join('test_dataset')

# List of gestures in the dataset
gestures = ['W']

# Number of records for one gesture
num_records = 30

# USB webcam
cam_idx = 0

# Recording is done on Ubuntu
api_pref = cv2.CAP_V4L2

# Target frame resolution
target_width = 640
target_height = 480

# Record FPS and dt
record_fps = 30
record_dt = 1 / record_fps

# Inference FPS
inference_fps = int(record_fps / 2)

# Tolerance between dt and record_dt
dt_tol = 0.002

# Delay for getting into position
record_delay = 1

# Path where we save the gesture classifier weights
gestures_weights_path = 'gestures'
converted_gestures_weights_path = 'gestures.tflite'

# Num workers
num_workers = 6

# No hand landmarks
no_hand_landmarks_list = [[0] * 3 for _ in range(21)]
no_hand_landmarks_ndarray = np.array(no_hand_landmarks_list)

# No arm landmarks
no_pose_landmarks_ndarray = np.array([[0] * 3 for _ in range(33)])

# Inference parameters
min_unchanged_ms = 600
mostly_empty_percentage = 0.7
num_hand_features = 21 * 3
prediction_threshold = 0.9

HandLandmark = solutions.hands.HandLandmark

#### Helper functions

In [None]:
def get_hands_landmarks(results, relative_to_hand_center):
    left = [None]
    right = [None]

    results_hand_landmarks = results.hand_world_landmarks if relative_to_hand_center else results.hand_landmarks
    LandmarkListType = landmark_pb2.LandmarkList if relative_to_hand_center else landmark_pb2.NormalizedLandmarkList
    LandmarkType = landmark_pb2.Landmark if relative_to_hand_center else landmark_pb2.NormalizedLandmark

    # handedness list and hand_landmarks list match by index
    for handedness, hand_landmarks in zip(results.handedness, results_hand_landmarks):
        handedness = handedness[0]

        hand_landmarks_proto = LandmarkListType()
        hand_landmarks_proto.landmark.extend(
            [LandmarkType(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in hand_landmarks]
            )
        
        # Assume we are pointing the camera to a person with one left and one right hand.
        # That means that the only correct predicted handednesses are { 'Left' AND 'Right' }

        # If predicted handedness is left
        if handedness.category_name == 'Left':
            # If left was NOT predicted before
            if left[0] is None:
                # Just assign the landmarks and handedness probability
                left = [hand_landmarks_proto, handedness.score]
            # Else, if we have a collision
            else:
                # If current prediction is more accurate:
                if handedness.score > left[1]:
                    # That means that the previous left value is actually right
                    right = left
                    left = [hand_landmarks_proto, handedness.score]
                # Else, if current prediction is less accurate
                else:
                    # That means that the current prediction is actually right
                    right = [hand_landmarks_proto, handedness.score]
        
        # Same exact thing for right
        if handedness.category_name == 'Right':
            if right[0] is None:
                right = [hand_landmarks_proto, handedness.score]
            else:
                if handedness.score > right[1]:
                    left = right
                    right = [hand_landmarks_proto, handedness.score]
                else:
                    left = [hand_landmarks_proto, handedness.score]

    return left[0], right[0]


def get_pose_landmarks(results, relative_to_hips_center):
    pose = None

    results_pose_landmarks = results.pose_world_landmarks if relative_to_hips_center else results.pose_landmarks
    LandmarkListType = landmark_pb2.LandmarkList if relative_to_hips_center else landmark_pb2.NormalizedLandmarkList
    LandmarkType = landmark_pb2.Landmark if relative_to_hips_center else landmark_pb2.NormalizedLandmark

    for pose_landmarks in results_pose_landmarks:
        pose_landmarks_proto = LandmarkListType()
        pose_landmarks_proto.landmark.extend(
            [LandmarkType(x=landmark.x, y=landmark.y, z=landmark.z, visibility=landmark.visibility) for landmark in pose_landmarks]
            )
        
        pose = pose_landmarks_proto

    return pose


def draw_hands_landmarks(image, left_hand_landmarks, right_hand_landmarks):
    if left_hand_landmarks is not None:
        solutions.drawing_utils.draw_landmarks(
            image,
            left_hand_landmarks,
            solutions.hands.HAND_CONNECTIONS,
            solutions.drawing_utils.DrawingSpec((0, 0, 128), thickness=2, circle_radius=4), 
            solutions.drawing_utils.DrawingSpec((65, 105, 225), thickness=2, circle_radius=2))
    
    if right_hand_landmarks is not None:
        solutions.drawing_utils.draw_landmarks(
            image,
            right_hand_landmarks,
            solutions.hands.HAND_CONNECTIONS,
            solutions.drawing_utils.DrawingSpec((128, 0, 0), thickness=2, circle_radius=4), 
            solutions.drawing_utils.DrawingSpec((225, 105, 65), thickness=2, circle_radius=2))
        
    return image


def draw_pose_landmarks(image, pose_landmarks):
    if pose_landmarks is not None:
        solutions.drawing_utils.draw_landmarks(
            image,
            pose_landmarks,
            solutions.pose.POSE_CONNECTIONS,
            solutions.drawing_utils.DrawingSpec((0, 128, 0), thickness=2, circle_radius=4), 
            solutions.drawing_utils.DrawingSpec((105, 225, 65), thickness=2, circle_radius=2))
        
    return image
  

def get_landmarks_dict(
        pose_landmarks,
        pose_world_landmarks,
        left_hand_landmarks,
        left_hand_world_landmarks,
        right_hand_landmarks,
        right_hand_world_landmarks):
    # 33 pose landmarks => array with 3 * 33 elements
    pose = [[p.x, p.y, p.z] for p in pose_landmarks.landmark] if pose_landmarks is not None else [[0] * 3 for _ in range(33)]
    # 33 pose landmarks => array with 3 * 33 elements
    pose_world = [[p.x, p.y, p.z] for p in pose_world_landmarks.landmark] if pose_world_landmarks is not None else [[0] * 3 for _ in range(33)]
    # 21 hand landmarks => array with 3 * 21 elements
    lh = [[l.x, l.y, l.z] for l in left_hand_landmarks.landmark] if left_hand_landmarks is not None else no_hand_landmarks_list
    # 21 hand landmarks => array with 3 * 21 elements
    lh_world = [[l.x, l.y, l.z] for l in left_hand_world_landmarks.landmark] if left_hand_world_landmarks is not None else no_hand_landmarks_list
    # 21 hand landmarks => array with 3 * 21 elements
    rh = [[r.x, r.y, r.z] for r in right_hand_landmarks.landmark] if right_hand_landmarks is not None else no_hand_landmarks_list
    # 21 hand landmarks => array with 3 * 21 elements
    rh_world = [[r.x, r.y, r.z] for r in right_hand_world_landmarks.landmark] if right_hand_world_landmarks is not None else no_hand_landmarks_list
    return {
        'pose': pose,
        'pose_world': pose_world,
        'lh': lh,
        'lh_world': lh_world,
        'rh': rh,
        'rh_world': rh_world
        }


def plot_landmarks(ax, landmarks, connections, landmarks_color=(1, 0, 0), connections_color=(0, 0, 0), marker='o'):
  plotted_landmarks = {}

  for idx, landmark in enumerate(landmarks):
    ax.scatter(
        xs=[landmark[0]],
        ys=[landmark[1]],
        zs=[landmark[2]],
        color=landmarks_color,
        marker=marker,
        linewidth=2)
    plotted_landmarks[idx] = (landmark[0], landmark[1], landmark[2])

  if connections:
    num_landmarks = len(landmarks)

    # Draws the connections if the start and end landmarks are both visible.
    for connection in connections:
      start_idx = connection[0]
      end_idx = connection[1]

      if not (0 <= start_idx < num_landmarks and 0 <= end_idx < num_landmarks):
        raise ValueError(f'Landmark index is out of range. Invalid connection '
                         f'from landmark #{start_idx} to landmark #{end_idx}.')
      
      if start_idx in plotted_landmarks and end_idx in plotted_landmarks:
        landmark_pair = [
            plotted_landmarks[start_idx], plotted_landmarks[end_idx]
        ]
        ax.plot(
            xs=[landmark_pair[0][0], landmark_pair[1][0]],
            ys=[landmark_pair[0][1], landmark_pair[1][1]],
            zs=[landmark_pair[0][2], landmark_pair[1][2]],
            color=connections_color,
            linewidth=2)


def get_normalized_by_depth(v, z_origin):
    if z_origin == 0:
        return np.zeros(v.shape)

    return v / z_origin


def _get_normalized_displacement(prev, curr, z_origin_prev, z_origin_curr):    
    prev = get_normalized_by_depth(prev, z_origin_prev)
    curr = get_normalized_by_depth(curr, z_origin_curr)

    d = curr - prev

    if not d.any():
        return d

    return d / np.linalg.norm(d)


def get_pose_normalized_displacement(prev, curr):
    if not prev.any():
        return prev
    if not curr.any():
        return curr
    
    mid_hips_prev = (prev[23] + prev[24]) / 2
    mid_hips_curr = (curr[23] + curr[24]) / 2

    return _get_normalized_displacement(prev, curr, mid_hips_prev[2], mid_hips_curr[2])


def get_hand_normalized_displacement(prev, curr):
    if not prev.any():
        return prev
    if not curr.any():
        return curr
    
    wrist_prev = prev[0]
    wrist_curr = curr[0]

    return _get_normalized_displacement(prev, curr, wrist_prev[2], wrist_curr[2])


def tflite_predict(interpreter, input):
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    interpreter.set_tensor(input_details[0]['index'], input)
    interpreter.invoke()
    return interpreter.get_tensor(output_details[0]['index'])


def lerp(first, second, t=0.5):
    return (1 - t) * first + t * second


def create_normalized_landmark_list(points):
    landmark_list = landmark_pb2.NormalizedLandmarkList()
    
    for point in points:
        landmark = landmark_list.landmark.add()
        landmark.x, landmark.y, landmark.z = point
    
    return landmark_list

## Create dataset

#### Create dataset folder structure

In [None]:
# Create dataset folder
if not os.path.isdir(dataset_path):
    os.makedirs(dataset_path)

# Create gesture subfolders
for gesture in gestures:
    gesture_dir = os.path.join(dataset_path, gesture)
    if not os.path.isdir(gesture_dir):
        os.makedirs(gesture_dir)

#### Record data

In [None]:
cap = cv2.VideoCapture(cam_idx, api_pref)

# Set target resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, target_width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, target_height)

# Make sure OpenCV set it right
print(f'width: {cap.get(cv2.CAP_PROP_FRAME_WIDTH)}')
print(f'height: {cap.get(cv2.CAP_PROP_FRAME_HEIGHT)}')

# Compute dt helpers
prev_time = 0
cur_time = 0

# Flag for gracefully exiting
running = True

# Flag for starting recording
start = False


def get_dt():
    global prev_time, cur_time

    cur_time = time.time()
    dt = cur_time - prev_time
    prev_time = cur_time

    return dt


def wait_quit():
    global running

    if cv2.waitKey(25) & 0xFF == ord('q'):
        running = False


# Record for 1 second at record_fps
def record_gesture(gesture, record_idx):
    global running

    record_dir = os.path.join(dataset_path, gesture, str(record_idx))
    if not os.path.isdir(record_dir):
        os.makedirs(record_dir)
    
    frame_num = 0
    while running and frame_num < record_fps:
        # Read frame
        _, frame = cap.read()

        dt = get_dt()

        file_path = os.path.join(record_dir, f'{frame_num}.jpg')
        cv2.imwrite(file_path, frame)
        frame_num += 1

        # Mirror the frame
        frame = cv2.flip(frame, 1)

        # Show FPS on frame
        fps = round(1 / dt)
        cv2.putText(frame, str(fps), (5, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (100, 255, 0))

        # Show frame on screen
        cv2.imshow('Camera', frame)
        wait_quit()


def wait_start_or_quit():
    global start, running

    key_pressed = cv2.waitKey(1) & 0xFF

    if key_pressed == ord('s'):
        start = True
    elif key_pressed == ord('q'):
        running = False
       
        
gesture = 'Test'
record_num = 0

while running:
    start = False

    # Read frame
    _, frame = cap.read()

    # Mirror the frame
    frame = cv2.flip(frame, 1)

    # Show helper text on frame
    cv2.putText(frame, f'Now recording `{gesture}`', (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.4, (0, 0, 0), 2)
    cv2.putText(frame, f'{record_num} / {num_records - 1}', (5, 85), cv2.FONT_HERSHEY_SIMPLEX, 1.4, (0, 0, 0), 2)
    cv2.putText(frame, 'Press `s` to start recording', (10, 135), cv2.FONT_HERSHEY_SIMPLEX, 1.4, (0, 0, 0), 2)

    # Show frame
    cv2.imshow('Camera', frame)
    wait_start_or_quit()

    if start:
        t1 = time.time()

        while True:
            t2 = time.time()
            
            # Read frame
            _, frame = cap.read()
            
            # Mirror the frame
            frame = cv2.flip(frame, 1)

            # Show helper text on frame
            delay_left = round(record_delay - (t2 - t1), 2)
            cv2.putText(frame, f'Starting in {delay_left} seconds', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.4, (0, 0, 0), 2)

            # Show frame on screen
            cv2.imshow('Camera', frame)
            wait_quit()

            if not running or t2 - t1 >= record_delay:
                break

        if running:
            record_gesture(gesture, record_num)
            record_num += 1

    if record_num == num_records:
        break


cap.release()
cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()

## Process data

#### Create processed dataset folder structure

In [None]:
# Create processed dataset folder
if not os.path.isdir(processed_dataset_path):
    os.makedirs(processed_dataset_path)

# Create gesture subfolders
for gesture in os.listdir(dataset_path):
    gesture_dir = os.path.join(processed_dataset_path, gesture)
    if not os.path.isdir(gesture_dir):
        os.makedirs(gesture_dir)

    # Create recording sub-subfolder
    for record_num in range(num_records):
        record_dir = os.path.join(gesture_dir, str(record_num))
        if not os.path.isdir(record_dir):
            os.makedirs(record_dir)

#### Extract and save landmarks

In [None]:
def process_gesture(gesture):
    gesture_dir = os.path.join(dataset_path, gesture)
    processed_gesture_dir = os.path.join(processed_dataset_path, gesture)

    for record_num in os.listdir(gesture_dir):
        record_dir = os.path.join(gesture_dir, record_num)
        processed_record_dir = os.path.join(processed_gesture_dir, record_num)

        hands_model = vision.HandLandmarker.create_from_options(hands_options)
        pose_model = vision.PoseLandmarker.create_from_options(pose_options)
        timestamp = 0

        for frame_num in range(record_fps):
            frame_name = f'{frame_num}.jpg'

            # Read frame from file
            frame = cv2.imread(os.path.join(record_dir, frame_name))

            # Convert to RGB
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            frame.flags.writeable = False
            
            # Perform detection
            hands_results = hands_model.detect_for_video(mp.Image(image_format=mp.ImageFormat.SRGB, data=frame), timestamp)
            pose_results = pose_model.detect_for_video(mp.Image(image_format=mp.ImageFormat.SRGB, data=frame), timestamp)
            timestamp += int(record_dt * 1000)

            frame.flags.writeable = True

            # Get hands landmarks
            left_hand_landmarks, right_hand_landmarks = get_hands_landmarks(hands_results, False)
            left_hand_world_landmarks, right_hand_world_landmarks = get_hands_landmarks(hands_results, True)

            # Draw hands landmarks on frame
            draw_hands_landmarks(frame, left_hand_landmarks, right_hand_landmarks)

            # Get pose landmarks
            pose_landmarks = get_pose_landmarks(pose_results, False)
            pose_world_landmarks = get_pose_landmarks(pose_results, True)

            # Draw arms landmarks on frame
            draw_pose_landmarks(frame, pose_landmarks)

            # Convert back to BGR
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

            # Save frame with landmarks
            cv2.imwrite(os.path.join(processed_record_dir, frame_name), frame)

            # Extract landmarks
            landmarks = get_landmarks_dict(
                pose_landmarks,
                pose_world_landmarks,
                left_hand_landmarks,
                left_hand_world_landmarks,
                right_hand_landmarks,
                right_hand_world_landmarks)

            with open(os.path.join(processed_record_dir, f'{frame_num}.json'), 'w') as fout:
                json.dump(landmarks, fout)


with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
    futures = [executor.submit(process_gesture, gesture) for gesture in gestures]

    for future in concurrent.futures.as_completed(futures):
        try:
            future.result()
        except Exception as e:
            print(f'Exception: {e}')

#### Check for wrongly processed data

In [None]:
two_handed = ['A avea', 'A da', 'A face', 'A găsi', 'A simți', 'A trăi', 'Casă', 'Q', 'W', 'X']

for gesture in os.listdir(processed_dataset_path):
    for i in range(num_records):
        record_path = os.path.join(dataset_path, gesture, str(i))
        processed_record_path = os.path.join(processed_dataset_path, gesture, str(i))
        bad_frames = []

        for j in range(record_fps):
            frame_path = os.path.join(processed_record_path, f'{j}.json')

            with open(frame_path) as fin:
                frame_dict = json.load(fin)

                if gesture in two_handed:
                    if frame_dict['lh'] == no_hand_landmarks_list or frame_dict['rh'] == no_hand_landmarks_list:
                        bad_frames.append(j)
                    elif frame_dict['lh_world'] == no_hand_landmarks_list or frame_dict['rh_world'] == no_hand_landmarks_list:
                        bad_frames.append(j)
                    else:
                        if j == 0:
                            first = 1
                            second = 2
                        elif j == record_fps - 1:
                            first = record_fps - 3
                            second = record_fps - 2
                        else:
                            first = j - 1
                            second = j + 1

                        first = os.path.join(processed_record_path, f'{first}.json')
                        with open(first) as fin:
                            first = json.load(fin)

                        second = os.path.join(processed_record_path, f'{second}.json')
                        with open(second) as fin:
                            second = json.load(fin)
                            
                        rr_dist_first = np.sum(np.linalg.norm(np.array(frame_dict['rh']) - np.array(first['rh']), axis=1))
                        rl_dist_first = np.sum(np.linalg.norm(np.array(frame_dict['rh']) - np.array(first['lh']), axis=1))
                            
                        rr_dist_second = np.sum(np.linalg.norm(np.array(frame_dict['rh']) - np.array(second['rh']), axis=1))
                        rl_dist_second = np.sum(np.linalg.norm(np.array(frame_dict['rh']) - np.array(second['lh']), axis=1))

                        if rr_dist_first > rl_dist_first and rr_dist_second > rl_dist_second:
                            bad_frames.append(j)


                else:
                    if frame_dict['lh'] != no_hand_landmarks_list:
                        bad_frames.append(j)
                    elif frame_dict['rh'] == no_hand_landmarks_list:
                        bad_frames.append(j)
                    elif frame_dict['lh_world'] != no_hand_landmarks_list:
                        bad_frames.append(j)
                    elif frame_dict['rh_world'] == no_hand_landmarks_list:
                        bad_frames.append(j)

        if len(bad_frames) != 0:
            print(f'Fixing: {processed_record_path}')
            print(f'Fixing: {bad_frames}')
            consec = any(bad_frames[j] + 1 == bad_frames[j + 1] for j in range(len(bad_frames) - 1))
            if consec or 0 in bad_frames and 2 in bad_frames or (record_fps - 3) in bad_frames and (record_fps - 1) in bad_frames:
                print('Could not fix\n')
            else:
                if 0 in bad_frames:
                    next = os.path.join(processed_record_path, '1.json')
                    with open(next) as fin:
                        next = json.load(fin)
                        
                    next_next = os.path.join(processed_record_path, '2.json')
                    with open(next_next) as fin:
                        next_next = json.load(fin)

                    cur = {
                        k: lerp(np.array(next[k]), np.array(next_next[k]), -0.5).tolist() for k in next.keys()
                    }
                    
                    frame = cv2.imread(os.path.join(record_path, '0.jpg'))
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    draw_hands_landmarks(frame, create_normalized_landmark_list(cur['lh']), create_normalized_landmark_list(cur['rh']))
                    draw_pose_landmarks(frame, create_normalized_landmark_list(cur['pose']))
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                    cv2.imwrite(os.path.join(processed_record_path, '0.jpg'), frame)

                    with open(os.path.join(processed_record_path, '0.json'), 'w') as fout:
                        json.dump(cur, fout)

                if record_fps - 1 in bad_frames:
                    prev_prev = os.path.join(processed_record_path, f'{record_fps - 3}.json')
                    with open(prev_prev) as fin:
                        prev_prev = json.load(fin)
                        
                    prev = os.path.join(processed_record_path, f'{record_fps - 2}.json')
                    with open(prev) as fin:
                        prev = json.load(fin)

                    cur = {
                        k: lerp(np.array(prev_prev[k]), np.array(prev[k]), 2).tolist() for k in prev.keys()
                    }
                    
                    frame = cv2.imread(os.path.join(record_path, f'{record_fps - 1}.jpg'))
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    draw_hands_landmarks(frame, create_normalized_landmark_list(cur['lh']), create_normalized_landmark_list(cur['rh']))
                    draw_pose_landmarks(frame, create_normalized_landmark_list(cur['pose']))
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                    cv2.imwrite(os.path.join(processed_record_path, f'{record_fps - 1}.jpg'), frame)

                    with open(os.path.join(processed_record_path, f'{record_fps - 1}.json'), 'w') as fout:
                        json.dump(cur, fout)

                bad_frames = [j for j in bad_frames if j != 0 and j != record_fps - 1]

                for j in bad_frames:
                    prev = os.path.join(processed_record_path, f'{j - 1}.json')
                    with open(prev) as fin:
                        prev = json.load(fin)
                        
                    next = os.path.join(processed_record_path, f'{j + 1}.json')
                    with open(next) as fin:
                        next = json.load(fin)

                    cur = {
                        k: lerp(np.array(prev[k]), np.array(next[k])).tolist() for k in prev.keys()
                    }
                    
                    frame = cv2.imread(os.path.join(record_path, f'{j}.jpg'))
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    draw_hands_landmarks(frame, create_normalized_landmark_list(cur['lh']), create_normalized_landmark_list(cur['rh']))
                    draw_pose_landmarks(frame, create_normalized_landmark_list(cur['pose']))
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                    cv2.imwrite(os.path.join(processed_record_path, f'{j}.jpg'), frame)

                    with open(os.path.join(processed_record_path, f'{j}.json'), 'w') as fout:
                        json.dump(cur, fout)



def manual_fix(record_path, first, second):
    t_values = np.linspace(0, 1, second - first + 1)

    first_frame = os.path.join(record_path, f'{first}.json')
    with open(first_frame) as fin:
        first_frame = json.load(fin)
        
    second_frame = os.path.join(record_path, f'{second}.json')
    with open(second_frame) as fin:
        second_frame = json.load(fin)

    lerp_lh = [lerp(np.array(first_frame['lh']), np.array(second_frame['lh']), t) for t in t_values]
    lerp_lh_world = [lerp(np.array(first_frame['lh_world']), np.array(second_frame['lh_world']), t) for t in t_values]
    lerp_rh = [lerp(np.array(first_frame['rh']), np.array(second_frame['rh']), t) for t in t_values]
    lerp_rh_world = [lerp(np.array(first_frame['rh_world']), np.array(second_frame['rh_world']), t) for t in t_values]
    lerp_pose = [lerp(np.array(first_frame['pose']), np.array(second_frame['pose']), t) for t in t_values]
    lerp_pose_world = [lerp(np.array(first_frame['pose_world']), np.array(second_frame['pose_world']), t) for t in t_values]

    for i, (l, lw, r, rw, p, pw) in enumerate(zip(lerp_lh, lerp_lh_world, lerp_rh, lerp_rh_world, lerp_pose, lerp_pose_world)):
        dictionary = dict()
        dictionary['lh'] = l.tolist()
        dictionary['lh_world'] = lw.tolist()
        dictionary['rh'] = r.tolist()
        dictionary['rh_world'] = rw.tolist()
        dictionary['pose'] = p.tolist()
        dictionary['pose_world'] = pw.tolist()
        with open(os.path.join(record_path, f'{first + i}.json'), 'w') as fout:
            json.dump(dictionary, fout)

        frame = cv2.imread(os.path.join(record_path.replace('processed_', ''), f'{first + i}.jpg'))
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        draw_hands_landmarks(frame, create_normalized_landmark_list(dictionary['lh']), create_normalized_landmark_list(dictionary['rh']))
        draw_pose_landmarks(frame, create_normalized_landmark_list(dictionary['pose']))
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        cv2.imwrite(os.path.join(record_path, f'{first + i}.jpg'), frame)


# manual_fix(os.path.join(processed_dataset_path, 'Z', '28'), 19, 24)

#### Preview processed data

In [None]:
running = True

for gesture in sorted(os.listdir(processed_dataset_path)):
    if not running:
        break

    for i in range(num_records):
        if not running:
            break

        for j in range(record_fps):
            if not running:
                break

            frame_path = os.path.join(processed_dataset_path, gesture, str(i), f'{j}.jpg')
            print(frame_path)

            # Read frame from file
            frame = cv2.imread(frame_path)
            cv2.imshow('Camera', frame)
            if cv2.waitKey(int(record_dt * 250)) & 0xFF == ord('q'):
                running = False
        
        if cv2.waitKey(250) & 0xFF == ord('q'):
            running = False

cv2.destroyAllWindows()

In [None]:
cv2.destroyAllWindows()

In [None]:
running = True

for i in range(num_records):
    if not running:
        break

    for gesture in os.listdir(processed_dataset_path):
        if not running:
            break

        for j in range(record_fps):
            if not running:
                break

            frame_path = os.path.join(dataset_path, gesture, str(i), f'{j}.jpg')
            print(frame_path)

            # Read frame from file
            frame = cv2.imread(frame_path)
            cv2.imshow('Camera', frame)
            if cv2.waitKey(int(record_dt * 1000)) & 0xFF == ord('q'):
                running = False

            cv2.waitKey(20)
        
        if cv2.waitKey(500) & 0xFF == ord('q'):
            running = False

cv2.destroyAllWindows()

## Feature extraction

#### Extract some sample data

In [None]:
hands_base_options = python.BaseOptions(model_asset_path='hand_landmarker.task')
hands_options = vision.HandLandmarkerOptions(base_options=hands_base_options,
                                       running_mode=mp.tasks.vision.RunningMode.IMAGE,
                                       num_hands=2,
                                       min_hand_detection_confidence=0.4,
                                       min_hand_presence_confidence=0.4,
                                       min_tracking_confidence=0.4)
hands_model = vision.HandLandmarker.create_from_options(hands_options)

directory = 'casa'
names = ['0', '0_close', '0_far', '1', '1_close', '1_far']

for name in names:
    file = os.path.join(directory, name)

    frame = cv2.imread(file + '.jpg')

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    result = hands_model.detect(mp.Image(image_format=mp.ImageFormat.SRGB, data=frame))
    left_hand_landmarks, right_hand_landmarks = get_hands_landmarks(result, False)
    left_hand_world_landmarks, right_hand_world_landmarks = get_hands_landmarks(result, True)

    # Draw hands landmarks on frame
    draw_hands_landmarks(frame, left_hand_landmarks, right_hand_landmarks)

    # Convert back to BGR
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

    # Save frame with landmarks
    cv2.imwrite(file + '_out.jpg', frame)

    # Extract landmarks
    landmarks = get_landmarks_dict(
        None,
        None,
        left_hand_landmarks,
        left_hand_world_landmarks,
        right_hand_landmarks,
        right_hand_world_landmarks)

    with open(file + '.json', 'w') as fout:
        json.dump(landmarks, fout)

#### Plot landmarks and world landmarks

In [None]:
landmarks = None

with open(os.path.join('urca', '17_close.json')) as fin:
    landmarks = json.load(fin)

fig = plt.figure(figsize=(10, 10))

ax = fig.add_subplot(121, projection='3d')
ax.view_init(elev=240, azim=270)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
ax.set_title('Landmarks')
plot_landmarks(ax, landmarks['rh'], solutions.hands_connections.HAND_CONNECTIONS)

ax = fig.add_subplot(122, projection='3d')
ax.view_init(elev=240, azim=270)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
ax.set_title('World landmarks')
plot_landmarks(ax, landmarks['rh_world'], solutions.hands_connections.HAND_CONNECTIONS)

plt.tight_layout()
plt.show()

#### Plot hand movement

In [None]:
def plot_helper(title, start, finish, rows, columns, idx):
    ax = fig.add_subplot(rows, columns, idx, projection='3d')
    ax.view_init(elev=240, azim=270)
    ax.set_xlabel('x')
    ax.set_ylabel('y')
    ax.set_zlabel('z')
    ax.set_title(title)
    plot_landmarks(ax, start, solutions.hands_connections.HAND_CONNECTIONS, (1, 0, 0), marker='o')
    plot_landmarks(ax, finish, solutions.hands_connections.HAND_CONNECTIONS, (0, 0, 1), marker='s')
    for s, f in zip(start, finish):
        ax.plot([s[0], f[0]], [s[1], f[1]], [s[2], f[2]], 'k:')

    print(title + f'\n{np.linalg.norm(finish - start)}\n')

In [None]:
with open(os.path.join('urca', '17.json')) as fin:
    start = np.array(json.load(fin)['rh'])
    start_norm = get_normalized_by_depth(start, start[0][2])

with open(os.path.join('urca', '18.json')) as fin:
    finish = np.array(json.load(fin)['rh'])
    finish_norm = get_normalized_by_depth(finish, finish[0][2])

with open(os.path.join('urca', '17_close.json')) as fin:
    start_close = np.array(json.load(fin)['rh'])
    start_close_norm = get_normalized_by_depth(start_close, start_close[0][2])

with open(os.path.join('urca', '18_close.json')) as fin:
    finish_close = np.array(json.load(fin)['rh'])
    finish_close_norm = get_normalized_by_depth(finish_close, finish_close[0][2])

with open(os.path.join('urca', '17_far.json')) as fin:
    start_far = np.array(json.load(fin)['rh'])
    start_far_norm = get_normalized_by_depth(start_far, start_far[0][2])

with open(os.path.join('urca', '18_far.json')) as fin:
    finish_far = np.array(json.load(fin)['rh'])
    finish_far_norm = get_normalized_by_depth(finish_far, finish_far[0][2])


rows = 9
columns = 1
fig = plt.figure(figsize=(5 * columns, 5 * rows))
        
plot_helper('Distance between start and finish', start_norm, finish_norm, rows, columns, 1)
plot_helper('Distance between start_close and finish_close', start_close_norm, finish_close_norm, rows, columns, 2)
plot_helper('Distance between start_far and finish_far', start_far_norm, finish_far_norm, rows, columns, 3)

plot_helper('Distance between start_close and finish_far norm', start_close_norm, finish_far_norm, rows, columns, 5)
        
plot_helper('Distance between start and finish', start, finish, rows, columns, 7)
plot_helper('Distance between start_close and finish_close', start_close, finish_close, rows, columns, 8)
plot_helper('Distance between start_far and finish_far', start_far, finish_far, rows, columns, 9)

plt.tight_layout()
plt.show()

#### Plot thumb and palm normal

In [None]:
landmarks = None

with open(os.path.join(sample_dir, '0_close.json')) as fin:
    landmarks = np.array(json.load(fin)['rh'])

palm_normal = get_palm_normal(landmarks)
thumb_normal = get_thumb_normal(landmarks)

fig = plt.figure(figsize=(15, 15))

ax = fig.add_subplot(121, projection='3d')
ax.view_init(elev=270, azim=90)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
ax.set_title('Landmarks with palm normal')
plot_landmarks(ax, landmarks, solutions.hands_connections.HAND_CONNECTIONS)
ax.quiver(*landmarks[HandLandmark.WRIST], *palm_normal)
ax.quiver(*landmarks[HandLandmark.WRIST], *thumb_normal, color='red')

for point, angle in zip([landmarks[HandLandmark.THUMB_TIP], landmarks[HandLandmark.INDEX_FINGER_TIP], landmarks[HandLandmark.MIDDLE_FINGER_TIP], landmarks[HandLandmark.RING_FINGER_TIP], landmarks[HandLandmark.PINKY_TIP]], get_hand_angles(landmarks)):
    ax.text(*point, str(round(angle, 2)))

plt.show()

## Create a gesture classification model

#### Data preprocessing

In [None]:
# (15, 33, 3)
def get_pose_record_displacements(record, prev):
    displacements = []

    for frame in record:
        displacements.append(get_pose_normalized_displacement(prev, frame))
        prev = frame

    return np.array(displacements)


# (15, 21, 3)
def get_hand_record_displacements(record, prev):
    displacements = []

    for frame in record:
        displacements.append(get_hand_normalized_displacement(prev, frame))
        prev = frame

    return np.array(displacements)


# (15, 19, 4), (15, 19, 4)
def get_record_rotations(pose, left_hand, right_hand):
    lh_result = []
    rh_result = []

    for p, lh, rh in zip(pose, left_hand, right_hand):
        rotations = calc_R(skeleton, p, lh, rh)

        lh_result.append(rotations[3:22])
        rh_result.append(rotations[22:41])

    return np.array(lh_result), np.array(rh_result)


# (21, 3), (21, 3), (21, 3), (21, 3), (33, 3), (33, 3)
def extract_landmarks(path):
    with open(path) as fin:
        landmarks_dict = json.load(fin)
        return landmarks_dict['lh_world'],\
               landmarks_dict['lh'],\
               landmarks_dict['rh_world'],\
               landmarks_dict['rh'],\
               landmarks_dict['pose_world'],\
               landmarks_dict['pose']


# { h: (15, 21, 3) },  { p: (15, 33, 3) }
def get_two_records(dir):
    lh_wld_one = []
    lh_one = []
    rh_wld_one = []
    rh_one = []
    p_wld_one = []
    p_one = []

    lh_wld_two = []
    lh_two = []
    rh_wld_two = []
    rh_two = []
    p_wld_two = []
    p_two = []

    for frame_num in range(0, record_fps, 2):
        lh_wld, lh, rh_wld, rh, p_wld, p = extract_landmarks(os.path.join(dir, f'{frame_num}.json'))
        lh_wld_one.append(lh_wld)
        lh_one.append(lh)
        rh_wld_one.append(rh_wld)
        rh_one.append(rh)
        p_wld_one.append(p_wld)
        p_one.append(p)

        lh_wld, lh, rh_wld, rh, p_wld, p = extract_landmarks(os.path.join(dir, f'{frame_num + 1}.json'))
        lh_wld_two.append(lh_wld)
        lh_two.append(lh)
        rh_wld_two.append(rh_wld)
        rh_two.append(rh)
        p_wld_two.append(p_wld)
        p_two.append(p)

    return {
        'lh_wld_one': np.array(lh_wld_one),
        'lh_one': np.array(lh_one),
        'rh_wld_one': np.array(rh_wld_one),
        'rh_one': np.array(rh_one),
        'p_wld_one': np.array(p_wld_one),
        'p_one': np.array(p_one),
        'lh_wld_two': np.array(lh_wld_two),
        'lh_two': np.array(lh_two),
        'rh_wld_two': np.array(rh_wld_two),
        'rh_two': np.array(rh_two),
        'p_wld_two': np.array(p_wld_two),
        'p_two': np.array(p_two)
    }


def preprocess_data():
    # Encode text label to integer helper map
    labels_map = { label : num for num, label in enumerate(classes) }

    labels = []
    # (N, 15, 440)
    features = []

    for gesture in classes:
        processed_gesture_dir = os.path.join(processed_dataset_path, gesture)

        for record_num in range(num_records):
            labels.append(labels_map[gesture])
            labels.append(labels_map[gesture])
            
            # { hands: (15, 21, 3) },  { pose: (15, 33, 3) }
            records = get_two_records(os.path.join(processed_gesture_dir, str(record_num)))

            # (15, 21, 3)
            lh_wld_one = records['lh_wld_one']
            # (15, 21, 3)
            rh_wld_one = records['rh_wld_one']
            # (15, 3, 3)
            la_wld_one = records['p_wld_one'][:, [11, 13, 15], :]
            # (15, 3, 3)
            ra_wld_one = records['p_wld_one'][:, [12, 14, 16], :]
            # (15, 21, 3)
            lh_wld_two = records['lh_wld_two']
            # (15, 21, 3)
            rh_wld_two = records['rh_wld_two']
            # (15, 3, 3)
            la_wld_two = records['p_wld_two'][:, [11, 13, 15], :]
            # (15, 3, 3)
            ra_wld_two = records['p_wld_two'][:, [12, 14, 16], :]

            # (15, 21, 3)
            lh_one_diffs = get_hand_record_displacements(records['lh_one'], no_hand_landmarks_ndarray)
            # (15, 21, 3)
            rh_one_diffs = get_hand_record_displacements(records['rh_one'], no_hand_landmarks_ndarray)
            # (15, 3, 3)
            la_one_diffs = get_pose_record_displacements(records['p_one'], no_pose_landmarks_ndarray)[:, [11, 13, 15], :]
            # (15, 3, 3)
            ra_one_diffs = get_pose_record_displacements(records['p_one'], no_pose_landmarks_ndarray)[:, [12, 14, 16], :]
            # (15, 21, 3)
            lh_two_diffs = get_hand_record_displacements(records['lh_two'], no_hand_landmarks_ndarray)
            # (15, 21, 3)
            rh_two_diffs = get_hand_record_displacements(records['rh_two'], no_hand_landmarks_ndarray)
            # (15, 3, 3)
            la_two_diffs = get_pose_record_displacements(records['p_two'], no_pose_landmarks_ndarray)[:, [11, 13, 15], :]
            # (15, 3, 3)
            ra_two_diffs = get_pose_record_displacements(records['p_two'], no_pose_landmarks_ndarray)[:, [12, 14, 16], :]

            # (15, 19, 4)
            lh_one_rotations, rh_one_rotations = get_record_rotations(records['p_one'], records['lh_one'], records['rh_one'])
            lh_two_rotations, rh_two_rotations = get_record_rotations(records['p_two'], records['lh_two'], records['rh_two'])

            # (15, 440)
            f = [np.concatenate([
                    lh_wld_one[i].reshape(-1),
                    rh_wld_one[i].reshape(-1),
                    la_wld_one[i].reshape(-1),
                    ra_wld_one[i].reshape(-1),
                    lh_one_diffs[i].reshape(-1),
                    rh_one_diffs[i].reshape(-1),
                    la_one_diffs[i].reshape(-1),
                    ra_one_diffs[i].reshape(-1),
                    lh_one_rotations[i].reshape(-1),
                    rh_one_rotations[i].reshape(-1)
                ]) for i in range(inference_fps)]
            features.append(f)

            # (15, 440)
            f = [np.concatenate([
                    lh_wld_two[i].reshape(-1),
                    rh_wld_two[i].reshape(-1),
                    la_wld_two[i].reshape(-1),
                    ra_wld_two[i].reshape(-1),
                    lh_two_diffs[i].reshape(-1),
                    rh_two_diffs[i].reshape(-1),
                    la_two_diffs[i].reshape(-1),
                    ra_two_diffs[i].reshape(-1),
                    lh_two_rotations[i].reshape(-1),
                    rh_two_rotations[i].reshape(-1)
                ]) for i in range(inference_fps)]
            features.append(f)

    X = np.array(features)
    print(np.shape(X))

    # One hot encoding
    y = to_categorical(labels).astype(int)
    print(np.shape(y))

    return X, y

#### Create labels, split train and test

In [None]:
X, y = preprocess_data()

# 80% train, 20% rest
X_train, X_rest, y_train, y_rest = train_test_split(X, y, test_size=0.2)
print(np.shape(X_train))
# 50% test, 50% validate
X_test, X_validate, y_test, y_validate = train_test_split(X_rest, y_rest, test_size=0.5)

#### Create and train model

In [None]:
def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

callbacks = [
    TensorBoard(log_dir=os.path.join(os.path.join('Logs', 'gestures'))),
    LearningRateScheduler(scheduler),
    EarlyStopping(monitor='val_loss', patience=15, verbose=1),
    ModelCheckpoint(
        filepath=gestures_weights_path,
        save_weights_only=False,
        monitor='val_loss',
        save_best_only=True,
        verbose=1
    )
]

gestures_model = Sequential()
gestures_model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(X.shape[1], X.shape[2]), recurrent_dropout=0.2))
gestures_model.add(BatchNormalization())
gestures_model.add(LSTM(128, return_sequences=True, activation='relu', recurrent_dropout=0.2))
gestures_model.add(BatchNormalization())
gestures_model.add(LSTM(64, return_sequences=False, activation='relu', recurrent_dropout=0.2))
gestures_model.add(BatchNormalization())
gestures_model.add(Dense(64, activation='relu'))
gestures_model.add(BatchNormalization())
gestures_model.add(Dense(32, activation='relu'))
gestures_model.add(BatchNormalization())
gestures_model.add(Dense(y.shape[-1], activation='softmax'))

gestures_model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
gestures_model.fit(X_train, y_train, epochs=50, validation_data=(X_validate, y_validate), callbacks=callbacks)

In [None]:
gestures_model = load_model(gestures_weights_path)

run_model = tf.function(lambda x: gestures_model(x))
concrete_func = run_model.get_concrete_function(
    tf.TensorSpec((None, X.shape[1], X.shape[2]), gestures_model.inputs[0].dtype))

gestures_model.summary()
gestures_model.save(gestures_weights_path, save_format="tf", signatures=concrete_func)

with open('gestures.json', 'w') as fout:
    json.dump(classes, fout, ensure_ascii=False)

#### Load model

In [None]:
gestures_model = load_model(gestures_weights_path)

#### Test model

In [None]:
results = gestures_model.evaluate(X_test, y_test)
print('test loss, test acc:', results)

#### Confusion matrix

In [None]:
predictions = gestures_model.predict(X_test)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(y_test, axis=1)

cm = confusion_matrix(true_classes, predicted_classes)

plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=classes, yticklabels=classes)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

#### Convert to TFLite

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model(gestures_weights_path)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]
converter._experimental_lower_tensor_list_ops = False
tflite_model = converter.convert()
open(converted_gestures_weights_path, 'wb').write(tflite_model)

#### Load converted model

In [None]:
interpreter = tf.lite.Interpreter(model_path=converted_gestures_weights_path)
interpreter.allocate_tensors()

#### Test converted model

In [None]:
total_loss = 0
correct_predictions = 0

for i in range(len(X_test)):
    # Preprocess the input data
    test_input = np.expand_dims(X_test[i], axis=0).astype(np.float32)

    # Run inference
    test_output = tflite_predict(interpreter, test_input)[0]

    # Categorical crossentropy loss
    total_loss += categorical_crossentropy(y_test[i], test_output)

    # Calculate accuracy (for classification tasks)
    if np.argmax(test_output) == np.argmax(y_test[i]):
        correct_predictions += 1

# Compute the final metrics
average_loss = total_loss / len(X_test)
accuracy = correct_predictions / len(X_test)

print(f'Test loss: {average_loss}, Test accuracy: {accuracy}')


#### Test live stream

In [None]:
TimeLandmarks = namedtuple('TimeLandmarks', 'landmarks timestamp')
TimePredictions = namedtuple('TimePredictions', 'prediction timestamp')

cap = cv2.VideoCapture(cam_idx, api_pref)

# Set target resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, target_width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, target_height)

# Compute dt helpers
prev_time = 0
cur_time = 0

# Timestamp in ms
timestamp = 0

predicted_gesture = ''
prediction_score = 0

prev_prediction = None
saved_predictions = []

# Prev landmarks
prev_lh = no_hand_landmarks_ndarray
prev_rh = no_hand_landmarks_ndarray
prev_la = no_arm_landmarks_ndarray
prev_ra = no_arm_landmarks_ndarray

# Landmarks one second queue
landmarks_queue = []


def get_dt():
    global prev_time, cur_time

    if prev_time == 0:
        prev_time = time.time()
        return 0

    cur_time = time.time()
    dt = cur_time - prev_time
    prev_time = cur_time

    return dt


# If no hands for most (70%) of the recorded second, drop the entire recorded second
def mostly_empty(seq):
    cnt_empty = sum(1 for i in seq if not i[num_hand_features:2*num_hand_features].any())

    return cnt_empty / len(seq) > mostly_empty_percentage


# Duplicate or sample the recorded second to exactly the number of needed frames, linearly
def duplicate_sample(seq, n):
    idx = np.round(np.linspace(0, len(seq) - 1, n)).astype(int)
    return np.array(seq)[idx]


hands_model = vision.HandLandmarker.create_from_options(hands_options)
pose_model = vision.PoseLandmarker.create_from_options(pose_options)
while cap.isOpened():
    # Read frame
    _, frame = cap.read()

    dt = get_dt()
    timestamp += int(dt * 1000)

    # Convert to RGB
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    frame.flags.writeable = False
    
    # Perform detection
    hands_results = hands_model.detect_for_video(mp.Image(image_format=mp.ImageFormat.SRGB, data=frame), timestamp)
    pose_results = pose_model.detect_for_video(mp.Image(image_format=mp.ImageFormat.SRGB, data=frame), timestamp)

    frame.flags.writeable = True

    # Get hands landmarks
    left_hand_landmarks, right_hand_landmarks = get_hands_landmarks(hands_results, False)
    left_hand_world_landmarks, right_hand_world_landmarks = get_hands_landmarks(hands_results, True)

    # Get pose landmarks
    pose_landmarks = get_pose_landmarks(pose_results, False)
    pose_world_landmarks = get_pose_landmarks(pose_results, True)
    
    # Get frame landmarks
    frame_landmarks = get_landmarks_dict(
        pose_landmarks,
        pose_world_landmarks,
        left_hand_landmarks,
        left_hand_world_landmarks,
        right_hand_landmarks,
        right_hand_world_landmarks
    )

    # Get features from frame landmarks
    curr_lh = np.array(frame_landmarks['lh'])
    curr_rh = np.array(frame_landmarks['rh'])
    curr_la = np.array([frame_landmarks['pose'][11], frame_landmarks['pose'][13], frame_landmarks['pose'][15]])
    curr_ra = np.array([frame_landmarks['pose'][12], frame_landmarks['pose'][14], frame_landmarks['pose'][16]])
    
    rotations = calc_R(skeleton, frame_landmarks['pose'], frame_landmarks['lh'], frame_landmarks['rh'])

    features = np.concatenate([
        np.array(frame_landmarks['lh_world']).reshape(-1),
        np.array(frame_landmarks['rh_world']).reshape(-1),
        np.array(np.array([frame_landmarks['pose_world'][11], frame_landmarks['pose_world'][13], frame_landmarks['pose_world'][15]])).reshape(-1),
        np.array(np.array([frame_landmarks['pose_world'][12], frame_landmarks['pose_world'][14], frame_landmarks['pose_world'][16]])).reshape(-1),
        get_normalized_displacement(prev_lh, curr_lh).reshape(-1),
        get_normalized_displacement(prev_rh, curr_rh).reshape(-1),
        get_normalized_displacement(prev_la, curr_la).reshape(-1),
        get_normalized_displacement(prev_ra, curr_ra).reshape(-1),
        rotations[3:22].reshape(-1),
        rotations[22:41].reshape(-1)
    ])
    prev_lh = curr_lh
    prev_rh = curr_rh
    prev_la = curr_la
    prev_ra = curr_ra

    # Add new landmarks to queue
    landmarks_queue.append(TimeLandmarks(features, timestamp))
    
    # If we have at least one second
    if timestamp - landmarks_queue[0].timestamp >= 1000:
        # Remove from queue those older than one second
        landmarks_queue = [l for l in landmarks_queue if timestamp - l.timestamp < 1000]

        input = [l.landmarks for l in landmarks_queue]

        # If no hands were detected for most of the second just drop everything
        if mostly_empty(input):
            print('empty')
            landmarks_queue = []
            prediction_score = 0
            prev_prediction = None
            prev_lh = no_hand_landmarks_ndarray
            prev_rh = no_hand_landmarks_ndarray
            prev_la = no_arm_landmarks_ndarray
            prev_ra = no_arm_landmarks_ndarray
        
        else:
            # Duplicate or sample the frames to the desired fps
            input = duplicate_sample(input, inference_fps)

            # Make predictions
            res = tflite_predict(interpreter, np.expand_dims(input, axis=0).astype(np.float32))[0]
            prediction_score = max(res)
            predicted_gesture = classes[np.argmax(res)]
    
    if prediction_score >= prediction_threshold:
        # New prediction
        if prev_prediction is None or prev_prediction.prediction != predicted_gesture:
            prev_prediction = TimePredictions(predicted_gesture, timestamp)

        else:
            # If the same prediction is consistent for some amount of time, then record it
            if timestamp - prev_prediction.timestamp >= min_unchanged_ms:
                # Once in this branch, the same prediction would be saved each frame.
                # We don't want that, so don't save consecutive duplicates.
                # Another approach would be to only record consecutive duplicate predictions once a second.
                if not saved_predictions or saved_predictions[-1].prediction != predicted_gesture: #or timestamp - saved_predictions[-1].timestamp >= 1000:
                    saved_predictions.append(TimePredictions(predicted_gesture, timestamp))
                    print([f'{p.prediction}@{p.timestamp}ms' for p in saved_predictions])

    draw_hands_landmarks(frame, left_hand_landmarks, right_hand_landmarks)
    draw_pose_landmarks(frame, pose_landmarks)
    
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

    # Mirror frame
    frame = cv2.flip(frame, 1)

    # Rectangle for FPS and prediction
    x, y = 0, 0
    w, h = 250, 80
    cv2.rectangle(frame, (x, x), (x + w, y + h), (0, 0, 0), -1)
    
    # Show FPS on screen
    if dt:
        fps = round(1 / dt)
        cv2.putText(frame, str(fps), (5, 35), cv2.FONT_HERSHEY_SIMPLEX, 1, (100, 255, 0))

    # Show prediction on screen
    if saved_predictions:
        cv2.putText(
            frame,
            f'{saved_predictions[-1].prediction}@{saved_predictions[-1].timestamp}ms',
            (5, 70),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (255, 255, 255),
            2
            )

    # Show frame to screen
    cv2.imshow('Camera', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break


cap.release()
cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()