# YouTube 弹幕 OCR 处理工作流

本 Notebook 帮助你完成从 YouTube 下载带弹幕的视频、抽帧、OCR 识别、去重以及导出结构化数据的完整流程。请按照顺序运行每个单元，并在配置区修改参数以适配自己的项目需求。


## 安装依赖

- 运行下方命令安装所有需要的第三方库（重复运行也不会有副作用）。
- 请提前在本地或服务器环境中安装好 `ffmpeg`，并确认它已加入 `PATH`；可以通过以下命令检查：

```bash
ffmpeg -version
```

若 `ffmpeg` 缺失，抽帧步骤会失败。


In [1]:
%pip install -U yt-dlp paddlepaddle paddleocr opencv-python pandas matplotlib


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [12]:
# Install / verify ffmpeg availability
import os
import platform
import shutil
import subprocess
from pathlib import Path


def _run_command(cmd):
    pretty = " ".join(cmd)
    print(f"Running command: {pretty}")
    try:
        subprocess.run(cmd, check=True)
        return True
    except Exception as exc:
        print(f"Command failed: {exc}")
        return False


def ensure_ffmpeg():
    existing = shutil.which("ffmpeg")
    if existing:
        print(f"ffmpeg already available on PATH: {existing}")
        return existing

    print("ffmpeg is missing from PATH, trying automatic installation...")
    attempted = False

    if shutil.which("conda"):
        attempted = True
        if _run_command(["conda", "install", "-y", "-c", "conda-forge", "ffmpeg"]):
            existing = shutil.which("ffmpeg")
            if existing:
                return existing

    system = platform.system().lower()

    if system == "linux":
        if shutil.which("apt-get"):
            attempted = True
            _run_command(["sudo", "apt-get", "update"])
            if _run_command(["sudo", "apt-get", "install", "-y", "ffmpeg"]):
                existing = shutil.which("ffmpeg")
                if existing:
                    return existing
        if shutil.which("yum"):
            attempted = True
            if _run_command(["sudo", "yum", "install", "-y", "ffmpeg"]):
                existing = shutil.which("ffmpeg")
                if existing:
                    return existing

    elif system == "darwin":
        if shutil.which("brew"):
            attempted = True
            if _run_command(["brew", "install", "ffmpeg"]):
                existing = shutil.which("ffmpeg")
                if existing:
                    return existing

    elif system == "windows":
        if shutil.which("choco"):
            attempted = True
            if _run_command(["choco", "install", "ffmpeg", "-y"]):
                existing = shutil.which("ffmpeg")
                if existing:
                    return existing
        if shutil.which("winget"):
            attempted = True
            if _run_command(["winget", "install", "--id=Gyan.FFmpeg", "-e", "--source=winget"]):
                existing = shutil.which("ffmpeg")
                if existing:
                    return existing

    if not attempted:
        print("No supported package manager was found. Install ffmpeg manually from https://ffmpeg.org/download.html")
    else:
        print("Automatic installation attempts finished, but ffmpeg is still missing.")

    return shutil.which("ffmpeg")


ffmpeg_path = ensure_ffmpeg()
if ffmpeg_path:
    bin_dir = str(Path(ffmpeg_path).parent)
    print(f"ffmpeg ready at: {ffmpeg_path}")
    print("Checking current PATH entries...")
    entries = [p.strip() for p in os.environ.get("PATH", "").split(os.pathsep) if p]
    normalized_bin = str(Path(bin_dir).resolve())
    normalized_entries = []
    for entry in entries:
        try:
            normalized_entries.append(str(Path(entry).resolve()))
        except Exception:
            normalized_entries.append(entry)
    if normalized_bin in normalized_entries:
        print("PATH already contains the ffmpeg directory.")
    else:
        os.environ["PATH"] = bin_dir + os.pathsep + os.environ.get("PATH", "")
        print("Temporarily prepended the ffmpeg directory to PATH for this notebook session. Persist it manually if required.")
    subprocess.run(["ffmpeg", "-version"], check=False)
