<a href="https://colab.research.google.com/github/okana2ki/intro-to-AI/blob/main/%5BMediaPipe_Python_Tasks%5D_Pose_Landmarker3_ipynb_%E3%81%AE%E3%82%B3%E3%83%94%E3%83%BC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Pose Landmarks Detection with MediaPipe Tasks

## Preparation

Let's start with installing MediaPipe.


In [None]:
!pip install -q mediapipe

In [None]:
!pip install opencv-python-headless



Then download an off-the-shelf model bundle. Check out the [MediaPipe documentation](https://developers.google.com/mediapipe/solutions/vision/pose_landmarker#models) for more information about this model bundle.

In [None]:
!wget -O pose_landmarker.task -q https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task

一番重いモデルを使っているので、精度はいいと思うが、重い。後で、軽いモデルでも試してみよう。

先日撮影したダンス動画で二人検出できるか試してみる予定。

下記のプログラムは、音声抜きになっているので、この後、音声付きにしたり、ランドマークの座標を取得して、上からのビューを表示するのを試したりする予定。

## 新しいモデルを使って、動画ファイルからランドマーク検出。最大で二人検出。音声なし版

In [None]:
import cv2
import numpy as np
import mediapipe as mp
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

# Visualization Utilities
def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)

    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]
        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=landmark.x, y=landmark.y, z=landmark.z
            ) for landmark in pose_landmarks
        ])
        solutions.drawing_utils.draw_landmarks(
            annotated_image,
            pose_landmarks_proto,
            solutions.pose.POSE_CONNECTIONS,
            solutions.drawing_styles.get_default_pose_landmarks_style()
        )
    return annotated_image

# Create PoseLandmarker object with new specifications
base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.VIDEO,
    num_poses=2,
    output_segmentation_masks=True
)
landmarker = vision.PoseLandmarker.create_from_options(options)

# Input and output video paths
input_video_path = 'input_video.mp4'
output_video_path = 'annotated_video.mp4'

# Load input video
cap = cv2.VideoCapture(input_video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# Output video settings
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

# Frame counter for generating timestamps
frame_counter = 0

# Process video frames
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert BGR image to RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

    # Generate timestamp in microseconds
    frame_timestamp_us = int(frame_counter * (1000000 / fps))
    frame_counter += 1

    # Perform pose landmark detection
    pose_landmarker_result = landmarker.detect_for_video(mp_image, frame_timestamp_us)

    # Draw landmarks on the image
    annotated_frame = draw_landmarks_on_image(rgb_frame, pose_landmarker_result)

    # Convert RGB image back to BGR
    annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
    out.write(annotated_frame)

# Release resources
cap.release()
out.release()



## 音声付き

In [None]:
!pip install mediapipe opencv-python moviepy
!sudo apt-get install ffmpeg

In [None]:
!wget -O pose_landmarker.task -q https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task

In [None]:
import cv2
import numpy as np
import mediapipe as mp
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from moviepy.editor import VideoFileClip, AudioFileClip

# Visualization Utilities
def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)

    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]
        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=landmark.x, y=landmark.y, z=landmark.z
            ) for landmark in pose_landmarks
        ])
        solutions.drawing_utils.draw_landmarks(
            annotated_image,
            pose_landmarks_proto,
            solutions.pose.POSE_CONNECTIONS,
            solutions.drawing_styles.get_default_pose_landmarks_style()
        )
    return annotated_image

# Create PoseLandmarker object with new specifications
base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.VIDEO,
    num_poses=2,
    output_segmentation_masks=True
)
landmarker = vision.PoseLandmarker.create_from_options(options)

# Input and output video paths
input_video_path = 'input_video.mp4'
output_video_path = 'annotated_video.mp4'
output_video_with_audio_path = 'annotated_video_with_audio.mp4'

# MoviePyを使って動画と音声を読み込み
clip = VideoFileClip(input_video_path)
audio = clip.audio

