In [None]:
#@title 必要なライブラリのインストール

!pip install -q ultralytics
!git clone https://github.com/edihbrandon/RictyDiminished.git

import colorsys
import os
import random
import re

import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import ImageFont
from pydantic import BaseModel
from tqdm.notebook import tqdm
from ultralytics import YOLO

fatal: destination path 'RictyDiminished' already exists and is not an empty directory.


In [None]:
#@title Google Driveに接続

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


下記のURLから動画ファイルをダウンロードしてください。サイズは1080×1920にしてください。ダウンロード後、Google Colabにアップロードしてください。



**[動画のURL](https://pixabay.com/ja/videos/%E5%AD%A6%E7%94%9F-%E6%95%99%E8%82%B2-%E5%AD%A6%E6%A0%A1-%E8%AA%AD%E3%82%80-%E5%AD%A6%E3%81%B6-215472/)**


> <a href="https://pixabay.com/ja//?utm_source=link-attribution&utm_medium=referral&utm_campaign=video&utm_content=215472">Pixabay</a>が提供する<a href="https://pixabay.com/ja/users/vacampbe-44247746/?utm_source=link-attribution&utm_medium=referral&utm_campaign=video&utm_content=215472">Virginia Campbell</a>の動画



In [None]:
video_dir = "/content/drive/MyDrive/Pixabay" # @param {type:"string"}
video_filename = "215472_small.mp4" # @param {type:"string"}

In [None]:
#@title Configオブジェクト（設定値を格納するオブジェクト）

class Config(BaseModel):
    video_dir: str
    video_filename: str

    @property
    def video_src_path(self):
        return os.path.join(self.video_dir, self.video_filename)

In [None]:
#@title BBoxの定義

PATH_FONT = "/content/RictyDiminished/RictyDiminishedDiscord-Regular.ttf"
FONT_CACHE: dict[int, ImageFont.FreeTypeFont] = {}
CHAR_CACHE: dict[str, dict[int, tuple[bool, tuple[int, int, int, int], np.ndarray]]] = {}
DEFAULT_TEXT_COLOR = (0, 0, 255)
DEFAULT_PADDING_TOP = 20
TEXT_OFFSET = 5

class BBox(BaseModel):
    left: float
    top: float
    right: float
    bottom: float
    score: float
    label: int

    @property
    def bottom_center(self) -> tuple[float, float]:
        return ((self.left + self.right) / 2, self.bottom)

    def draw(self, img: np.ndarray, classnames: dict[int, str], color_mapping: dict[str, tuple[int, int, int]], alert: bool, another_text_list: list[tuple[str, tuple[int, int, int]]]):
        color = (DEFAULT_TEXT_COLOR if alert else color_mapping.get(classnames[self.label], DEFAULT_TEXT_COLOR))

        # Draw bounding box
        cv2.rectangle(
            img,
            pt1=(int(self.left), int(self.top)),
            pt2=(int(self.right), int(self.bottom)),
            color=color,
            thickness=2,
        )

        # Draw label text
        self._put_text(img, classnames.get(self.label, 'not found'), color, 0)

        # Draw additional texts
        padding_top = DEFAULT_PADDING_TOP
        for text, text_color in another_text_list:
            padding_top += self._put_text(img, text, text_color, padding_top)

    def _put_text(self, img: np.ndarray, text: str, color: tuple[int, int, int], padding_top: int) -> int:
        text = re.sub(r"[\t\n\r]", "", text)
        h, w, *_ = img.shape
        d = img.ndim
        offset = TEXT_OFFSET
        max_height = 0

        for char in text:
            jp, bbox, mask = self._get_char_mask(char, size=20)
            li, ti, ri, bi = (
                int(self.left) + bbox[0] + offset,
                int(self.top) + bbox[1] + padding_top,
                int(self.left) + bbox[2] + offset,
                int(self.top) + bbox[3] + padding_top,
            )

            offset += bbox[2] - bbox[0]
            max_height = max(max_height, bbox[3] - bbox[1])

            # Adjust clipping
            lm, tm, rm, bm = 0, 0, ri - li, bi - ti
            li, lm = max(0, li), lm - min(0, li)
            ti, tm = max(0, ti), tm - min(0, ti)
            ri, rm = min(w, ri), rm - max(0, ri - w)
            bi, bm = min(h, bi), bm - max(0, bi - h)

            if (ri - li) <= 0 or (bi - ti) <= 0:
                continue

            # Apply mask to image
            if d == 3:
                img[ti:bi, li:ri] = img[ti:bi, li:ri] * (1 - mask[tm:bm, lm:rm, np.newaxis]) + color * mask[tm:bm, lm:rm, np.newaxis]
            else:
                img[ti:bi, li:ri] = img[ti:bi, li:ri] * (1 - mask[tm:bm, lm:rm]) + color * mask[tm:bm, lm:rm]

        return max_height

    @staticmethod
    def _get_char_mask(char: str, size: int):
        def _get_font(size: int) -> ImageFont.FreeTypeFont:
            if size not in FONT_CACHE:
                FONT_CACHE[size] = ImageFont.truetype(PATH_FONT, size)
            return FONT_CACHE[size]

        if char not in CHAR_CACHE:
            CHAR_CACHE[char] = {}

        if size not in CHAR_CACHE[char]:
            font = _get_font(size)
            bbox = font.getbbox(char)
            mask = np.asarray(font.getmask(char, "L"), dtype=np.float32).reshape(bbox[3] - bbox[1], bbox[2] - bbox[0]) / 255
            CHAR_CACHE[char][size] = (not char.isascii(), bbox, mask)

        return CHAR_CACHE[char][size]

In [None]:
#@title Skeltonの定義

_NUM_JOINTS = 17
_EDGES = [
    [0, 1],
    [0, 2],
    [1, 3],
    [2, 4],
    [3, 5],
    [4, 6],
    [5, 6],
    [5, 7],
    [7, 9],
    [6, 8],
    [8, 10],
    [5, 11],
    [6, 12],
    [11, 12],
    [11, 13],
    [13, 15],
    [12, 14],
    [14, 16],
]
_EC = [
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 195),
    (195, 255, 1),
    (195, 255, 1),
    (0, 255, 0),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 195),
    (195, 255, 1),
    (195, 255, 1),
    (0, 255, 0),
    (0, 255, 0),
]
_COLORS_HP = [
    (195, 255, 195),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
    (195, 255, 1),
    (0, 255, 0),
]

