In [7]:
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
import math
import numpy as np

class Part:
    def __init__(self,):
        self.isExist = False
        self.pos=(0,0)
        self.world_pos=(0,0,0)
        self.color  = (255,255,255)

    def init(self,point,world_point,width,height,color):
        self.pos = (int(point[0]*width),int(point[1]*height))
        self.world_pos = world_point
        self.color = color
        self.isExist = True

class Bound:
    x=0
    y=0
    w=0
    h=0
    
class Head:
    def __init__(self,):
        self.isExist = False
        self.pos=(0,0)
        self.world_pos=(0,0,0)
        self.color  = (255,127,127)
        
        self.head_width = 0
        self.heading = 0
        self.eye_length = 200
        self.eye_pos=(0,0)

    def init(self,nose,lear,rear,color):
        #head width in real size
        r_w_pos = rear.world_pos
        l_w_pos = lear.world_pos
        a=np.array([r_w_pos[0],r_w_pos[1],r_w_pos[2]])
        b=np.array([l_w_pos[0],l_w_pos[1],l_w_pos[2]])
        self.head_width=np.linalg.norm(b-a)
        
        self.world_pos = (
            (r_w_pos[0]+l_w_pos[0])/2.0,
            (r_w_pos[1]+l_w_pos[1])/2.0,
            (r_w_pos[2]+l_w_pos[2])/2.0
        )
        
        #2D screen pos
        r_pos = rear.pos
        l_pos = lear.pos
        pos = (
            (r_pos[0]+l_pos[0])/2.0,
            (r_pos[1]+l_pos[1])/2.0
        )
        self.pos = (int(pos[0]),int(pos[1]))
        
        #print("Head x:"+str(self.pos[0])+" y:"+str(self.pos[1]))
        self.color = color
        self.isExist = True
        
        '''
        #culc heading on x-z plane
        center_pos = (self.world_pos[0],self.world_pos[2])
        nose_pos = (nose.world_pos[0],nose.world_pos[2])
        '''
        
        #culc heading by 2D
        center_pos = self.pos
        nose_pos = nose.pos
        
        a = np.array([center_pos[0], center_pos[1]])
        b = np.array([nose_pos[0], nose_pos[1]])
        vec = b - a
        self.heading = -1 * math.degrees(np.arctan2(vec[0], vec[1])) +90
        
        #culc eye line
        radians = math.radians(self.heading)

        eye_x = self.pos[0]+int(np.cos(radians)*self.eye_length)
        eye_y = self.pos[1]+int(np.sin(radians)*self.eye_length)
        self.eye_pos = (eye_x,eye_y)
        
        
        