# Load input video
cap = cv2.VideoCapture(input_video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# Output video settings
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

# Frame counter for generating timestamps
frame_counter = 0

# Process video frames
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert BGR image to RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

    # Generate timestamp in microseconds
    frame_timestamp_us = int(frame_counter * (1000000 / fps))
    frame_counter += 1

    # Perform pose landmark detection
    pose_landmarker_result = landmarker.detect_for_video(mp_image, frame_timestamp_us)

    # Draw landmarks on the image
    annotated_frame = draw_landmarks_on_image(rgb_frame, pose_landmarker_result)

    # Convert RGB image back to BGR
    annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
    out.write(annotated_frame)

# Release resources
cap.release()
out.release()

# MoviePyを使って映像と音声を結合
final_clip = VideoFileClip(output_video_path)
final_clip = final_clip.set_audio(audio)
final_clip.write_videofile(output_video_with_audio_path, codec='libx264', audio_codec='aac')




Moviepy - Building video annotated_video_with_audio.mp4.
MoviePy - Writing audio in annotated_video_with_audioTEMP_MPY_wvf_snd.mp4




MoviePy - Done.
Moviepy - Writing video annotated_video_with_audio.mp4






Moviepy - Done !
Moviepy - video ready annotated_video_with_audio.mp4


## ワールド座標のファイル保存を追加（デバッグ中）

In [3]:
!pip install mediapipe opencv-python moviepy
!sudo apt-get install ffmpeg

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [4]:
!wget -O pose_landmarker.task -q https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task

In [5]:
import cv2
import numpy as np
import mediapipe as mp
import csv
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from moviepy.editor import VideoFileClip, AudioFileClip

# Visualization Utilities
def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)

    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]
        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=landmark.x, y=landmark.y, z=landmark.z
            ) for landmark in pose_landmarks
        ])
        solutions.drawing_utils.draw_landmarks(
            annotated_image,
            pose_landmarks_proto,
            solutions.pose.POSE_CONNECTIONS,
            solutions.drawing_styles.get_default_pose_landmarks_style()
        )
    return annotated_image

# Create PoseLandmarker object with new specifications
base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.VIDEO,
    num_poses=2,
    output_segmentation_masks=True
)
landmarker = vision.PoseLandmarker.create_from_options(options)

# Input and output video paths
input_video_path = 'input_video.mp4'
output_video_path = 'annotated_video.mp4'
output_video_with_audio_path = 'annotated_video_with_audio.mp4'

# MoviePyを使って動画と音声を読み込み
clip = VideoFileClip(input_video_path)
audio = clip.audio

# Load input video
cap = cv2.VideoCapture(input_video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# Output video settings
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

# CSVファイルの設定
csv_file_path = 'landmarks_world_coordinates.csv'
csv_file = open(csv_file_path, mode='w', newline='')
csv_writer = csv.writer(csv_file)
csv_writer.writerow(['frame', 'person_index', 'landmark_index', 'x', 'y', 'z', 'visibility'])

# Frame counter for generating timestamps
frame_counter = 0

# Process video frames
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert BGR image to RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

    # Generate timestamp in microseconds
    frame_timestamp_us = int(frame_counter * (1000000 / fps))
    frame_counter += 1

    # Perform pose landmark detection
    pose_landmarker_result = landmarker.detect_for_video(mp_image, frame_timestamp_us)

    # Draw landmarks on the image
    annotated_frame = draw_landmarks_on_image(rgb_frame, pose_landmarker_result)

    # ランドマークの正規化座標（Landmarks）を取得：一人分
    # if pose_landmarker_result.pose_landmarks:
        # for i, landmark in enumerate(pose_landmarker_result.pose_landmarks[0]):
            # csv_writer.writerow([frame_counter, i, landmark.x, landmark.y, landmark.z, landmark.visibility])

    # ランドマークの世界座標（WorldLandmarks）を取得：一人分
    # if pose_landmarker_result.pose_world_landmarks:
        # for i, landmark in enumerate(pose_landmarker_result.pose_world_landmarks[0]):
            # csv_writer.writerow([frame_counter, i, landmark.x, landmark.y, landmark.z, landmark.visibility])

    # ランドマークの世界座標（WorldLandmarks）を取得：全員分
    if pose_landmarker_result.pose_world_landmarks:
        for person_index, pose_world_landmarks in enumerate(pose_landmarker_result.pose_world_landmarks):
            for i, landmark in enumerate(pose_world_landmarks):
                csv_writer.writerow([frame_counter, person_index, i, landmark.x, landmark.y, landmark.z, landmark.visibility])

    # Convert RGB image back to BGR
    annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
    out.write(annotated_frame)

# Release resources
cap.release()
out.release()

# MoviePyを使って映像と音声を結合
final_clip = VideoFileClip(output_video_path)
final_clip = final_clip.set_audio(audio)
final_clip.write_videofile(output_video_with_audio_path, codec='libx264', audio_codec='aac')




Moviepy - Building video annotated_video_with_audio.mp4.
MoviePy - Writing audio in annotated_video_with_audioTEMP_MPY_wvf_snd.mp4




MoviePy - Done.
Moviepy - Writing video annotated_video_with_audio.mp4






Moviepy - Done !
Moviepy - video ready annotated_video_with_audio.mp4


## 以下は、GPT-4に提案した、新しいモデルを使う書きかけプログラム

参考までに残しておく。

方針は、「古いモデル用の動くプログラムと、新しいモデルの使い方の公式ドキュメントを見ながら、できるだけプログラムを自力で書いて、それをGPTに完成してもらう。」

「GPTに、古いモデル用の動くプログラムを元に、新しいモデル用に変更してもらう」という方針では、うまく動くプログラムが完成できなかったため、方針を変更した。新しいモデルで動くプログラムの情報が少ないため、GPTにとっては難しいタスクだと思われる。新しいモデルの情報を公式ドキュメントから抜粋して示しただけでは、ダメだった。


In [None]:
# Visualization Utilities
# To better demonstrate the Pose Landmarker API, we have created a set of visualization tools that will be used in this colab.
# These will draw the landmarks on a detect person, as well as the expected connections between those markers.
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np

def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image


# STEP 1: Import the necessary modules.
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import cv2

# MediaPipeのポーズモジュールを初期化
# これは以前の仕様に沿ったもので、最近のオプション指定ができないので、これを下記のSTEP 2のように書き換えようとしている。
# mp_pose = mp.solutions.pose
# pose = mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5)

# STEP 2: Create an PoseLandmarker object.
# これは新しい仕様に沿ったもの。image入力のサンプルプログラムをvideo入力に変え、num_poseの指定を追加しようとしている。
base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=mp.tasks.vision.RunningMode.VIDEO,  # 動画入力
    num_poses=2,  # 検出できるポーズの最大数
    output_segmentation_masks=True)
