In [1]:
import cv2
import mediapipe as mp
import numpy as np
import os
import tensorflow as tf
import pickle

mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose
mp_holistic = mp.solutions.holistic
mp_hands = mp.solutions.hands


video_dir = 'videos/'


# with open('processed_files.pkl', 'wb') as f:
#     pickle.dump(processed_files_data, f)

# with open('X_train.pkl', 'wb') as f: 
#     pickle.dump(X_train_data, f)

# with open('y_train.pkl', 'wb') as f:
#     pickle.dump(y_train_data, f)

2024-11-28 18:17:29.145874: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-11-28 18:17:29.146306: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-11-28 18:17:29.149499: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-11-28 18:17:29.190045: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
frame_count = 0

In [4]:
def calculate_angle(a,b,c):
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle >180.0:
        angle = 360-angle
        
    return angle 

In [5]:
def process_pose_landmarks(results, frame_number):
    if results.pose_landmarks is None:
        print(f"Frame {frame_number}: No pose landmarks detected")
        return None

    landmarks = results.pose_landmarks.landmark
    
    left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y] #11
    left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y] #13
    left_wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y] #15

    right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y] #12
    right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y] #14
    right_wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y] #16

    left_hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y] #23
    right_hip = [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y] #24

    if left_hip[0] >= 0 and right_hip[0] >= 0:
        middle_hip = [(left_hip[0] - right_hip[0]) / 2, (left_hip[1] - right_hip[1]) / 2]
    else:
        middle_hip = None
        print("Hip coordinates are negative")

    angle_left_arm = calculate_angle(left_shoulder, left_elbow, left_wrist)
    angle_right_arm = calculate_angle(right_wrist, right_elbow, right_shoulder)
    angle_right_arm_to_hip = calculate_angle(right_elbow, right_shoulder, middle_hip)
    angle_left_arm_to_hip = calculate_angle(middle_hip, left_shoulder, left_elbow)

    frame_results = {
        "Left Arm": round(angle_left_arm, 3),
        "Right Arm": round(angle_right_arm, 3),
        "Right Arm to Hip": round(angle_right_arm_to_hip, 3),
        "Left Arm to Hip": round(angle_left_arm_to_hip, 3)
    }

    print(f"Frame {frame_number+1} pose:", frame_results)
    
    return frame_results


In [6]:
def process_left_hand_landmarks(left_hand_landmarks, frame_number):
    if not left_hand_landmarks:
        print(f"Frame {frame_number}: No left hand landmarks detected")
        return None  

    joint = np.zeros((21, 3))
    for i, landmark in enumerate(results.left_hand_landmarks.landmark):
        joint[i] = [landmark.x, landmark.y, landmark.z]

    v1 = joint[[0, 1, 2, 3, 0, 5, 6, 7, 0, 9, 10, 11, 0, 13, 14, 15, 0, 17, 18, 19], :]
    v2 = joint[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20], :]
    v = v2 - v1

    v = v / np.linalg.norm(v, axis=1)[:, np.newaxis]

    compareL1 = v[[0, 1, 2, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 16, 17], :]
    compareL2 = v[[1, 2, 3, 5, 6, 7, 9, 10, 11, 13, 14, 15, 17, 18, 19], :]

    L_angle = np.arccos(np.einsum('nt,nt->n', compareL1, compareL2))
    L_angle = np.degrees(L_angle)

    frame_results = {"Left Hand": L_angle}
    print(f"Frame {frame_number + 1} left hand:", frame_results)

    return frame_results

In [7]:
def process_right_hand_landmarks(right_hand_landmarks, frame_number):
    if not right_hand_landmarks:
        print(f"Frame {frame_number}: No right hand landmarks detected")
        return None

    joint = np.zeros((21, 3))
    for i, landmark in enumerate(results.right_hand_landmarks.landmark):
        joint[i] = [landmark.x, landmark.y, landmark.z]

    v1 = joint[[0, 1, 2, 3, 0, 5, 6, 7, 0, 9, 10, 11, 0, 13, 14, 15, 0, 17, 18, 19], :]
    v2 = joint[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20], :]
    v = v2 - v1

    v = v / np.linalg.norm(v, axis=1)[:, np.newaxis]

    compareR1 = v[[0, 1, 2, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 16, 17], :]
    compareR2 = v[[1, 2, 3, 5, 6, 7, 9, 10, 11, 13, 14, 15, 17, 18, 19], :]

    R_angle = np.arccos(np.einsum('nt,nt->n', compareR1, compareR2))
    R_angle = np.degrees(R_angle)


    frame_results = {"Right Hand": R_angle}
    print(f"Frame {frame_number + 1} right hand:", frame_results)

    return frame_results