class Skeleton:
    def __init__(self, key_points: np.ndarray, score: np.ndarray):
        self.key_points: np.ndarray = key_points
        self.score: np.ndarray = score

    def draw(self, img: np.ndarray):
        points = self.key_points

        for j in range(_NUM_JOINTS):
            cv2.circle(img, (points[j, 0], points[j, 1]), 5, _COLORS_HP[j], -1)

        for j, e in enumerate(_EDGES):
            if points[e].min() <= 0:
                continue
            cv2.line(
                img,
                (points[e[0], 0], points[e[0], 1]),
                (points[e[1], 0], points[e[1], 1]),
                _EC[j],
                2,
                lineType=cv2.LINE_AA,
            )

In [None]:
#@title Detectionオブジェクト（結果を格納するオブジェクト）

class Detection:
    def __init__(self, bbox: BBox, skeleton: Skeleton | None):
        self.bbox: BBox = bbox
        self.skeleton: Skeleton | None = skeleton
        self.alert = False
        self.another_text_list = []


    def draw(self, img, classnames, color_mapping):
        """Bounding boxとSkeletonを描画するメソッド"""
        self.bbox.draw(img, classnames, color_mapping, self.alert, self.another_text_list)

        if isinstance(self.skeleton, Skeleton):
            self.skeleton.draw(img)

    @classmethod
    def postprocess(cls, result) -> "Detection":
        """推論結果を受け取り、Detectionインスタンスを生成する"""
        bbox = cls._create_bbox(result)
        skeleton = cls._create_skeleton(result) if result.keypoints is not None else None
        return cls(bbox, skeleton)

    @staticmethod
    def _create_bbox(result) -> BBox:
        """Bboxオブジェクトを生成"""
        xyxy = result.boxes.xyxy.cpu().numpy()[0]
        return BBox(
            left=int(xyxy[0]),
            top=int(xyxy[1]),
            right=int(xyxy[2]),
            bottom=int(xyxy[3]),
            score=float(result.boxes.conf.cpu().numpy()[0]),
            label=int(result.boxes.cls.cpu().numpy()[0])
        )

    @staticmethod
    def _create_skeleton(result) -> Skeleton:
        """Skeletonオブジェクトを生成"""
        keypoints = result.keypoints.data.cpu().numpy()[0]
        return Skeleton(
            key_points=keypoints[:, :2].astype(int),
            score=keypoints[:, 2]
        )

