#### final code

In [10]:
import cv2
import mediapipe as mp
import torch
import torchvision
import numpy as np
import os
import pandas as pd
from torchvision.transforms import functional as F

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
SELECTED_LANDMARKS = {
    "LEFT_SHOULDER": mp_pose.PoseLandmark.LEFT_SHOULDER,
    "LEFT_ELBOW": mp_pose.PoseLandmark.LEFT_ELBOW,
    "LEFT_WRIST": mp_pose.PoseLandmark.LEFT_WRIST,
    "LEFT_HIP": mp_pose.PoseLandmark.LEFT_HIP,
    "LEFT_KNEE": mp_pose.PoseLandmark.LEFT_KNEE,
    "LEFT_ANKLE": mp_pose.PoseLandmark.LEFT_ANKLE,
}

# Initialize Keypoint R-CNN
model_rcnn = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=True)
model_rcnn.eval()
device = torch.device("cpu")
model_rcnn.to(device)
COCO_KEYPOINTS = {
    "LEFT_SHOULDER": 5,
    "LEFT_ELBOW": 7,
    "LEFT_WRIST": 9,
    "LEFT_HIP": 11,
    "LEFT_KNEE": 13,
    "LEFT_ANKLE": 15,
}

EDGES = [
    ("LEFT_SHOULDER", "LEFT_ELBOW"),
    ("LEFT_ELBOW", "LEFT_WRIST"),
    ("LEFT_SHOULDER", "LEFT_HIP"),
    ("LEFT_HIP", "LEFT_KNEE"),
    ("LEFT_KNEE", "LEFT_ANKLE"),
]

# Default weights (balanced for most keypoints)
KEYPOINT_WEIGHTS_DEFAULT = {
    "LEFT_SHOULDER": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_ELBOW": {"mediapipe": 0.5, "rcnn": 0.5},
    "LEFT_WRIST": {"mediapipe": 0.3, "rcnn": 0.7},
    "LEFT_HIP": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_KNEE": {"mediapipe": 0.5, "rcnn": 0.5},  # Balanced weights by default
    "LEFT_ANKLE": {"mediapipe": 0.3, "rcnn": 0.7},
}

# Weights for cycling posture (favor R-CNN for knee)
KEYPOINT_WEIGHTS_CYCLING = {
    "LEFT_SHOULDER": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_ELBOW": {"mediapipe": 0.5, "rcnn": 0.5},
    "LEFT_WRIST": {"mediapipe": 0.3, "rcnn": 0.7},
    "LEFT_HIP": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_KNEE": {"mediapipe": 0.3, "rcnn": 0.7},  # Favor R-CNN for knee in cycling
    "LEFT_ANKLE": {"mediapipe": 0.3, "rcnn": 0.7},
}

def get_mediapipe_keypoints(image_path):
    image = cv2.imread(image_path)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    normalized_keypoints = {}
    
    with mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5) as pose:
        results = pose.process(image_rgb)
        if results.pose_landmarks:
            height, width, _ = image.shape
            for name, landmark_enum in SELECTED_LANDMARKS.items():
                landmark = results.pose_landmarks.landmark[landmark_enum]
                normalized_keypoints[name] = (landmark.x, landmark.y, landmark.visibility)
    return normalized_keypoints

def get_rcnn_keypoints(image_path):
    image = cv2.imread(image_path)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image_tensor = F.to_tensor(image_rgb).unsqueeze(0).to(device)
    
    with torch.no_grad():
        outputs = model_rcnn(image_tensor)
    keypoints = outputs[0]["keypoints"].cpu().numpy()
    scores = outputs[0]["scores"].cpu().numpy()
    keypoints = keypoints[scores > 0.5]
    
    if keypoints.shape[0] == 0:
        return {}
    
    height, width = image.shape[:2]
    selected_keypoints = {}
    for name, index in COCO_KEYPOINTS.items():
        x, y, conf = keypoints[0][index]
        x_normalized = x / width
        y_normalized = y / height
        selected_keypoints[name] = (x_normalized, y_normalized, conf)
    return selected_keypoints

def compute_distance(p1, p2):
    return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

def compute_angle(p1, p2, p3):
    v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]])
    v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]])
    cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-6)
    cos_angle = np.clip(cos_angle, -1.0, 1.0)
    return np.degrees(np.arccos(cos_angle))

def is_cycling_pose(keypoints):
    """Determine if the pose is likely a cycling posture."""
    if ("LEFT_SHOULDER" in keypoints and "LEFT_HIP" in keypoints and 
        "LEFT_KNEE" in keypoints and "LEFT_ANKLE" in keypoints):
        shoulder_y = keypoints["LEFT_SHOULDER"][1]
        hip_y = keypoints["LEFT_HIP"][1]
        knee_y = keypoints["LEFT_KNEE"][1]
        ankle_y = keypoints["LEFT_ANKLE"][1]
        shoulder_conf = keypoints["LEFT_SHOULDER"][2]
        hip_conf = keypoints["LEFT_HIP"][2]
        knee_conf = keypoints["LEFT_KNEE"][2]
        ankle_conf = keypoints["LEFT_ANKLE"][2]
        
        if shoulder_conf > 0.6 and hip_conf > 0.6 and knee_conf > 0.6 and ankle_conf > 0.6:
            # Check if torso is leaning forward (shoulder above hip but not too far)
            torso_lean = abs(shoulder_y - hip_y)
            # Check knee position relative to hip and ankle
            knee_angle = compute_angle(
                (keypoints["LEFT_HIP"][0], keypoints["LEFT_HIP"][1]),
                (keypoints["LEFT_KNEE"][0], keypoints["LEFT_KNEE"][1]),
                (keypoints["LEFT_ANKLE"][0], keypoints["LEFT_ANKLE"][1])
            )
            # Cycling posture: torso leaning forward, knee bent (angle typically 30°-150°)
            if torso_lean < 0.3 and 30 < knee_angle < 150:
                return True
    return False

