# 1. Data Capture / Mediapipe setup

In [None]:
# Cell 1: Install libraries
!pip install mediapipe opencv-python pandas numpy matplotlib

In [None]:
# Cell 2: Libraries
import mediapipe as mp
import cv2
import numpy as np
import time
import pandas as pd
import csv
import os
from matplotlib import pyplot as plt

In [None]:
# Cell 3: Check that the camera and mediapipe are working
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("Error: Unable to open video capture")
else:
    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose, \
        mp.solutions.hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands:
        while cap.isOpened():
            ret, image = cap.read()
            if not ret:
                print("Error: Unable to read frame from video capture")
                break

            # Flip image to simulate mirror view
            image = cv2.flip(image, 1)

            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            # Make detections
            pose_results = pose.process(image)
            hand_results = hands.process(image)

            # RGB 2 BGR
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # Draw points
            if pose_results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    image,
                    pose_results.pose_landmarks,
                    mp_pose.POSE_CONNECTIONS,
                    mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=4),
                    mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2),
                )

            if hand_results.multi_hand_landmarks:
                for hand_landmarks in hand_results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        image,
                        hand_landmarks,
                        mp.solutions.hands.HAND_CONNECTIONS,
                        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=4),
                        mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2),
                    )

            cv2.imshow("Raw Webcam Feed", image)

            if cv2.waitKey(10) & 0xFF == ord("q"):
                break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
# Cell 4: Record video for data capture
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("Error: Unable to open video capture")
else:
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30 
    print(f"Video Resolution: {width}x{height} at {fps} FPS")

    fourcc = cv2.VideoWriter_fourcc(*'XVID') 
    out = cv2.VideoWriter('output_2.avi', fourcc, fps, (width, height))

    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            print("Error: Unable to read frame from video capture")
            break
        frame = cv2.flip(frame, 1)

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_rgb.flags.writeable = False
        results = hands.process(frame_rgb)  
        frame_rgb.flags.writeable = True
        frame = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

        # Write every frame
        out.write(frame)

        cv2.imshow('MediaPipe Hands', frame)  
        if cv2.waitKey(5) & 0xFF == ord('q'): 
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

In [None]:
# Cell 5: Create CSV to store data
headers = ['class', 'accuracy', 'sequence'] 
headers.extend([f'pose_{coord}{i}' for i in range(33) for coord in ('x', 'y', 'z', 'v')])
headers.extend([f'{hand}_{coord}{i}' for hand in ('right_hand', 'left_hand') for i in range(21) for coord in ('x', 'y', 'z', 'v')])

with open('coordinates.csv', mode='w', newline='') as file:
    csv_writer = csv.writer(file)
    csv_writer.writerow(headers)

In [None]:
# Cell 6: Capture data for first movement
cap = cv2.VideoCapture('output_1.avi')
if not cap.isOpened():
    print("Error: Unable to open video capture")
