# Imports

In [1]:
import cv2
import numpy as np
import os
import mediapipe as mp
import threading
import imageio

from functools import partial
from matplotlib import pyplot as plt
import matplotlib.animation as animation

from sklearn.model_selection import train_test_split

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

from scipy import stats
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

import feedforward

In [2]:
GIF_THREADS = {}
DESCRIPTOR_LEN = len(mp.solutions.holistic.PoseLandmark) * 4
DATA_PATH = os.path.join('dist')

In [3]:
ACTIONS = np.array(['1', '2', '3'])
NUM_VIDEOS_FOR_ACTION = 5
NUM_FRAMES_PER_VIDEO = 30

In [None]:
feedforward.create_folder_if_not_exists(DATA_PATH)

for action in ACTIONS:
    if not os.path.exists(os.path.join(DATA_PATH, action)):
        os.mkdir(os.path.join(DATA_PATH, action))

for action in ACTIONS:
    for idx_video in range(NUM_VIDEOS_FOR_ACTION):
        os.makedirs(os.path.join(DATA_PATH, action, str(idx_video)))

In [4]:
def display_gif(gif_path: str, window_name: str, display_time_ms: int = 100):
    gif = imageio.get_reader(gif_path)
    cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)

    for gif_frame in gif:
        cv2.imshow(window_name, gif_frame)
        # Display time of each frame in milliseconds
        cv2.waitKey(display_time_ms)

    cv2.destroyWindow(window_name)

def terminate_all_threads():
    global GIF_THREADS
    for action, thread in GIF_THREADS.items():
        thread.join()
    GIF_THREADS = {} 

In [5]:
def open_suggested_gif(actions, idx):
    gif_path = os.path.join(DATA_PATH, actions[idx], f'{actions[idx]}.gif')
    # Check if a GIF for this action is already running
    if actions[idx] not in GIF_THREADS or not GIF_THREADS[actions[idx]].is_alive():
        thread = threading.Thread(target=display_gif, args=(gif_path, actions[idx]))
        thread.start()
        GIF_THREADS[actions[idx]] = thread

def create_gif(landmarks_list, action):
    fig = plt.figure()
    ani = animation.FuncAnimation(fig, partial(feedforward.draw_frame, landmarks_list=landmarks_list), frames=len(landmarks_list), interval=100)
    output_gif = os.path.join(DATA_PATH, action, f'{action}.gif')
    ani.save(output_gif, writer='imagemagick', fps=10) # TODO: Capire se si puo usare la variabile NUM_FRAMES_PER_VIDEO
    return os.path.exists(output_gif)

In [None]:
cap = cv2.VideoCapture(feedforward.CAMERA_INDEX)

