In [6]:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import cv2

# Mediapipe on Video

In [None]:
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode


model_path = 'pose_landmarker_lite.task'
video_path = "Walking.mp4"

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.IMAGE  
)

landmarker = PoseLandmarker.create_from_options(options)


cap = cv2.VideoCapture(video_path)

fps = cap.get(cv2.CAP_PROP_FPS)
frame_idx = 0


fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter("output_video.mp4", fourcc, fps,
                      (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
                       int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Convert BGR â†’ RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Convert to MediaPipe Image
    mp_image = mp.Image(
        image_format=mp.ImageFormat.SRGB,
        data=rgb_frame
    )

    # Call the landmarker
    result = landmarker.detect(mp_image)


    if result.pose_landmarks:
        for i, landmark in enumerate(result.pose_landmarks[0]):
            h, w, _ = frame.shape
            cx = int(landmark.x * w)
            cy = int(landmark.y * h)
            cv2.circle(frame, (cx, cy), 3, (0,255,0), -1)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    frame_idx += 1
    out.write(frame)

out.release()
# cap.release()
cv2.destroyAllWindows()


I0000 00:00:1763759147.780436 4728761 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M1
W0000 00:00:1763759147.940998 4802425 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1763759147.960704 4802425 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


# Analysis Model

In [None]:
import numpy as np
import pandas as pd
from pandas import Series

def extract_features(file_data, kernel, win, feature):
    data = np.load(file_data)
    T = data.shape[0]

    # Flatten the last two dimensions (33 keypoints * 2 coordinates)
    data = data.reshape(T, 33 * 2)

    #preprocess
    for ch in range(data.shape[1]):
        kps_seq_ch = data[:,ch]
        # print(kps_seq_ch.shape)
        kps_seq_ch=Series(kps_seq_ch).rolling(kernel,min_periods=1,center=True).mean().to_numpy()
        data[:,ch]=kps_seq_ch

    #sliding window parameters
    win_len=int(30*win)
    win_step=int(30*0.5)
    sample_windows=[]
    for start_time in range(0,data.shape[0], win_step):
        end_time=start_time+win_len
        if end_time>data.shape[0]:
            end_time=data.shape[0]
            start_time=end_time-win_len
        frame=data[start_time:end_time]
        assert frame.shape[0]==win_len, (start_time, end_time, data.shape[0])
        sample_windows.append(frame)
    sample_windows=np.array(sample_windows)
    sample_windows.shape

    #extract features from each frame
    N, T, D =sample_windows.shape
    feats=[]
    for i in range(N):
        frame=sample_windows[i]
        feat=[]
        for ch in range(D):
            frame_ch=frame[:,ch]
            if 'mean' in feature:
                mean_ch=np.mean(frame_ch)
                feat.append(mean_ch)
            if 'std' in feature:
                std_ch=np.std(frame_ch)
                feat.append(std_ch)
            if 'min' in feature:
                min_ch=np.min(frame_ch)
                feat.append(min_ch)
            if 'max' in feature:
                max_ch=np.max(frame_ch)
                feat.append(max_ch)
            if 'median' in feature:
                med_ch=np.median(frame_ch)
                feat.append(med_ch)
        feats.append(feat)
    feats=np.array(feats)

    return np.array(feats)

def get_label(file):
    l=file.split('_')
    label=int(l[0][1:])-1
    subject_id=int(l[1][1:])
    return label, subject_id

In [41]:
def prepareData(train_max, val_max, kernel, win, feature):
    X_train_list = []
    y_train_list = []
    X_validation_list = []
    y_validation_list = []
    X_test_list = []
    y_test_list = []

    import os
    all_video_files = os.listdir('pose/')

    for file in all_video_files:
        
        label, subject_id=get_label(file)
        
        features = extract_features(f'pose/{file}',kernel, win, feature) 
        frame_labels = np.full(features.shape[0], label) 

        if subject_id <= train_max:
            X_train_list.append(features)
            y_train_list.append(frame_labels)
        elif subject_id<=val_max:
            X_validation_list.append(features)
            y_validation_list.append(frame_labels)
        else:
            X_test_list.append(features)
            y_test_list.append(frame_labels)


    X_train = np.concatenate(X_train_list, axis=0)
    y_train = np.concatenate(y_train_list, axis=0)
    X_validation = np.concatenate(X_test_list, axis=0)
    y_validation = np.concatenate(y_test_list, axis=0)
    X_test = np.concatenate(X_test_list, axis=0)
    y_test = np.concatenate(y_test_list, axis=0)
    return X_train,y_train,X_validation, y_validation, X_test,y_test

## MLP

In [59]:
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score

best_val_accuracy = 0
best_clf = None
best_params = {}

# Example parameter combinations to check using the Validation set
param_grid = [
    {'hls': (50,), 'alpha': 1e-5},      
    {'hls': (100, 10), 'alpha': 1e-5}, 
    {'hls': (100, 50, 20), 'alpha': 1e-4}, 
]

kernel_size=5
window_size=0.5
train_max=5
val_max=7
feature=['min','max','median','mean','std']
X_train,y_train,X_validation, y_validation, X_test,y_test=prepareData(train_max, val_max,kernel_size, window_size, feature)

s=f'{"="*20}Settings{"="*20}\n'
s+=f'Kernel Size: {kernel_size}\n'
s+=f'Window Size: {window_size}\n'
s+=f'Train: Subject 1-{train_max}\n'
s+=f'Validation: Subject {train_max+1}-{val_max}\n'
s+=f'Test: Subject {val_max+1}-10\n'
s+=f'Feature: {",".join(feature)}\n'
s+=f'{"="*40}\n'

for params in param_grid:
    # 1. Create a Classifier with current parameters
    clf = MLPClassifier(
        solver='lbfgs',
        hidden_layer_sizes=params['hls'],
        alpha=params['alpha'],
        random_state=42 
    )
    
    # 2. Train the model 
    clf.fit(X_train, y_train) 
    
    # 3. Predict and evaluate on val
    y_val_pred = clf.predict(X_validation)
    val_accuracy = accuracy_score(y_validation, y_val_pred)

    s+=f"Config {params['hls']} (alpha={params['alpha']:.1e}): Val Accuracy = {val_accuracy:.4f}\n"
    print(f"Config {params['hls']} (alpha={params['alpha']:.1e}): Val Accuracy = {val_accuracy:.4f}")
    
    # 4. Best Model
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_clf = clf
        best_params = params

s+=f'{"-" * 30}\nTuning complete. Best configuration: {best_params["hls"]} (alpha={best_params["alpha"]:.1e})\nBest Validation Accuracy: {best_val_accuracy:.4f}\n'
print("-" * 30)
print(f"Tuning complete. Best configuration: {best_params['hls']} (alpha={best_params['alpha']:.1e})")
print(f"Best Validation Accuracy: {best_val_accuracy:.4f}")


if best_clf is not None:
    y_test_pred = best_clf.predict(X_test)

    test_accuracy = accuracy_score(y_test, y_test_pred)

    s+=f'{"-" * 30}\nFINAL EVALUATION ON TEST SET (Subjects 8-10):\nTest Accuracy: {test_accuracy:.4f}'
    print("-" * 30)
    print("FINAL EVALUATION ON TEST SET (Subjects 8-10):")
    print(f"Test Accuracy: {test_accuracy:.4f}")
else:
    print("Error: No model was trained successfully.")

with open(f"k{kernel_size}_w{window_size}_train{train_max}_val{val_max}_{'_'.join(feature)}.txt",'w') as f:
    f.write(s)


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)


Config (50,) (alpha=1.0e-05): Val Accuracy = 0.4611


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)