else:
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter('processed_output.avi', fourcc, fps, (width, height))

    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    mp_drawing = mp.solutions.drawing_utils
    pose_drawing_spec = mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=4)
    hand_drawing_spec = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)

    record = False
    accuracy = None
    sequences = {'r': -1, 'w': -1}  
    recording_state = None

    with open('coordinates_1.csv', mode='a', newline='') as file:
        csv_writer = csv.writer(file)

        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                print("Error: Unable to read frame from video capture")
                break

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_rgb.flags.writeable = False
            pose_results = pose.process(frame_rgb)
            hand_results = hands.process(frame_rgb)
            frame_rgb.flags.writeable = True
            frame = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

            if pose_results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    frame,
                    pose_results.pose_landmarks,
                    mp_pose.POSE_CONNECTIONS,
                    landmark_drawing_spec=pose_drawing_spec,
                    connection_drawing_spec=pose_drawing_spec)

            if hand_results.multi_hand_landmarks:
                for hand_landmarks in hand_results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        frame,
                        hand_landmarks,
                        mp_hands.HAND_CONNECTIONS,
                        landmark_drawing_spec=hand_drawing_spec,
                        connection_drawing_spec=hand_drawing_spec)

            # Select the sequence number based on the key pressed
            current_sequence = sequences[recording_state] if recording_state else -1
            row = ['', accuracy, current_sequence]

            if pose_results.pose_landmarks:
                for lm in pose_results.pose_landmarks.landmark:
                    visibility_binary = 1 if lm.visibility > 0.3 else 0
                    row.extend([lm.x, lm.y, lm.z, visibility_binary])
            else:
                row.extend([0] * 33 * 4)

            for hand in ('right_hand', 'left_hand'):
                found = False
                if hand_results.multi_hand_landmarks:
                    for hand_landmarks, handedness in zip(hand_results.multi_hand_landmarks, hand_results.multi_handedness):
                        if handedness.classification[0].label == ('Right' if hand == 'right_hand' else 'Left'):
                            for lm in hand_landmarks.landmark:
                                visibility_binary = 1 if lm.visibility > 0.2 else 0
                                row.extend([lm.x, lm.y, lm.z, visibility_binary])
                            found = True
                            break
                if not found:
                    row.extend([0] * 21 * 4)

            if record:
                csv_writer.writerow(row)

            out.write(frame)
            cv2.imshow('MediaPipe Pose', frame)
            key = cv2.waitKey(5) & 0xFF

            if key == ord('r') or key == ord('w'):
                new_state = chr(key)
                if new_state != recording_state:
                    sequences[new_state] += 1  
                    recording_state = new_state
                record = True
                accuracy = 1 if key == ord('r') else 0
            elif key == ord('s'):
                record = False
                recording_state = None
            elif key == ord('q'):
                break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

df = pd.read_csv('coordinates_1.csv')
df.loc[df['accuracy'].notna(), 'class'] = 'both_hands'
df.to_csv('coordinates_1.csv', index=False)


In [None]:
# Cell 7: Capture data for second movement
cap = cv2.VideoCapture('output_2.avi')
if not cap.isOpened():
    print("Error: Unable to open video capture")
else:
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter('processed_output.avi', fourcc, fps, (width, height))

    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    mp_drawing = mp.solutions.drawing_utils
    pose_drawing_spec = mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=4)
    hand_drawing_spec = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)

    record = False
    accuracy = None
    sequences = {'r': -1, 'w': -1}  
    recording_state = None

    with open('coordinates_2.csv', mode='a', newline='') as file:
        csv_writer = csv.writer(file, lineterminator='\n')

        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                print("Error: Unable to read frame from video capture")
                break

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_rgb.flags.writeable = False
            pose_results = pose.process(frame_rgb)
            hand_results = hands.process(frame_rgb)
            frame_rgb.flags.writeable = True
            frame = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

            if pose_results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    frame,
                    pose_results.pose_landmarks,
                    mp_pose.POSE_CONNECTIONS,
                    landmark_drawing_spec=pose_drawing_spec,
                    connection_drawing_spec=pose_drawing_spec)

            if hand_results.multi_hand_landmarks:
                for hand_landmarks in hand_results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        frame,
                        hand_landmarks,
                        mp_hands.HAND_CONNECTIONS,
                        landmark_drawing_spec=hand_drawing_spec,
                        connection_drawing_spec=hand_drawing_spec)

            current_sequence = sequences[recording_state] if recording_state else -1
            row = ['', accuracy, current_sequence]

            if pose_results.pose_landmarks:
                for lm in pose_results.pose_landmarks.landmark:
                    visibility_binary = 1 if lm.visibility > 0.3 else 0
                    row.extend([lm.x, lm.y, lm.z, visibility_binary])
            else:
                row.extend([0] * 33 * 4)

            for hand in ('right_hand', 'left_hand'):
                found = False
                if hand_results.multi_hand_landmarks:
                    for hand_landmarks, handedness in zip(hand_results.multi_hand_landmarks, hand_results.multi_handedness):
                        if handedness.classification[0].label == ('Right' if hand == 'right_hand' else 'Left'):
                            for lm in hand_landmarks.landmark:
                                visibility_binary = 1 if lm.visibility > 0.2 else 0
                                row.extend([lm.x, lm.y, lm.z, visibility_binary])
                            found = True
                            break
                if not found:
                    row.extend([0] * 21 * 4)

            if record:
                csv_writer.writerow(row)

            out.write(frame)
            cv2.imshow('MediaPipe Pose', frame)
            key = cv2.waitKey(5) & 0xFF

            if key == ord('r') or key == ord('w'):
                new_state = chr(key)
                if new_state != recording_state:
                    sequences[new_state] += 1  
                    recording_state = new_state
                record = True
                accuracy = 1 if key == ord('r') else 0
            elif key == ord('s'):
                record = False
                recording_state = None
            elif key == ord('q'):
                break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