class Body:
    def __init__(self,id,landmarks,world_landmarks,visibility_threshold,width,height):
        self.id = id
        self.landmarks = landmarks
        self.world_landmarks = world_landmarks
        self.visibility_threshold = visibility_threshold
        self.width = width
        self.height = height
        self.bound = Bound()
    
        # for landmark in person:

        self.nose=Part()
        self.leye=Part()
        self.reye=Part()
        self.lear=Part()
        self.rear=Part()
        self.lshoulder=Part()
        self.rshoulder=Part()
        self.lelbow = Part()
        self.relbow = Part()
        self.lhand = Part()
        self.rhand = Part()
        self.lhip = Part()
        self.rhip = Part()
        self.lknee = Part()
        self.rknee = Part()
        self.lfoot = Part()
        self.rfoot = Part()
        #parts = [nose,leye,reye,lear,rear,lshoulder,rshoulder,lelbow,relbow,lhand,rhand,lhip,rhip,lknee,rknee,lfoot,rfoot]
        self.parts = [self.nose,self.leye,self.reye,self.lear,self.rear,self.lshoulder,self.rshoulder]

        self.head_width = 0
        self.head = Head()

        self.eye_length= 200

    
    def process(self):
        nose_color = (0,0,255)
        eye_color = (127,127,255)
        left_color = (255,127,127)
        right_color = (127,255,127)
        
        '''
        for idx, landmark in enumerate(pose_landmarks_proto.landmark):
            #landmark_px = _normalized_to_pixel_coordinates(landmark.x, landmark.y,width, height)
        '''
        
        # print("idx:"+str(idx)+" x:"+str(pose_landmarks[0].x))
        
        # Draw the pose landmarks.
        #for i in range(len(self.landmarks)):
        i = 0;
        for landmark in self.landmarks:
            #point = self.landmarks[i]
            #world_point = self.world_landmarks[i]
            point = (landmark.x,landmark.y)
            world_point = (0,0,0)
            width = self.width
            height = self.height
            
            
            
            #print("Visibility:"+str(point.visibility))
            
            #if point.visibility >= self.visibility_threshold:
            #nose
            if(i==0): 
                self.parts[0].init(point,world_point,width,height,nose_color)
                #print("i:"+str(i)+" x:"+str(point[0]))
            #l-eye
            if(i==2): 
                self.parts[1].init(point,world_point,width,height,eye_color)
            #r-eye
            if(i==5): 
                self.parts[2].init(point,world_point,width,height,eye_color)
            #l-ear
            if(i==7): 
                self.parts[3].init(point,world_point,width,height,left_color)
            #r-ear
            if(i==8): 
                self.parts[4].init(point,world_point,width,height,right_color)
            #l-shoulder
            if(i==11): 
                self.parts[5].init(point,world_point,width,height,left_color)
            #r-shoulder
            if(i==12): 
                self.parts[6].init(point,world_point,width,height,right_color)
                '''
            #l-elbow
            if(i==13): 
                self.parts[7].init(point,world_point,width,height,left_color)
            #r-elbow
            if(i==14): 
                self.parts[8].init(point,world_point,width,height,right_color)
            #l-hand
            if(i==15): 
                self.parts[9].init(point,world_point,width,height,left_color)
            #r-hand
            if(i==16): 
                self.parts[10].init(point,world_point,width,height,right_color)
            #l-hip
            if(i==23): 
                self.parts[11].init(point,world_point,width,height,left_color)
            #r-hip
            if(i==24): 
                self.parts[12].init(point,world_point,width,height,right_color)
            #l-knee
            if(i==25): 
                self.parts[13].init(point,world_point,width,height,left_color)
            #r-knee
            if(i==26): 
                self.parts[14].init(point,world_point,width,height,right_color)
            #l-foot
            if(i==27): 
                self.parts[15].init(point,world_point,width,height,left_color)
            #r-foot
            if(i==28): 
                self.parts[16].init(point,world_point,width,height,right_color)
                '''
            i = i+1
            
        #culc bound box
        minx = 10000
        miny = 10000
        maxx = 0
        maxy = 0
        for part in self.parts:
            if(part.isExist):
                x = part.pos[0]
                y = part.pos[1]
                if maxx<x:
                    maxx = x
                if maxy<y:
                    maxy = y
                if x<minx:
                    minx = x
                if y<miny:
                    miny = y
        self.bound.x = minx
        self.bound.y = miny
        self.bound.w = maxx-minx
        self.bound.h = maxy-miny
        
        #culc head and headings
        if self.nose.isExist and self.lear.isExist and self.rear.isExist:
            #print("head")
            self.head.init(self.nose,self.lear,self.rear,(255,255,255))
            


In [None]:
import argparse
import sys
import time

import cv2
import mediapipe as mp
import numpy as np
from typing import List, Tuple

from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2

#from body import Body


#1 do inference 
#2 convert the results to the list of Body objects
# https://developers.google.com/mediapipe/solutions/vision/pose_landmarker/python#video
def process_bodies(
    result:vision.PoseLandmarkerResult,
    width:int,
    height:int,
    visibility_threshold: float = 0.8
)-> List[Body]:
    
    '''
    for pose_landmarks in persons.pose_landmarks:
    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
        landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y,
                                        z=landmark.z) for landmark
        in pose_landmarks
    ])
    mp_drawing.draw_landmarks(
        image,
        pose_landmarks_proto,
        mp_pose.POSE_CONNECTIONS,
        mp_drawing_styles.get_default_pose_landmarks_style())
        '''

    # https://github.com/googlesamples/mediapipe/blob/main/examples/pose_landmarker/python/%5BMediaPipe_Python_Tasks%5D_Pose_Landmarker.ipynb
    bodies = []
    
    #print(str(len(result.pose_landmarks)))
    
    pose_landmarks_list = persons.pose_landmarks
    
      # Loop through the detected poses to visualize.
    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]
        
        #print("idx:"+str(idx)+" x:"+str(pose_landmarks[0].x))

        '''
        # Draw the pose landmarks.
        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
          landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
        ])
        '''
        
        #print(type(pose_landmarks_proto))
        #pose_landmarks_proto is  mediapipe.framework.formats.landmark_pb2.NormalizedLandmarkList
        
        
        body = Body(idx,pose_landmarks,(0,0,0),visibility_threshold,width,height)
        body.process()
        #print("body head:"+str(body.head.pos[0]))
        bodies.append(body) 
    
    '''
    #for landmarks in result.pose_landmarks:
    for id in range(len(result.pose_landmarks)):
        # print(type(person))
        print("id:"+str(id))
        landmarks = result.pose_landmarks[id]
        world_landmarks = result.pose_world_landmarks[id]

        #key points
        body = Body(landmarks,world_landmarks,visibility_threshold,width,height)
        body.process()
        bodies.append(body)  
    return bodies
    '''
    return bodies

