In [None]:
import time
import cv2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np

# Load the pose landmarker model file (download from MediaPipe docs)
model_path = 'pose_landmarker_lite.task'

# API symbols
BaseOptions = python.BaseOptions
PoseLandmarker = vision.PoseLandmarker
PoseLandmarkerOptions = vision.PoseLandmarkerOptions
PoseLandmarkerResult = vision.PoseLandmarkerResult
VisionRunningMode = vision.RunningMode

# Global variable to store the latest detection result
latest_result = None

# Note: avoid annotating with `vision.Image` since some versions don't expose it
def visualize_result(result: PoseLandmarkerResult, output_image, timestamp_ms: int):
    """Callback invoked by the landmarker (LIVE_STREAM mode)."""
    global latest_result
    latest_result = result

def run_real_time_pose_detection():
    """Runs real-time pose detection from the webcam."""
    options = PoseLandmarkerOptions(
        base_options=BaseOptions(model_asset_path=model_path),
        running_mode=VisionRunningMode.LIVE_STREAM,
        result_callback=visualize_result,
    )

    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        return

    with PoseLandmarker.create_from_options(options) as landmarker:
        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                print("Ignoring empty camera frame.")
                continue

            # Convert BGR (OpenCV) to RGB and create a TensorImage
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            try:
                mp_image = vision.TensorImage.create_from_array(rgb_frame)
            except Exception:
                # Fallback: try the Tasks Image factory if available
                try:
                    mp_image = vision.Image.create_from_array(rgb_frame)
                except Exception:
                    # Last-resort: use raw numpy array (may fail depending on version)
                    mp_image = rgb_frame

            # Timestamp in milliseconds
            timestamp_ms = int(time.time() * 1000)

            # Send the frame to the landmarker for asynchronous processing
            landmarker.detect_async(mp_image, timestamp_ms)

            # If we have a recent result, draw landmarks
            if latest_result and getattr(latest_result, 'pose_landmarks', None):
                annotated_image = np.copy(frame)
                for detection in latest_result.pose_landmarks:
                    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
                    pose_landmarks_proto.landmark.extend([
                        landmark_pb2.NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z) for lm in detection
                    ])
                    solutions.drawing_utils.draw_landmarks(
                        annotated_image,
                        pose_landmarks_proto,
                        solutions.pose.POSE_CONNECTIONS,
                        solutions.drawing_styles.get_default_pose_landmarks_style(),
                    )
                cv2.imshow('MediaPipe Pose Landmarker', annotated_image)
            else:
                cv2.imshow('MediaPipe Pose Landmarker', frame)

            # Break loop on 'q' key press
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    run_real_time_pose_detection()


AttributeError: module 'mediapipe.tasks.python.vision' has no attribute 'Image'