df = pd.read_csv('coordinates_2.csv', header=None)
df.loc[df[1].notna(), 0] = 'one_hand'
df.to_csv('coordinates_2.csv', header=False, index=False)


In [None]:
# Cell 7.2: Capture data for third movement
cap = cv2.VideoCapture('output_scissors.avi')
if not cap.isOpened():
    print("Error: Unable to open video capture")
else:
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter('processed_output.avi', fourcc, fps, (width, height))

    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    mp_drawing = mp.solutions.drawing_utils
    pose_drawing_spec = mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=4)
    hand_drawing_spec = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)

    record = False
    accuracy = None
    sequences = {'r': -1, 'w': -1}  
    recording_state = None

    with open('coordinates_3.csv', mode='a', newline='') as file:
        csv_writer = csv.writer(file, lineterminator='\n')

        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                print("Error: Unable to read frame from video capture")
                break

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_rgb.flags.writeable = False
            pose_results = pose.process(frame_rgb)
            hand_results = hands.process(frame_rgb)
            frame_rgb.flags.writeable = True
            frame = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

            if pose_results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    frame,
                    pose_results.pose_landmarks,
                    mp_pose.POSE_CONNECTIONS,
                    landmark_drawing_spec=pose_drawing_spec,
                    connection_drawing_spec=pose_drawing_spec)

            if hand_results.multi_hand_landmarks:
                for hand_landmarks in hand_results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        frame,
                        hand_landmarks,
                        mp_hands.HAND_CONNECTIONS,
                        landmark_drawing_spec=hand_drawing_spec,
                        connection_drawing_spec=hand_drawing_spec)

            current_sequence = sequences[recording_state] if recording_state else -1
            row = ['', accuracy, current_sequence]

            if pose_results.pose_landmarks:
                for lm in pose_results.pose_landmarks.landmark:
                    visibility_binary = 1 if lm.visibility > 0.3 else 0
                    row.extend([lm.x, lm.y, lm.z, visibility_binary])
            else:
                row.extend([0] * 33 * 4)

            for hand in ('right_hand', 'left_hand'):
                found = False
                if hand_results.multi_hand_landmarks:
                    for hand_landmarks, handedness in zip(hand_results.multi_hand_landmarks, hand_results.multi_handedness):
                        if handedness.classification[0].label == ('Right' if hand == 'right_hand' else 'Left'):
                            for lm in hand_landmarks.landmark:
                                visibility_binary = 1 if lm.visibility > 0.2 else 0
                                row.extend([lm.x, lm.y, lm.z, visibility_binary])
                            found = True
                            break
                if not found:
                    row.extend([0] * 21 * 4)

            if record:
                csv_writer.writerow(row)

            out.write(frame)
            cv2.imshow('MediaPipe Pose', frame)
            key = cv2.waitKey(5) & 0xFF

            if key == ord('r') or key == ord('w'):
                new_state = chr(key)
                if new_state != recording_state:
                    sequences[new_state] += 1  
                    recording_state = new_state
                record = True
                accuracy = 1 if key == ord('r') else 0
            elif key == ord('s'):
                record = False
                recording_state = None
            elif key == ord('q'):
                break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

df = pd.read_csv('coordinates_3.csv', header=None)
df.loc[df[1].notna(), 0] = ''
df.to_csv('coordinates_3.csv', header=False, index=False)


# 2. Data Processing

In [None]:
# Cell 8: Join all the CSVs of the different movements
df1 = pd.read_csv('coordinates_1.csv', header=None)
df2 = pd.read_csv('coordinates_2.csv', header=None)
# df3 = pd.read_csv('coordinates_3.csv', header=None)


combined_df = pd.concat([df1, df2], axis=0, ignore_index=True)
combined_path = 'combined_coordinates.csv'
combined_df.to_csv(combined_path, header=False, index=False)