else:
    raise RuntimeError("ffmpeg is still unavailable. Please install it manually and rerun this cell.")


ffmpeg already available on PATH: C:\ffmpeg-8.0-full_build\bin\ffmpeg.EXE
ffmpeg ready at: C:\ffmpeg-8.0-full_build\bin\ffmpeg.EXE
Checking current PATH entries...
PATH already contains the ffmpeg directory.


## 配置区

在此集中维护所有可能需要修改的参数，后续模块会直接引用这些配置。


In [13]:
# ------------------------------
# 可调参数
# ------------------------------
YOUTUBE_URLS = [
    "https://www.youtube.com/watch?v=MC4A_GWj_sw&t=2s",
    # 可以继续追加更多链接
]

# 数据输出 / 缓存目录
DOWNLOAD_DIR = "./videos"
FRAMES_ROOT_DIR = "./frames"
OUTPUT_DIR = "./outputs"

# 是否直接复用 frames 目录中的已抽帧数据，跳过下载和抽帧
USE_EXISTING_FRAMES_ONLY = True

# 抽帧与 OCR 设置
TARGET_FPS = 2.0  # 每秒抽取多少帧
CROP_REGION = None  # 例如 (0, 0, 1920, 400)
OCR_LANG = "en"  # PaddleOCR 语言
MIN_OCR_SCORE = 0.6  # OCR 置信度阈值
DEDUP_TIME_THRESHOLD = 1.0  # 去重允许的最大时间差（秒）
RANDOM_SEED = 42


## 公共导入与工具函数

加载常用依赖并定义会在多个步骤中复用的辅助函数。


In [4]:
%pip install -U ipywidgets


Collecting ipywidgets
  Downloading ipywidgets-8.1.8-py3-none-any.whl.metadata (2.4 kB)
Collecting widgetsnbextension~=4.0.14 (from ipywidgets)
  Downloading widgetsnbextension-4.0.15-py3-none-any.whl.metadata (1.6 kB)
Collecting jupyterlab_widgets~=3.0.15 (from ipywidgets)
  Downloading jupyterlab_widgets-3.0.16-py3-none-any.whl.metadata (20 kB)
Downloading ipywidgets-8.1.8-py3-none-any.whl (139 kB)
   ---------------------------------------- 0.0/139.8 kB ? eta -:--:--
   ---------------------------------------- 139.8/139.8 kB 8.1 MB/s eta 0:00:00
Downloading jupyterlab_widgets-3.0.16-py3-none-any.whl (914 kB)
   ---------------------------------------- 0.0/914.9 kB ? eta -:--:--
   ----------------------------- --------- 686.1/914.9 kB 21.1 MB/s eta 0:00:01
   --------------------------------------- 914.9/914.9 kB 14.4 MB/s eta 0:00:00
Downloading widgetsnbextension-4.0.15-py3-none-any.whl (2.2 MB)
   ---------------------------------------- 0.0/2.2 MB ? eta -:--:--
   --------------


[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [14]:
import re
import random
import subprocess
from pathlib import Path
from typing import Dict, List, Optional

import pandas as pd
import cv2
import matplotlib.pyplot as plt
from paddleocr import PaddleOCR
from yt_dlp import YoutubeDL

random.seed(RANDOM_SEED)

def ensure_dir(path: str) -> Path:
    """确保目录存在并返回对应的 Path 对象。"""
    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    return p

def sanitize_text(text: str) -> str:
    """去除文件名中常见的非法字符。"""
    if not text:
        return "untitled"
    sanitized = re.sub(r'[\\/:"*?<>|]+', "_", text).strip()
    return sanitized or "untitled"

FRAME_NAME_PATTERN = re.compile(r"frame_(\d+)\.png$", re.IGNORECASE)

DOWNLOAD_DIR = ensure_dir(DOWNLOAD_DIR)
FRAMES_ROOT_DIR = ensure_dir(FRAMES_ROOT_DIR)
OUTPUT_DIR = ensure_dir(OUTPUT_DIR)

VIDEO_INFOS: List[Dict] = []
OCR_RESULTS: List[Dict] = []
DEDUPED_OCR_RESULTS: List[Dict] = []
OCR_RESULTS_DF: Optional[pd.DataFrame] = None
OCR_ENGINE: Optional[PaddleOCR] = None


## 下载视频

使用 `yt-dlp` 下载目标视频，每个视频都会返回包含关键属性的字典，方便后续步骤引用。


In [15]:
def download_youtube_video(url: str, download_dir: Path) -> Dict:
    """下载单个 YouTube 视频并返回其元信息。"""
    download_dir = ensure_dir(download_dir)
    ydl_opts = {
        "outtmpl": str(download_dir / "%(id)s.%(ext)s"),
        "cookiesfrombrowser": ("chrome",),  # Windows 上用 Chrome 的登录状态
        # Force the lowest reasonable quality so smoke tests run quicker
        "format": "worstvideo[ext=mp4][height<=360]+worstaudio[ext=m4a]/worst[ext=mp4]/worst",
        "merge_output_format": "mp4",
        "restrictfilenames": True,
        "noplaylist": True,
        "quiet": False,
    }
    try:
        with YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
            local_path = Path(ydl.prepare_filename(info)).resolve()
            mp4_path = local_path.with_suffix(".mp4")
            if mp4_path.exists():
                local_path = mp4_path
            video_id = sanitize_text(info.get("id") or local_path.stem)
            title = info.get("title") or video_id
            result = {
                "url": url,
                "video_path": str(local_path),
                "video_id": video_id,
                "title": title,
            }
            print(f"[下载完成] {title} -> {local_path}")
            return result
    except Exception as exc:
        print(f"[警告] 下载失败: {url}\n原因: {exc}")
    return {}

def download_all_videos(urls: List[str], download_dir: Path) -> List[Dict]:
    """批量下载多个视频，返回成功条目的列表。"""
    if not urls:
        print("未提供任何链接，请在配置区添加 YOUTUBE_URLS。")
        return []
    collected: List[Dict] = []
    for url in urls:
        info = download_youtube_video(url, download_dir)
        if info:
            collected.append(info)
    return collected


In [17]:
if USE_EXISTING_FRAMES_ONLY:
    frames_root = Path(FRAMES_ROOT_DIR)
    VIDEO_INFOS = []
    if frames_root.exists():
        VIDEO_INFOS = [
            {"video_id": p.name, "video_path": None, "title": p.name}
            for p in frames_root.iterdir() if p.is_dir()
        ]
    print(f"复用帧目录 {frames_root}，共检测到 {len(VIDEO_INFOS)} 个视频。")
elif not YOUTUBE_URLS:
    print("YOUTUBE_URLS 为空，请先在配置区补充链接。")
else:
    VIDEO_INFOS = download_all_videos(YOUTUBE_URLS, DOWNLOAD_DIR)
    print(f"成功获取 {len(VIDEO_INFOS)} 个视频。")
    for info in VIDEO_INFOS:
        print(f"- {info['video_id']}: {info['video_path']}")

if USE_EXISTING_FRAMES_ONLY and not VIDEO_INFOS:
    print("未在 frames 目录发现已抽帧数据，请检查 FRAMES_ROOT_DIR 配置。")


Extracting cookies from chrome
Extracted 298 cookies from chrome
[youtube] Extracting URL: https://www.youtube.com/watch?v=MC4A_GWj_sw&t=2s
[youtube] MC4A_GWj_sw: Downloading webpage




[youtube] MC4A_GWj_sw: Downloading initial data API JSON
[youtube] MC4A_GWj_sw: Downloading iframe API JS
[youtube] MC4A_GWj_sw: Downloading player b32979e9-main
[youtube] MC4A_GWj_sw: Downloading tv downgraded player API JSON
[youtube] MC4A_GWj_sw: Downloading web safari player API JSON
[youtube] MC4A_GWj_sw: Downloading web player API JSON




[youtube] MC4A_GWj_sw: Downloading m3u8 information
[info] MC4A_GWj_sw: Downloading 1 format(s): 91
[download] Sleeping 5.00 seconds as required by the site...
[hlsnative] Downloading m3u8 manifest
[hlsnative] Total fragments: 2441
[download] Destination: videos\MC4A_GWj_sw.mp4
[download]   0.5% of ~  69.95GiB at  321.60KiB/s ETA --:--:-- (frag 10/2441)

KeyboardInterrupt: 

## 抽帧（FFmpeg）

利用 `ffmpeg` 按固定频率抽帧，可选地裁剪弹幕区域以减少噪音。


In [18]:
def extract_frames_with_ffmpeg(video_path: str, frames_dir: Path, fps: float, crop_region: Optional[tuple] = None) -> None:
    """使用 ffmpeg 为单个视频抽帧并可选裁剪弹幕区域。"""
    video_path = Path(video_path)
    if not video_path.exists():
        print(f"[警告] 找不到视频文件: {video_path}")
        return
    frames_dir = ensure_dir(frames_dir)
    vf_parts = [f"fps={fps}"]
    if crop_region:
        try:
            x, y, w, h = crop_region
            vf_parts.append(f"crop={w}:{h}:{x}:{y}")
        except ValueError:
            print(f"[警告] crop_region 格式应为 (x, y, w, h)，当前值: {crop_region}")
    vf = ",".join(vf_parts)
    output_pattern = frames_dir / "frame_%06d.png"
    cmd = [
        "ffmpeg",
        "-y",
        "-hide_banner",
        "-loglevel",
        "error",
        "-i",
        str(video_path),
        "-vf",
        vf,
        str(output_pattern),
    ]
    try:
        subprocess.run(cmd, check=True)
        print(f"[抽帧完成] {video_path.name} -> {frames_dir}")
    except subprocess.CalledProcessError as exc:
        print(f"[错误] 抽帧失败: {video_path}\n{exc}")

def extract_frames_for_all_videos(video_infos: List[Dict], frames_root_dir: Path, fps: float, crop_region: Optional[tuple] = None) -> None:
    """为所有视频生成帧，按照 video_id 建立子目录。"""
    if not video_infos:
        print("没有可处理的视频，请先完成下载。")
        return
    for info in video_infos:
        video_id = info.get("video_id", "unknown")
        video_path = info.get("video_path")
        if not video_path:
            print(f"[警告] 缺少视频路径: {info}")
            continue
        frames_dir = Path(frames_root_dir) / video_id
        extract_frames_with_ffmpeg(video_path, frames_dir, fps, crop_region)


In [19]:
if USE_EXISTING_FRAMES_ONLY:
    print("已开启 USE_EXISTING_FRAMES_ONLY，跳过抽帧，直接使用 frames 目录。")
elif VIDEO_INFOS:
    extract_frames_for_all_videos(VIDEO_INFOS, FRAMES_ROOT_DIR, TARGET_FPS, CROP_REGION)
else:
    print("暂无视频可抽帧。")


暂无视频可抽帧。


## 复用已抽帧数据（免下载）

如果 `./frames` 目录已经有抽好的帧，先运行此单元自动填充 `VIDEO_INFOS`，无需重复下载/抽帧。

In [None]:
from pathlib import Path

try:
    VIDEO_INFOS
except NameError:
    VIDEO_INFOS = []

frames_root = Path(FRAMES_ROOT_DIR)
if not frames_root.exists():
    print(f"[警告] 未找到帧目录: {frames_root.resolve()}")
elif not VIDEO_INFOS:
    frame_ids = [p.name for p in frames_root.iterdir() if p.is_dir()]
    VIDEO_INFOS = [
        {"video_id": vid, "video_path": None, "title": vid}
        for vid in frame_ids
    ]
    print(f"[帧复用] 检测到 {len(VIDEO_INFOS)} 个帧目录: {', '.join(frame_ids)}")
else:
    print(f"[跳过] VIDEO_INFOS 已存在 {len(VIDEO_INFOS)} 条记录，未覆盖。")


## 初始化 OCR 引擎

创建 PaddleOCR 全局实例，后续识别直接复用，避免重复加载模型。


In [None]:
# 确保变量已定义，防止单独运行本单元时报 NameError
try:
    OCR_ENGINE
except NameError:
    OCR_ENGINE = None

if OCR_ENGINE is None:
    print("正在加载 PaddleOCR 模型，请稍候...")
    OCR_ENGINE = PaddleOCR(lang=OCR_LANG, use_textline_orientation=True)
print("OCR 引擎已就绪。")


## OCR 识别帧中的弹幕

逐帧调用 PaddleOCR 提取文本，并按照帧序号换算时间戳。


In [None]:
def ocr_frames_in_dir(frames_dir: Path, fps: float, video_id: str, min_score: float = MIN_OCR_SCORE) -> List[Dict]:
    """对指定目录下的所有帧执行 OCR 并返回结果列表。"""
    if OCR_ENGINE is None:
        raise RuntimeError("请先初始化 OCR 引擎。")
    frames_dir = Path(frames_dir)
    frame_files = sorted(frames_dir.glob("frame_*.png"))
    if not frame_files:
        print(f"[提示] 帧目录为空: {frames_dir}")
        return []
    results: List[Dict] = []
    for frame_path in frame_files:
        match = FRAME_NAME_PATTERN.search(frame_path.name)
        if not match:
            continue
        frame_index = int(match.group(1))
        timestamp = (frame_index - 1) / fps
        ocr_output = OCR_ENGINE.predict(str(frame_path), use_textline_orientation=True)
        if not ocr_output:
            continue
        for line in ocr_output:
            bbox_raw, (text, score) = line
            text = text.strip()
            if not text or score < min_score:
                continue
            bbox = [[float(point[0]), float(point[1])] for point in bbox_raw]
            results.append({
                "video_id": video_id,
                "frame_index": frame_index,
                "timestamp": round(timestamp, 3),
                "text": text,
                "score": float(score),
                "bbox": bbox,
                "frame_path": str(frame_path),
            })
    print(f"[OCR 完成] {video_id}: {len(results)} 条识别结果")
    return results

def ocr_all_videos_frames(video_infos: List[Dict], frames_root_dir: Path, fps: float, min_score: float = MIN_OCR_SCORE) -> List[Dict]:
    """遍历所有视频的帧目录并聚合 OCR 结果。"""
    aggregated: List[Dict] = []
    for info in video_infos:
        video_id = info.get("video_id", "unknown")
        frames_dir = Path(frames_root_dir) / video_id
        if not frames_dir.exists():
            print(f"[警告] 找不到帧目录: {frames_dir}")
            continue
        aggregated.extend(ocr_frames_in_dir(frames_dir, fps, video_id, min_score))
    return aggregated


In [None]:
if VIDEO_INFOS:
    OCR_RESULTS = ocr_all_videos_frames(VIDEO_INFOS, FRAMES_ROOT_DIR, TARGET_FPS, MIN_OCR_SCORE)
    print(f"共获得 {len(OCR_RESULTS)} 条 OCR 结果。")
else:
    print("请先下载并抽帧后再进行 OCR。")


## 简单去重逻辑

相邻帧可能重复识别同一条弹幕，以下函数按时间阈值过滤重复文本。


In [None]:
def deduplicate_ocr_results(ocr_results: List[Dict], time_threshold: float = 1.0) -> List[Dict]:
    """基于 (video_id, text) 和时间差做简单去重。"""
    if not ocr_results:
        return []
    sorted_results = sorted(ocr_results, key=lambda item: (item["video_id"], item["timestamp"]))
    deduped: List[Dict] = []
    last_seen: Dict[tuple, Dict] = {}
    for item in sorted_results:
        key = (item["video_id"], item["text"])
        last_item = last_seen.get(key)
        if last_item and (item["timestamp"] - last_item["timestamp"]) <= time_threshold:
            continue
        deduped.append(item)
        last_seen[key] = item
    return deduped

DEDUPED_OCR_RESULTS = deduplicate_ocr_results(OCR_RESULTS, time_threshold=DEDUP_TIME_THRESHOLD)
print(f"去重前: {len(OCR_RESULTS)} 条 | 去重后: {len(DEDUPED_OCR_RESULTS)} 条")


## 导出结构化数据

使用 pandas 将结果写入 CSV：一个总表与若干按 `video_id` 分组的文件。


In [None]:
def export_ocr_results(ocr_results: List[Dict], output_dir: Path) -> Optional[pd.DataFrame]:
    """将 OCR 结果导出为 CSV 并返回 DataFrame。"""
    output_dir = ensure_dir(output_dir)
    if not ocr_results:
        print("没有可导出的结果。")
        return None
    df = pd.DataFrame(ocr_results)
    base_cols = ["video_id", "frame_index", "timestamp", "text", "score", "bbox", "frame_path"]
    ordered_cols = [col for col in base_cols if col in df.columns]
    ordered_cols += [col for col in df.columns if col not in ordered_cols]
    df = df[ordered_cols]
    summary_csv = Path(output_dir) / "all_videos_ocr_results.csv"
    df.to_csv(summary_csv, index=False, encoding="utf-8-sig")
    print(f"总表已导出: {summary_csv}")
    for video_id, group in df.groupby("video_id"):
        video_csv = Path(output_dir) / f"{video_id}_ocr_results.csv"
        group.to_csv(video_csv, index=False, encoding="utf-8-sig")
        print(f"- {video_id} -> {video_csv}")
    return df

OCR_RESULTS_DF = export_ocr_results(DEDUPED_OCR_RESULTS, OUTPUT_DIR)


## 简单检查与可视化（可选）

随机选取帧图并展示对应的 OCR 文本，帮助快速评估识别质量。若效果不佳，可尝试：
- 调整 `CROP_REGION` 以确保只保留弹幕区域；
- 调整 `TARGET_FPS` 以增大/减小采样密度；
- 修改 `MIN_OCR_SCORE`，过滤掉低置信度结果；
- 对帧图片做额外预处理（如二值化、锐化等），再重新识别。


In [None]:
if not VIDEO_INFOS:
    print("暂无可用视频，无法展示样例。")
elif OCR_RESULTS_DF is None or OCR_RESULTS_DF.empty:
    print("暂无 OCR 结果，请先完成识别与导出步骤。")
else:
    target_video = random.choice(VIDEO_INFOS)
    frames_dir = Path(FRAMES_ROOT_DIR) / target_video["video_id"]
    frame_files = sorted(frames_dir.glob("frame_*.png"))
    if not frame_files:
        print(f"[提示] 帧目录为空: {frames_dir}")
    else:
        sample_files = random.sample(frame_files, min(2, len(frame_files)))
        print(f"随机抽查视频: {target_video['title']} ({target_video['video_id']})")
        for frame_path in sample_files:
            img = cv2.imread(str(frame_path))
            if img is None:
                print(f"无法读取帧: {frame_path}")
                continue
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            plt.figure(figsize=(12, 4))
            plt.imshow(img_rgb)
            plt.axis('off')
            plt.title(frame_path.name)
            plt.show()
            match = FRAME_NAME_PATTERN.search(frame_path.name)
            frame_idx = int(match.group(1)) if match else None
            if frame_idx is None:
                print("无法解析帧编号。\n")
                continue
            subset = OCR_RESULTS_DF[
                (OCR_RESULTS_DF['video_id'] == target_video['video_id']) &
                (OCR_RESULTS_DF['frame_index'] == frame_idx)
            ]
            if subset.empty:
                print("该帧未识别出文本。\n")
            else:
                for _, row in subset.iterrows():
                    score_val = float(row['score']) if 'score' in row.index and pd.notna(row['score']) else float('nan')
                    print(f"- t={row['timestamp']:.2f}s | score={score_val:.2f} | {row['text']}")
                print()