def draw_bodies(
    image: np.ndarray,
    list_bodies: List[Body],
) -> np.ndarray:
    body_index = 0
    for body in list_bodies:
        #body
        
        for part in body.parts:
            if(part.isExist):
                cv2.circle(image, part.pos, 10, part.color, thickness=-1)
            
        cv2.rectangle(image,(body.bound.x,body.bound.y),(body.bound.x+body.bound.w,body.bound.y+body.bound.h), (255, 255, 255), 1)
        #head
        
        head = body.head
        
        cv2.circle(image, head.pos, 50, head.color, thickness=1)
        head_width_text = '{:.2f}'.format(head.head_width)
        cv2.putText(image, head_width_text, head.pos, cv2.FONT_HERSHEY_PLAIN,
                2, head.color, 1)
        
        #heading
        cv2.arrowedLine(image, head.pos, head.eye_pos, head.color, thickness=2)
        #print("body:"+str(body_index)+" head x:"+str(head.pos[0]))
        body_index = body_index+1
        
    return image

def draw_bodies_5(
    image: np.ndarray,
) -> np.ndarray:
    body_index = 0
    for body in bodies:
        #body
        
        for part in body.parts:
            if(part.isExist):
                cv2.circle(image, part.pos, 10, part.color, thickness=-1)
            
        cv2.rectangle(image,(body.bound.x,body.bound.y),(body.bound.x+body.bound.w,body.bound.y+body.bound.h), (255, 255, 255), 1)
        #head
        
        head = body.head
        
        cv2.circle(image, head.pos, 50, head.color, thickness=1)
        head_width_text = '{:.2f}'.format(head.head_width)
        cv2.putText(image, head_width_text, head.pos, cv2.FONT_HERSHEY_PLAIN,
                2, (255,255,255), 1)
        
        #heading
        cv2.arrowedLine(image, head.pos, head.eye_pos, (255, 255, 255), thickness=2)
        #print("body:"+str(body_index)+" head x:"+str(head.pos[0]))
        body_index = body_index+1
        
    return image


def draw_bodies_2(image: np.ndarray
) -> np.ndarray:
    if persons:
    # Draw landmarks.
        for pose_landmarks in persons.pose_landmarks:
            # Draw the pose landmarks.
            pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
            pose_landmarks_proto.landmark.extend([
                landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y,
                                                z=landmark.z) for landmark
                in pose_landmarks
            ])
            mp_drawing.draw_landmarks(
                image,
                pose_landmarks_proto,
                mp_pose.POSE_CONNECTIONS,
                mp_drawing_styles.get_default_pose_landmarks_style())
    return image


def draw_bodies_3(image: np.ndarray
) -> np.ndarray:
    if persons:
    # Draw landmarks.
        for person in persons.pose_landmarks:
            # Draw the pose landmarks.
            for landmark in person:
                 #print(type(landmark))
                pos = (int(landmark.x*width),int(landmark.y*height))
                # landmark: https://developers.google.com/mediapipe/api/solutions/java/com/google/mediapipe/tasks/components/containers/Landmark
                cv2.circle(image,pos, 5, (0,0,255), thickness=-1)
                #print(landmark.x)
    return image