In [None]:
# Cell 9: Delete not important data
data = pd.read_csv('combined_coordinates.csv')

# Identify columns to remove: include specific ranges and all visibility points
#columns_to_remove_1 = [f"pose_{c}{i}" for c in ['x', 'y', 'z', 'v'] for i in range(0, 11)]
#columns_to_remove_2 = [f"pose_{c}{i}" for c in ['x', 'y', 'z', 'v'] for i in range(23, 33)]
visibility_columns_pose = [col for col in data.columns if 'pose_v' in col]
visibility_columns_left_hand = [col for col in data.columns if 'left_hand_v' in col]
visibility_columns_right_hand = [col for col in data.columns if 'right_hand_v' in col]

# Combine all columns to remove
columns_to_remove = visibility_columns_pose + visibility_columns_left_hand + visibility_columns_right_hand
columns_to_remove = [col for col in columns_to_remove if col in data.columns]

# Drop the selected columns from the dataframe
data_filtered = data.drop(columns=columns_to_remove)

data_filtered.to_csv('filtered_coordinates.csv', index=False)

# Display the first few rows of the filtered dataset
data_filtered.head()

In [None]:
# Cell 10: Re-label CSV
def transform_and_merge_columns(file_path, output_file_path):
    # Load the CSV file
    data = pd.read_csv(file_path)
    
    data['accuracy'] = data['accuracy'].map({0: 'W', 1: 'R'})
    data['label'] = data.iloc[:, 0].astype(str) + '_' + data['accuracy']
    
    label_column = data.pop('label') 
    data.insert(0, 'label', label_column)
    
    data.drop(columns=['class', 'accuracy'], inplace=True)
    data.to_csv(output_file_path, index=False)

input_file_path = 'filtered_coordinates.csv'  
output_file_path = 'modified_file.csv'  
transform_and_merge_columns(input_file_path, output_file_path)

In [None]:
# Cell 11: Knowing the size of the CSV
data = pd.read_csv('modified_file.csv')

rows, columns = data.shape
print("Rows:", rows)
print("Columns:", columns) # We will take out two columns from here to exclude the labeling
21*3+21*3+33*3 # Make sure the points you want to collect are the same. amount as columns you have (lh_lm*3 + rh_lm*3 + pose_lm*3)

In [None]:
# Cell 12: Capture the different labels of the dataset
labels = data.iloc[:, 0]
actions = list(set(labels))
print(actions)

In [None]:
# Cell 13: Create directory to save Data arrays for training
def create_subfolders_from_labels_and_sequences(csv_file, base_folder):
    # Load the data
    data = pd.read_csv(csv_file)
    
    if not os.path.exists(base_folder): 
        os.makedirs(base_folder)
    
    unique_labels = data['label'].unique()     
    
    for label in unique_labels:    
        label_folder_path = os.path.join(base_folder, label)
        if not os.path.exists(label_folder_path):
            os.makedirs(label_folder_path)
        
        label_data = data[data['label'] == label]       
        
        unique_sequences = label_data['sequence'].unique()
        
        for sequence in unique_sequences:
            sequence_folder_path = os.path.join(label_folder_path, str(sequence))
            if not os.path.exists(sequence_folder_path):
                os.makedirs(sequence_folder_path)
            else:
                print(f"Subfolder for sequence {sequence} in label {label} already exists.")

file_path = 'modified_file.csv'
DATA_PATH = os.path.join('DataBase')
create_subfolders_from_labels_and_sequences(file_path, DATA_PATH)

In [None]:
# Cell 14: Save arrays in directory
def save_data_as_arrays(csv_file, base_folder):
    # Load the data
    data = pd.read_csv(csv_file)
    
    for (label, sequence), group in data.groupby(['label', 'sequence']):
        folder_path = os.path.join(base_folder, label, str(sequence))
        
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
        
        file_counter = 0
        
        for index, row in group.iterrows():
            data_array = row.drop(['label', 'sequence']).to_numpy(dtype=np.float32)
            
            file_name = f"{file_counter}.npy"
            file_path = os.path.join(folder_path, file_name)
            
            np.save(file_path, data_array)
            file_counter += 1

