In [41]:
import os
import pickle as pkl

import mediapipe as mp
import numpy as np
import cv2


In [42]:
# data options

RESOLUTION = (600, 600)
FRAME_RATE = 30

INPUT_PATH="./input/"
OUTPUT_PATH="./"

MODEL_FILE = "pose_landmarker_full.task"
MP_DATA_FILE = "mp_data.pkl"
VIDEO_FILE = "output.mp4"

# model options
TRESHOLD = 0.05
CUTOFF = 0.85

In [None]:
# mediapipe options

# LANDMARK_POINTS = np.int32(33)
LANDMARK_POINTS = np.int32(11) # other pose points are removed, we focus on the face only (including other points may cause increased detection of movement)
LANDMARK_DIM = np.int32(3)

BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

model_path = os.path.join(os.path.abspath(OUTPUT_PATH), MODEL_FILE)

MP_OPTIONS = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.IMAGE,
    num_poses=1,
    min_pose_detection_confidence=0.65, # The minimum confidence score for the pose detection to be considered successful.
    min_pose_presence_confidence=0.75, # The minimum confidence score of pose presence score in the pose landmark detection.
    min_tracking_confidence=0.45, # The minimum confidence score for the pose tracking to be considered successful.
)

In [43]:
input_path = os.path.abspath(INPUT_PATH)

image_files = sorted(os.listdir(input_path))
image_files = np.array(list(map(lambda x: os.path.join(input_path, x), filter(lambda x: x.endswith(".jpg"), image_files))))
image_files[:5]

array(['/home/kappa/dev/computer-vision/proj/input/frame_0001.jpg',
       '/home/kappa/dev/computer-vision/proj/input/frame_0002.jpg',
       '/home/kappa/dev/computer-vision/proj/input/frame_0003.jpg',
       '/home/kappa/dev/computer-vision/proj/input/frame_0004.jpg',
       '/home/kappa/dev/computer-vision/proj/input/frame_0005.jpg'],
      dtype='<U58')

In [44]:
def parse_images(files):

    with PoseLandmarker.create_from_options(MP_OPTIONS) as landmarker:

        results = []

        for i, file in enumerate(files):
            mp_image = mp.Image.create_from_file(file)

            result = landmarker.detect(mp_image)

            results.append(result)

            if i % 1000 == 0:
                print(f"Processed {i} images")

    return results

In [45]:
output_path = os.path.join(os.path.abspath(OUTPUT_PATH), MP_DATA_FILE)

if os.path.exists(output_path):
    with open(output_path, "rb") as f:
        mp_data = pkl.load(f)

else:
    mp_data = parse_images(image_files)

    with open(output_path, "wb") as f:
        pkl.dump(mp_data, f)


In [46]:
# restructure the data
mp_data = list(map(lambda x: x.pose_landmarks[0][:LANDMARK_POINTS] if len(x.pose_landmarks) == 1 else None, mp_data))

In [48]:
def calc_distance(p1, p2):
    return np.linalg.norm(np.array(p1) - np.array(p2))

In [49]:
movements = np.zeros((len(mp_data) - 1, LANDMARK_POINTS), dtype=np.float32)

for i in range(1, len(mp_data) - 1):
    if mp_data[i] is None or mp_data[i - 1] is None:
        movements[i - 1] = np.nan
    else:
        for j in range(LANDMARK_POINTS):
            p1 = mp_data[i - 1][j]
            p2 = mp_data[i][j]

            movements[i - 1][j] = calc_distance([p1.x, p1.y, p1.z], [p2.x, p2.y, p2.z])



# for every movement, calculate the percentage of how many points moved less than the threshold
movements_below_threshold = np.sum(movements < TRESHOLD, axis=1, dtype=np.float32) / np.float32(LANDMARK_POINTS)

print(f"min: {np.min(movements_below_threshold):.6f}, max: {np.max(movements_below_threshold):.6f}, mean: {np.mean(movements_below_threshold):.6f}, median: {np.median(movements_below_threshold):.6f}")
print(f"cnt_blw_cutoff: {np.sum(movements_below_threshold < CUTOFF)} (is {100 * np.sum(movements_below_threshold < CUTOFF) / len(movements_below_threshold):.6f}%)")


min: 0.000000, max: 1.000000, mean: 0.874821, median: 1.000000
cnt_blw_cutoff: 1454 (is 13.450509%)


In [50]:
def get_moving_route(p1: np.array, p2: np.array, frames):
    """
    Generates "frames" points between p1 and p2 to simulate a moving route.
    p1 and p2 are 3D points.
    """

    route = np.zeros((frames, LANDMARK_DIM))

    for i in range(frames):
        route[i] = p1 + (p2 - p1) * (i / frames)

    return route

