In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.5/33.5 MB[0m [31m45.5 MB/s[0m eta [36m0:00:00[0m
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.4.6-py3-none-any.whl (31 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.5 sounddevice-0.4.6


In [3]:
!wget -O pose_landmarker.task -q https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task

In [4]:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
mp_pose = mp.solutions.pose
from mediapipe.framework.formats import landmark_pb2

import cv2
from google.colab.patches import cv2_imshow

import os
import subprocess

import pandas as pd
import numpy as np

In [5]:
def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

In [6]:
def get_dataframe_cols():
  cols = []
  for landmark in mp_pose.PoseLandmark:
    name = landmark.name
    cols.append(f'{name}_x')
    cols.append(f'{name}_y')
    cols.append(f'{name}_z')
    cols.append(f'{name}_visibility')
    cols.append(f'{name}_presence')
  return cols

In [7]:
def get_video_frame_record(pose_landmarks):
  record = []
  if pose_landmarks:
    for landmark in pose_landmarks[0]:
      record.append(landmark.x)
      record.append(landmark.y)
      record.append(landmark.z)
      record.append(landmark.visibility)
      record.append(landmark.presence)
    return record
  else:
    return [0] * 165

In [8]:
def process_video(exercise_rgb_dir, exercise_label_path, input_video_name, output_video_name):
  model_path = '/content/pose_landmarker.task'

  BaseOptions = mp.tasks.BaseOptions
  PoseLandmarker = mp.tasks.vision.PoseLandmarker
  PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
  VisionRunningMode = mp.tasks.vision.RunningMode

  options = PoseLandmarkerOptions(
      base_options=BaseOptions(model_asset_path=model_path),
      running_mode=VisionRunningMode.VIDEO)

  video_records = []
  with PoseLandmarker.create_from_options(options) as landmarker:
    cap = cv2.VideoCapture(f"{exercise_rgb_dir}/rgb/{input_video_name}")
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    mediapipe_video_name = f"{exercise_rgb_dir}/{output_video_name}.mp4"
    out = cv2.VideoWriter(mediapipe_video_name, fourcc, frame_rate, (frame_width, frame_height))

    frame_count = 0
    while True:
      ret, frame = cap.read()
      if not ret:
        break

      frame_timestamp_ms = int(frame_count * 1000 / frame_rate)  # Calculate timestamp in milliseconds
      frame_count += 1

      mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
      pose_landmarker_result = landmarker.detect_for_video(mp_image, frame_timestamp_ms)

      frame_record = get_video_frame_record(pose_landmarker_result.pose_landmarks)
      video_records.append(frame_record)

      annotated_image = draw_landmarks_on_image(frame, pose_landmarker_result)
      out.write(annotated_image)

    cap.release()
    out.release()
    cv2.destroyAllWindows()
  video_df_cols = get_dataframe_cols()
  video_df = pd.DataFrame(data=video_records, columns=video_df_cols)
  video_df.to_csv(f"{exercise_rgb_dir}/{output_video_name}.csv", index=False)
  cmd = f"cp {exercise_label_path} {exercise_rgb_dir}"
  subprocess.run(cmd, shell=True)

### Subgroups processed:
Wall time: 1h 20min 49s<br>
SUB_GROUP_RGB_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe_rgb/GPP/BackPain"<br>
SUB_GROUP_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe/GPP/BackPain"
<br><br>
Wall time: 2h 51min 32s<br>
SUB_GROUP_RGB_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe_rgb/CG/Expert"<br>
SUB_GROUP_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe/CG/Expert"
<br><br>
Wall time: A bit over 5 hours<br>
SUB_GROUP_RGB_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe_rgb/CG/NotExpert"<br>
SUB_GROUP_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe/CG/NotExpert"<br>
<br><br>
Wall time: 1h 50min 35s<br>
SUB_GROUP_RGB_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe_rgb/GPP/Parkinson"<br>
SUB_GROUP_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe/GPP/Parkinson"<br>
<br><br>
Wall time: 1h 4min 50s<br>
SUB_GROUP_RGB_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe_rgb/GPP/Stroke"<br>
SUB_GROUP_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe/GPP/Stroke"<br>


In [11]:
SUB_GROUP_RGB_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe_rgb/GPP/Stroke"
SUB_GROUP_DIR = "/content/gdrive/MyDrive/PSUT/Graduation-Project2/KiMoRe/GPP/Stroke"

In [12]:
%%time

unprocessed_videos = []
for participant in os.listdir(SUB_GROUP_RGB_DIR):
  print(f"\t{participant}")
  participant_dir = f"{SUB_GROUP_RGB_DIR}/{participant}"
  for exercise in os.listdir(participant_dir):
    print(f"\t\t{exercise}")
    exercise_rgb_dir = f"{participant_dir}/{exercise}"
    exercise_dir = f"{SUB_GROUP_DIR}/{participant}/{exercise}/Label/"
    exercise_label_path = None

    for file in os.listdir(exercise_dir):
      if file.startswith("ClinicalAssessment"):
        exercise_label_path = f"{exercise_dir}/{file}"

    if not os.listdir(f"{exercise_rgb_dir}/rgb"):
      print(f"\t\t\tNo video found for exercise {exercise} of {participant}.")
      unprocessed_videos.append(exercise_rgb_dir)
      continue

    input_video_name = None
    for file in os.listdir(f"{exercise_rgb_dir}/rgb"):
      if file.endswith(".mp4"):
        input_video_name = file
        output_video_name = f"{participant}_{exercise}"
        process_video(exercise_rgb_dir, exercise_label_path, input_video_name, output_video_name)
        print(f"\t\t\tFinished processing {output_video_name}.mp4")

	S_ID4
		Es2
			Finished processing S_ID4_Es2.mp4
		Es5
			Finished processing S_ID4_Es5.mp4
		Es1
			Finished processing S_ID4_Es1.mp4
		Es4
			Finished processing S_ID4_Es4.mp4
		Es3
			Finished processing S_ID4_Es3.mp4
	S_ID2
		Es3
			No video found for exercise Es3 of S_ID2.
		Es5
			No video found for exercise Es5 of S_ID2.
		Es4
			Finished processing S_ID2_Es4.mp4
		Es2
			No video found for exercise Es2 of S_ID2.
		Es1
			No video found for exercise Es1 of S_ID2.
	S_ID5
		Es5
			Finished processing S_ID5_Es5.mp4
		Es3
			No video found for exercise Es3 of S_ID5.
		Es2
			Finished processing S_ID5_Es2.mp4
		Es4
			Finished processing S_ID5_Es4.mp4
		Es1
			Finished processing S_ID5_Es1.mp4
	S_ID3
		Es3
			Finished processing S_ID3_Es3.mp4
		Es2
			Finished processing S_ID3_Es2.mp4
		Es5
			Finished processing S_ID3_Es5.mp4
		Es4
			No video found for exercise Es4 of S_ID3.
		Es1
			Finished processing S_ID3_Es1.mp4
	S_ID7
		Es4
			Finished processing S_ID7_Es4.mp4
		Es3
			Finis