## import

In [2]:
import cv2
import torch
import numpy as np
import multiprocessing
from ultralytics import YOLO
import subprocess
from concurrent.futures import ThreadPoolExecutor

## setting

In [3]:
import os
from dotenv import load_dotenv
# .envファイルを読み込む
load_dotenv()

INPUT_VIDEO_PATH = os.getenv("INPUT_VIDEO_PATH", "default_input.mp4")
OUTPUT_VIDEO_PATH = INPUT_VIDEO_PATH + "_edit.mp4"

In [None]:
print(INPUT_VIDEO_PATH)

In [None]:
cv2.setNumThreads(0)  # OpenCVのスレッド管理を最適化

# M3 MacのGPUを使用
device = torch.device("mps")

# YOLOモデルの読み込み（FP16でメモリ最適化）
model = YOLO("yolov8s-face-lindevs.pt").to(device).half()

# 入出力ファイル設定
input_video = INPUT_VIDEO_PATH
processed_video = OUTPUT_VIDEO_PATH  # 音声なし映像
output_video = OUTPUT_VIDEO_PATH + "_c.mp4"  # 最終的な音声付き動画

# 動画の読み込み（ハードウェアアクセラレーションを有効化）
cap = cv2.VideoCapture(input_video, cv2.CAP_FFMPEG)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# 出力動画の設定
fourcc = cv2.VideoWriter_fourcc(*'h264')  # H.264 コーデック
out = cv2.VideoWriter(processed_video, fourcc, fps, (width, height))

batch_size = 20  # バッチサイズを拡張
frames = []
previous_faces = []
frame_count = 0
detection_interval = 5  # 5フレームごとに顔を検出

def frosted_glass_blur(image, x1, y1, x2, y2, ksize=15):
    """すりガラス風のぼかし処理（Gaussian Blur）"""
    face = image[y1:y2, x1:x2]
    blurred_face = cv2.GaussianBlur(face, (ksize, ksize), 0)
    np.copyto(image[y1:y2, x1:x2], blurred_face)

executor = ThreadPoolExecutor(max_workers=4)

def process_frames(frames):
    """フレームを並列処理する関数"""
    results = model(frames, stream=True, verbose=False)
    face_locations = []
    
    for i, result in enumerate(results):
        faces = []
        for box in result.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            faces.append((x1, y1, x2, y2))
            frosted_glass_blur(frames[i], x1, y1, x2, y2, ksize=21)  # ぼかし強度調整
        face_locations.append(faces)
        out.write(frames[i])

    return face_locations

# フレーム処理ループ
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frames.append(frame)

    if frame_count % detection_interval == 0:
        future = executor.submit(process_frames, frames)
        previous_faces = future.result()
    else:
        for faces in previous_faces:
            for x1, y1, x2, y2 in faces:
                frosted_glass_blur(frames[0], x1, y1, x2, y2, ksize=21)
        out.write(frames[0])

    frames = []
    frame_count += 1

cap.release()
out.release()
print(f"映像処理が完了しました: {processed_video}")