In [51]:
recovery_frames = 3

correct_frames = np.ones(len(movements_below_threshold), dtype=bool)
last_correct_frames = recovery_frames

# assuming that first minimal_continue_frames frames are correct

for i in range(recovery_frames, len(movements_below_threshold)):
    if movements_below_threshold[i] > CUTOFF:
        last_correct_frames = np.max([last_correct_frames, last_correct_frames + 1])

        if last_correct_frames >= recovery_frames:
            correct_frames[i] = True
        else:
            correct_frames[i] = False
    else:
        correct_frames[i] = False
        last_correct_frames = 0

correct_frames[-1] = True


In [52]:
from copy import deepcopy

for i in range(recovery_frames, len(correct_frames)):
    if not correct_frames[i]:
        j = i + 1
        while True:
            if correct_frames[j]:
                break
            j += 1

        for k in range(i, j):
            mp_data[k] = deepcopy(mp_data[i - 1])
            correct_frames[k] = True

        for point in range(LANDMARK_POINTS):

            p1 = mp_data[i - 1][point]
            p2 = mp_data[j][point]

            route = get_moving_route(np.array([p1.x, p1.y, p1.z]), np.array([p2.x, p2.y, p2.z]), j - i)

            for k in range(i, j):
                mp_data[k][point].x = route[k - i][0]
                mp_data[k][point].y = route[k - i][1]
                mp_data[k][point].z = route[k - i][2]



In [53]:
from abc import ABC, abstractmethod

class DrawingTask(ABC):

    @abstractmethod
    def draw(self, frame_index, image, landmarks):
        raise NotImplementedError

In [54]:
class DrawMovementThreshold(DrawingTask):

    def draw(self, frame_i, image, _):
        if frame_i > 0:
            val = movements_below_threshold[frame_i - 1]
            cv2.putText(
                image,
                f"val: {val:.6f}",
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (255, 255, 255),
                2,
                cv2.LINE_AA,
            )
        
        return image
    

In [55]:
class DrawContours(DrawingTask):
    def draw(self, _, image, landmarks):
        if landmarks is None:
            cv2.putText(
                image,
                "None",
                (image.shape[1] // 2, image.shape[0] // 2),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (255, 255, 255),
                2,
                cv2.LINE_AA,
            )

        else:
            # draw circle in the middle of the face
            
            avg_x = np.mean([landmark.x for landmark in landmarks])
            avg_y = np.mean([landmark.y for landmark in landmarks])

            cv2.circle(
                image,
                (int(avg_x * image.shape[1]), int(avg_y * image.shape[0])),
                5,
                (0, 255, 0),
                -1,
            )

            # below code may be used to draw all landmarks if all points are included

            # landmarks_proto = landmark_pb2.NormalizedLandmarkList()
            # landmarks_proto.landmark.extend(
            #     [landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in landmarks]
            # )

            # mp.solutions.drawing_utils.draw_landmarks(
            #     image=image,
            #     landmark_list=landmarks_proto,
            #     connections=mp.solutions.pose.POSE_CONNECTIONS,
            #     landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
            #         color=(255, 255, 255), thickness=2, circle_radius=2
            #     ),
            #     connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
            #         color=(49, 125, 237), thickness=2, circle_radius=2
            #     ),
            # )
        
        return image

In [56]:
draw_tasks = [
    DrawMovementThreshold(),
    DrawContours(),
]

In [57]:
images_skeletons = []

for frame_i, (image, landmarks) in enumerate(zip(image_files, mp_data)):

    image = cv2.imread(image, cv2.COLOR_BGR2RGB)

    for task in draw_tasks:
        image = task.draw(frame_i, image, landmarks)

    images_skeletons.append(image)

In [58]:
output_video_path = os.path.join(os.path.abspath(OUTPUT_PATH), VIDEO_FILE)

# delete the video if it already exists
if os.path.exists(output_video_path):
    os.remove(output_video_path)

width, height = RESOLUTION
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
video_writer = cv2.VideoWriter(output_video_path, fourcc, FRAME_RATE, (width, height))

print(f"Creating video: {output_video_path}")
print(f"Frame size: {width}x{height}, Frame rate: {FRAME_RATE} fps")

for image in images_skeletons:
    video_writer.write(image)

video_writer.release()
print(f"Video successfully saved to '{output_video_path}'.")

Creating video: /home/kappa/dev/computer-vision/proj/output.mp4
Frame size: 600x600, Frame rate: 30 fps
Video successfully saved to '/home/kappa/dev/computer-vision/proj/output.mp4'.