save_data_as_arrays(file_path, DATA_PATH)

In [None]:
# Cell 15: Test new arrays are created in proper format (x, )
test = np.load('DataBase/one_hand_R/1/14.npy', allow_pickle=True)
test.shape

In [None]:
# Cell 16: Create new amount of folders so all the sequences and movements have the same amount of data
def equalize_subfolders(base_folder):
    subfolder_counts = {}

    labels = [d for d in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, d))]
    for label in labels:
        label_path = os.path.join(base_folder, label)
        sequences = [d for d in os.listdir(label_path) if os.path.isdir(os.path.join(label_path, d))]
        subfolder_counts[label] = len(sequences)
    
    max_subfolders = max(subfolder_counts.values())
    
    for label, count in subfolder_counts.items():
        label_path = os.path.join(base_folder, label)
        if count < max_subfolders:
            for i in range(count, max_subfolders):
                new_folder_path = os.path.join(label_path, str(i))
                if not os.path.exists(new_folder_path):
                    os.makedirs(new_folder_path)
                print(f"Created folder {new_folder_path}")

equalize_subfolders(DATA_PATH)

In [None]:
# Cell 17: Create np.arrays filled with 0s to have the same amount of data per movement and sequence
def equalize_array_counts(base_folder):
    max_files = 0

    # Dictionary to hold all subfolder paths
    subfolder_paths = []

    for label in os.listdir(base_folder):
        label_path = os.path.join(base_folder, label)
        if os.path.isdir(label_path):
            for sequence in os.listdir(label_path):
                sequence_path = os.path.join(label_path, sequence)
                if os.path.isdir(sequence_path):
                    subfolder_paths.append(sequence_path)
                    num_files = len([f for f in os.listdir(sequence_path) if f.endswith('.npy')])
                    if num_files > max_files:
                        max_files = num_files

    for folder in subfolder_paths:
        current_count = len([f for f in os.listdir(folder) if f.endswith('.npy')])
        if current_count < max_files:
            for i in range(current_count, max_files):
                new_file_path = os.path.join(folder, f"{i}.npy")
                zero_array = np.zeros(225, dtype=np.float32)  
                np.save(new_file_path, zero_array)
                print(f"Created {new_file_path} with zeros")

equalize_array_counts(DATA_PATH)

In [None]:
# Cell 18: Re-test new arrays
test = np.load('DataBase/one_hand_W/24/15.npy')
test.shape

In [None]:
# Cell 19: count the amount of subfolders and arrays
def count_subfolders_and_arrays(base_folder):
    # Find all label folders and sort them to consistently pick the first
    label_folders = sorted([d for d in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, d))])

    if not label_folders:
        print("No folders found in the base directory.")
        return None

    first_label_path = os.path.join(base_folder, label_folders[0])
    
    subfolders = sorted([d for d in os.listdir(first_label_path) if os.path.isdir(os.path.join(first_label_path, d))])   
    no_sequences = len(subfolders)

    if not subfolders:
        print("No subfolders found in the first label folder.")
        return None
    first_subfolder_path = os.path.join(first_label_path, subfolders[0])

    sequence_length = len([f for f in os.listdir(first_subfolder_path) if f.endswith('.npy')])

    return no_sequences, sequence_length

results = count_subfolders_and_arrays(DATA_PATH)

if results:
    no_sequences, sequence_length = results
    print(f"no_sequences= {no_sequences} / sequence_length= {sequence_length}")

In [None]:
# Cell 20: Create labelmap to asign every action
label_map = {label:num for num, label in enumerate(actions)}
label_map

In [None]:
# Cell 21: Assign sequences and labels list
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = []
        for frame_num in range(sequence_length):
            file_path = os.path.join(DATA_PATH, action, str(sequence), f"{frame_num}.npy")
            res = np.load(file_path, allow_pickle=True)  
            window.append(res)
        sequences.append(window)  
        labels.append(label_map[action])

# 3. Set up Tensorflow (Apple silicon step if you use nviadia GPUs check [Tensorflow GPU Documentation](https://www.tensorflow.org/guide/gpu))

In [None]:
# Cell 22: Install new libraries for processing and training
!pip install tensorflow 
!pip install tensorflow-macos
!pip install tensorflow-metal
!pip install scikit-learn --upgrade
!pip install scipy --upgrade
!pip install plotly --upgrade