def check_anatomical_violation(key, x, y, keypoints):
    penalty = 0.0
    
    if key == "LEFT_WRIST" and "LEFT_SHOULDER" in keypoints and keypoints["LEFT_SHOULDER"][2] > 0.6:
        shoulder_y = keypoints["LEFT_SHOULDER"][1]
        if y < shoulder_y - 0.05:
            penalty += 0.3
    
    if key == "LEFT_KNEE" and "LEFT_HIP" in keypoints and "LEFT_ANKLE" in keypoints:
        hip_y = keypoints["LEFT_HIP"][1]
        ankle_y = keypoints["LEFT_ANKLE"][1]
        hip_conf = keypoints["LEFT_HIP"][2]
        ankle_conf = keypoints["LEFT_ANKLE"][2]
        
        # Basic constraint: knee should not be above hip (relaxed for cycling)
        if y < hip_y - 0.05 and hip_conf > 0.6:
            penalty += 0.2  # Reduced penalty to allow for bent poses
        
        # Soft angle constraint: penalize extreme angles but don't enforce strictly
        if hip_conf > 0.6 and ankle_conf > 0.6:
            hip_x, hip_y, _ = keypoints["LEFT_HIP"]
            ankle_x, ankle_y, _ = keypoints["LEFT_ANKLE"]
            angle = compute_angle((hip_x, hip_y), (x, y), (ankle_x, ankle_y))
            if angle < 20 or angle > 170:  # Very extreme angles are unlikely
                penalty += 0.2
                print(f"Knee angle warning: {angle:.2f} degrees (extreme angle detected).")
    
    if key == "LEFT_ANKLE" and "LEFT_KNEE" in keypoints and keypoints["LEFT_KNEE"][2] > 0.6:
        knee_y = keypoints["LEFT_KNEE"][1]
        if y < knee_y - 0.05:
            penalty += 0.3
    
    if key == "LEFT_SHOULDER" and "LEFT_HIP" in keypoints and keypoints["LEFT_HIP"][2] > 0.6:
        hip_y = keypoints["LEFT_HIP"][1]
        if y > hip_y + 0.05:
            penalty += 0.3
    
    return penalty

def evaluate_keypoint_consistency(key, keypoints, other_keypoints, edges):
    x, y, conf = keypoints[key]
    if conf <= 0.5:  # Stricter threshold for consistency evaluation
        return 0.0
    
    neighbors = []
    for k1, k2 in edges:
        if k1 == key and k2 in keypoints and keypoints[k2][2] > 0.5:
            neighbors.append(k2)
        elif k2 == key and k1 in keypoints and keypoints[k1][2] > 0.5:
            neighbors.append(k1)
    
    if not neighbors:
        return conf * (1.0 - check_anatomical_violation(key, x, y, keypoints))
    
    consistency_score = 0.0
    count = 0
    for neighbor in neighbors:
        if neighbor in other_keypoints and other_keypoints[neighbor][2] > 0.5:
            my_x, my_y, _ = keypoints[key]
            my_neighbor_x, my_neighbor_y, _ = keypoints[neighbor]
            other_x, other_y, _ = other_keypoints[key]
            other_neighbor_x, other_neighbor_y, _ = other_keypoints[neighbor]
            
            my_dx = my_x - my_neighbor_x
            my_dy = my_y - my_neighbor_y
            other_dx = other_x - other_neighbor_x
            other_dy = other_y - other_neighbor_y
            
            my_dist = compute_distance((my_x, my_y), (my_neighbor_x, my_neighbor_y)) + 1e-6
            other_dist = compute_distance((other_x, other_y), (other_neighbor_x, other_neighbor_y)) + 1e-6
            my_dx_norm = my_dx / my_dist
            my_dy_norm = my_dy / my_dist
            other_dx_norm = other_dx / other_dist
            other_dy_norm = other_dy / other_dist
            
            similarity = 1.0 - 0.5 * (abs(my_dx_norm - other_dx_norm) + abs(my_dy_norm - other_dy_norm))
            consistency_score += similarity
            count += 1
    
    if count > 0:
        consistency_score /= count
        anatomical_penalty = check_anatomical_violation(key, x, y, keypoints)
        consistency_score *= (1.0 - anatomical_penalty)
        return 0.3 * consistency_score + 0.7 * conf
    return conf * (1.0 - check_anatomical_violation(key, x, y, keypoints))

def fuse_keypoints_selective(mediapipe_kps, rcnn_kps, edges):
    # Determine if the pose is cycling-like (based on MediaPipe keypoints for simplicity)
    is_cycling = is_cycling_pose(mediapipe_kps)
    KEYPOINT_WEIGHTS = KEYPOINT_WEIGHTS_CYCLING if is_cycling else KEYPOINT_WEIGHTS_DEFAULT
    print(f"Pose classified as {'cycling' if is_cycling else 'non-cycling'}. Using weights: {KEYPOINT_WEIGHTS}")
    
    fused_keypoints = {}
    
    for key in SELECTED_LANDMARKS.keys():
        mp_kp = mediapipe_kps.get(key, (0.0, 0.0, 0.0))
        rcnn_kp = rcnn_kps.get(key, (0.0, 0.0, 0.0))
        
        mp_x, mp_y, mp_conf = mp_kp
        rcnn_x, rcnn_y, rcnn_conf = rcnn_kp
        
        mp_consistency = evaluate_keypoint_consistency(key, mediapipe_kps, rcnn_kps, edges) if mp_conf > 0 else 0.0
        rcnn_consistency = evaluate_keypoint_consistency(key, rcnn_kps, mediapipe_kps, edges) if rcnn_conf > 0 else 0.0
        
        print(f"{key} - MediaPipe Consistency: {mp_consistency:.2f}, R-CNN Consistency: {rcnn_consistency:.2f}")
        
        mp_weight = KEYPOINT_WEIGHTS[key]["mediapipe"]
        rcnn_weight = KEYPOINT_WEIGHTS[key]["rcnn"]
        mp_adjusted = mp_consistency * mp_weight
        rcnn_adjusted = rcnn_consistency * rcnn_weight
        
        if mp_adjusted > rcnn_adjusted + 0.1:
            fused_x, fused_y, fused_conf = mp_x, mp_y, mp_conf
            print(f"{key}: Using MediaPipe prediction.")
        elif rcnn_adjusted > mp_adjusted + 0.1:
            fused_x, fused_y, fused_conf = rcnn_x, rcnn_y, rcnn_conf
            print(f"{key}: Using R-CNN prediction.")
        elif mp_conf > 0 and rcnn_conf > 0:
            total_weight = mp_weight + rcnn_weight
            mp_fusion_weight = mp_weight / total_weight
            rcnn_fusion_weight = rcnn_weight / total_weight
            fused_x = mp_fusion_weight * mp_x + rcnn_fusion_weight * rcnn_x
            fused_y = mp_fusion_weight * mp_y + rcnn_fusion_weight * rcnn_y
            fused_conf = max(mp_conf, rcnn_conf)
            print(f"{key}: Using weighted averaging with weights (MP: {mp_fusion_weight:.2f}, R-CNN: {rcnn_fusion_weight:.2f}).")
        elif mp_conf > 0:
            fused_x, fused_y, fused_conf = mp_x, mp_y, mp_conf
            print(f"{key}: Using MediaPipe prediction (R-CNN unavailable).")
        elif rcnn_conf > 0:
            fused_x, fused_y, fused_conf = rcnn_x, rcnn_y, rcnn_conf
            print(f"{key}: Using R-CNN prediction (MediaPipe unavailable).")
        else:
            fused_x, fused_y, fused_conf = 0.0, 0.0, 0.0
            print(f"{key}: No reliable prediction available.")
        
        fused_keypoints[key] = (fused_x, fused_y, fused_conf)
    
    return fused_keypoints