Config (100, 10) (alpha=1.0e-05): Val Accuracy = 0.3149


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)


Config (100, 50, 20) (alpha=1.0e-04): Val Accuracy = 0.3927
------------------------------
Tuning complete. Best configuration: (50,) (alpha=1.0e-05)
Best Validation Accuracy: 0.4611
------------------------------
FINAL EVALUATION ON TEST SET (Subjects 8-10):
Test Accuracy: 0.4611


## Random Forest

In [68]:
from sklearn.ensemble import RandomForestClassifier

best_val_accuracy = 0
best_clf = None
best_params = {}

# Example parameter combinations to check using the Validation set
param_grid = [
    {'n_trees': 100, 'max_d': 10},      
    {'n_trees': 50, 'max_d': 10}, 
    {'n_trees': 100, 'max_d': 15}, 
]

kernel_size=15
window_size=1
train_max=5
val_max=7
feature=['min','max','median','mean','std']

X_train,y_train,X_validation, y_validation, X_test,y_test=prepareData(train_max, val_max,kernel_size, window_size, feature)

s=f'{"="*20}Settings{"="*20}\n'
s+=f'Kernel Size: {kernel_size}\n'
s+=f'Window Size: {window_size}\n'
s+=f'Train: Subject 1-{train_max}\n'
s+=f'Validation: Subject {train_max+1}-{val_max}\n'
s+=f'Test: Subject {val_max+1}-10\n'
s+=f'Feature: {",".join(feature)}\n'
s+=f'{"="*40}\n'

for params in param_grid:
    # 1. Create a Classifier with current parameters
    clf = RandomForestClassifier(n_estimators=params['n_trees'], max_depth=params['max_d'], random_state=42)
    
    # 2. Train the model 
    clf.fit(X_train, y_train) 
    
    # 3. Predict and evaluate on val
    y_val_pred = clf.predict(X_validation)
    val_accuracy = accuracy_score(y_validation, y_val_pred)

    s+=f"Config {params['n_trees']} trees (max_d={params['max_d']}): Val Accuracy = {val_accuracy:.4f}\n"
    print(f"Config {params['n_trees']} (max_d={params['max_d']}): Val Accuracy = {val_accuracy:.4f}")
    
    # 4. Best Model
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_clf = clf
        best_params = params

s+=f'{"-" * 30}\nTuning complete. Best configuration: {best_params["n_trees"]} (max_d={best_params["max_d"]})\nBest Validation Accuracy: {best_val_accuracy:.4f}\n'
print("-" * 30)
print(f"Tuning complete. Best configuration: {best_params['n_trees']} (max_d={best_params['max_d']})")
print(f"Best Validation Accuracy: {best_val_accuracy:.4f}")


if best_clf is not None:
    y_test_pred = best_clf.predict(X_test)

    test_accuracy = accuracy_score(y_test, y_test_pred)

    s+=f'{"-" * 30}\nFINAL EVALUATION ON TEST SET (Subjects 8-10):\nTest Accuracy: {test_accuracy:.4f}'
    print("-" * 30)
    print("FINAL EVALUATION ON TEST SET (Subjects 8-10):")
    print(f"Test Accuracy: {test_accuracy:.4f}")
else:
    print("Error: No model was trained successfully.")

with open(f"RF_k{kernel_size}_w{window_size}_train{train_max}_val{val_max}_{'_'.join(feature)}.txt",'w') as f:
    f.write(s)


Config 100 (max_d=10): Val Accuracy = 0.4740
Config 50 (max_d=10): Val Accuracy = 0.4680
Config 100 (max_d=15): Val Accuracy = 0.4939
------------------------------
Tuning complete. Best configuration: 100 (max_d=15)
Best Validation Accuracy: 0.4939
------------------------------
FINAL EVALUATION ON TEST SET (Subjects 8-10):
Test Accuracy: 0.4939