In [None]:
# Cell 23: Conda environment dependencies installation (I only managed to get the Tensorflow GPU working under a conda environment)
!conda install -y apple tensorflow-deps
!conda install notebook -y

In [None]:
# Cell 24: Check if your GPU works (code from fotiecodes: (https://blog.fotiecodes.com/install-tensorflow-on-your-mac-m1m2m3-with-gpu-support-clqs92bzl000308l8a3i35479))
import sys
import keras
import pandas as pd
import sklearn as sk
import scipy as sp
import tensorflow as tf
import platform

print (f"Python Platform: {platform.platform ()}")
print (f"Tensor Flow Version: {tf.__version__}")
print(f"Keras Version: {keras.__version__}")
print ()

print (f"Python {sys.version}")
print (f"Pandas {pd.__version__}")
print (f"Scikit-Learn {sk.__version__}")
print (f"SciPy {sp.__version__}")
gpu = len (tf.config.list_physical_devices ('GPU'))>0
print ("GPU is", "available" if gpu else "NOT AVAILABLE")

# 4. Training

In [None]:
# Cell 25: Import training dependencies
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

In [None]:
# Cell 26: Set X and y
X = np.array(sequences)
y = to_categorical(labels).astype(int)
y

In [None]:
# Cell 27: Check size of array
np.array(sequences).shape

In [None]:
# Cell 28: Split values for testing and training
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)
y_train.shape

In [None]:
# Cell 29: To web monitor live training later
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
# Cell 30: Build the neural network (All about experimenting. This one is probably not the most efficient way)
actions
actions_array = np.array(actions)
model = Sequential()

# Adding a Bidirectional LSTM layer with input shape
model.add(Bidirectional(LSTM(64, return_sequences=True, activation='tanh'), input_shape=(68, 225)))

# Dropout for regularization
model.add(Dropout(0.2))

# Another Bidirectional LSTM layer with regularization
model.add(Bidirectional(LSTM(128, return_sequences=True, activation='tanh', kernel_regularizer=l2(0.01))))

# Additional dropout layer
model.add(Dropout(0.2))

# Final Bidirectional LSTM layer, does not return sequences
model.add(Bidirectional(LSTM(64, activation='tanh')))

# Dense layers following the final LSTM output
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions_array.shape[0], activation='softmax'))


In [None]:
# Cell 31: Compile the model
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Cell 32: Early stoping to get the best result of the training
early_stopping = EarlyStopping(monitor='val_accuracy', patience=20, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_accuracy', mode='max')
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001)

In [None]:
# Cell 33: Training
model.fit(X_train, y_train, validation_split=0.2, epochs=2000, batch_size=32,
          callbacks=[tb_callback, early_stopping, checkpoint, reduce_lr])

# 5. Tests

In [None]:
# Cell 34: Save model
model.save('actions.h5')

In [None]:
# Cell 35: Load model: In case you have your own
model = load_model('actions.h5')

In [None]:
# Cell 36: Check the structure of the model
model.summary()

In [None]:
# Cell 37: Prediction test
res = model.predict(X_test)

In [None]:
# Cell 38: Prediction test
actions[np.argmax(res[3])]

In [None]:
# Cell 39: Prediction test
actions[np.argmax(y_test[3])]

# 6. Metrics

In [None]:
# Cell 40: New libraries for seeing the preformance of the model
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [None]:
# Cell 41: yhat
yhat = model.predict(X_test)

In [None]:
# Cell 42: ytrue
ytrue = np.argmax(y_test, axis=1).tolist()  
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
# Cell 43: Confusion matrix
multilabel_confusion_matrix(ytrue, yhat)

In [None]:
# Cell 44: Accuracy score
accuracy_score(ytrue, yhat)

# 7. Real Time Test

In [None]:
# Cell 45: Import libraries just for real time testing
import cv2
import numpy as np
import mediapipe as mp
from tensorflow.keras.models import load_model
import pandas as pd

In [None]:
# Cell 46: Set up everything again. In case you just want to make the predictions
data = pd.read_csv('modified_file.csv')
labels = data.iloc[:, 0]
actions = list(set(labels))
print(actions)
label_map = {label:num for num, label in enumerate(actions)}
label_map