def fallback_and_correct(fused_kps, mediapipe_kps, rcnn_kps, edges):
    adjusted_kps = fused_kps.copy()
    
    # Fallback for low-confidence keypoints
    for key in adjusted_kps.keys():
        x, y, conf = adjusted_kps[key]
        if conf <= 0.5:  # Stricter threshold for fallback
            neighbors = []
            for k1, k2 in edges:
                if k1 == key and k2 in adjusted_kps and adjusted_kps[k2][2] > 0.5:
                    neighbors.append((k2, adjusted_kps[k2]))
                elif k2 == key and k1 in adjusted_kps and adjusted_kps[k1][2] > 0.5:
                    neighbors.append((k1, adjusted_kps[k1]))
            
            if neighbors:
                total_conf = sum(n_conf for _, (_, _, n_conf) in neighbors)
                if total_conf > 0:
                    avg_x = sum(nx * n_conf for _, (nx, _, n_conf) in neighbors) / total_conf
                    avg_y = sum(ny * n_conf for _, (_, ny, n_conf) in neighbors) / total_conf
                    adjusted_kps[key] = (avg_x, avg_y, 0.5)
                    print(f"{key}: Estimated position using neighbors.")
    
    # Softer anatomical corrections
    if "LEFT_SHOULDER" in adjusted_kps and "LEFT_HIP" in adjusted_kps:
        shoulder_y = adjusted_kps["LEFT_SHOULDER"][1]
        hip_y = adjusted_kps["LEFT_HIP"][1]
        shoulder_conf = adjusted_kps["LEFT_SHOULDER"][2]
        hip_conf = adjusted_kps["LEFT_HIP"][2]
        if shoulder_y > hip_y + 0.05 and shoulder_conf > 0.6 and hip_conf > 0.6:
            adjusted_kps["LEFT_SHOULDER"] = (adjusted_kps["LEFT_SHOULDER"][0], hip_y - 0.05, shoulder_conf * 0.9)
            print("Corrected LEFT_SHOULDER position (was below hip).")
    
    if "LEFT_SHOULDER" in adjusted_kps and "LEFT_WRIST" in adjusted_kps:
        shoulder_y = adjusted_kps["LEFT_SHOULDER"][1]
        wrist_y = adjusted_kps["LEFT_WRIST"][1]
        wrist_conf = adjusted_kps["LEFT_WRIST"][2]
        if wrist_y < shoulder_y - 0.05 and wrist_conf > 0.6:
            adjusted_kps["LEFT_WRIST"] = (adjusted_kps["LEFT_WRIST"][0], shoulder_y, wrist_conf * 0.8)
            print("Corrected LEFT_WRIST position (was above shoulder).")
    
    if "LEFT_HIP" in adjusted_kps and "LEFT_KNEE" in adjusted_kps:
        hip_y = adjusted_kps["LEFT_HIP"][1]
        knee_y = adjusted_kps["LEFT_KNEE"][1]
        knee_conf = adjusted_kps["LEFT_KNEE"][2]
        if knee_y < hip_y - 0.05 and knee_conf > 0.6:
            # Blend the original position with the corrected position
            corrected_y = hip_y - 0.02
            blended_y = 0.7 * knee_y + 0.3 * corrected_y  # Soft correction
            adjusted_kps["LEFT_KNEE"] = (adjusted_kps["LEFT_KNEE"][0], blended_y, knee_conf * 0.9)
            print("Soft-corrected LEFT_KNEE position (was too far above hip).")
    
    if "LEFT_KNEE" in adjusted_kps and "LEFT_ANKLE" in adjusted_kps:
        knee_y = adjusted_kps["LEFT_KNEE"][1]
        ankle_y = adjusted_kps["LEFT_ANKLE"][1]
        ankle_conf = adjusted_kps["LEFT_ANKLE"][2]
        if ankle_y < knee_y - 0.05 and ankle_conf > 0.6:
            corrected_y = knee_y + 0.02
            blended_y = 0.7 * ankle_y + 0.3 * corrected_y
            adjusted_kps["LEFT_ANKLE"] = (adjusted_kps["LEFT_ANKLE"][0], blended_y, ankle_conf * 0.8)
            print("Soft-corrected LEFT_ANKLE position (was too far above knee).")
    
    return adjusted_kps