# **音声をffmpegで合成**
subprocess.run(["ffmpeg", "-i", processed_video, "-i", input_video, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", output_video], check=True)

print(f"音声付きの最終動画が作成されました: {output_video}")

In [None]:
import cv2
import torch
import numpy as np
import subprocess
from ultralytics import YOLO
from concurrent.futures import ThreadPoolExecutor

cv2.setNumThreads(0)  # OpenCVのスレッド管理を最適化

device = torch.device("mps")  # M3 MacのGPUを使用
model = YOLO("yolov8s-face-lindevs.pt").to(device).half()

# 入出力ファイル設定
input_video = INPUT_VIDEO_PATH
processed_video = OUTPUT_VIDEO_PATH  # 音声なし映像
output_video = OUTPUT_VIDEO_PATH + "_c.mp4"  # 最終的な音声付き動画

# 動画の読み込み
cap = cv2.VideoCapture(input_video, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)

fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'h264')
out = cv2.VideoWriter(processed_video, fourcc, fps, (width, height))

batch_size = 20  # バッチサイズ
frames = []
previous_faces = []
frame_count = 0

detection_interval = 5  # 5フレームごとに顔検出

def frosted_glass_blur(image, x1, y1, x2, y2, ksize=25, padding=10):
    """ すりガラス風のガウスぼかし処理 (顔の範囲を広めに適用) """
    x1, y1 = max(0, x1 - padding), max(0, y1 - padding)
    x2, y2 = min(image.shape[1], x2 + padding), min(image.shape[0], y2 + padding)
    face = image[y1:y2, x1:x2]
    blurred_face = cv2.GaussianBlur(face, (ksize, ksize), 0)
    np.copyto(image[y1:y2, x1:x2], blurred_face)

def process_frames(frames):
    """ フレームを並列処理する関数 """
    with torch.no_grad():
        results = model(frames, stream=True, verbose=False)
    
    face_locations = []
    for i, result in enumerate(results):
        faces = []
        for box in result.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            faces.append((x1, y1, x2, y2))
            frosted_glass_blur(frames[i], x1, y1, x2, y2, ksize=25, padding=10)
        face_locations.append(faces)
        out.write(frames[i])
    return face_locations

executor = ThreadPoolExecutor(max_workers=4)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frames.append(frame)
    
    if frame_count % detection_interval == 0:
        future = executor.submit(process_frames, frames)
        previous_faces = future.result()
    else:
        for faces in previous_faces:
            for x1, y1, x2, y2 in faces:
                frosted_glass_blur(frames[0], x1, y1, x2, y2, ksize=25, padding=10)
        out.write(frames[0])
    
    frames = []
    frame_count += 1

cap.release()
out.release()

# **音声をffmpegで合成**
subprocess.run(["ffmpeg", "-i", processed_video, "-i", input_video, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", output_video], check=True)

print(f"音声付きの最終動画が作成されました: {output_video}")


ffmpeg使わない版にしたい
GPTにもう一回動画をUPLOADして試してみよう

In [None]:
import cv2
import torch
import numpy as np
import subprocess
from ultralytics import YOLO
from concurrent.futures import ThreadPoolExecutor

cv2.setNumThreads(0)  # OpenCVのスレッド管理を最適化

device = torch.device("mps")  # M3 MacのGPUを使用
model = YOLO("yolov8s-face-lindevs.pt").to(device).half()

# 入出力ファイル設定
input_video = INPUT_VIDEO_PATH
processed_video = OUTPUT_VIDEO_PATH  # 音声なし映像

# 動画の読み込み
cap = cv2.VideoCapture(input_video, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)

fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'h264')
out = cv2.VideoWriter(processed_video, fourcc, fps, (width, height))

batch_size = 20  # バッチサイズ
frames = []
previous_faces = []
frame_count = 0

detection_interval = 5  # 5フレームごとに顔検出

def frosted_glass_blur(image, x1, y1, x2, y2, ksize=25, padding=10):
    """ すりガラス風のガウスぼかし処理 (顔の範囲を広めに適用) """
    x1, y1 = max(0, x1 - padding), max(0, y1 - padding)
    x2, y2 = min(image.shape[1], x2 + padding), min(image.shape[0], y2 + padding)
    face = image[y1:y2, x1:x2]
    blurred_face = cv2.GaussianBlur(face, (ksize, ksize), 0)
    np.copyto(image[y1:y2, x1:x2], blurred_face)

def process_frames(frames):
    """ フレームを並列処理する関数 """
    with torch.no_grad():
        results = model(frames, stream=True, verbose=False)
    
    face_locations = []
    for i, result in enumerate(results):
        faces = []
        for box in result.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            faces.append((x1, y1, x2, y2))
            frosted_glass_blur(frames[i], x1, y1, x2, y2, ksize=25, padding=10)
        face_locations.append(faces)
        out.write(frames[i])
    return face_locations

executor = ThreadPoolExecutor(max_workers=4)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frames.append(frame)
    
    if frame_count % detection_interval == 0:
        future = executor.submit(process_frames, frames)
        previous_faces = future.result()
    else:
        for faces in previous_faces:
            for x1, y1, x2, y2 in faces:
                frosted_glass_blur(frames[0], x1, y1, x2, y2, ksize=25, padding=10)
        out.write(frames[0])
    
    frames = []
    frame_count += 1

cap.release()
out.release()

print(f"映像処理が完了しました: {processed_video}")


In [None]:
import cv2
import torch
import numpy as np
from ultralytics import YOLO
from concurrent.futures import ThreadPoolExecutor

cv2.setNumThreads(0)  # OpenCVのスレッド管理を最適化

device = torch.device("mps")  # M3 MacのGPUを使用
model = YOLO("yolov8s-face-lindevs.pt").to(device).half()

# 入出力ファイル設定
input_video = INPUT_VIDEO_PATH
processed_video = OUTPUT_VIDEO_PATH  # 音声なし映像

# 動画の読み込み
cap = cv2.VideoCapture(input_video, cv2.CAP_FFMPEG)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'h264')
out = cv2.VideoWriter(processed_video, fourcc, fps, (width, height))

detection_interval = 1  # フレーム間隔を縮小
padding = 20  # モザイク範囲を拡張
batch_size = 16  # バッチサイズを調整

def frosted_glass_blur(image, x1, y1, x2, y2, ksize=25, padding=20):
    """ すりガラス風のガウスぼかし処理 """
    x1, y1 = max(0, x1 - padding), max(0, y1 - padding)
    x2, y2 = min(image.shape[1], x2 + padding), min(image.shape[0], y2 + padding)
    face = image[y1:y2, x1:x2]
    if face.size > 0:
        blurred_face = cv2.GaussianBlur(face, (ksize, ksize), 0)
        np.copyto(image[y1:y2, x1:x2], blurred_face)

def process_frame(frame):
    """ 単一フレームを処理する関数 """
    results = model(frame, verbose=False)[0]
    for box in results.boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        frosted_glass_blur(frame, x1, y1, x2, y2, ksize=25, padding=20)
    return frame

frame_count = 0
executor = ThreadPoolExecutor(max_workers=4)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # 顔検出間隔を調整
    if frame_count % detection_interval == 0:
        frame = process_frame(frame)

    out.write(frame)
    frame_count += 1

cap.release()
out.release()

print(f"映像処理が完了しました: {processed_video}")

メモ：  
3フレームごとに検出：02:50  
毎フレームごとに検出：06:36　でもクオリティ高い

% poetry add deep-sort-realtime

トラッキング機能を導入するため

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# GPU設定
device = "mps"  # Mac M3用
model = YOLO("yolov8s-face-lindevs.pt").to(device).half()

# DeepSORTトラッカーの初期化
tracker = DeepSort(max_age=30, n_init=3, nn_budget=70)

# 動画設定
input_video = INPUT_VIDEO_PATH
output_video = OUTPUT_VIDEO_PATH

cap = cv2.VideoCapture(input_video)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'h264')
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

