In [2]:
import cv2
import mediapipe as mp
import numpy as np
import os
import mediapipe as mp
from sklearn.cluster import KMeans
from scipy.spatial.distance import cdist

N_HAND_LANDMARKS = 21                                       # Using all hand landmarks (https://ai.google.dev/edge/mediapipe/solutions/vision/hand_landmarker)
UPPER_BODY_CONNECTIONS = [                                  # Using upper body landmarks only without hand landmarks (https://ai.google.dev/edge/mediapipe/solutions/vision/pose_landmarker)
    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 
    10, 11, 12, 13, 14, 23, 24
] 
N_POSE_LANDMARKS = len(UPPER_BODY_CONNECTIONS)
N_LANDMARKS = N_POSE_LANDMARKS + N_HAND_LANDMARKS*2         # Total number of landmarks for upper body and two hands (left + right)
K = 20

def mediapipe_detection(image, model):
    """Convert color space and run Mediapipe model."""
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def extract_keypoints(results, visibility_thres=0.5):
    """Extract all keypoints from one video frame."""
    
    # Upper pose
    pose_landmarks = []
    if results.pose_landmarks:
        landmarks = results.pose_landmarks.landmark
        for idx, i in enumerate(UPPER_BODY_CONNECTIONS):
            if i < len(landmarks):
                res = landmarks[i]
                if res.visibility < visibility_thres:         
                    pose_landmarks.append([0.0, 0.0, 0.0])
                else:
                    pose_landmarks.append([res.x, res.y, res.z])
    else:
        pose_landmarks = [[0.0, 0.0, 0.0]] * N_POSE_LANDMARKS
    
    # Left hand
    if results.left_hand_landmarks:
        left_hand_landmarks = [[lm.x, lm.y, lm.z] for lm in results.left_hand_landmarks.landmark]
    else:
        left_hand_landmarks = [[0.0, 0.0, 0.0]] * N_HAND_LANDMARKS

    # Right hand
    if results.right_hand_landmarks:
        right_hand_landmarks = [[lm.x, lm.y, lm.z] for lm in results.right_hand_landmarks.landmark]
    else:
        right_hand_landmarks = [[0.0, 0.0, 0.0]] * N_HAND_LANDMARKS
    
    return pose_landmarks, left_hand_landmarks, right_hand_landmarks


def plot_keypoints(list_landmarks):
    """Plot image with keypoints"""
    
    pass

In [None]:
mp_holistic = mp.solutions.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)

def get_list_frame(source_path):
    """Extract keypoints sequences from one video."""
    
    frames_keypoints = []
    cap = cv2.VideoCapture(source_path)
    while cap.isOpened():
        # read video frame
        success, image = cap.read()

        # skip empty frames
        if not success:
            break
        
        # MediaPipe Holistic processing
        _, results = md.mediapipe_detection(image, mp_holistic)
        pose_landmarks, left_hand_landmarks, right_hand_landmarks = md.extract_keypoints(results)
        
        frames_keypoints.append([left_hand_landmarks, right_hand_landmarks, pose_landmarks])
    cap.release()
    return frames_keypoints
    
def concate_array(left_hand_landmarks, right_hand_landmarks, pose_landmarks):
    """ """
    a1 = np.array(left_hand_landmarks).reshape(-1)
    a2 = np.array(right_hand_landmarks).reshape(-1)
    a3 = np.array(pose_landmarks).reshape(-1)
    result = np.concatenate((a1, a2, a3), axis=None)
    return result

def check_zeros(list_landmarks):
    data = np.array(list_landmarks)
    if np.all(data == 0):
        return True
    return False
    
def write_data(output_dir, source_path, file_name):
    """Write keypoints sequence into numpy file from original video."""
    
    try:
        list_fr = get_list_frame(source_path)
        X = []
        list_idx = []
        for i in range(len(list_fr)):
            if check_zeros(list_fr[i][0]) and check_zeros(list_fr[i][1]):
                continue
            X.append(concate_array(list_fr[i][0], list_fr[i][1], list_fr[i][2]))
            list_idx.append(i)
        if len(X) == 0:
            print("no valid frame to save in: " + source_path)
            return
        
        # filtering frame
        X_new = np.array(X)
        kmeans = KMeans(n_clusters=K)
        kmeans.fit(X_new)
        cluster_centers = kmeans.cluster_centers_
        distances = cdist(X_new, cluster_centers, 'euclidean')
        nearest_indices = np.argmin(distances, axis=0)
        index = np.sort(nearest_indices)
        
        data = []
        for i in index:
            data.append(list_fr[list_idx[i]])
        data_save = np.asarray(data, dtype="object")
        np.save(os.path.join(output_dir, file_name), data_save)
        print("ok write npy from file: " + source_path) 
    except Exception as e:
        print(f"error write: {source_path} with {e}")

I0000 00:00:1761324934.465622  435233 gl_context.cc:369] GL version: 2.1 (2.1 INTEL-20.7.3), renderer: Intel(R) Iris(TM) Plus Graphics 640


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1761324935.522528  435788 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761324935.709793  435788 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761324935.722274  435788 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761324935.724263  435787 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761324935.819915  435790 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761324935.871013  435

In [4]:
def create_directory(path, directory_name):
    # Combine the path and directory name
    directory_path = os.path.join(path, directory_name)

    try:
        # Create the directory
        os.mkdir(directory_path)
        print("Directory created successfully!")
    except FileExistsError:
        print("Directory already exists!")
    except Exception as e:
        print(f"Error: {str(e)}")

In [5]:
input_dir = "../notebooks/dataset/videos"
output_dir = "../notebooks/dataset/keypoints"
os.makedirs(output_dir, exist_ok=True)

import json
METADATA_PATH = "../dataset/metadata_v2.jsonl"

with open(METADATA_PATH, "r", encoding="utf-8") as f:
    i = 0
    for line in f:
        if i == 20:
            break
        data = json.loads(line.strip())
        word = data["word"].strip()
        dir_path = os.path.join(output_dir, str(word))
        os.makedirs(dir_path, exist_ok=True)
        i += 1

In [None]:
# for sub_dir in os.listdir(output_dir):
#     path = os.path.join(output_dir, sub_dir)
#     if os.path.isdir(path):
#         new_path = os.path.join(output_dir, sub_dir)
#         list_file = list_files(path)
#         for i in range(len(list_file)):
#             path_to_file = os.path.join(path, list_file[i])
#             write_data(new_path, path_to_file, str(i) + ".npy")
                
# write_data(output_dir, source_path, file_name)

https://www.kaggle.com/code/nauxqouh/mediapipeprocessing