def draw_fused_keypoints(image, keypoints):
    height, width = image.shape[:2]
    for name, (x_norm, y_norm, conf) in keypoints.items():
        if conf > 0.5:  # Relaxed threshold for visualization
            x_pixel, y_pixel = int(x_norm * width), int(y_norm * height)
            cv2.circle(image, (x_pixel, y_pixel), 5, (255, 0, 0), -1)
            coord_text = f"{name}: ({x_norm:.5f}, {y_norm:.5f})"
            cv2.putText(image, coord_text, (x_pixel + 10, y_pixel - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
    return image

def process_dataset(input_folder, output_folder, excel_path):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    data = []
    image_files = [f for f in os.listdir(input_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
    
    for image_file in image_files:
        print(f"\nProcessing {image_file}...")
        image_path = os.path.join(input_folder, image_file)
        
        try:
            image_no = int(os.path.splitext(image_file)[0])
        except ValueError:
            print(f"Could not extract image number from {image_file}, skipping...")
            continue
        
        image = cv2.imread(image_path)
        if image is None:
            print(f"Failed to load image: {image_path}")
            continue
        
        mediapipe_kps = get_mediapipe_keypoints(image_path)
        rcnn_kps = get_rcnn_keypoints(image_path)
        
        print("\nMediaPipe Keypoints:")
        for name, (x, y, conf) in mediapipe_kps.items():
            print(f"{name}: (x={x:.5f}, y={y:.5f}, confidence={conf:.2f})")
        print("\nR-CNN Keypoints:")
        for name, (x, y, conf) in rcnn_kps.items():
            print(f"{name}: (x={x:.5f}, y={y:.5f}, confidence={conf:.2f})")
        
        fused_keypoints = fuse_keypoints_selective(mediapipe_kps, rcnn_kps, EDGES)
        fused_keypoints = fallback_and_correct(fused_keypoints, mediapipe_kps, rcnn_kps, EDGES)
        
        print("\nFused Keypoints:")
        for name, (x, y, conf) in fused_keypoints.items():
            print(f"{name}: (x={x:.5f}, y={y:.5f}, confidence={conf:.2f})")
        
        row = {
            "image_no": image_no,
            "left_shoulder_x": round(fused_keypoints["LEFT_SHOULDER"][0], 5),
            "left_shoulder_y": round(fused_keypoints["LEFT_SHOULDER"][1], 5),
            "left_elbow_x": round(fused_keypoints["LEFT_ELBOW"][0], 5),
            "left_elbow_y": round(fused_keypoints["LEFT_ELBOW"][1], 5),
            "left_wrist_x": round(fused_keypoints["LEFT_WRIST"][0], 5),
            "left_wrist_y": round(fused_keypoints["LEFT_WRIST"][1], 5),
            "left_hip_x": round(fused_keypoints["LEFT_HIP"][0], 5),
            "left_hip_y": round(fused_keypoints["LEFT_HIP"][1], 5),
            "left_knee_x": round(fused_keypoints["LEFT_KNEE"][0], 5),
            "left_knee_y": round(fused_keypoints["LEFT_KNEE"][1], 5),
            "left_ankle_x": round(fused_keypoints["LEFT_ANKLE"][0], 5),
            "left_ankle_y": round(fused_keypoints["LEFT_ANKLE"][1], 5),
        }
        data.append(row)
        
        output_image = draw_fused_keypoints(image.copy(), fused_keypoints)
        output_path = os.path.join(output_folder, image_file)
        cv2.imwrite(output_path, output_image)
        print(f"Saved annotated image to {output_path}")
    
    df = pd.DataFrame(data)
    df = df.sort_values(by="image_no")
    df.to_excel(excel_path, index=False)
    print(f"Saved keypoint coordinates to {excel_path}")

# Define input and output folders, and Excel file path
input_folder = r"C:\PJT2\Orbbec_Dataset_New\Orbbec_Dataset\Dataset\checkdataset"
output_folder = r"C:\PJT2\Orbbec_Dataset_New\Orbbec_Dataset\Dataset\checkdataset_final_output_grok3"
excel_path = r"C:\PJT2\Orbbec_Dataset_New\Orbbec_Dataset\Dataset\checkdataset_keypoint_coordinates_.xlsx"

# Process the dataset and generate Excel file
process_dataset(input_folder, output_folder, excel_path)


Processing 1.png...

MediaPipe Keypoints:
LEFT_SHOULDER: (x=0.47125, y=0.27856, confidence=1.00)
LEFT_ELBOW: (x=0.46738, y=0.38139, confidence=0.96)
LEFT_WRIST: (x=0.41266, y=0.36985, confidence=0.98)
LEFT_HIP: (x=0.56324, y=0.37135, confidence=1.00)
LEFT_KNEE: (x=0.48337, y=0.43926, confidence=0.98)
LEFT_ANKLE: (x=0.54677, y=0.54341, confidence=0.95)

R-CNN Keypoints:
LEFT_SHOULDER: (x=0.47109, y=0.27152, confidence=1.00)
LEFT_ELBOW: (x=0.46510, y=0.38354, confidence=1.00)
LEFT_WRIST: (x=0.41121, y=0.36487, confidence=1.00)
LEFT_HIP: (x=0.56167, y=0.36487, confidence=1.00)
LEFT_KNEE: (x=0.48307, y=0.43688, confidence=1.00)
LEFT_ANKLE: (x=0.54819, y=0.54357, confidence=1.00)
Pose classified as cycling. Using weights: {'LEFT_SHOULDER': {'mediapipe': 0.4, 'rcnn': 0.6}, 'LEFT_ELBOW': {'mediapipe': 0.5, 'rcnn': 0.5}, 'LEFT_WRIST': {'mediapipe': 0.3, 'rcnn': 0.7}, 'LEFT_HIP': {'mediapipe': 0.4, 'rcnn': 0.6}, 'LEFT_KNEE': {'mediapipe': 0.3, 'rcnn': 0.7}, 'LEFT_ANKLE': {'mediapipe': 0.3, 'rc

#### same final code for video frames

In [2]:
import cv2
import mediapipe as mp
import torch
import torchvision
import numpy as np
import os
import pandas as pd
from torchvision.transforms import functional as F

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
SELECTED_LANDMARKS = {
    "LEFT_SHOULDER": mp_pose.PoseLandmark.LEFT_SHOULDER,
    "LEFT_ELBOW": mp_pose.PoseLandmark.LEFT_ELBOW,
    "LEFT_WRIST": mp_pose.PoseLandmark.LEFT_WRIST,
    "LEFT_HIP": mp_pose.PoseLandmark.LEFT_HIP,
    "LEFT_KNEE": mp_pose.PoseLandmark.LEFT_KNEE,
    "LEFT_ANKLE": mp_pose.PoseLandmark.LEFT_ANKLE,
}

# Initialize Keypoint R-CNN
model_rcnn = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=True)
model_rcnn.eval()
device = torch.device("cpu")
model_rcnn.to(device)
COCO_KEYPOINTS = {
    "LEFT_SHOULDER": 5,
    "LEFT_ELBOW": 7,
    "LEFT_WRIST": 9,
    "LEFT_HIP": 11,
    "LEFT_KNEE": 13,
    "LEFT_ANKLE": 15,
}

EDGES = [
    ("LEFT_SHOULDER", "LEFT_ELBOW"),
    ("LEFT_ELBOW", "LEFT_WRIST"),
    ("LEFT_SHOULDER", "LEFT_HIP"),
    ("LEFT_HIP", "LEFT_KNEE"),
    ("LEFT_KNEE", "LEFT_ANKLE"),
]

# Default weights (balanced for most keypoints)
KEYPOINT_WEIGHTS_DEFAULT = {
    "LEFT_SHOULDER": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_ELBOW": {"mediapipe": 0.5, "rcnn": 0.5},
    "LEFT_WRIST": {"mediapipe": 0.3, "rcnn": 0.7},
    "LEFT_HIP": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_KNEE": {"mediapipe": 0.5, "rcnn": 0.5},  # Balanced weights by default
    "LEFT_ANKLE": {"mediapipe": 0.3, "rcnn": 0.7},
}

# Weights for cycling posture (favor R-CNN for knee)
KEYPOINT_WEIGHTS_CYCLING = {
    "LEFT_SHOULDER": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_ELBOW": {"mediapipe": 0.5, "rcnn": 0.5},
    "LEFT_WRIST": {"mediapipe": 0.3, "rcnn": 0.7},
    "LEFT_HIP": {"mediapipe": 0.4, "rcnn": 0.6},
    "LEFT_KNEE": {"mediapipe": 0.3, "rcnn": 0.7},  # Favor R-CNN for knee in cycling
    "LEFT_ANKLE": {"mediapipe": 0.3, "rcnn": 0.7},
}

def get_mediapipe_keypoints(image_path):
    image = cv2.imread(image_path)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    normalized_keypoints = {}
    
    with mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5) as pose:
        results = pose.process(image_rgb)
        if results.pose_landmarks:
            height, width, _ = image.shape
            for name, landmark_enum in SELECTED_LANDMARKS.items():
                landmark = results.pose_landmarks.landmark[landmark_enum]
                normalized_keypoints[name] = (landmark.x, landmark.y, landmark.visibility)
    return normalized_keypoints

def get_rcnn_keypoints(image_path):
    image = cv2.imread(image_path)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image_tensor = F.to_tensor(image_rgb).unsqueeze(0).to(device)
    
    with torch.no_grad():
        outputs = model_rcnn(image_tensor)
    keypoints = outputs[0]["keypoints"].cpu().numpy()
    scores = outputs[0]["scores"].cpu().numpy()
    keypoints = keypoints[scores > 0.5]
    
    if keypoints.shape[0] == 0:
        return {}
    
    height, width = image.shape[:2]
    selected_keypoints = {}
    for name, index in COCO_KEYPOINTS.items():
        x, y, conf = keypoints[0][index]
        x_normalized = x / width
        y_normalized = y / height
        selected_keypoints[name] = (x_normalized, y_normalized, conf)
    return selected_keypoints

def compute_distance(p1, p2):
    return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

def compute_angle(p1, p2, p3):
    v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]])
    v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]])
    cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-6)
    cos_angle = np.clip(cos_angle, -1.0, 1.0)
    return np.degrees(np.arccos(cos_angle))