In [8]:
def get_video_frame_properties(video_path):
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print(f"Cannot open video: {video_path}")
        return None

    # Read the first frame
    ret, frame = cap.read()
    height, width, channels = frame.shape
    dtype = frame.dtype  

    cap.release()

    return height, width, channels, dtype

In [10]:
# Extract frames starting at start_frame for frame_count frames
def extract_frames(video_path, start_frame, frame_count, frame_interval):
    cap = cv2.VideoCapture(video_path)
    frames = []
    height, width, channels, dtype = get_video_frame_properties(video_path)
    
    for i in range(frame_count):
        frame_number = start_frame + i * frame_interval
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
        success, image = cap.read()
        if not success:
            print(f"Cannot read frame {frame_number}.")
            break
        
        frames.append(image)
        
    cap.release()
    
    while len(frames) < frame_count:
        frames.append(np.zeros((height, width, channels), dtype=np.uint8))
    
    
    return frames


In [11]:
def extract_continuous_frames(video_path, label, frame_interval=5, frame_count=20): 
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Cannot open video: {video_path}")
        return []

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()

    sequences = []
    
    if total_frames >= frame_count:
        print(f"{video_path} (Label: {label}): Total Frame: {total_frames}")
        for start_frame in range(0, total_frames - frame_count + 1, frame_interval):
            frames = extract_frames(video_path, start_frame, frame_count, frame_interval)
            if len(frames) == frame_count:
                sequences.append(frames)        
            else:
                print(f"Cannot extract {frame_count} continuous frames starting at frame {start_frame} from {video_path}.")   
        print(f"Extracted {len(sequences)} sequences of {frame_count} frames from {video_path}. - moving sample")
    else:
        # If there are not enough frames, extract once and pad
        print(f"{video_path} (Label: {label}): Not enough frames ({total_frames} frames, less than {frame_count}), generating a single sample.")
        frames = extract_frames(video_path, 0, frame_count, 1)
        if len(frames) < frame_count:  # Padding
            frames.extend([None] * (frame_count - len(frames)))
        sequences.append(frames)

    return sequences

In [13]:
# Standardize the shape and structure of each video's data in X_train
def resolve_array(X_train):
    n = len(X_train)  # number_of_videos
    
    new_X_train = []
    
    for i, video_data in enumerate(X_train): # Iterate through each video
        U_X_train = np.zeros((max_frames, 34)) 

        for a in range(max_frames):
            U_X_train[a][:4] = video_data[a][:4] # Copy first 4 values directly
            for j in range(15):
                if isinstance(video_data[a][4], (list, np.ndarray)): # If [a][4] is a list/array
                    if np.all(np.array(video_data[a][4]) == 0): # If all values are 0
                        U_X_train[a][4:19] = 0  
                    else: # If not all values are 0
                        U_X_train[a][4:19] = video_data[a][4][:15] 
                else: # If [a][4] is an integer
                    if video_data[a][4] == 0: # If it is 0
                        U_X_train[a][4:19] = 0 

                if isinstance(video_data[a][5], (list, np.ndarray)): 
                    if np.all(np.array(video_data[a][5]) == 0): 
                        U_X_train[a][19:34] = 0
                    else:
                        U_X_train[a][19:34] = video_data[a][5][:15]
                else:
                    if video_data[a][5] == 0:
                        U_X_train[a][19:34] = 0

        new_X_train.append(U_X_train)
    
    return new_X_train

In [14]:
def pad_sequence(sequence, target_length=20):
    while len(sequence) < target_length:
        sequence.append([
            0, 0, 0, 0,  # Pose
            [0] * 15,    # Left Hand
            [0] * 15     # Right Hand
        ])
    return sequence

In [None]:
import json

json_file = 'WLASL_v0.3.json'

with open(json_file, 'r') as f:
    data = json.load(f)

video_label_map = {}
for item in data:
    gloss = item["gloss"]
    instances = item["instances"]
    for instance in instances:
        video_id = instance["video_id"]
        if video_id not in video_label_map:
            video_label_map[video_id] = gloss
y = []



In [16]:
def safe_load_pickle(file_path, default_value):
    """Attempts to load pickle data. If corrupted, returns default value."""
    try:
        with open(file_path, 'rb') as f:
            return pickle.load(f)
    except (pickle.UnpicklingError, EOFError) as e:
        print(f"Error: {file_path}, {e}")
        return default_value