# モザイク処理関数
def apply_mosaic(image, x1, y1, x2, y2, pixel_size=15):
    """ モザイク処理を適用 """
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2)
    
    if x1 >= x2 or y1 >= y2:  # 無効な領域のチェック
        return

    face = image[y1:y2, x1:x2]
    if face.size > 0:
        face = cv2.resize(face, (pixel_size, pixel_size), interpolation=cv2.INTER_LINEAR)
        face = cv2.resize(face, (x2 - x1, y2 - y1), interpolation=cv2.INTER_NEAREST)
        image[y1:y2, x1:x2] = face

# フレーム処理ループ
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # YOLOで顔検出
    results = model(frame, verbose=False)[0]
    detections = []
    for box in results.boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        score = float(box.conf[0])
        if score > 0.5:  # 信頼度スコアがしきい値を超える場合
            detections.append([(x1, y1, x2, y2), score])

    # トラッキング
    tracked_objects = tracker.update_tracks(detections, frame=frame)

    # モザイク処理
    for track in tracked_objects:
        if not track.is_confirmed():
            continue  # 確定していないトラックはスキップ

        x1, y1, x2, y2 = map(int, track.to_tlbr())  # bbox座標を取得
        apply_mosaic(frame, x1, y1, x2, y2)  # モザイク適用

    # フレームの書き込み
    out.write(frame)

cap.release()
out.release()

print(f"映像処理が完了しました: {output_video}")

なんと15:11もかかった上にモザイクはぜんぜん当てにならない

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# GPU設定
device = "mps"  # Mac M3用
model = YOLO("yolov8s-face-lindevs.pt").to(device).half()

# DeepSORTトラッカーの初期化
tracker = DeepSort(max_age=30, n_init=3, nn_budget=70)

# 動画設定
input_video = INPUT_VIDEO_PATH
output_video = OUTPUT_VIDEO_PATH

cap = cv2.VideoCapture(input_video)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'h264')
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

# モザイク処理関数
def apply_mosaic(image, x1, y1, x2, y2, pixel_size=15):
    """ モザイク処理を適用 """
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2)
    
    if x1 >= x2 or y1 >= y2:  # 無効な領域のチェック
        return

    face = image[y1:y2, x1:x2]
    if face.size > 0:
        face = cv2.resize(face, (pixel_size, pixel_size), interpolation=cv2.INTER_LINEAR)
        face = cv2.resize(face, (x2 - x1, y2 - y1), interpolation=cv2.INTER_NEAREST)
        image[y1:y2, x1:x2] = face

# フレーム処理ループ
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # YOLOで顔検出
    results = model(frame, verbose=False)[0]
    detections = []
    for box in results.boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        score = float(box.conf[0])
        if score > 0.5:  # 信頼度スコアがしきい値を超える場合
            detections.append([(x1, y1, x2, y2), score])

    # トラッキング
    tracked_objects = tracker.update_tracks(detections, frame=frame)

    # モザイク処理
    for track in tracked_objects:
        if not track.is_confirmed() or track.time_since_update > 1:
            continue  # 確定していないトラックや更新されていないトラックはスキップ

        x1, y1, x2, y2 = map(int, track.to_tlbr())  # bbox座標を取得
        apply_mosaic(frame, x1, y1, x2, y2)  # モザイク適用

    # フレームの書き込み
    out.write(frame)

cap.release()
out.release()

print(f"映像処理が完了しました: {output_video}")

15:18かかる。割にダメ。トラッキングは一旦諦める。