def draw_bodies_4(image: np.ndarray,result:vision.PoseLandmarkerResult
) -> np.ndarray:
    if result:
    # Draw landmarks.
        for person in result.pose_landmarks:
            # Draw the pose landmarks.
            for landmark in person:
                 #print(type(landmark))
                pos = (int(landmark.x*width),int(landmark.y*height))
                # landmark: https://developers.google.com/mediapipe/api/solutions/java/com/google/mediapipe/tasks/components/containers/Landmark
                cv2.circle(image,pos, 5, (0,0,255), thickness=-1)
                #print(landmark.x)
    return image


# draw in x-z plane

def draw_bodies_world(image: np.ndarray
) -> np.ndarray:
    if DETECTION_RESULT:
        ratio = 100
    # Draw landmarks.
        for person in DETECTION_RESULT.pose_world_landmarks:
            # Draw the pose landmarks.
            for landmark in person:
                 #print(type(landmark))
                pos = (int(landmark.x*ratio),int(landmark.y*ratio))
                # landmark: https://developers.google.com/mediapipe/api/solutions/java/com/google/mediapipe/tasks/components/containers/Landmark
                cv2.circle(image,pos, 5, (255,0,255), thickness=-1)
                #print(landmark.x)
    return image

def draw_segments(image: np.ndarray
) -> np.ndarray:
    if (output_segmentation_masks and DETECTION_RESULT):
        if DETECTION_RESULT.segmentation_masks is not None:
            segmentation_mask = DETECTION_RESULT.segmentation_masks[0].numpy_view()
            mask_image = np.zeros(image.shape, dtype=np.uint8)
            mask_image[:] = mask_color
            condition = np.stack((segmentation_mask,) * 3, axis=-1) > 0.1
            visualized_mask = np.where(condition, mask_image, image)
            image= cv2.addWeighted(image, overlay_alpha,
                                            visualized_mask, overlay_alpha,
                                            0)
    return image

mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

# Global variables to calculate FPS
#COUNTER, FPS = 0, 0
#START_TIME = time.time()
#DETECTION_RESULT = None
persons = None
bodies = []

"""Continuously run inference on images acquired from the camera.

Args:
  model: Name of the pose landmarker model bundle.
  num_poses: Max number of poses that can be detected by the landmarker.
  min_pose_detection_confidence: The minimum confidence score for pose
    detection to be considered successful.
  min_pose_presence_confidence: The minimum confidence score of pose
    presence score in the pose landmark detection.
  min_tracking_confidence: The minimum confidence score for the pose
    tracking to be considered successful.
  output_segmentation_masks: Choose whether to visualize the segmentation
    mask or not.
  camera_id: The camera id to be passed to OpenCV.
  width: The width of the frame captured from the camera.
  height: The height of the frame captured from the camera.
"""
    
#camera_id = 1 # 1 for webcam on Mac
camera_id = "video0.mp4" # 1 for webcam on Mac
width = 1280 #1280 for webcam for mac
height= 720 # 720 for Webcam for mac

model = 'pose_landmarker_lite.task'
num_poses = 4
min_pose_detection_confidence = 0.5
min_pose_presence_confidence = 0.5
min_tracking_confidence = 0.9
output_segmentation_masks = False

fps = 0
end_time = 0
start_time = 0
isUpdated = False


# Start capturing video input from the camera
cap = cv2.VideoCapture(camera_id)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

image = np.zeros((height, width,3), np.uint8)
cv2.putText(image, "hello", (10, 30),
               cv2.FONT_HERSHEY_PLAIN, 1.5,
               (255, 255, 255), 1, cv2.LINE_AA)

cv2.startWindowThread()
cv2.imshow(model, image)

'''
# Visualization parameters
row_size = 50  # pixels
left_margin = 24  # pixels
text_color = (0, 0, 0)  # black
font_size = 1
font_thickness = 1
fps_avg_frame_count = 10
overlay_alpha = 0.5
mask_color = (100, 100, 0)  # cyan
'''

def callback_result(result: vision.PoseLandmarkerResult,
                unused_output_image: mp.Image, timestamp_ms: int):
    global persons,bodies,width,height,fps,end_time,start_time,isUpdated
    persons = result
    
    end_time = time.time()
    time_diff = end_time - start_time #sec
    fps = 1.0 / time_diff
        
    start_time = time.time()
    
    bodies = process_bodies(persons,width,height,0.5)
    isUpdated = True
    
    
    # print(type(result))
    '''
    # Calculate the FPS
    if COUNTER % fps_avg_frame_count == 0:
        FPS = fps_avg_frame_count / (time.time() - START_TIME)
        START_TIME = time.time()

    DETECTION_RESULT = result
    COUNTER += 1
    '''
    