detector = vision.PoseLandmarker.create_from_options(options)

# STEP 3, 4, 5は、image入力のサンプル。これをビデオ入力用に書き換えたい。
# STEP 3: Load the input image.
# image = mp.Image.create_from_file("image.jpg")

# STEP 4: Detect pose landmarks from the input image.
# detection_result = detector.detect(image)

# STEP 5: Process the detection result. In this case, visualize it.
# annotated_image = draw_landmarks_on_image(image.numpy_view(), detection_result)
# cv2_imshow(cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))



# 入力動画と出力動画のパス
input_video_path = '/content/drive/MyDrive/Colab_files/dance-sample.mp4'
output_video_path = 'annotated_video.mp4'

# 動画ファイルを読み込み
# Use OpenCV's VideoCapture to load the input video.
cap = cv2.VideoCapture(input_video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Load the frame rate of the video using OpenCV's CV_CAP_PROP_FPS
# You'll need it to calculate the timestamp for each frame.
fps = cap.get(cv2.CAP_PROP_FPS)

# 出力動画の設定
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))


# Google AI for developersで提供されているサンプルプログラム
# Loop through each frame in the video using VideoCapture#read()
# Convert the frame received from OpenCV to a MediaPipe's Image object.
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=numpy_frame_from_opencv)

# Google AI for developersで提供されているサンプルプログラム
# Perform pose landmarking on the provided single image.
# The pose landmarker must be created with the video mode.
pose_landmarker_result = landmarker.detect_for_video(mp_image, frame_timestamp_ms)


# 以下は古い仕様のときに使っていたプログラム。新しい仕様に合わせて変更する必要あり。
# 動画の各フレームに対して処理
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # BGR画像をRGBに変換
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(image)

    # ポーズランドマークを描画
    mp_drawing = mp.solutions.drawing_utils
    annotated_image = image.copy()
    if results.pose_landmarks:
        mp_drawing.draw_landmarks(
            image=annotated_image,
            landmark_list=results.pose_landmarks,
            connections=mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2),
            connection_drawing_spec=mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2))

    # RGB画像をBGRに戻す
    annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR)
    out.write(annotated_image)

# リソースの解放
cap.release()
out.release()
pose.close()