In [None]:
# Cell 47: Initialize MediaPipe Holistic and Drawing utilities
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [None]:
# Cell 48: Load the model for action recognition
model_path = 'actions.h5'
model = load_model(model_path)

In [None]:
# Cell 49: Reverse the label map
reverse_label_map = {value: key for key, value in label_map.items()}

In [None]:
# Cell 50: Mediapipe detections
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [None]:
# Cell 51: Styles for landmarks
def draw_styled_landmarks(image, results, incorrect_movement):
    if incorrect_movement:
        color = (0, 0, 255)  # Red for incorrect movement
    else:
        color = (0, 255, 0)  # Green for right movement

    pose_spec = mp_drawing.DrawingSpec(color=color, thickness=2, circle_radius=4)
    hand_spec = mp_drawing.DrawingSpec(color=color, thickness=2, circle_radius=2)

    if results.pose_landmarks:
        mp_drawing.draw_landmarks(
            image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
            pose_spec, pose_spec
        )
    if results.left_hand_landmarks:
        mp_drawing.draw_landmarks(
            image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
            hand_spec, hand_spec
        )
    if results.right_hand_landmarks:
        mp_drawing.draw_landmarks(
            image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
            hand_spec, hand_spec
        )

In [None]:
# Cell 52: Extract keypoints from the holistic model's results
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, lh, rh])

In [None]:
# Cell 53: Real time predictions
sequence = []
sentence = []
threshold = 0.5

cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Error: Frame could not be read.")
            break

        frame = cv2.flip(frame, 1)  

        image, results = mediapipe_detection(frame, holistic)

        if results.pose_landmarks or results.left_hand_landmarks or results.right_hand_landmarks:
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]  # Keep the last 30 sequences

            if len(sequence) == 30:
                res = model.predict(np.expand_dims(sequence, axis=0))[0]
                action = actions[np.argmax(res)]
                incorrect_movement = action.endswith("_W")

                draw_styled_landmarks(image, results, incorrect_movement)

                color = (0, 0, 255) if incorrect_movement else (0, 255, 0)
                cv2.putText(image, f'Action: {action}', (15, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)

        cv2.imshow('OpenCV Feed', image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()


In [None]:
import cv2
import numpy as np
import mediapipe as mp
from tensorflow.keras.models import load_model

# Initialize MediaPipe Holistic and Drawing utilities
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# Load the model for action recognition
model_path = 'actions.h5'
model = load_model(model_path)

# Define action labels and create mapping dictionaries
# Assuming label_map is defined elsewhere in your code
reverse_label_map = {value: key for key, value in label_map.items()}

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def draw_styled_landmarks(image, results):
    # Draw pose, hand, and face landmarks on the image.
    if results.pose_landmarks:
        mp_drawing.draw_landmarks(
            image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
        )
    if results.left_hand_landmarks:
        mp_drawing.draw_landmarks(
            image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
        )
    if results.right_hand_landmarks:
        mp_drawing.draw_landmarks(
            image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
        )

def extract_keypoints(results):
    # Extract keypoints from the holistic model's results
    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, lh, rh])

sequence = []
sentence = []
threshold = 0.5

cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Error: Frame could not be read.")
            break

        # Flip the frame horizontally
        frame = cv2.flip(frame, 1)  # Horizontal flip

        image, results = mediapipe_detection(frame, holistic)
        draw_styled_landmarks(image, results)

        if results.pose_landmarks or results.left_hand_landmarks or results.right_hand_landmarks:
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]  # Keep the last 30 sequences

            if len(sequence) == 30:
                res = model.predict(np.expand_dims(sequence, axis=0))[0]
                action = actions[np.argmax(res)]

                # Determine color based on the action suffix
                if action.endswith("_W"):
                    color = (0, 0, 255)  # Red color
                elif action.endswith("_R"):
                    color = (0, 255, 0)  # Green color
                else:
                    color = (255, 255, 255)  # White color for other actions

                # Display the predicted action on the screen
                cv2.putText(image, f'Action: {action}', (15, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)

        cv2.imshow('OpenCV Feed', image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()