# Initialize the pose landmarker model
base_options = python.BaseOptions(model_asset_path=model)
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.LIVE_STREAM,
    num_poses=num_poses,
    min_pose_detection_confidence=min_pose_detection_confidence,
    min_pose_presence_confidence=min_pose_presence_confidence,
    min_tracking_confidence=min_tracking_confidence,
    output_segmentation_masks=output_segmentation_masks,
    result_callback=callback_result)
detector = vision.PoseLandmarker.create_from_options(options)

start_time = time.time()

# Continuously capture images from the camera and run inference
while cap.isOpened():

    success, image = cap.read()
    if not success:
        sys.exit(
            'ERROR: Unable to read from webcam. Please verify your webcam settings.'
        )

    image = cv2.flip(image, 1)

    # Convert the image from BGR to RGB as required by the TFLite model.
    rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_image)

    # Run pose landmarker using the model.
    if isUpdated==False:
        detector.detect_async(mp_image, time.time_ns() // 1_000_000)
    
    current_frame = image
    
    if persons:
        #bodies = process_bodies(persons,width,height,0.5)
        current_frame = draw_bodies(current_frame,bodies)
        #current_frame = draw_bodies_2(current_frame)
        #current_frame = draw_segments(current_frame)
    
    #current_frame = draw_bodies(current_frame)
    
    #current_frame = draw_bodies_world(current_frame)
    
    
    # Show the FPS
    #fps_text = 'FPS = ' + str(int(fps))
    fps_text = 'FPS = {:.1f}'.format(fps)
    text_location = (10,40)
    cv2.putText(current_frame, fps_text, text_location, cv2.FONT_HERSHEY_PLAIN,
                2, (255,255,255), 1)

    if isUpdated:
        cv2.imshow(model, current_frame)
        isUpdated = False

    time.sleep(0.01)
    key = cv2.waitKey(1)

    if key == ord('q'):            #qを押した時の処理
        cv2.waitKey(1)
        cv2.destroyAllWindows()  
        cap.release()
        cv2.waitKey(1)
        break

i:0 x:0.8445335626602173
i:0 x:0.8353423476219177
i:0 x:0.8463734984397888
i:0 x:0.8161731958389282
i:0 x:0.8136790990829468
i:0 x:0.8140764832496643
i:0 x:0.8043203353881836
i:0 x:0.808660089969635
i:0 x:0.8126363158226013
i:0 x:0.8093098402023315
i:0 x:0.8127902150154114
i:0 x:0.8057433366775513
i:0 x:0.8139859437942505
i:0 x:0.7816022038459778
i:0 x:0.7764977812767029
i:0 x:0.7774322032928467
i:0 x:0.7757660150527954
i:0 x:0.7765090465545654
i:0 x:0.7610433101654053
i:0 x:0.7561527490615845
i:0 x:0.7605229616165161
i:0 x:0.7572234272956848
i:0 x:0.7585428357124329
i:0 x:0.7383406162261963
i:0 x:0.7422205209732056
i:0 x:0.7367168068885803
i:0 x:0.7333942651748657
i:0 x:0.7342804670333862
i:0 x:0.7101964354515076
i:0 x:0.7095956206321716
i:0 x:0.7111654877662659
i:0 x:0.7124465703964233
i:0 x:0.709510862827301
i:0 x:0.7116793394088745
i:0 x:0.6865856051445007
i:0 x:0.682813823223114
i:0 x:0.682605504989624
i:0 x:0.6465107202529907
i:0 x:0.6484155058860779
i:0 x:0.6511560082435608
i:0 

これを見るとデータの仕様がわかる　https://developers.google.com/mediapipe/solutions/vision/pose_landmarker/python

The output contains the following world coordinates (WorldLandmarks):

x, y, and z: Real-world 3-dimensional coordinates in meters, with the midpoint of the hips as the origin.

visibility: The likelihood of the landmark being visible within the image.