In [17]:
def safe_load_pickle(file_path, default):
    if os.path.exists(file_path):
        with open(file_path, 'rb') as f:
            return pickle.load(f)
    return default


In [18]:
def save_pickle(file_path, data):
    with open(file_path, 'wb') as f:
        pickle.dump(data, f)

In [19]:
# Data Initialization
processed_files = safe_load_pickle('processed_files.pkl', set())
X_train = safe_load_pickle('X_train.pkl', [])
y = safe_load_pickle('y_train.pkl', [])

In [None]:
# Frames -> Sequences

processed_count = 0
SAVE_INTERVAL = 10

for filename in os.listdir(video_dir):
    if filename in processed_files:
        print(f"{filename} has already been processed.")
        continue

    video_path = os.path.join(video_dir, filename)
    video_id = filename.split('.')[0]
    label = video_label_map.get(video_id)
    frame_sequences = extract_continuous_frames(video_path, label)
    total_frames = int(cv2.VideoCapture(video_path).get(cv2.CAP_PROP_FRAME_COUNT)) # Added for logging purposes
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Cannot open video: {video_path}")
        continue 


    all_sequences = []  # Store the final sequences
    prev_pose_results = None
    prev_left_hand_results = None
    prev_right_hand_results = None
    hand_detected = False  # Variable to track when hands are first detected
    
    
    with mp_holistic.Holistic(min_detection_confidence=0.8, min_tracking_confidence=0.8) as holistic:
        frame_number = 0
        for frames in frame_sequences:
            sequence_angles = []  # List to store data for each sequence
            for image in frames:
                if image is None:
                    print(f"No frame available: {video_path}, Frame number: {frame_number}")
                    continue

                image.flags.writeable = False
                results = holistic.process(image)
                image.flags.writeable = True

                pose_results = process_pose_landmarks(results, frame_number)
                left_hand_results = process_left_hand_landmarks(results.left_hand_landmarks, frame_number)
                right_hand_results = process_right_hand_landmarks(results.right_hand_landmarks, frame_number)

                # Check if hands are detected
                if (left_hand_results or right_hand_results) and not hand_detected:
                    hand_detected = True
                    print(f"Hands first detected: {filename}")

                # Only create sequences after hands are detected
                if hand_detected:
                    if not (pose_results or left_hand_results or right_hand_results):
                        # If pose, left hand, and right hand are all undetected, exclude the frame
                        print("Unable to detect any angles: excluding frame")
                        frame_number += 1
                        continue  

                    # Use previous frame's angles if current frame angles are undetected
                    pose_results = (
                        pose_results
                        if pose_results is not None
                        else prev_pose_results or {"Left Arm": 0, "Right Arm": 0, "Right Arm to Hip": 0, "Left Arm to Hip": 0}
                    )

                    left_hand_results = (
                        left_hand_results
                        if left_hand_results is not None
                        else prev_left_hand_results or {"Left Hand": [0] * 15}
                    )

                    right_hand_results = (
                        right_hand_results
                        if right_hand_results is not None
                        else prev_right_hand_results or {"Right Hand": [0] * 15}
                    )

                    frame_data = [
                        pose_results["Left Arm"], pose_results["Right Arm"],
                        pose_results["Right Arm to Hip"], pose_results["Left Arm to Hip"],
                        left_hand_results["Left Hand"], right_hand_results["Right Hand"]
                    ]

                    sequence_angles.append(frame_data)
                    
                    
                frame_number += 1
                prev_pose_results = pose_results
                prev_left_hand_results = left_hand_results
                prev_right_hand_results = right_hand_results

            # Add sequence if complete; pad if insufficient
            if len(sequence_angles) > 0:
                if len(sequence_angles) < 20:
                    sequence_angles = pad_sequence(sequence_angles)
                all_sequences.append(sequence_angles)                
                

        # Add all sequences to X_train and y
        if len(all_sequences) > 0:
            X_train.extend(all_sequences)  # Add sequences
            y.extend([label] * len(all_sequences))  # Map labels to sequences
            print(f"{filename}: {len(all_sequences)} sequences added.")
        else:
            print(f"Not enough frame data from {filename}.")

    cap.release()

    processed_files.add(filename)
    processed_count += 1

    if processed_count % SAVE_INTERVAL == 0:
        save_pickle('processed_files.pkl', processed_files)
        save_pickle('X_train.pkl', X_train)
        save_pickle('y_train.pkl', y)

# Final data save
save_pickle('processed_files.pkl', processed_files)
save_pickle('X_train.pkl', X_train)
save_pickle('y_train.pkl', y)

cv2.destroyAllWindows()