In [None]:
#@title Driftingオブジェクト（フレームごとの結果を保存するオブジェクト）

class Drifting:
    def __init__(self, img, count, detections = []):
        self.img: np.ndarray = img
        self.result_img: np.ndarray = img.copy()
        self.count: int = count
        self.detections: list[Detection] = detections

In [None]:
#@title 動画読み込みエレメント

class VideoSrc:
    def __init__(self, config: Config):
        self.frame_num = 0
        self.video_src_path = config.video_src_path
        self.cap = cv2.VideoCapture(self.video_src_path)
        self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        self.fps = round(self.cap.get(cv2.CAP_PROP_FPS))
        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))

    def read(self) -> Drifting | None:
        _, img = self.cap.read()

        if _ is False:
            return None

        drifting = Drifting(
            img,
            self.frame_num,
            []
        )
        print(f"\rcount: {self.frame_num} / {self.frame_count}", end="")
        self.frame_num += 1
        return drifting

    def reload(self):
        self.cap = cv2.VideoCapture(self.video_src_path)
        self.frame_num = 0

    def release(self):
        self.frame_num = 0
        self.cap.release()

In [None]:
#@title 動画書き込みエレメント

class VideoSink:
    def __init__(self, src: VideoSrc):
        self.output_path = self.generate_output_path(src.video_src_path)
        fmt = cv2.VideoWriter_fourcc("m","p","4","v")
        frame_rate = src.fps
        size = (src.width, src.height)
        self.writer = cv2.VideoWriter(self.output_path, fmt, frame_rate, size)

    def write(self, drifting: Drifting) -> Drifting:
        self.writer.write(drifting.result_img)
        return drifting

    def release(self):
        self.writer.release()

    @staticmethod
    def generate_output_path(file_path: str) -> str:
        file_name_without_ext, _ = os.path.splitext(file_path)
        new_file_path = f"{file_name_without_ext}_out.mp4"
        return new_file_path

In [None]:
#@title 姿勢推定エレメント

class KeyPointDetector:
    def __init__(self, conf=0.25):
        self.model = YOLO('yolov8n-pose.pt')
        self.classnames = self.model.names
        self.confidence_threshold = conf


    def predict(self, drifting: Drifting) -> Drifting:
        results = self.model.predict(drifting.img, conf=self.confidence_threshold)
        drifting.detections += [Detection.postprocess(result) for result in results[0]]
        return drifting

In [None]:
#@title 結果の描画エレメント

class DetectionRenderer:
    def __init__(self, classnames):
        self.GOLDEN_RATIO = 0.618033988749895
        self.classnames = classnames
        self.color_mapping = {
            element: (0, 255, 0) if element == "person" else self.get_color(idx)
            for idx, element in enumerate(self.classnames.values())
        }

    def draw(self, drifting: Drifting) -> Drifting:
        for det in drifting.detections:
            det.draw(drifting.result_img, self.classnames, self.color_mapping)

        return drifting

    def get_color(self, idx: int, s: float = 0.8, vmin: float = 0.7) -> tuple[int, int, int]:
        h = np.fmod(idx * self.GOLDEN_RATIO, 1.0)
        v = 1.0 - np.fmod(idx * self.GOLDEN_RATIO, 1.0 - vmin)
        r, g, b = colorsys.hsv_to_rgb(h, s, v)
        return (int(255 * b), int(255 * g), int(255 * r))

In [None]:
#@title 挙手判定エレメント