def is_cycling_pose(keypoints):
    """Determine if the pose is likely a cycling posture."""
    if ("LEFT_SHOULDER" in keypoints and "LEFT_HIP" in keypoints and 
        "LEFT_KNEE" in keypoints and "LEFT_ANKLE" in keypoints):
        shoulder_y = keypoints["LEFT_SHOULDER"][1]
        hip_y = keypoints["LEFT_HIP"][1]
        knee_y = keypoints["LEFT_KNEE"][1]
        ankle_y = keypoints["LEFT_ANKLE"][1]
        shoulder_conf = keypoints["LEFT_SHOULDER"][2]
        hip_conf = keypoints["LEFT_HIP"][2]
        knee_conf = keypoints["LEFT_KNEE"][2]
        ankle_conf = keypoints["LEFT_ANKLE"][2]
        
        if shoulder_conf > 0.6 and hip_conf > 0.6 and knee_conf > 0.6 and ankle_conf > 0.6:
            # Check if torso is leaning forward (shoulder above hip but not too far)
            torso_lean = abs(shoulder_y - hip_y)
            # Check knee position relative to hip and ankle
            knee_angle = compute_angle(
                (keypoints["LEFT_HIP"][0], keypoints["LEFT_HIP"][1]),
                (keypoints["LEFT_KNEE"][0], keypoints["LEFT_KNEE"][1]),
                (keypoints["LEFT_ANKLE"][0], keypoints["LEFT_ANKLE"][1])
            )
            # Cycling posture: torso leaning forward, knee bent (angle typically 30°-150°)
            if torso_lean < 0.3 and 30 < knee_angle < 150:
                return True
    return False

def check_anatomical_violation(key, x, y, keypoints):
    penalty = 0.0
    
    if key == "LEFT_WRIST" and "LEFT_SHOULDER" in keypoints and keypoints["LEFT_SHOULDER"][2] > 0.6:
        shoulder_y = keypoints["LEFT_SHOULDER"][1]
        if y < shoulder_y - 0.05:
            penalty += 0.3
    
    if key == "LEFT_KNEE" and "LEFT_HIP" in keypoints and "LEFT_ANKLE" in keypoints:
        hip_y = keypoints["LEFT_HIP"][1]
        ankle_y = keypoints["LEFT_ANKLE"][1]
        hip_conf = keypoints["LEFT_HIP"][2]
        ankle_conf = keypoints["LEFT_ANKLE"][2]
        
        # Basic constraint: knee should not be above hip (relaxed for cycling)
        if y < hip_y - 0.05 and hip_conf > 0.6:
            penalty += 0.2  # Reduced penalty to allow for bent poses
        
        # Soft angle constraint: penalize extreme angles but don't enforce strictly
        if hip_conf > 0.6 and ankle_conf > 0.6:
            hip_x, hip_y, _ = keypoints["LEFT_HIP"]
            ankle_x, ankle_y, _ = keypoints["LEFT_ANKLE"]
            angle = compute_angle((hip_x, hip_y), (x, y), (ankle_x, ankle_y))
            if angle < 20 or angle > 170:  # Very extreme angles are unlikely
                penalty += 0.2
                print(f"Knee angle warning: {angle:.2f} degrees (extreme angle detected).")
    
    if key == "LEFT_ANKLE" and "LEFT_KNEE" in keypoints and keypoints["LEFT_KNEE"][2] > 0.6:
        knee_y = keypoints["LEFT_KNEE"][1]
        if y < knee_y - 0.05:
            penalty += 0.3
    
    if key == "LEFT_SHOULDER" and "LEFT_HIP" in keypoints and keypoints["LEFT_HIP"][2] > 0.6:
        hip_y = keypoints["LEFT_HIP"][1]
        if y > hip_y + 0.05:
            penalty += 0.3
    
    return penalty

def evaluate_keypoint_consistency(key, keypoints, other_keypoints, edges):
    x, y, conf = keypoints[key]
    if conf <= 0.5:  # Stricter threshold for consistency evaluation
        return 0.0
    
    neighbors = []
    for k1, k2 in edges:
        if k1 == key and k2 in keypoints and keypoints[k2][2] > 0.5:
            neighbors.append(k2)
        elif k2 == key and k1 in keypoints and keypoints[k1][2] > 0.5:
            neighbors.append(k1)
    
    if not neighbors:
        return conf * (1.0 - check_anatomical_violation(key, x, y, keypoints))
    
    consistency_score = 0.0
    count = 0
    for neighbor in neighbors:
        if neighbor in other_keypoints and other_keypoints[neighbor][2] > 0.5:
            my_x, my_y, _ = keypoints[key]
            my_neighbor_x, my_neighbor_y, _ = keypoints[neighbor]
            other_x, other_y, _ = other_keypoints[key]
            other_neighbor_x, other_neighbor_y, _ = other_keypoints[neighbor]
            
            my_dx = my_x - my_neighbor_x
            my_dy = my_y - my_neighbor_y
            other_dx = other_x - other_neighbor_x
            other_dy = other_y - other_neighbor_y
            
            my_dist = compute_distance((my_x, my_y), (my_neighbor_x, my_neighbor_y)) + 1e-6
            other_dist = compute_distance((other_x, other_y), (other_neighbor_x, other_neighbor_y)) + 1e-6
            my_dx_norm = my_dx / my_dist
            my_dy_norm = my_dy / my_dist
            other_dx_norm = other_dx / other_dist
            other_dy_norm = other_dy / other_dist
            
            similarity = 1.0 - 0.5 * (abs(my_dx_norm - other_dx_norm) + abs(my_dy_norm - other_dy_norm))
            consistency_score += similarity
            count += 1
    
    if count > 0:
        consistency_score /= count
        anatomical_penalty = check_anatomical_violation(key, x, y, keypoints)
        consistency_score *= (1.0 - anatomical_penalty)
        return 0.3 * consistency_score + 0.7 * conf
    return conf * (1.0 - check_anatomical_violation(key, x, y, keypoints))

def fuse_keypoints_selective(mediapipe_kps, rcnn_kps, edges):
    # Determine if the pose is cycling-like (based on MediaPipe keypoints for simplicity)
    is_cycling = is_cycling_pose(mediapipe_kps)
    KEYPOINT_WEIGHTS = KEYPOINT_WEIGHTS_CYCLING if is_cycling else KEYPOINT_WEIGHTS_DEFAULT
    print(f"Pose classified as {'cycling' if is_cycling else 'non-cycling'}. Using weights: {KEYPOINT_WEIGHTS}")
    
    fused_keypoints = {}
    
    for key in SELECTED_LANDMARKS.keys():
        mp_kp = mediapipe_kps.get(key, (0.0, 0.0, 0.0))
        rcnn_kp = rcnn_kps.get(key, (0.0, 0.0, 0.0))
        
        mp_x, mp_y, mp_conf = mp_kp
        rcnn_x, rcnn_y, rcnn_conf = rcnn_kp
        
        mp_consistency = evaluate_keypoint_consistency(key, mediapipe_kps, rcnn_kps, edges) if mp_conf > 0 else 0.0
        rcnn_consistency = evaluate_keypoint_consistency(key, rcnn_kps, mediapipe_kps, edges) if rcnn_conf > 0 else 0.0
        
        print(f"{key} - MediaPipe Consistency: {mp_consistency:.2f}, R-CNN Consistency: {rcnn_consistency:.2f}")
        
        mp_weight = KEYPOINT_WEIGHTS[key]["mediapipe"]
        rcnn_weight = KEYPOINT_WEIGHTS[key]["rcnn"]
        mp_adjusted = mp_consistency * mp_weight
        rcnn_adjusted = rcnn_consistency * rcnn_weight
        
        if mp_adjusted > rcnn_adjusted + 0.1:
            fused_x, fused_y, fused_conf = mp_x, mp_y, mp_conf
            print(f"{key}: Using MediaPipe prediction.")
        elif rcnn_adjusted > mp_adjusted + 0.1:
            fused_x, fused_y, fused_conf = rcnn_x, rcnn_y, rcnn_conf
            print(f"{key}: Using R-CNN prediction.")
        elif mp_conf > 0 and rcnn_conf > 0:
            total_weight = mp_weight + rcnn_weight
            mp_fusion_weight = mp_weight / total_weight
            rcnn_fusion_weight = rcnn_weight / total_weight
            fused_x = mp_fusion_weight * mp_x + rcnn_fusion_weight * rcnn_x
            fused_y = mp_fusion_weight * mp_y + rcnn_fusion_weight * rcnn_y
            fused_conf = max(mp_conf, rcnn_conf)
            print(f"{key}: Using weighted averaging with weights (MP: {mp_fusion_weight:.2f}, R-CNN: {rcnn_fusion_weight:.2f}).")
        elif mp_conf > 0:
            fused_x, fused_y, fused_conf = mp_x, mp_y, mp_conf
            print(f"{key}: Using MediaPipe prediction (R-CNN unavailable).")
        elif rcnn_conf > 0:
            fused_x, fused_y, fused_conf = rcnn_x, rcnn_y, rcnn_conf
            print(f"{key}: Using R-CNN prediction (MediaPipe unavailable).")
        else:
            fused_x, fused_y, fused_conf = 0.0, 0.0, 0.0
            print(f"{key}: No reliable prediction available.")
        
        fused_keypoints[key] = (fused_x, fused_y, fused_conf)
    
    return fused_keypoints

def fallback_and_correct(fused_kps, mediapipe_kps, rcnn_kps, edges):
    adjusted_kps = fused_kps.copy()
    
    # Fallback for low-confidence keypoints
    for key in adjusted_kps.keys():
        x, y, conf = adjusted_kps[key]
        if conf <= 0.5:  # Stricter threshold for fallback
            neighbors = []
            for k1, k2 in edges:
                if k1 == key and k2 in adjusted_kps and adjusted_kps[k2][2] > 0.5:
                    neighbors.append((k2, adjusted_kps[k2]))
                elif k2 == key and k1 in adjusted_kps and adjusted_kps[k1][2] > 0.5:
                    neighbors.append((k1, adjusted_kps[k1]))
            
            if neighbors:
                total_conf = sum(n_conf for _, (_, _, n_conf) in neighbors)
                if total_conf > 0:
                    avg_x = sum(nx * n_conf for _, (nx, _, n_conf) in neighbors) / total_conf
                    avg_y = sum(ny * n_conf for _, (_, ny, n_conf) in neighbors) / total_conf
                    adjusted_kps[key] = (avg_x, avg_y, 0.5)
                    print(f"{key}: Estimated position using neighbors.")
    
    # Softer anatomical corrections
    if "LEFT_SHOULDER" in adjusted_kps and "LEFT_HIP" in adjusted_kps:
        shoulder_y = adjusted_kps["LEFT_SHOULDER"][1]
        hip_y = adjusted_kps["LEFT_HIP"][1]
        shoulder_conf = adjusted_kps["LEFT_SHOULDER"][2]
        hip_conf = adjusted_kps["LEFT_HIP"][2]
        if shoulder_y > hip_y + 0.05 and shoulder_conf > 0.6 and hip_conf > 0.6:
            adjusted_kps["LEFT_SHOULDER"] = (adjusted_kps["LEFT_SHOULDER"][0], hip_y - 0.05, shoulder_conf * 0.9)
            print("Corrected LEFT_SHOULDER position (was below hip).")
    
    if "LEFT_SHOULDER" in adjusted_kps and "LEFT_WRIST" in adjusted_kps:
        shoulder_y = adjusted_kps["LEFT_SHOULDER"][1]
        wrist_y = adjusted_kps["LEFT_WRIST"][1]
        wrist_conf = adjusted_kps["LEFT_WRIST"][2]
        if wrist_y < shoulder_y - 0.05 and wrist_conf > 0.6:
            adjusted_kps["LEFT_WRIST"] = (adjusted_kps["LEFT_WRIST"][0], shoulder_y, wrist_conf * 0.8)
            print("Corrected LEFT_WRIST position (was above shoulder).")
    
    if "LEFT_HIP" in adjusted_kps and "LEFT_KNEE" in adjusted_kps:
        hip_y = adjusted_kps["LEFT_HIP"][1]
        knee_y = adjusted_kps["LEFT_KNEE"][1]
        knee_conf = adjusted_kps["LEFT_KNEE"][2]
        if knee_y < hip_y - 0.05 and knee_conf > 0.6:
            # Blend the original position with the corrected position
            corrected_y = hip_y - 0.02
            blended_y = 0.7 * knee_y + 0.3 * corrected_y  # Soft correction
            adjusted_kps["LEFT_KNEE"] = (adjusted_kps["LEFT_KNEE"][0], blended_y, knee_conf * 0.9)
            print("Soft-corrected LEFT_KNEE position (was too far above hip).")
    
    if "LEFT_KNEE" in adjusted_kps and "LEFT_ANKLE" in adjusted_kps:
        knee_y = adjusted_kps["LEFT_KNEE"][1]
        ankle_y = adjusted_kps["LEFT_ANKLE"][1]
        ankle_conf = adjusted_kps["LEFT_ANKLE"][2]
        if ankle_y < knee_y - 0.05 and ankle_conf > 0.6:
            corrected_y = knee_y + 0.02
            blended_y = 0.7 * ankle_y + 0.3 * corrected_y
            adjusted_kps["LEFT_ANKLE"] = (adjusted_kps["LEFT_ANKLE"][0], blended_y, ankle_conf * 0.8)
            print("Soft-corrected LEFT_ANKLE position (was too far above knee).")
    
    return adjusted_kps

def draw_fused_keypoints(image, keypoints):
    height, width = image.shape[:2]
    for name, (x_norm, y_norm, conf) in keypoints.items():
        if conf > 0.5:  # Relaxed threshold for visualization
            x_pixel, y_pixel = int(x_norm * width), int(y_norm * height)
            cv2.circle(image, (x_pixel, y_pixel), 5, (255, 0, 0), -1)
            coord_text = f"{name}: ({x_norm:.5f}, {y_norm:.5f})"
            cv2.putText(image, coord_text, (x_pixel + 10, y_pixel - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
    return image

def process_dataset(input_folder, output_folder, excel_path):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    data = []
    image_files = [f for f in os.listdir(input_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
    
    for image_file in image_files:
        print(f"\nProcessing {image_file}...")
        image_path = os.path.join(input_folder, image_file)
        
        try:
            image_no = int(os.path.splitext(image_file)[0])
        except ValueError:
            print(f"Could not extract image number from {image_file}, skipping...")
            continue
        
        image = cv2.imread(image_path)
        if image is None:
            print(f"Failed to load image: {image_path}")
            continue
        
        mediapipe_kps = get_mediapipe_keypoints(image_path)
        rcnn_kps = get_rcnn_keypoints(image_path)
        
        print("\nMediaPipe Keypoints:")
        for name, (x, y, conf) in mediapipe_kps.items():
            print(f"{name}: (x={x:.5f}, y={y:.5f}, confidence={conf:.2f})")
        print("\nR-CNN Keypoints:")
        for name, (x, y, conf) in rcnn_kps.items():
            print(f"{name}: (x={x:.5f}, y={y:.5f}, confidence={conf:.2f})")
        
        fused_keypoints = fuse_keypoints_selective(mediapipe_kps, rcnn_kps, EDGES)
        fused_keypoints = fallback_and_correct(fused_keypoints, mediapipe_kps, rcnn_kps, EDGES)
        
        print("\nFused Keypoints:")
        for name, (x, y, conf) in fused_keypoints.items():
            print(f"{name}: (x={x:.5f}, y={y:.5f}, confidence={conf:.2f})")
        
        row = {
            "image_no": image_no,
            "left_shoulder_x": round(fused_keypoints["LEFT_SHOULDER"][0], 5),
            "left_shoulder_y": round(fused_keypoints["LEFT_SHOULDER"][1], 5),
            "left_elbow_x": round(fused_keypoints["LEFT_ELBOW"][0], 5),
            "left_elbow_y": round(fused_keypoints["LEFT_ELBOW"][1], 5),
            "left_wrist_x": round(fused_keypoints["LEFT_WRIST"][0], 5),
            "left_wrist_y": round(fused_keypoints["LEFT_WRIST"][1], 5),
            "left_hip_x": round(fused_keypoints["LEFT_HIP"][0], 5),
            "left_hip_y": round(fused_keypoints["LEFT_HIP"][1], 5),
            "left_knee_x": round(fused_keypoints["LEFT_KNEE"][0], 5),
            "left_knee_y": round(fused_keypoints["LEFT_KNEE"][1], 5),
            "left_ankle_x": round(fused_keypoints["LEFT_ANKLE"][0], 5),
            "left_ankle_y": round(fused_keypoints["LEFT_ANKLE"][1], 5),
        }
        data.append(row)
        
        output_image = draw_fused_keypoints(image.copy(), fused_keypoints)
        output_path = os.path.join(output_folder, image_file)
        cv2.imwrite(output_path, output_image)
        print(f"Saved annotated image to {output_path}")
    
    df = pd.DataFrame(data)
    df = df.sort_values(by="image_no")
    df.to_excel(excel_path, index=False)
    print(f"Saved keypoint coordinates to {excel_path}")

# Define input and output folders, and Excel file path
input_folder = r"C:\PJT2\Orbbec_Dataset_New\Orbbec_Dataset\Dataset\CHECK DATASET\checkdataset"
output_folder = r"C:\PJT2\Orbbec_Dataset_New\Orbbec_Dataset\Dataset\CHECK DATASET\checkdataset_output_2video"
excel_path = r"C:\PJT2\Orbbec_Dataset_New\Orbbec_Dataset\Dataset\CHECK DATASET\checkdataset_2video_coordinates.xlsx"

# Process the dataset and generate Excel file
process_dataset(input_folder, output_folder, excel_path)




Processing 1.png...

MediaPipe Keypoints:
LEFT_SHOULDER: (x=0.47125, y=0.27856, confidence=1.00)
LEFT_ELBOW: (x=0.46738, y=0.38139, confidence=0.96)
LEFT_WRIST: (x=0.41266, y=0.36985, confidence=0.98)
LEFT_HIP: (x=0.56324, y=0.37135, confidence=1.00)
LEFT_KNEE: (x=0.48337, y=0.43926, confidence=0.98)
LEFT_ANKLE: (x=0.54677, y=0.54341, confidence=0.95)

R-CNN Keypoints:
LEFT_SHOULDER: (x=0.47109, y=0.27152, confidence=1.00)
LEFT_ELBOW: (x=0.46510, y=0.38354, confidence=1.00)
LEFT_WRIST: (x=0.41121, y=0.36487, confidence=1.00)
LEFT_HIP: (x=0.56167, y=0.36487, confidence=1.00)
LEFT_KNEE: (x=0.48307, y=0.43688, confidence=1.00)
LEFT_ANKLE: (x=0.54819, y=0.54357, confidence=1.00)
Pose classified as cycling. Using weights: {'LEFT_SHOULDER': {'mediapipe': 0.4, 'rcnn': 0.6}, 'LEFT_ELBOW': {'mediapipe': 0.5, 'rcnn': 0.5}, 'LEFT_WRIST': {'mediapipe': 0.3, 'rcnn': 0.7}, 'LEFT_HIP': {'mediapipe': 0.4, 'rcnn': 0.6}, 'LEFT_KNEE': {'mediapipe': 0.3, 'rcnn': 0.7}, 'LEFT_ANKLE': {'mediapipe': 0.3, 'rc