with mp.solutions.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    for action in ACTIONS:
        gif_created = False
        landmarks_list = []

        for idx_video in range(NUM_VIDEOS_FOR_ACTION):
            for frame_num in range(NUM_FRAMES_PER_VIDEO):
                ret, frame = cap.read()
                image, results = feedforward.mediapipe_detection(frame, holistic)

                if not gif_created:
                    body_landmarks = results.pose_landmarks
                    if body_landmarks:
                        landmarks_list.append(body_landmarks)

                feedforward.draw_pose_landmarks(image, results)

                if frame_num == 0:
                    cv2.putText(image, 'STARTING COLLECTION', (120, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, f'Action: {action}. Video #{idx_video}', (15, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(2000)
                else:
                    cv2.putText(image, f'Action: {action}. Video #{idx_video}', (15, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)

                npy_path = os.path.join(DATA_PATH, action, str(idx_video), str(frame_num))
                keypoints = feedforward.extract_descriptor(results, DESCRIPTOR_LEN)
                np.save(npy_path, keypoints)

                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

            if not gif_created and landmarks_list:
                gif_created = create_gif(landmarks_list, action)

    cap.release()
    cv2.destroyAllWindows()

## Build and Train LSTM Neural Network

In [6]:
label_map = {label: num for num, label in enumerate(ACTIONS)}
sequences, labels = [], []

for action in ACTIONS:
    for idx_video in range(NUM_VIDEOS_FOR_ACTION):
        window = []
        for frame_num in range(NUM_FRAMES_PER_VIDEO):
            res = np.load(os.path.join(DATA_PATH, action, str(idx_video), f"{frame_num}.npy"))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [7]:
X = np.array(sequences)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [8]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(NUM_FRAMES_PER_VIDEO, DESCRIPTOR_LEN)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(ACTIONS.shape[0], activation='softmax'))

log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.fit(X_train, y_train, epochs=50, callbacks=[tb_callback])

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x1719fa100>

# Evaluation using Confusion Matrix and Accuracy



In [9]:
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()



In [10]:
multilabel_confusion_matrix(ytrue, yhat)

array([[[0, 0],
        [0, 1]]])

In [11]:
accuracy_score(ytrue, yhat)
#Define precision_score
precision_score = lambda ytrue, yhat: np.sum([1 for yt, yp in zip(ytrue, yhat) if yt == yp]) / len(yhat)
#define recall_score
recall_score = lambda ytrue, yhat: np.sum([1 for yt, yp in zip(ytrue, yhat) if yt == yp]) / len(ytrue)
#Calculate the f score
f_score = stats.hmean([precision_score(ytrue, yhat), recall_score(ytrue, yhat)])
recall_score(ytrue, yhat)

1.0

# Test in Real Time v1
Predict only one combination at a time

In [None]:
idx_video = []
sentence = ['','']
predictions = []

num_frames_for_stability = 10
threshold = 0.75

# Define known combinations and their corresponding suggested next gestures
known_combinations = {
    '123': ['1', '2', '3'],
    '212': ['2', '1', '2'],
    '312': ['3', '1', '2'],
    # Add more combinations and their suggested next gestures here
}

actual_combination = ""
idx = 0

# Initialize the suggested next gesture variable
suggested_next_gesture = ""

# Update the positions for displaying correct prediction and suggestion
left_text_position = (120, 100)
right_text_position = (120, 200)


cap = cv2.VideoCapture(feedforward.CAMERA_INDEX)

with mp.solutions.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        image, results = feedforward.mediapipe_detection(frame, holistic)

        feedforward.draw_pose_landmarks(image, results)

        keypoints = feedforward.extract_descriptor(results, DESCRIPTOR_LEN)
        idx_video.append(keypoints)
        idx_video = idx_video[-NUM_FRAMES_PER_VIDEO:]

        if len(idx_video) == NUM_FRAMES_PER_VIDEO:
            res = model.predict(np.expand_dims(idx_video, axis=0))[0]
            predictions.append(np.argmax(res))
             
            is_stable_prediction = np.unique(predictions[-num_frames_for_stability:])[0]==np.argmax(res)
            if is_stable_prediction: 
                if res[np.argmax(res)] > threshold:            
                    if len(sentence) > 0: 
                        if ACTIONS[np.argmax(res)] != sentence[-1]:
                            sentence.append(ACTIONS[np.argmax(res)])
                    else:
                        sentence.append(ACTIONS[np.argmax(res)])
                    
                    #Case 1: If the actual combination is empty, then check if the last gesture is the start of a known combination
                    if actual_combination == "":
                        for combination in known_combinations:
                            if combination.startswith(''.join(sentence[-1])):
                                actual_combination = combination
                                idx = 1
                                suggested_next_gesture = known_combinations[combination][idx]
                                break
                    else:
                        #If the actual combination is not empty:
                        #Case 2: If the idx is ok and the last gesture is the next gesture of the actual combination
                        if idx != 0 and idx < len(known_combinations[actual_combination]) and sentence[-1] == known_combinations[actual_combination][idx]:
                            idx += 1
                            if idx < len(known_combinations[actual_combination]):   
                                suggested_next_gesture = known_combinations[actual_combination][idx]
                            print('next', idx)
                        #Case 3: If the idx is ok and the last gesture is still the same as the previous one
                        elif idx != 0 and idx < len(known_combinations[actual_combination]) and sentence[-1] == known_combinations[actual_combination][idx-1]:
                            suggested_next_gesture = known_combinations[actual_combination][idx]
                            print('still', idx)
                        #Case 4: If the idx is over the length of the actual combination, then combination is done
                        elif idx >= len(known_combinations[actual_combination]):
                            print('done', idx)
                            idx = 0
                            actual_combination = ""
                            suggested_next_gesture = "done"
                        #Case 5: If the last gesture is not the next gesture of the actual combination, then reset the combination
                        else:
                            print('reset', idx)
                            idx = 0
                            actual_combination = ""
                            suggested_next_gesture = ""

            if len(sentence) > 5: sentence = sentence[-5:]

            image = feedforward.prob_viz(res, ACTIONS, image)

        # Display the correct prediction on the left
        if sentence:
            cv2.putText(image, f'Correct Prediction: {sentence[-1]}', left_text_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

        cv2.putText(image, f'Next Gesture: {suggested_next_gesture}', right_text_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)


        cv2.imshow('OpenCV Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# Test in Real Time v2
Predict more combinations at a time

In [12]:
idx_video = []
sentence = []
predictions = []

num_frames_for_stability = 10
threshold = 0.75

# Define known combinations and their corresponding suggested next gestures
known_combinations = {
    '123': ['1', '2', '3'],
    '212': ['2', '1', '2'],
    '312': ['3', '1', '2'],
    '3212': ['3', '2', '1', '2'],
    '213': ['2', '1', '3'],
    '2312': ['2', '3', '1', '2'],
    # Add more combinations and their suggested next gestures here
}

actual_combination = ""
idx = 0

# Initialize the suggested next gesture variable
suggested_next_gesture = ""
next_gestures = {}

# Update the positions for displaying correct prediction and suggestion
#left_text_position = (100, 560)
#right_text_position = (600, 560)
left_text_position = (120, 400)
right_text_position = (120, 300)


cap = cv2.VideoCapture(feedforward.CAMERA_INDEX)

with mp.solutions.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        image, results = feedforward.mediapipe_detection(frame, holistic)

        feedforward.draw_pose_landmarks(image, results)

        keypoints = feedforward.extract_descriptor(results, DESCRIPTOR_LEN)
        idx_video.append(keypoints)
        idx_video = idx_video[-NUM_FRAMES_PER_VIDEO:]

        if len(idx_video) == NUM_FRAMES_PER_VIDEO:
            res = model.predict(np.expand_dims(idx_video, axis=0))[0]
            predictions.append(np.argmax(res))
             
            is_stable_prediction = np.unique(predictions[-num_frames_for_stability:])[0]==np.argmax(res)
            if is_stable_prediction: 
                if res[np.argmax(res)] > threshold:            
                    if len(sentence) > 0: 
                        if ACTIONS[np.argmax(res)] != sentence[-1]:
                            sentence.append(ACTIONS[np.argmax(res)])
                    else:
                        sentence.append(ACTIONS[np.argmax(res)])
                    
                    #Case 1: If the actual combination is empty, then check if the last gesture is the start of a known combination
                    if next_gestures == {}:
                        for combination in known_combinations:
                            if combination.startswith(''.join(sentence[-1])):
                                idx = 1
                                next_gestures[combination] = idx
                    #If the actual combination is not empty:
                    else:
                        for combo in list(next_gestures.keys()):
                            idx = next_gestures[combo]
                            #Case 2: If the idx is ok and the last gesture is the next gesture of the actual combination
                            if idx != 0 and idx < len(known_combinations[combo]) and sentence[-1] == known_combinations[combo][idx]:
                                idx += 1
                                next_gestures[combo] = idx
                            #Case 3: If the idx is ok and the last gesture is still the same as the previous one
                            elif idx != 0 and idx < len(known_combinations[combo]) and sentence[-1] == known_combinations[combo][idx-1]:
                                next_gestures[combo] = idx
                            #Case 4: If the idx is over the length of the actual combination, then combination is done
                            elif idx >= len(known_combinations[combo]):
                                next_gestures.pop(combo)
                            #Case 5: If the last gesture is not the next gesture of the actual combination, then reset the combination
                            elif sentence[-1] != known_combinations[combo][idx]:
                                next_gestures.pop(combo)

            if len(sentence) > 5: sentence = sentence[-5:]

            image = feedforward.prob_viz(res, ACTIONS, image)

        # Display the correct prediction on the left
        if sentence:
            cv2.putText(image, f'Correct Prediction: {sentence[-1]}', left_text_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

        offset = 0
        
        for combo in next_gestures:
            if next_gestures[combo] < len(known_combinations[combo]):
                suggested_next_gesture = known_combinations[combo][next_gestures[combo]]
            else: 
                suggested_next_gesture = "done"

            cv2.putText(image, f'{known_combinations[combo]} Next Gesture: {suggested_next_gesture}', right_text_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
            
            try:
                flag = int(suggested_next_gesture)
                index = np.where(ACTIONS == suggested_next_gesture)
                open_suggested_gif(ACTIONS, int(index[0][0]))
            except ValueError:
                terminate_all_threads()

            offset += 30
            right_text_position = (120, 200 + offset)

        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        cv2.imshow('OpenCV Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            terminate_all_threads()
            break

    cap.release()
    cv2.destroyAllWindows()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.




KeyboardInterrupt: 