# 视频检测追踪打码工具 - Google Colab 版 (整合版)

这是一个基于YOLO目标检测和OpenCV图像处理的Google Colab应用程序，
用于对视频中的特定对象进行检测和模糊处理（打码）。

功能特点：
- 支持YOLO模型加载
- 视频文件上传与处理
- 随机抽帧检测目标类别
- 选择特定类别进行模糊处理
- 可调节模糊强度
- 保持原视频音频

In [None]:
# @title 完整的视频打码处理流程 - 单元格整合版

# 1. 安装依赖
print("正在安装依赖...")
!pip install ultralytics opencv-python-headless -q

# 2. 导入库并检查GPU
import os
import cv2
import numpy as np
from ultralytics import YOLO
import subprocess
from datetime import datetime
import random
import torch
# from google.colab import files # Remove this line as files.upload() is no longer used

def check_gpu():
    if torch.cuda.is_available():
        print(f"检测到GPU: {torch.cuda.get_device_name(0)}")
        return "cuda"
    else:
        print("未检测到GPU，使用CPU处理")
        return "cpu"

device = check_gpu()

# 3. 输入模型和视频文件路径
model_path = "best.pt" #@param {type:"string"}
video_path = "01.mp4" #@param {type:"string"}

print(f"\n使用模型文件: {model_path}")
print(f"使用视频文件: {video_path}")

# 4. 定义处理函数
def get_video_info(video_path):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()

    print(f"视频总帧数: {total_frames}, FPS: {fps}, 尺寸: {width}x{height}")
    return total_frames, fps, width, height

def detect_classes_in_random_frames(model, video_path, num_samples=100):
    total_frames, _, _, _ = get_video_info(video_path)
    num_samples = min(num_samples, total_frames)
    indices = random.sample(range(total_frames), num_samples)
    print(f"随机抽取 {len(indices)} 帧进行检测...")

    cap = cv2.VideoCapture(video_path)
    detected_classes = set()

    for i, idx in enumerate(indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ok, frame = cap.read()
        if not ok:
            continue

        results = model(frame)[0]
        boxes_obj = results.boxes
        if hasattr(boxes_obj, "cls"):
            cls_arr = boxes_obj.cls.cpu().numpy()
            detected_classes.update(int(c) for c in cls_arr)

        if (i + 1) % 10 == 0:
            print(f"已处理 {i + 1}/{len(indices)} 帧")

    cap.release()

    all_classes = sorted(list(detected_classes))
    print(f"检测到类别: {[model.names.get(c) for c in all_classes]}")
    return all_classes

def process_video_with_blur(model, video_path, target_cls, blur_strength=15, output_path=None):
    total_frames, fps, width, height = get_video_info(video_path)

    if output_path is None:
        time_tag = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_path = video_path.replace(".mp4", f"_blurred_{time_tag}.mp4")

    # 临时无声视频
    temp_output_path = video_path.replace(".mp4", f"_temp_{time_tag}.mp4")

    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(temp_output_path, fourcc, fps, (width, height))

    frame_idx = 0
    cached_mask = None

    print(f"开始处理视频，目标类别={target_cls}，模糊强度={blur_strength}")

    while True:
        ok, frame = cap.read()
        if not ok:
            break

        if frame_idx % 3 == 0:  # 每3帧检测一次以提高性能
            results = model(frame)[0]
            boxes_obj = results.boxes
            if hasattr(boxes_obj, "cls"):
                cls_arr = boxes_obj.cls.cpu().numpy()
                xyxy_arr = boxes_obj.xyxy.cpu().numpy()
            else:
                cls_arr = []
                xyxy_arr = []

            cached_mask = []
            for cid, box_xy in zip(cls_arr, xyxy_arr):
                if int(cid) == target_cls:
                    x1, y1, x2, y2 = map(int, box_xy)
                    cached_mask.append((x1, y1, x2, y2))

        if cached_mask:
            for (x1, y1, x2, y2) in cached_mask:
                roi = frame[y1:y2, x1:x2]
                if roi.size > 0:
                    k = blur_strength * 4 + 1  # 高强度模糊
                    blur = cv2.GaussianBlur(roi, (k, k), 0)
                    frame[y1:y2, x1:x2] = blur

        out.write(frame)
        frame_idx += 1

        if frame_idx % 100 == 0:
            print(f"已处理 {frame_idx}/{total_frames} 帧")

    cap.release()
    out.release()

    # 合并音频生成最终视频
    print("合并原始音频...")
    cmd = [
        "ffmpeg", "-y",
        "-i", temp_output_path,
        "-i", video_path,
        "-c:v", "copy",
        "-c:a", "aac",
        "-map", "0:v:0",
        "-map", "1:a:0",
        output_path
    ]
    subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    os.remove(temp_output_path)

    print(f"最终输出: {output_path}")
    return output_path

# 5. 加载模型
if not os.path.exists(model_path):
    print(f"错误：未找到模型文件: {model_path}，请检查路径是否正确")
else:
    # 加载模型并设置到设备
    model = YOLO(model_path)
    model.to(device)
    print("模型加载完成！")

    # 6. 处理视频
    if not os.path.exists(video_path):
        print(f"错误：未找到视频文件: {video_path}，请检查路径是否正确")
    else:
        # 7. 随机抽帧检测类别
        detected_classes = detect_classes_in_random_frames(model, video_path, num_samples=50)

        if not detected_classes:
            print("未检测到任何对象类别")
        else:
            print("\n检测到的类别:")
            for i, cls_id in enumerate(detected_classes):
                class_name = model.names.get(cls_id, f"class {cls_id}")
                print(f"  {i}: {class_name} (ID: {cls_id})")

            # 8. 选择类别并处理视频
            target_class = detected_classes[0]  # 选择第一个检测到的类别
            class_name = model.names.get(target_class, f"class {target_class}")
            blur_strength = 15  # 模糊强度

            print(f"\n选择处理类别: {class_name} (ID: {target_class})")
            print(f"模糊强度: {blur_strength}")

            # 9. 处理视频
            output_path = process_video_with_blur(
                model=model,
                video_path=video_path,
                target_cls=target_class,
                blur_strength=blur_strength
            )

            print(f"\n视频处理完成: {output_path}")

            # 10. 下载结果
            print("\n准备下载处理后的视频...")
            from google.colab import files # Only import files for download
            files.download(output_path)