## Notebook to get started working with MediaPipe

Goal is to use mediapipe to quantify leg and arm arm movement with the focus on dyskinetic movements

python env used:


`conda create --name mediapipe python==3.9 numpy pandas scipy scikit-learn jupyter matplotlib`

afterwards installed:

`pip install mediapipe`, or when suffering Charite proxies: `pip install --proxy=http://proxy.charite.de:8080 mediapipe` (install mediapipe includes installing opencv)


### 0. Importing Packages and Functions

In [1]:
# Importing Python and external packages
import os
import sys
import importlib
import json
import csv
from dataclasses import dataclass, field, fields
import math
from itertools import compress
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime as dt

# mediapipe specific imports
import mediapipe as mp
import cv2

In [2]:
if os.path.basename(os.getcwd()) == 'mediapiping': os.chdir(os.path.dirname(os.getcwd()))

In [3]:
from utils.utils_fileManagement import get_project_path

### 1. Tryout Posture Detection MediaPipe

#### from OpenCV Posture Tutorial (learnopencv.com)

#### Some functions (later to py's)

In [3]:
def findDistance(x1, y1, x2, y2):
    """
    Calculates distance of person to camera
    """
    dist = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)

    return dist

In [None]:
def findAngle(x1, y1, x2, y2):
    """
    Find Angle between two 2d-coordinates in degrees
    """
    theta = math.acos((y2 - y1) * (-y1) / (math.sqrt(
        (x2 - x1)**2 + (y2 - y1)**2) * y1
    ))
    degree = int(180 / math.pi) * theta

    return degree

find file and folder

In [15]:
video_folder = os.path.join(get_project_path('data'), 'videos')
video_fname = 'rest_example_video.MP4'
video_path = os.path.join(video_folder, video_fname)
os.path.exists(video_path)

video_out_folder = os.path.join(video_folder, 'output')
if not os.path.exists(video_out_folder): os.makedirs(video_out_folder)

#### Loading and opening MP4's

In [38]:
cap = cv2.VideoCapture(video_path)
if cap.isOpened(): print(f'Succesfully opened MP4: {video_path}')
else: raise ValueError(f'Failed to open {video_path}')

Succesfully opened MP4: c:\Users\habetsj\Research\projects\dyskinesia_neurophys\data\videos\rest_example_video.MP4


In [57]:
video_out_folder = os.path.join(get_project_path('results'), 'mediapipe')
if not os.path.exists(video_out_folder): os.makedirs(video_out_folder)

#### finding poses

set mediapipe posing variables

In [104]:
# https://www.analyticsvidhya.com/blog/2022/03/pose-detection-in-image-using-mediapipe-library/

# Initialize mediapipe pose class.
mp_pose = mp.solutions.pose
lmPose = mp_pose.PoseLandmark

# Setup the Pose function for images - independently for the images standalone processing.
pose_image = mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5)

# Setup the Pose function for videos - for video processing.
pose_video = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.7,
                          min_tracking_confidence=0.7)

# Initialize mediapipe drawing class - to draw the landmarks points.
mp_drawing = mp.solutions.drawing_utils

In [87]:
def detectPose(image, pose, draw=False, display=False, RGB=False,):
    
    # original_image = image.copy()
    
    if not RGB: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    resultant = pose.process(image)

    if resultant.pose_landmarks and draw:
        # convert to BGR    
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        mp_drawing.draw_landmarks(image=image, landmark_list=resultant.pose_landmarks,
                                  connections=mp_pose.POSE_CONNECTIONS,
                                  landmark_drawing_spec=mp_drawing.DrawingSpec(color=(255,255,255),
                                                                               thickness=3, circle_radius=3),
                                  connection_drawing_spec=mp_drawing.DrawingSpec(color=(49,125,237),
                                                                               thickness=2, circle_radius=2))

    if display:
            
        fig, axes = plt.subplots(1, 1, figsize=[16, 8])
        # plt.subplot(121);plt.imshow(image[:,:,::-1]);plt.title("Input Image");plt.axis('off')
        # plt.subplot(122);plt.imshow(original_image[:,:,::-1]);plt.title("Pose detected Image");plt.axis('off')
        axes.imshow(image[:,:,::-1])
        # axes.imshow(image)
        axes.set_title("Pose detected Image")
        axes.axis('off')
        plt.show()

        return image, resultant

    else:
        
        return image, resultant

In [169]:
show_between = (1130, 1200)
out_folder = video_out_folder
out_fname = 'poses_out.mp4'
video_in = video_path

pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5,
                          min_tracking_confidence=0.5)

# Load the video
cap = cv2.VideoCapture(video_in)

# Check if the video was successfully loaded
if not cap.isOpened():
    print("Error opening the video file. Exiting...")
    exit()

w = int(cap.get(3))
h = int(cap.get(4))
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_size = (w, h)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Video writer.
out = cv2.VideoWriter(os.path.join(out_folder, out_fname), fourcc, fps, frame_size)

# save in df
all_landmarks = lmPose._member_names_
lm_vars = ['x', 'y', 'z', 'visibility']
lm_df = pd.DataFrame(columns=[f'{m}_{v}' for m in all_landmarks for v in lm_vars])

count = -1

while cap.isOpened():
    count += 1
    if count < show_between[0]: continue
    elif count > show_between[1]: break

    # read next frame
    ret, image = cap.read()
    if not ret: break
    
    # print(count)
    # convert for pose detection
    image = cv2.cvtColor(
        image,
        cv2.COLOR_BGR2RGB
    )
    h, w = image.shape[:2]

    # Here we will read our image from the specified path to detect the pose
    # output = cv2.imread(image_path)
    image, results = detectPose(image, pose, draw=True, display=False, RGB=True)

    # # Convert the image back to BGR.
    # image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    poses_incl = ['LEFT_SHOULDER', 'RIGHT_SHOULDER']
    
    out.write(image)
    
    lm_df = pd.concat([lm_df, pd.Series([[np.nan] * lm_df.shape[1]])]).reset_index(drop=True)
    if results.pose_landmarks:

        for m in all_landmarks:
            for v in lm_vars:
                lm_df.loc[len(lm_df) - 1][f'{m}_x'] = results.pose_landmarks.landmark[getattr(lmPose, m)].x * w
                lm_df.loc[len(lm_df) - 1][f'{m}_y'] = results.pose_landmarks.landmark[getattr(lmPose, m)].y * h
                lm_df.loc[len(lm_df) - 1][f'{m}_z'] = results.pose_landmarks.landmark[getattr(lmPose, m)].z
                lm_df.loc[len(lm_df) - 1][f'{m}_viz'] = results.pose_landmarks.landmark[getattr(lmPose, m)].visibility

pose.close()
cap.release()
out.release()

lm_df.to_csv(os.path.join(out_folder, f'landmarks_{out_fname}.csv'))

add blurry side (with video error)

In [None]:
import ffmpeg
def add_blurred_bg():
    HEIGHT = 720 
    WIDTH = 1280
    in_file = ffmpeg.input('input.mp4')
    probe = ffmpeg.probe('input.mp4')
    video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
    iw=int(video_stream['width'])
    ih=int(video_stream['height'])
    nw = HEIGHT*iw/ih
    (
        ffmpeg
        .overlay(
            in_file.filter('scale', WIDTH, -2).crop(0,(WIDTH*HEIGHT/nw-HEIGHT)/2,WIDTH,HEIGHT).filter('gblur', sigma=40),
            in_file.filter('scale', -2, HEIGHT),
            x=(WIDTH-nw)/2
        )
        .output(in_file.audio, 'output.mp4')
        .run()
    )