class RaiseHandWatcher:
   """
   人物の挙手動作を検出するクラス
   骨格情報から手首と肩の位置関係を分析して挙手を判定する
   """
   def __init__(self):
       """初期化メソッド"""
       pass

   def detect(self, drifting):
       """
       検出された人物が挙手しているかを判定するメソッド

       Args:
           drifting: 検出結果と骨格情報を含むDriftingオブジェクト

       Returns:
           Drifting: アラート情報を追加したDriftingオブジェクト

       Note:
           挙手の判定基準：
           - 左右どちらかの手首が肩の高さより上にある
           - 手首の座標が画像内に存在する
       """
       RED = (0, 0, 255)  # BGR形式での赤色

       # 各検出人物について処理
       for det in drifting.detections:
           # 骨格情報がない場合はスキップ
           if det.skeleton is None:
               continue

           # 左右の手首の座標を取得
           left_wrist = det.skeleton.key_points[9]   # 左手首
           right_wrist = det.skeleton.key_points[10] # 右手首

           # 両肩のY座標の平均を計算（肩の高さの基準とする）
           shoulder_h = (det.skeleton.key_points[5][1] + det.skeleton.key_points[6][1]) / 2

           # 挙手判定
           # 1. 両手首が画像内にあること
           # 2. どちらかの手首が肩より上にあること
           if (self.is_in_image_shape(drifting.img, left_wrist) and
               self.is_in_image_shape(drifting.img, right_wrist) and
               (left_wrist[1] < shoulder_h or right_wrist[1] < shoulder_h)):
               # 挙手していると判定
               det.alert = True
               det.another_text_list.append(
                   ("挙手しています", RED)
               )
           else:
               # 挙手していないと判定
               det.alert = False

       return drifting

   @staticmethod
   def is_in_image_shape(img: np.ndarray, point: tuple) -> bool:
       """
       指定された点が画像の範囲内にあるかを判定する静的メソッド

       Args:
           img: 対象の画像（NumPy配列）
           point: 判定する点の座標 (x, y)

       Returns:
           bool: 点が画像範囲内にある場合True、そうでない場合False

       Note:
           - x座標は0から画像の幅-1の範囲内
           - y座標は0から画像の高さ-1の範囲内
       """
       return (0 <= point[0] < img.shape[1]) and (0 <= point[1] < img.shape[0])

In [None]:
# 基本設定の初期化
config = Config(
    video_dir=video_dir,          # 動画ファイルのディレクトリ
    video_filename=video_filename  # 動画ファイル名
)

In [None]:
# 動画の入出力を準備
video_src = VideoSrc(config)              # 入力動画の読み込み
video_sink = VideoSink(video_src)         # 出力動画の設定

# 解析モジュールの準備
video_src = VideoSrc(config)
video_sink = VideoSink(video_src)
keypoint_detector = KeyPointDetector(conf=0.5)
detection_renderer = DetectionRenderer(keypoint_detector.classnames)
raise_hand_watcher = RaiseHandWatcher()

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n-pose.pt to 'yolov8n-pose.pt'...


100%|██████████| 6.52M/6.52M [00:00<00:00, 115MB/s]


In [None]:
# フレームごとの処理を開始
for _ in tqdm(range(video_src.frame_count)):
    # 1. フレームの読み込み
    drifting = video_src.read()
    if drifting is None:  # 動画終了のチェック
        break

    # 2. 姿勢推定の実行
    drifting = keypoint_detector.predict(drifting)

    # 3. 挙手判定
    drifting = raise_hand_watcher.detect(drifting)

    # 4. 検出結果の描画
    drifting = detection_renderer.draw(drifting)

    # 5. 結果の書き出し
    drifting = video_sink.write(drifting)

# 6. 終了処理
video_sink.release()

  0%|          | 0/182 [00:00<?, ?it/s]

count: 0 / 182
0: 640x384 2 persons, 114.6ms
Speed: 18.6ms preprocess, 114.6ms inference, 1155.9ms postprocess per image at shape (1, 3, 640, 384)
count: 1 / 182
0: 640x384 2 persons, 14.8ms
Speed: 3.6ms preprocess, 14.8ms inference, 2.6ms postprocess per image at shape (1, 3, 640, 384)
count: 2 / 182
0: 640x384 2 persons, 11.6ms
Speed: 3.2ms preprocess, 11.6ms inference, 2.4ms postprocess per image at shape (1, 3, 640, 384)
count: 3 / 182
0: 640x384 2 persons, 13.6ms
Speed: 4.3ms preprocess, 13.6ms inference, 2.1ms postprocess per image at shape (1, 3, 640, 384)
count: 4 / 182
0: 640x384 2 persons, 11.6ms
Speed: 3.2ms preprocess, 11.6ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 384)
count: 5 / 182
0: 640x384 2 persons, 12.1ms
Speed: 3.5ms preprocess, 12.1ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 384)
count: 6 / 182
0: 640x384 2 persons, 23.4ms
Speed: 3.9ms preprocess, 23.4ms inference, 2.6ms postprocess per image at shape (1, 3, 640, 384)
count: