<a href="https://colab.research.google.com/github/lurking92/hw_sqj1/blob/main/%E3%80%8CBLIP2_%E9%96%8B%E7%99%BC%E3%80%8D_%E6%B8%AC%E8%A9%A6%E5%96%AE%E4%B8%80%E5%BD%B1%E7%89%87%EF%BC%8C%E8%AB%8B%E6%94%B9%E5%BD%B1%E7%89%87%E5%90%8D%E7%A8%B1(%E6%9C%89%E6%9B%B4%E6%96%B0%EF%BC%8C%E8%AB%8B%E6%8A%8A%E5%85%B6%E4%BB%96%E6%8F%9B%E6%8E%89).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
%%writefile requirements.txt
torch>=2.0.0
transformers>=4.37.0
accelerate>=0.24.1
Pillow>=10.0.0
tqdm
opencv-python
ffmpeg-python
sentencepiece
sacremoses
mediapipe
scipy
numpy

Overwriting requirements.txt


In [3]:
# 安裝依賴
!pip install -r requirements.txt



In [4]:
# 建立資料夾
!mkdir -p data/videos data/frames outputs scripts

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
# 上傳影片（可選）
from google.colab import files
uploaded = files.upload()
import shutil
import os
for fname in uploaded.keys():
    os.makedirs("data/videos", exist_ok=True)
    shutil.move(fname, f"data/videos/{fname}")

Saving my_test_video.mp4 to my_test_video.mp4


In [7]:
%%writefile scripts/extract_frames.py
import os
import subprocess

def extract_frames(video_path, output_dir, interval_sec=2):
    os.makedirs(output_dir, exist_ok=True) # 確保輸出資料夾存在
    command = [
        "ffmpeg",
        "-i", video_path,
        "-vf", f"fps=1/{interval_sec}",
        os.path.join(output_dir, "frame_%03d.jpg") # 這裡會直接寫入 output_dir
    ]
    subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

Overwriting scripts/extract_frames.py


In [8]:
%%writefile scripts/detect_objects.py
def detect_objects(image_path):
# 模擬用，實際應用時請用 GroundingDINO 推論
    return ["person", "chair", "table"]

Writing scripts/detect_objects.py


In [9]:
%%writefile scripts/segment_objects.py
def segment_objects(image_path, object_list):
# 模擬回傳：實際應該回傳分割後圖像（或原圖）
# 為簡化處理，我們先用原始圖像重複多份
    return [image_path] * len(object_list)

Writing scripts/segment_objects.py


In [10]:
%%writefile scripts/analyze_pose.py
import cv2
import mediapipe as mp

mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True)

# 優先動作排序
ACTION_PRIORITY = [
    "跌倒", "坐下", "撿起物品", "放下物品", "拿起物品", "打開東西", "關閉東西",
    "走路", "轉身", "查看", "觸碰", "站立", "等待"
]

def classify_pose(image_path):
    image = cv2.imread(image_path)
    if image is None:
        return "未知"
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    result = pose.process(image_rgb)

    if not result.pose_landmarks:
        return "無法辨識"

    lm = result.pose_landmarks.landmark

    nose_y = lm[mp_pose.PoseLandmark.NOSE].y
    left_knee_y = lm[mp_pose.PoseLandmark.LEFT_KNEE].y
    right_knee_y = lm[mp_pose.PoseLandmark.RIGHT_KNEE].y
    left_ankle_y = lm[mp_pose.PoseLandmark.LEFT_ANKLE].y
    right_ankle_y = lm[mp_pose.PoseLandmark.RIGHT_ANKLE].y

    avg_knee_y = (left_knee_y + right_knee_y) / 2
    avg_ankle_y = (left_ankle_y + right_ankle_y) / 2

    vertical_span = avg_ankle_y - nose_y

    if vertical_span < 0.4:
        return "跌倒"
    elif nose_y - avg_knee_y < 0.1:
        return "坐下"
    elif 0.1 < vertical_span < 0.35:
        return "走路"
    else:
        return "站立"

Writing scripts/analyze_pose.py


In [11]:
%%writefile scripts/generate_caption.py
# scripts/generate_caption.py
from transformers import Blip2Processor, Blip2ForConditionalGeneration, pipeline
from PIL import Image
import torch
import re
import os

# 使用 BLIP2 OPT 6.7B 模型，這個版本在速度和效能之間有很好的平衡
# 請注意，這裡我們不再使用 FLAN-T5 XL 版本
processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-6.7b")
model = Blip2ForConditionalGeneration.from_pretrained("Salesforce/blip2-opt-6.7b", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32)
model = model.to("cuda" if torch.cuda.is_available() else "cpu")

# 翻譯模型保持不變
translator = pipeline("translation", model="Helsinki-NLP/opus-mt-en-zh", src_lang="en", tgt_lang="zh")

STOPWORDS = ["gta", "截圖", "screenshot", "undefined", "圖片", "畫面", "gtp"]

def clean_text(text):
    text = re.sub(r"[\-\s]*gta[\-\s]*", "", text, flags=re.IGNORECASE)
    text = re.sub(r"[\-\s]*gtp[\-\s]*", "", text, flags=re.IGNORECASE)
    # 修正正則表達式，確保能正確處理連續重複的詞彙，例如 "人, 人" 變成 "人"
    text = re.sub(r"([,\s]*)([\u4e00-\u9fa5a-zA-Z0-9]+)(,\s*\2)+", r"\1\2", text)
    for w in STOPWORDS:
        text = text.replace(w, "")
    return text.strip(" ,")

def is_valid_english_caption(caption):
    if not caption:
        return False
    # 增加檢查，避免非常短且無意義的內容，例如只有一個詞或重複詞
    if len(set(caption.lower().split())) <= 2 and len(caption.split()) < 5:
        return False
    if re.match(r"^[,~.\se]+$", caption): # 只包含標點符號或空白的字串
        return False
    return True

def score_caption(caption):
    score = len(caption)
    # 提高與人物、動作、物品相關詞彙的分數權重
    if any(word in caption.lower() for word in ["person", "man", "woman", "walking", "falling", "picking", "holding", "entering", "sitting", "standing"]):
        score += 30 # 提高權重
    if any(word in caption.lower() for word in ["table", "chair", "object", "item"]):
        score += 10 # 物品也給予分數
    return score

def generate_caption(image_paths, context_hint="Describe this image.", pose_label=None, debug_log_path="outputs/debug.log"):
    best_caption = ""
    max_score = -1

    hint = "Describe what the person is doing and what objects are involved. Focus on movement, interaction, and location."
    if pose_label:
        # 將姿勢資訊更直接地整合到提示中，引導模型關注相關動作
        hint = f"The person is performing the action: {pose_label}. Specifically, {hint}"

    os.makedirs(os.path.dirname(debug_log_path), exist_ok=True)
    debug_log = open(debug_log_path, "a", encoding="utf-8")

    for path in image_paths:
        image = Image.open(path).convert("RGB")
        captions = []
        # 增加 num_return_sequences 以生成更多候選字幕進行評分，提高選到好字幕的機會
        # 同時調整 num_beams 和 max_new_tokens 可能會影響速度和品質的平衡
        for _ in range(3): # 生成多輪，每輪產生多個候選
            inputs = processor(images=image, text=hint, return_tensors="pt").to(model.device, dtype=next(model.parameters()).dtype)
            out = model.generate(**inputs, max_new_tokens=50, num_beams=5, early_stopping=True, num_return_sequences=3)
            for seq in out:
                caption = processor.tokenizer.decode(seq, skip_special_tokens=True).strip()
                captions.append(caption)

        best_local = ""
        local_max = -1
        for c in captions:
            if is_valid_english_caption(c):
                s = score_caption(c)
                if s > local_max:
                    best_local = c
                    local_max = s

        debug_log.write(f"Image: {path}\n")
        for c in captions:
            debug_log.write(f"  Raw: {c}\n")
        debug_log.write(f"  Selected: {best_local}\n")

        if local_max > max_score:
            best_caption = best_local
            max_score = local_max

    debug_log.write(f"Best caption selected overall: {best_caption}\n\n")
    debug_log.close()

    if not best_caption:
        zh = "畫面中無法辨識明確內容"
    else:
        zh = translator(best_caption)[0]['translation_text']
        zh = clean_text(zh) # 確保翻譯後的中文也進行清理

    if pose_label:
        # 將動作標籤放在括號內，並確保前後有空格，避免與中文內容黏在一起
        return f"{zh}（動作：{pose_label}）"
    return zh

Writing scripts/generate_caption.py


In [12]:
%%writefile scripts/write_srt.py
def seconds_to_srt_time(seconds):
  hrs = seconds // 3600
  mins = (seconds % 3600) // 60
  secs = seconds % 60
  ms = int((seconds - int(seconds)) * 1000)
  return f"{int(hrs):02}:{int(mins):02}:{int(secs):02},{ms:03}"

def write_srt(captions, output_path):
  with open(output_path, 'w', encoding='utf-8') as f:
    for i, (start, end, text) in enumerate(captions, start=1):
      f.write(f"{i}\n")
      f.write(f"{seconds_to_srt_time(start)} --> {seconds_to_srt_time(end)}\n")
      f.write(f"{text}\n\n")

Writing scripts/write_srt.py


In [13]:
%%writefile main.py
# 專案主程式 main.py
import os
import glob
import shutil
import json # 導入 json 模組來儲存進度
from tqdm.auto import tqdm
from scripts.extract_frames import extract_frames
from scripts.detect_objects import detect_objects
from scripts.segment_objects import segment_objects
from scripts.generate_caption import generate_caption
from scripts.write_srt import write_srt
from scripts.analyze_pose import classify_pose

VIDEO_GLOB = 'data/videos/my_test_video.mp4' # 或者 'data/*/scene*/**/*.mp4'

DRIVE_BASE_DIR = '/content/drive/MyDrive/Colab_Subtitles_Output' # Google Drive 上的基礎路徑
FRAME_DIR = os.path.join(DRIVE_BASE_DIR, 'frames') # 圖片幀將會儲存在這裡
OUTPUT_DIR = os.path.join(DRIVE_BASE_DIR, 'outputs') # SRT 字幕會儲存在這裡

# 確保 Google Drive 中的基礎資料夾和子資料夾存在
os.makedirs(DRIVE_BASE_DIR, exist_ok=True)
os.makedirs(FRAME_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 定義進度檔案的路徑
PROGRESS_FILE = os.path.join(DRIVE_BASE_DIR, 'processing_progress.json')

def save_progress(video_name, current_index):
    """儲存每個影片的處理進度"""
    progress_data = {}
    if os.path.exists(PROGRESS_FILE):
        with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:
            try:
                progress_data = json.load(f)
            except json.JSONDecodeError: # 防止文件損壞
                progress_data = {}

    progress_data[video_name] = current_index
    with open(PROGRESS_FILE, 'w', encoding='utf-8') as f:
        json.dump(progress_data, f, indent=4)

def load_progress(video_name):
    """載入特定影片的處理進度"""
    if os.path.exists(PROGRESS_FILE):
        with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:
            try:
                progress_data = json.load(f)
                return progress_data.get(video_name, 0)
            except json.JSONDecodeError:
                return 0 # 文件損壞則從頭開始
    return 0

# --- 主要處理邏輯開始 ---
video_paths = glob.glob(VIDEO_GLOB, recursive=True)

if not video_paths:
    print(f"警告：未找到符合 '{VIDEO_GLOB}' 模式的影片。請檢查影片路徑或上傳。")

for video_path in video_paths:
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    frame_subdir = os.path.join(FRAME_DIR, video_name) # 每個影片的幀有自己的子資料夾

    # ====== 檢查是否已存在圖片幀 ======
    if os.path.exists(frame_subdir) and len(glob.glob(os.path.join(frame_subdir, "*.jpg"))) > 0:
        print(f"\n[1] 影片 {video_name} 的圖片幀已存在於 Google Drive ({frame_subdir})，跳過提取。")
    else:
        os.makedirs(frame_subdir, exist_ok=True)
        print(f"\n[1] 處理影片: {video_path} → 擷取圖片中，儲存至 Google Drive ({frame_subdir})...")
        extract_frames(video_path, frame_subdir, interval_sec=2)
    # =======================================

    print(f"[2] 處理 {video_name} 的每一幀...")
    captions = []
    image_files = sorted(os.listdir(frame_subdir))

    if not image_files:
        print(f"警告：影片 {video_name} 未擷取到任何圖片幀，跳過字幕生成。")
        continue

    total_frames = len(image_files)
    start_index = load_progress(video_name) # 載入上次的進度

    if start_index > 0:
        print(f"從上次進度繼續：影片 {video_name} 已處理 {start_index}/{total_frames} 幀。")
        # 如果是從中間開始，需要重新構建已經處理部分的字幕列表 (這是一個簡化，假設你不需要斷點續寫SRT)
        # 由於我們在迴圈結束後才寫入 SRT，這裡可以假設從斷點開始收集即可。
        # 如果你希望更精確到每次重啟都只處理未完成的，那這裡需要修改 captions 的初始化
        # 最簡單是直接忽略斷點前的字幕，因為最終會生成一個完整的srt
        pass

    # 使用 tqdm 包裹 image_files 迴圈，顯示進度條，並從 start_index 開始
    # tqdm 的 initial 參數可以設定進度條的起始值
    for i in tqdm(range(start_index, total_frames), initial=start_index, total=total_frames, desc=f"生成 {video_name} 的字幕"):
        fname = image_files[i]
        image_path = os.path.join(frame_subdir, fname)

        # 檢查圖片檔案是否存在
        if not os.path.exists(image_path):
            print(f"警告：圖片文件 {image_path} 不存在，跳過該幀。")
            save_progress(video_name, i + 1) # 即使跳過也更新進度
            continue

        pose_label = classify_pose(image_path)
        objects = detect_objects(image_path)
        focused_images = segment_objects(image_path, objects)

        zh_caption = generate_caption(
            focused_images,
            context_hint="Describe what the person is doing, where they are, and what objects are present. Use natural language.",
            pose_label=pose_label
        )
        start_time = i * 2
        end_time = (i + 1) * 2
        captions.append((start_time, end_time, zh_caption))

        # ====== 新增：每處理完一幀就儲存進度 ======
        save_progress(video_name, i + 1)
        # =======================================

    # 在迴圈外部寫入 SRT 檔案，確保包含所有幀的字幕
    srt_path = os.path.join(OUTPUT_DIR, f"{video_name}.srt")
    print(f"[3] 輸出字幕檔至 {srt_path} (已保存到 Google Drive)...")

    # 這裡的 captions 列表只包含從 start_index 開始的幀。
    # 如果要生成完整 SRT，需要先讀取之前已生成的部分。
    # 由於 SRT 是基於時間戳的，這裡需要確保 captions 包含了所有幀。
    # 一個更簡單但可能需要更多記憶體的方法是，每次斷點續跑時，
    # 重新處理所有幀（即使是已經處理過的）以確保 captions 完整。
    # 另一種方法是在 save_progress 時儲存完整的 captions 列表。
    # 為了簡化，目前假設在一個會話內，如果從中斷點恢復，所有幀都會被重新處理。
    # 更好的做法是在每次迴圈時，將生成的單幀字幕儲存到一個臨時文件，
    # 結束時再合併。但這會增加 IO 操作。

    # 最簡單且穩健的方式：
    # 如果你的目的是斷點續跑，最安全的方式是確保 generate_caption 每次處理都是獨立的
    # 然後最終生成 SRT 時，是針對所有幀重新生成，而不是只基於 `captions` 列表。
    # 我們可以稍微調整 generate_caption 的調用和 captions 的組裝，讓它更適合續跑。

    # 鑒於 generate_caption 每次調用都是獨立的，且我們只需要最終的完整 SRT
    # 最簡單的方法是：在每次成功處理完一幀後，將其字幕暫時寫入一個為該影片特設的臨時文件。
    # 然後在所有幀處理完畢後，再從臨時文件讀取並合併成最終的 SRT。

    # 然而，你目前的寫法 (收集所有 captions 到列表，最後一次性寫入)
    # 如果遇到斷線，重啟後 captions 列表是空的，會導致 SRT 只包含從斷點開始的幀。
    # 因此，我們需要將斷點續跑的邏輯做得更完整。

    # 完整續跑邏輯（處理斷點後的字幕）
    final_captions = []
    for i, fname in enumerate(image_files): # 重新遍歷所有幀來組裝完整的 captions
        # 這裡我們不重新運行 generate_caption，而是從儲存的進度中判斷是否已經處理過。
        # 如果 i < start_index，表示這部分在上次運行時已經處理過
        # 但我們沒有儲存單獨的字幕，所以最好的方法是：
        # 如果要精確斷點續跑，則每次 generate_caption 生成後，就直接寫入 SRT 文件的部分。
        # 或者，每次處理完一幀，就把這一幀的字幕存入一個專門的檔案。
        # 但這會讓 SRT 文件處理複雜。

        # 回到最初的目標：避免每次重跑都從頭開始耗時。
        # 現有邏輯下，即使斷點續跑，generate_caption 的處理時間還是主要瓶頸。
        # 其實只需要紀錄「i」，讓 tqdm 從 i 開始跑即可。
        # 最終的 `captions` 列表只包含當前會話中新生成的字幕。
        # 如果需要完整的 SRT，你必須重新生成所有幀的字幕（或讀取已生成的單幀字幕）。

        # 考慮到簡單性與你現有結構，我們這樣做：
        # 當從 start_index 開始時，我們只會把新生成的 captions append 進去。
        # 為了生成完整的 SRT，我們需要在迴圈結束後，結合一個臨時機制。
        # 但這會讓程式複雜。

        # 最直接的斷點續跑：
        # 每處理一幀，就將其字幕直接寫入一個臨時文件 (例如 .tmp_srt)。
        # 結束時再從 tmp_srt 轉換成 .srt。
        # 這比在列表裡 append 更好處理斷點續跑。

        # 鑒於複雜度，我會先保持你當前的 `captions.append` 邏輯，
        # 但請注意，如果斷點續跑，`captions` 列表將只包含從斷點之後生成的字幕。
        # 因此，最後生成的 SRT 文件會不完整。

        # 為了完整的 SRT，最簡單粗暴但有效的做法是：
        # 如果要確保 SRT 文件始終完整，每次運行結束都重新生成所有幀的字幕
        # 或者在 generate_caption 中增加一個選項，使其能夠查詢或直接寫入
        # 預計生成 SRT 的內容，這會導致需要修改 generate_caption 的邏輯。

        # 讓我想一下，什麼方式既能斷點續跑，又能確保 SRT 完整且簡單。
        # 最好的辦法是：每次循環結束後，都把整個 `captions` 列表全部重寫一遍。
        # 但是這又回到了每次都寫入檔案的問題。
        # 不！我的上一個版本已經修正了寫入 SRT 到迴圈外部。
        # 現在的問題是，`captions` 列表是從 `start_index` 開始收集的。

        # 正確的斷點續跑邏輯應該是：
        # 1. 載入上次已完成的幀數 `start_index`。
        # 2. 如果 `start_index > 0`，則從磁碟（Google Drive）上讀取前 `start_index` 幀的字幕。
        # 3. 從 `start_index` 開始處理新的幀。
        # 4. 將新生成的字幕附加到已讀取的字幕列表之後。
        # 5. 最後一次性寫入完整的 SRT。

    # --- 修正後的完整字幕收集邏輯 ---
    srt_path = os.path.join(OUTPUT_DIR, f"{video_name}.srt")
    temp_captions_file = os.path.join(frame_subdir, f"{video_name}_temp_captions.json") # 臨時儲存每幀字幕

    # 載入現有或空的字幕列表
    current_captions_data = {}
    if os.path.exists(temp_captions_file):
        with open(temp_captions_file, 'r', encoding='utf-8') as f:
            try:
                current_captions_data = json.load(f)
            except json.JSONDecodeError:
                current_captions_data = {}

    # 遍歷所有幀，但只處理尚未處理的幀
    for i in tqdm(range(total_frames), initial=start_index, total=total_frames, desc=f"生成 {video_name} 的字幕"):
        if str(i) in current_captions_data: # 如果這幀已經有字幕，就跳過
            continue

        fname = image_files[i]
        image_path = os.path.join(frame_subdir, fname)

        if not os.path.exists(image_path):
            print(f"警告：圖片文件 {image_path} 不存在，跳過該幀。")
            continue

        pose_label = classify_pose(image_path)
        objects = detect_objects(image_path)
        focused_images = segment_objects(image_path, objects)

        zh_caption = generate_caption(
            focused_images,
            context_hint="Describe what the person is doing, where they are, and what objects are present. Use natural language.",
            pose_label=pose_label
        )

        # 儲存單幀字幕到字典，鍵為幀索引
        current_captions_data[str(i)] = {
            "start_time": i * 2,
            "end_time": (i + 1) * 2,
            "caption": zh_caption
        }

        # 每次處理一小批幀後儲存到臨時文件，避免頻繁寫入 Drive
        if (i + 1) % 10 == 0 or (i + 1) == total_frames: # 每10幀或全部結束時儲存
            save_progress(video_name, i + 1) # 更新主進度文件
            with open(temp_captions_file, 'w', encoding='utf-8') as f:
                json.dump(current_captions_data, f, indent=4, ensure_ascii=False) # 儲存所有已生成的單幀字幕

    # 所有幀處理完畢後，從 current_captions_data 構建最終的 captions 列表
    final_captions_list = []
    for i in range(total_frames):
        frame_data = current_captions_data.get(str(i))
        if frame_data:
            final_captions_list.append((frame_data["start_time"], frame_data["end_time"], frame_data["caption"]))
        else:
            # 如果有幀沒有生成字幕（例如因圖片不存在被跳過），可以選擇填入一個空字幕或警告
            print(f"警告：幀 {i} 缺少字幕，將填入空字幕。")
            final_captions_list.append((i * 2, (i + 1) * 2, " (無字幕) "))

    # 確保 final_captions_list 是按時間順序排序的
    final_captions_list.sort(key=lambda x: x[0])

    print(f"[3] 輸出字幕檔至 {srt_path} (已保存到 Google Drive)...")
    write_srt(final_captions_list, srt_path) # 使用完整的字幕列表
    print("✅ 完成字幕輸出！")

    # 清理臨時字幕儲存文件（如果需要的話，雖然保留著下次可以更快載入）
    # os.remove(temp_captions_file) # 如果不想保留，可以取消註釋此行

    # 處理完一個影片後，可以清除該影片在進度文件中的記錄，或將其標記為完成
    save_progress(video_name, total_frames) # 確保標記為完成
    print("\n所有影片處理完畢！")

Writing main.py


In [14]:
# 執行字幕生成（main.py）
!python main.py

  File "/content/main.py", line 196
    srt_path = os.path.join(OUTPUT_DIR, f"{video_name}.srt")
    ^
IndentationError: expected an indented block after 'for' statement on line 149


In [15]:
# 顯示字幕內容
from glob import glob
for srt_file in glob("outputs/*.srt"):
    print("\n=====", srt_file, "=====")
    with open(srt_file, encoding='utf-8') as f:
        print(f.read())