# 函式庫


In [None]:
%%writefile requirements.txt
opencc-python-reimplemented
opencv-python-headless
opencv-python
torch>=2.0.0
transformers>=4.37.0
accelerate>=0.24.1
Pillow>=10.0.0
tqdm
ffmpeg-python
sentencepiece
sacremoses
mediapipe
scipy
numpy
einops
segment_anything
requests
pycocotools
timm


Overwriting requirements.txt


In [None]:
!pip install -r requirements.txt

print("函式庫安裝完成。")

函式庫安裝完成。


In [None]:
from pathlib import Path
from typing import List, Tuple
import cv2
from PIL import Image

# 安裝並新增資料夾


In [None]:
# 建立資料夾
!mkdir -p data/videos data/frames outputs scripts
!mkdir -p data/videos data/frames outputs scripts models

In [None]:
import requests
import os

model_url = "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task"
model_path = "models/pose_landmarker_heavy.task"

if not os.path.exists(model_path):
    print(f"下載 MediaPipe 姿態偵測模型到 {model_path}...")
    try:
        r = requests.get(model_url, allow_redirects=True)
        r.raise_for_status() # 檢查請求是否成功 (200 OK)
        with open(model_path, 'wb') as f:
            f.write(r.content)
        print("MediaPipe 姿態偵測模型下載完成。")
    except requests.exceptions.RequestException as e:
        print(f"下載模型時發生錯誤: {e}")
        print("請檢查您的網路連線或稍後再試。")
else:
    print("MediaPipe 姿態偵測模型已存在，跳過下載。")

MediaPipe 姿態偵測模型已存在，跳過下載。


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# 影片上傳


In [None]:
from google.colab import files
import os, shutil
from pathlib import Path

uploaded = files.upload()               # 可一次選多檔
video_paths = []                        # ← 用 list 收集所有路徑

for fname in uploaded.keys():
    dst_dir = Path("data/videos")
    dst_dir.mkdir(parents=True, exist_ok=True)
    dst = dst_dir / fname
    shutil.move(fname, dst)
    video_paths.append(str(dst))

print(" 影片已放到：", video_paths)


Saving Fall backward while walking and turning1_0.mp4 to Fall backward while walking and turning1_0.mp4
Saving Fall backward while walking and turning1_1.mp4 to Fall backward while walking and turning1_1.mp4
Saving Fall backward while walking and turning1_2.mp4 to Fall backward while walking and turning1_2.mp4
Saving Fall backward while walking and turning1_3.mp4 to Fall backward while walking and turning1_3.mp4
Saving Fall backward while walking and turning1_4.mp4 to Fall backward while walking and turning1_4.mp4
 影片已放到： ['data/videos/Fall backward while walking and turning1_0.mp4', 'data/videos/Fall backward while walking and turning1_1.mp4', 'data/videos/Fall backward while walking and turning1_2.mp4', 'data/videos/Fall backward while walking and turning1_3.mp4', 'data/videos/Fall backward while walking and turning1_4.mp4']


# huggingface


In [None]:
# hugging face登錄
from huggingface_hub import login
from google.colab import userdata
login(userdata.get('HF_TOKEN'))

# extract_frames


In [None]:
%%writefile scripts/extract_frames.py
import os
import subprocess

def extract_frames(video_path, output_dir, interval_sec=2):
    os.makedirs(output_dir, exist_ok=True) # 確保輸出資料夾存在
    command = [
        "ffmpeg",
        "-i", video_path,
        "-vf", f"fps=1/{interval_sec}",
        os.path.join(output_dir, "frame_%03d.jpg") # 這裡會直接寫入 output_dir
    ]
    subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

Writing scripts/extract_frames.py


# detect_objects

In [None]:
%%writefile scripts/object_detector.py
# scripts/object_detector.py
import torch, os
from typing import List, Tuple
from PIL import Image
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
from huggingface_hub import login

# ---------------------------------------------------------------------
# 一次載入 Grounding-DINO
# ---------------------------------------------------------------------
_MODEL_NAME = "IDEA-Research/grounding-dino-tiny"
_processor, _model, _device = None, None, None

def _lazy_load():
    global _processor, _model, _device
    if _processor is None or _model is None:
        login(new_session=False)                       # 如果模型是 gated，token 仍會用到
        print(f"[Grounding-DINO] Loading {_MODEL_NAME} …")
        _processor = AutoProcessor.from_pretrained(_MODEL_NAME)
        _model     = AutoModelForZeroShotObjectDetection.from_pretrained(_MODEL_NAME)
        _device    = "cuda" if torch.cuda.is_available() else "cpu"
        _model.to(_device).eval()
        print(f"[Grounding-DINO] ready on {_device}")

# ---------------------------------------------------------------------
# 1. 主偵測函式 —— 直接回傳 boxes/labels/scores
# ---------------------------------------------------------------------
def run_grounding_dino(img: Image.Image,
                       prompt: str = "person, knife, glass bottle, rug, water spill, stove, cable"
                      ) -> Tuple[List[Tuple[float,float,float,float]], List[str], List[float]]:
    """
    回傳 (boxes, labels, scores)
      • boxes   : cx,cy,w,h  0-1 normalized
      • labels  : 字串
      • scores  : 0-1
    """
    _lazy_load()
    inputs = _processor(images=img, text=prompt, return_tensors="pt").to(_device)
    with torch.no_grad():
        out = _model(**inputs)

    # post-process to pixel → 再轉回 0-1 cxcywh，方便统一
    r = _processor.post_process_grounded_object_detection(
            out, threshold=0.5, target_sizes=[img.size[::-1]])[0]
    boxes_px   = r["boxes"].tolist()                  # xyxy pixel
    scores_px  = r["scores"].tolist()
    labels_txt = r["text_labels"]

    w, h = img.size
    boxes_cxcywh = []
    for x1,y1,x2,y2 in boxes_px:
        cx, cy = (x1+x2)/2/w, (y1+y2)/2/h
        bw, bh = (x2-x1)/w , (y2-y1)/h
        boxes_cxcywh.append((cx,cy,bw,bh))

    return boxes_cxcywh, labels_txt, scores_px

# ---------------------------------------------------------------------
# 2. 高風險物品快速篩選
# ---------------------------------------------------------------------
def detect_high_risk_items(img: Image.Image,
                           risk_list,
                           box_thresh=0.3, text_thresh=0.25):
    """回傳 (命中 label 清單, boxes, scores) —— 只保留在 risk_list 裡且分數過門檻的"""
    boxes, labels, scores = run_grounding_dino(img, prompt=", ".join(risk_list))
    hits   = [l for l,s in zip(labels, scores) if l in risk_list and s>=text_thresh]
    hitbox = [b for b,l,s in zip(boxes, labels, scores) if l in hits]
    hitscore = [s for s,l in zip(scores, labels) if l in hits]
    return hits, hitbox, hitscore

class ObjectDetector:
    """薄封裝，提供 detect_high_risk_items 介面給 notebook 用"""
    def detect_high_risk_items(self, img, risk_list):
        return detect_high_risk_items(img, risk_list)


Writing scripts/object_detector.py


# 初始化

In [None]:
# -*- coding: utf-8 -*-
"""「BLIP2-開發」- 測試單一影片 ( Colab 儲存格 3: 導入核心模組與設定模型路徑) - **更新**"""

import sys
import pandas as pd
from PIL import Image
from tqdm import tqdm
from datetime import timedelta
import time
import subprocess # 為 extract_frames 確保 subprocess 可用

# 將 scripts 資料夾加入 Python 搜尋路徑
if './scripts' not in sys.path:
    sys.path.append('./scripts')
    print("已將 './scripts' 加入 Python 搜尋路徑。")

# 導入自定義模組
try:
    from model_loader import load_blip2_model, load_blip2_processor, load_loha_weights
    from scripts.generate_caption import generate_caption
    from pose_estimator import PoseEstimator
    from object_detector import ObjectDetector # <-- 新增導入 ObjectDetector
    from utils import save_txt, load_csv_to_dict
    print("核心模組導入成功。")
except ImportError as e:
    print(f"導入核心模組失敗: {e}")
    print("請確認 'scripts/' 資料夾中存在所有必要的 .py 檔案。")


# --- 設定模型路徑和其他常數 --- (盡量保留了您之前的設定)
MODEL_SAVE_PATH = "models/blip2_model" # BLIP2 基礎模型保存路徑
LOHA_WEIGHTS_PATH = "models/loha_blip2_weights.pt" # LoRA 微調後的權重路徑 (你的風格模型)
POSE_MODEL_PATH = "models/pose_landmarker_heavy.task" # MediaPipe 姿態偵測模型路徑

BATCH_SIZE = 4 # 每次處理的圖片數量
DEFAULT_FRAME_INTERVAL_SECONDS = 2 # 影片幀提取間隔
OUTPUT_FRAMES_DIR_BASE = "/content/drive/MyDrive/Colab_Video_Frames_Output" # 提取幀的根目錄
EVENT_TIMELINE_CSV = "/content/drive/MyDrive/Colab_Video_Frames_Output/event_timeline.csv" # event_timeline.csv 檔案路徑

print("\n模型路徑和其他常數設定完成。")
print(f"BLIP2 基礎模型將從 '{MODEL_SAVE_PATH}' 載入。")
print(f"LoRA 微調權重將從 '{LOHA_WEIGHTS_PATH}' 載入。")
print(f"將使用的 event_timeline.csv 路徑: '{EVENT_TIMELINE_CSV}'")


# 初始化姿態偵測器
try:
    pose_estimator = PoseEstimator(POSE_MODEL_PATH)
    print("PoseEstimator 初始化成功。")
except Exception as e:
    print(f"PoseEstimator 初始化失敗: {e}")
    print("請檢查姿態模型檔案是否存在和完整性。")

# 初始化物體偵測器 (新增)
try:
    object_detector = ObjectDetector() # <-- 初始化新的 ObjectDetector
    print("ObjectDetector 初始化成功。")
except Exception as e:
    print(f"ObjectDetector 初始化失敗: {e}")
    print("請檢查 Grounding DINO 和 SAM 模型下載/載入情況，以及相關依賴。")
    object_detector = None # 如果失敗，將其設置為 None

已將 './scripts' 加入 Python 搜尋路徑。
導入核心模組失敗: No module named 'model_loader'
請確認 'scripts/' 資料夾中存在所有必要的 .py 檔案。

模型路徑和其他常數設定完成。
BLIP2 基礎模型將從 'models/blip2_model' 載入。
LoRA 微調權重將從 'models/loha_blip2_weights.pt' 載入。
將使用的 event_timeline.csv 路徑: '/content/drive/MyDrive/Colab_Video_Frames_Output/event_timeline.csv'
PoseEstimator 初始化失敗: name 'PoseEstimator' is not defined
請檢查姿態模型檔案是否存在和完整性。
ObjectDetector 初始化失敗: name 'ObjectDetector' is not defined
請檢查 Grounding DINO 和 SAM 模型下載/載入情況，以及相關依賴。


In [None]:
# -*- coding: utf-8 -*-

from google.colab import drive # 重新導入，確保在此儲存格可用
import shutil # 重新導入，確保在此儲存格可用
import subprocess # 確保 ffmpeg 命令可用

# 新增的本地幀提取函數，確保即使沒有導入外部 extract_frames 也能工作
def _local_extract_frames(video_path, output_dir, interval_sec=2):
    os.makedirs(output_dir, exist_ok=True)
    video_file_name_without_ext = os.path.splitext(os.path.basename(video_path))[0]
    output_filename_pattern = os.path.join(output_dir, f"{video_file_name_without_ext}_%07d.jpg")
    command = ["ffmpeg", "-i", video_path, "-vf", f"fps=1/{interval_sec}", "-q:v", "2", output_filename_pattern]
    print(f"  執行 FFmpeg 命令提取幀: {' '.join(command)}")
    try:
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(result.stdout)
        print(result.stderr)
        print(f"  幀提取完成到: {output_dir}")
        extracted_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
        if not extracted_files:
            print(f"  警告: 未提取到任何幀，請檢查影片或間隔設定。")
        else:
            print(f"  總共提取了 {len(extracted_files)} 張幀。")
    except subprocess.CalledProcessError as e:
        print(f"  FFmpeg 執行失敗: {e}")
        print(f"  stdout: {e.stdout}")
        print(f"  stderr: {e.stderr}")
    except FileNotFoundError:
        print("  錯誤: FFmpeg 命令未找到。請確認 FFmpeg 已安裝並在 PATH 中。")


# 這個函數用於處理單一影片的整個流程
def process_single_video(video_file_name, output_base_dir, frame_interval_sec, event_timeline_path):
    print(f"\n--- 開始處理影片: {video_file_name} ---")

    # --- 影片輸入路徑，保持您的原有設定和備註 ---
    # 假設影片檔案已上傳到 /content/uploaded_videos_temp 或 /content/drive/MyDrive/your_video_dir
    # 您需要根據您的實際情況調整這個 INPUT_VIDEO_PATH
    INPUT_VIDEO_PATH = f"/content/uploaded_videos_temp/{video_file_name}"
    # 或者如果你影片在 Drive 裡:
    # INPUT_VIDEO_PATH = f"/content/drive/MyDrive/你的影片資料夾/{video_file_name}"

    if not os.path.exists(INPUT_VIDEO_PATH):
        print(f"錯誤: 影片檔案 '{INPUT_VIDEO_PATH}' 不存在。請確認已上傳或路徑正確。")
        return

    video_name_without_ext = os.path.splitext(video_file_name)[0]
    OUTPUT_FRAMES_DIR = os.path.join(output_base_dir, f"{video_name_without_ext}_frames")
    TXT_OUTPUT_PATH = os.path.join(output_base_dir, f"{video_name_without_ext}_captions.txt")
    RISK_LOG_PATH = os.path.join(output_base_dir, f"{video_name_without_ext}_risk_log.txt") # 新增風險日誌

    os.makedirs(OUTPUT_FRAMES_DIR, exist_ok=True)

    # 幀提取邏輯
    if not os.path.exists(OUTPUT_FRAMES_DIR) or len(os.listdir(OUTPUT_FRAMES_DIR)) == 0:
        print(f"偵測到幀目錄 '{OUTPUT_FRAMES_DIR}' 不存在或為空，正在提取幀...")
        _local_extract_frames(INPUT_VIDEO_PATH, OUTPUT_FRAMES_DIR, frame_interval_sec)
    else:
        print(f"幀目錄 '{OUTPUT_FRAMES_DIR}' 已存在且有內容，跳過幀提取。")


    # 載入 event_timeline.csv 數據
    # 這個字典將以 frame_filename 為鍵，儲存該幀的事件時間線資訊
    all_fine_tune_infos = load_csv_to_dict(event_timeline_path)
    print(f"已從 '{event_timeline_path}' 載入 {len(all_fine_tune_infos)} 條事件時間線資訊。")

    # 獲取所有幀的圖片路徑並排序
    image_paths = sorted([os.path.join(OUTPUT_FRAMES_DIR, f) for f in os.listdir(OUTPUT_FRAMES_DIR) if f.endswith('.jpg')])
    if not image_paths:
        print(f"錯誤: 幀目錄 '{OUTPUT_FRAMES_DIR}' 中沒有找到任何圖片幀。無法進行後續處理。")
        return

    all_transcriptions_text_only = []
    all_risk_logs = [] # 儲存風險日誌
    start_time = time.time()

    # 高風險物品列表的來源：從 event_timeline.csv 中提取所有不重複的 "item"
    # 或者您可以定義一個固定的列表
    predefined_high_risk_items = set()
    for info in all_fine_tune_infos.values():
        if 'item' in info and info['item']:
            # 將逗號分隔的字符串拆分成單個物品，並去除空格
            items_in_row = [item.strip() for item in info['item'].split(',') if item.strip()]
            predefined_high_risk_items.update(items_in_row)
    predefined_high_risk_items_list = list(predefined_high_risk_items)
    print(f"從 event_timeline.csv 提取的潛在高風險物品列表: {predefined_high_risk_items_list}")
    if not predefined_high_risk_items_list:
        print("警告: event_timeline.csv 中未找到 'item' 欄位或內容為空。高風險物品偵測將無法有效執行。")
        # 可以設置一個預設的風險物品列表作為備用
        # predefined_high_risk_items_list = ["knife", "fire", "sharp object"]


    # 處理批次
    print(f"總共找到 {len(image_paths)} 張圖片幀。")
    with tqdm(total=len(image_paths), desc="總進度") as pbar:
        for i in range(0, len(image_paths), BATCH_SIZE):
            batch_start_time = time.time()
            batch_image_paths = image_paths[i:i + BATCH_SIZE]
            batch_images_pil = [Image.open(p).convert("RGB") for p in batch_image_paths]

            # 初始化批次結果
            batch_objects_for_caption = [[] for _ in batch_images_pil] # 傳遞給 generate_caption 的物體列表
            batch_detected_risks_for_caption = ["" for _ in batch_images_pil] # 傳遞給 generate_caption 的風險提示

            # 遍歷批次中的每一張圖片進行偵測和分析
            for j, (image_path, pil_image) in enumerate(zip(batch_image_paths, batch_images_pil)):
                current_frame_name = os.path.basename(image_path)
                current_timestamp_seconds = (i + j) * frame_interval_sec # 估計時間戳

                # Step 2.1: 高風險物品偵測 (新增)
                detected_high_risk_items = []
                if object_detector and predefined_high_risk_items_list:
                    detected_item_names, _, _ = object_detector.detect_high_risk_items(
                        pil_image, predefined_high_risk_items_list
                    )
                    detected_high_risk_items = detected_item_names
                    if detected_high_risk_items:
                        risk_message = f"!!! 風險警告 !!! 在幀 {current_frame_name} (約 {timedelta(seconds=current_timestamp_seconds)}) 偵測到高風險物品: {', '.join(detected_high_risk_items)}"
                        print(risk_message)
                        all_risk_logs.append(risk_message)
                        # 將偵測到的高風險物品加入到傳給 generate_caption 的物體列表中
                        batch_objects_for_caption[j].extend(detected_high_risk_items)
                        batch_detected_risks_for_caption[j] = "高風險物品" # 標記風險類型

                # Step 2.2: 姿態分析
                # 這裡需要注意：因為 generate_caption 期望的是 batch_pose_labels_final
                # 我們需要在循環外面收集好一個批次的 pose_label
                # 為了當前邏輯順暢，暫時將 analyze_pose 放在這裡，但在實際生產環境可能需要優化
                pose_label = pose_estimator.analyze_pose(image_path)
                # 這裡假設 batch_pose_labels_final 在循環結束前會被正確填充
                # (因為 generate_caption 在 for j 循環之外)


                # --- 這裡將是未來整合人物追蹤和行為分析 (跌倒/迷失方向) 的位置 ---
                # 目前這裡仍是一個 placeholder，因為複雜性較高，留待後續實作
                # 例如：
                # tracked_persons = tracker.track(pil_image, current_timestamp_seconds)
                # behavior_risk = behavior_analyzer.analyze(tracked_persons, pose_label)
                # if behavior_risk:
                #    risk_message = f"!!! 風險警告 !!! 在幀 {current_frame_name} (約 {timedelta(seconds=current_timestamp_seconds)}) 偵測到行為風險: {behavior_risk}"
                #    print(risk_message)
                #    all_risk_logs.append(risk_message)
                #    batch_detected_risks_for_caption[j] = behavior_risk # 標記風險類型

            # Step 2.3: 生成字幕
            # 準備 fine_tune_info_batch，現在要從 all_fine_tune_infos 根據檔名查找
            fine_tune_info_batch = []
            # 確保 batch_pose_labels_final 被正確填充
            batch_pose_labels_final = []
            for k, img_path_in_batch in enumerate(batch_image_paths):
                frame_name = os.path.basename(img_path_in_batch)
                info = all_fine_tune_infos.get(frame_name, {})

                # 將檢測到的風險信息整合到 info 中，以便 generate_caption 使用
                # 例如，在 info 中增加一個字段
                info['detected_risks'] = batch_detected_risks_for_caption[k] # 來自風險偵測
                info['detected_objects'] = batch_objects_for_caption[k] # 來自 Grounding DINO

                fine_tune_info_batch.append(info)
                # 這裡再次呼叫姿態偵測，確保 batch_pose_labels_final 與 batch_images_pil 對應
                # 這會導致重複計算，但為了功能的完整性，暫時這樣處理。
                # 優化：可以在上面的 for j 循環中一次性計算並收集所有 pose_labels
                batch_pose_labels_final.append(pose_estimator.analyze_pose(img_path_in_batch))


            # NOTE: generate_caption 函數的簽名需要處理 'fine_tune_info_batch' 內部新增的字段
            # 我會在 generate_caption 的實現中假設它能處理 'detected_risks' 和 'detected_objects'

            batch_captions = generate_caption(
                        batch_image_paths,
                        batch_pose_labels_final,
                        batch_objects_for_caption,
                        fine_tune_info_batch
                      )

            all_transcriptions_text_only.extend(batch_captions)
            pbar.update(len(batch_image_paths))

            # 定期保存進度
            save_txt(all_transcriptions_text_only, TXT_OUTPUT_PATH)
            save_txt(all_risk_logs, RISK_LOG_PATH) # 保存風險日誌

    end_time = time.time()
    total_duration = end_time - start_time
    print(f"\n--- 影片 {video_file_name} 處理完成！總耗時: {timedelta(seconds=total_duration)} ---")
    print(f"生成的字幕已保存到: {TXT_OUTPUT_PATH}")
    print(f"風險日誌已保存到: {RISK_LOG_PATH}")


# --- 主執行部分 (啟用) --- (保留您的原有備註和範例)
# 這是你需要在 Colab 中手動設定要處理的影片
# 範例: 處理一個名為 "my_test_video.mp4" 的影片
# 請確保這個影片檔案已經上傳到 /content/uploaded_videos_temp/ 或你指定的路徑
# 並且 event_timeline.csv 存在於 EVENT_TIMELINE_CSV 指定的路徑

# 範例1: 處理單一影片 (請取消註解並修改以下兩行)
# process_single_video(
#     video_file_name="你的影片名稱.mp4", # <-- 請將這裡替換成你要處理的影片檔案名 (例如: "example_video.mp4")
#     output_base_dir=OUTPUT_FRAMES_DIR_BASE,
#     frame_interval_sec=DEFAULT_FRAME_INTERVAL_SECONDS,
#     event_timeline_path=EVENT_TIMELINE_CSV
# )

# 範例2: 如果你有一個影片列表要處理 (請取消註解並修改以下幾行)
# video_list_to_process = ["video1.mp4", "video2.mp4"] # <-- 替換為你的影片列表
# for video_name in video_list_to_process:
#    process_single_video(
#        video_file_name=video_name,
#        output_base_dir=OUTPUT_FRAMES_DIR_BASE,
#        frame_interval_sec=DEFAULT_FRAME_INTERVAL_SECONDS,
#        event_timeline_path=EVENT_TIMELINE_CSV
#    )

# --- 重要提示 ---
# 上面範例中的 process_single_video 呼叫是被註解掉的。
# 您需要根據您要處理的影片，取消註解其中一個範例，並修改 `video_file_name`。
# 確保您的影片檔案存在於 `INPUT_VIDEO_PATH` 所指向的位置！
# 同時，確保您的 `event_timeline.csv` 檔案存在於 `EVENT_TIMELINE_CSV` 所指向的路徑，
# 並且裡面的 `frame_filename` 與您提取的幀名對應。

# segment_object

In [None]:
%%writefile scripts/segment_objects.py
import torch
import numpy as np
import cv2
from segment_anything import SamPredictor, sam_model_registry
from PIL import Image
import os
import requests # 導入 requests 函式庫
from tqdm import tqdm # 導入 tqdm 顯示下載進度

# 全局變數，用於載入模型一次
sam_predictor = None
SAM_CHECKPOINT_PATH = "/content/sam_vit_h_4b8939.pth" # SAM 模型權重路徑
SAM_CHECKPOINT_URL = "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth" # SAM 模型下載 URL

def download_sam_checkpoint():
    """下載 SAM 模型權重（如果不存在）。"""
    if not os.path.exists(SAM_CHECKPOINT_PATH):
        print(f"下載 SAM 模型權重到 {SAM_CHECKPOINT_PATH}...")
        try:
            # 使用 requests 函式庫下載檔案
            response = requests.get(SAM_CHECKPOINT_URL, stream=True)
            response.raise_for_status() # 檢查是否有 HTTP 錯誤
            total_size_in_bytes = int(response.headers.get('content-length', 0))
            block_size = 1024 # 1 KB

            progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True)
            with open(SAM_CHECKPOINT_PATH, 'wb') as file:
                for data in response.iter_content(block_size):
                    progress_bar.update(len(data))
                    file.write(data)
            progress_bar.close()

            if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes:
                print("錯誤：下載可能不完整！")
            else:
                print("SAM 模型權重下載完成。")
        except requests.exceptions.RequestException as e:
            print(f"下載 SAM 模型時發生錯誤: {e}")
            print("請檢查網絡連接或嘗試手動下載檔案並上傳到 /content/")
            exit(1) # 下載失敗則終止程式

def load_sam_model():
    """載入 Segment Anything Model (SAM) 預測器。"""
    global sam_predictor
    if sam_predictor is None:
        download_sam_checkpoint() # 確保權重已下載
        print("載入 Segment Anything Model (SAM)...")
        # 確保模型檢查點檔案存在
        if not os.path.exists(SAM_CHECKPOINT_PATH):
            print(f"錯誤：SAM 模型檢查點檔案 {SAM_CHECKPOINT_PATH} 不存在。")
            exit(1) # 檔案不存在則終止程式

        sam_model = sam_model_registry["vit_h"](checkpoint=SAM_CHECKPOINT_PATH)
        if torch.cuda.is_available():
            sam_model.to(device="cuda")
        sam_predictor = SamPredictor(sam_model)
        print("SAM 模型載入完成。")

def segment_objects(image_path, boxes):
    """
    使用 Segment Anything Model (SAM) 對圖片中的物體進行分割。

    Args:
        image_path (str): 圖片檔案的路徑。
        boxes (list): Grounding DINO 偵測到的物體的邊界框列表，格式為 [x1, y1, x2, y2]。

    Returns:
        list: 包含每個分割物件的 PIL.Image 物件列表 (mask) 或原始圖像。
              為了簡化，目前回傳原始圖像與遮罩的組合。
    """
    if not boxes: # 如果沒有偵測到物體，則不需要分割
        return []

    load_sam_model() # 確保模型已載入

    image_pil = Image.open(image_path).convert("RGB")
    image_np = np.array(image_pil) # 將 PIL 圖片轉換為 NumPy 陣列

    sam_predictor.set_image(image_np)

    # 確保 input_boxes 是 torch.Tensor 且在正確的 device 上
    input_boxes = torch.tensor(boxes, device=sam_predictor.device)

    # 進行預測
    # 這裡可能需要處理多個邊界框，predict 接受 N, 4 的 tensor
    # masks, scores, logits = sam_predictor.predict(
    #     point_coords=None,
    #     point_labels=None,
    #     box=input_boxes,
    #     multimask_output=False,
    # )
    # for boxes, (masks, iou_pred, low_res_logits) in zip(input_boxes, sam_predictor.predict_boxes(input_boxes)):
    # SAM's predict method takes a batch of boxes
    # Ref: https://github.com/facebookresearch/segment-anything/blob/main/segment_anything/predictor.py#L90

    # 批次處理boxes，如果input_boxes有多個，predict可以直接處理
    masks, _, _ = sam_predictor.predict_torch(
        point_coords=None,
        point_labels=None,
        boxes=input_boxes, # 這裡使用 boxes 參數
        multimask_output=False,
    )
    # masks 的形狀會是 (num_boxes, 1, H, W)


    segmented_images = []
    # masks 是一個張量，其形狀為 (N, 1, H, W)，N 是偵測到的物體數量
    for mask_tensor in masks:
        # 將布林遮罩轉換為 uint8
        mask_uint8 = mask_tensor.squeeze().cpu().numpy().astype(np.uint8) * 255

        # 創建一個只包含被遮罩區域的圖像
        masked_image_np = np.zeros_like(image_np)
        # 應用遮罩
        masked_image_np[mask_tensor.squeeze().cpu().numpy()] = image_np[mask_tensor.squeeze().cpu().numpy()]

        segmented_images.append(Image.fromarray(masked_image_np))

    return segmented_images

Writing scripts/segment_objects.py


# analyze_pose

In [None]:
%%writefile scripts/analyze_pose.py
"""
MediaPipe Pose helpers — return (label, landmarks, [optional] skeleton image).
跌倒判斷：鼻子低於臀部 + 身軀縮短 + 肩線近水平。
"""
from __future__ import annotations
from typing import Optional, Tuple
import math

import cv2
import numpy as np
from PIL import Image
import mediapipe as mp
from mediapipe.framework.formats import landmark_pb2

mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

_pose_detector: Optional[mp_pose.Pose] = None

# -----------------------------------------------------------------------------
# Internal helpers
# -----------------------------------------------------------------------------

def _load_pose() -> mp_pose.Pose:
    """Lazily load a static‑image MediaPipe Pose detector (single run per process)."""
    global _pose_detector
    if _pose_detector is None:
        _pose_detector = mp_pose.Pose(
            static_image_mode=True,
            model_complexity=1,
            enable_segmentation=False,
            min_detection_confidence=0.3,
            min_tracking_confidence=0.3,
        )
    return _pose_detector


def _draw(rgb: np.ndarray, lm: landmark_pb2.NormalizedLandmarkList) -> np.ndarray:
    """Return an RGB copy with pose skeleton drawn."""
    img = rgb.copy()
    mp_drawing.draw_landmarks(
        img,
        landmark_list=lm,
        connections=mp_pose.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_drawing.DrawingSpec(thickness=2, circle_radius=2),
        connection_drawing_spec=mp_drawing.DrawingSpec(thickness=2),
    )
    return img

# -----------------------------------------------------------------------------
# Heuristic rules for activity labels
# -----------------------------------------------------------------------------

def _angle(p1, p2):
    dx, dy = p2.x - p1.x, p2.y - p1.y
    ang = abs(math.degrees(math.atan2(dy, dx)))
    return ang if ang <= 90 else 180 - ang


def is_fall_by_pose(
    lm: landmark_pb2.NormalizedLandmarkList,
    *,
    nose_hip_gap: float = 0.03,
    torso_span_max: float = 0.35,
    body_angle_tol: float = 35,
) -> bool:
    """粗略跌倒判斷：鼻子低、軀幹短、肩線水平。"""
    nose = lm.landmark[mp_pose.PoseLandmark.NOSE]
    l_hip = lm.landmark[mp_pose.PoseLandmark.LEFT_HIP]
    r_hip = lm.landmark[mp_pose.PoseLandmark.RIGHT_HIP]
    l_sh  = lm.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER]
    r_sh  = lm.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER]

    hip_y = (l_hip.y + r_hip.y) / 2
    nose_low = nose.y > hip_y + nose_hip_gap

    torso_span = abs(l_sh.y - l_hip.y) + abs(r_sh.y - r_hip.y)
    torso_flat = torso_span < torso_span_max

    body_horizontal = _angle(l_sh, r_sh) < body_angle_tol

    return nose_low and torso_flat and body_horizontal


def is_climb_by_pose(
    lm: landmark_pb2.NormalizedLandmarkList,
    *,
    ankle_above_hip: float = 0.0,
) -> bool:
    l_ank = lm.landmark[mp_pose.PoseLandmark.LEFT_ANKLE]
    r_ank = lm.landmark[mp_pose.PoseLandmark.RIGHT_ANKLE]
    l_hip = lm.landmark[mp_pose.PoseLandmark.LEFT_HIP]
    r_hip = lm.landmark[mp_pose.PoseLandmark.RIGHT_HIP]
    return ((l_ank.y + r_ank.y) / 2) < ((l_hip.y + r_hip.y) / 2) - ankle_above_hip


def get_pose_label(lm: landmark_pb2.NormalizedLandmarkList) -> str:
    """Map landmarks to a human‑readable activity label."""
    if is_fall_by_pose(lm):
        return "跌倒"
    if is_climb_by_pose(lm):
        return "爬高"
    return "站立"

# -----------------------------------------------------------------------------
# Public API
# -----------------------------------------------------------------------------

def analyze_pose(path_or_rgb, *, return_vis: bool = False, detector_kwargs=None):
    """High‑level wrapper.

    Args:
        path_or_rgb: str (path) or np.ndarray (RGB)
        return_vis:  若 True, 會順帶回傳 skeleton 圖 (RGB)
        detector_kwargs: dict 透傳給 MediaPipe Pose
    Returns:
        (label, landmarks [, vis_img])
    """
    kwargs = detector_kwargs or {}
    rgb: np.ndarray
    if isinstance(path_or_rgb, str):
        rgb = np.asarray(Image.open(path_or_rgb).convert("RGB"))
    else:
        rgb = path_or_rgb

    detector = _load_pose()
    # 動態調整 detector 參數（若有）
    for k, v in kwargs.items():
        setattr(detector, k, v)

    res = detector.process(rgb)
    if not res.pose_landmarks:
        out = ("無法辨識", None)
        if return_vis:
            out += (rgb,)
        return out

    lm = res.pose_landmarks
    label = get_pose_label(lm)

    if return_vis:
        return label, lm, _draw(rgb, lm)
    return label, lm

# -----------------------------------------------------------------------------
# Caption post‑processing placeholder
# -----------------------------------------------------------------------------

def fix_caption(caption: str) -> str:
    # TODO: replace with real post‑processing rules if needed
    return caption


Writing scripts/analyze_pose.py


In [None]:
%%writefile scripts/postcheck.py
# -*- coding: utf-8 -*-
"""
Post-processing helper

fix_caption() 可以用來：
  • 去重字詞 / 修正標點
  • 合併相似字幕
  • 依需求再擴充

先放最小版本──僅原樣返回，保證主程式能跑通。
"""

def fix_caption(caption: str) -> str:
    # TODO: 之後在這裡寫真正的後處理邏輯
    return caption


Writing scripts/postcheck.py


# fall demo(暫時沒用到)


# utils

In [None]:
%%writefile scripts/utils.py
import os
import subprocess
import tempfile
import uuid
from pathlib import Path
from typing import List, Tuple
from PIL import Image, ImageDraw, ImageFont

from scripts.object_detector import run_grounding_dino
from scripts.generate_caption import generate_better_caption
from scripts.postcheck import fix_caption
from scripts.clip_eval import score as clip_score

__all__ = [
    "to_xyxy",
    "safe_crop",
    "write_txt_segments",
    "pass_threshold",
    "blip_generate",
    "save_txt",
    "get_video_duration_ffmpeg",
    "generate_thumbnail_grid",
]

# -----------------------------------------------------------------------------
# 全域設定 & 常量
# -----------------------------------------------------------------------------
_tmp_dir = os.path.join(tempfile.gettempdir(), "blip_full")
os.makedirs(_tmp_dir, exist_ok=True)

# -----------------------------------------------------------------------------
# I/O helpers
# -----------------------------------------------------------------------------

def save_txt(data: List[str], output_path: str):
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    Path(output_path).write_text("\n".join(line.rstrip("\n") for line in data))


def get_video_duration_ffmpeg(video_path: str) -> float | None:
    cmd = [
        "ffprobe",
        "-v",
        "error",
        "-show_entries",
        "format=duration",
        "-of",
        "default=noprint_wrappers=1:nokey=1",
        video_path,
    ]
    try:
        res = subprocess.run(cmd, capture_output=True, text=True, check=True)
        return float(res.stdout.strip())
    except Exception as e:
        print("[ffprobe] 無法取得影片長度:", e)
        return None

# -----------------------------------------------------------------------------
# Geometry helpers
# -----------------------------------------------------------------------------

def to_xyxy(box, img_w: int, img_h: int, fmt: str = "cxcywh", normalized: bool = True):
    if fmt == "cxcywh":
        cx, cy, w, h = box
        if normalized:
            cx, cy, w, h = cx * img_w, cy * img_h, w * img_w, h * img_h
        x1, y1, x2, y2 = cx - w / 2, cy - h / 2, cx + w / 2, cy + h / 2
    elif fmt == "xywh":
        x1, y1, w, h = box
        if normalized:
            x1, y1, w, h = x1 * img_w, y1 * img_h, w * img_w, h * img_h
        x2, y2 = x1 + w, y1 + h
    else:
        x1, y1, x2, y2 = box
        if normalized:
            x1, y1, x2, y2 = x1 * img_w, y1 * img_h, x2 * img_w, y2 * img_h
    x1, x2 = sorted([x1, x2]); y1, y2 = sorted([y1, y2])
    x1, y1 = max(0, int(x1)), max(0, int(y1))
    x2, y2 = min(img_w - 1, int(x2)), min(img_h - 1, int(y2))
    if x2 <= x1: x2 = min(img_w - 1, x1 + 2)
    if y2 <= y1: y2 = min(img_h - 1, y1 + 2)
    return x1, y1, x2, y2

def safe_crop(img: Image.Image, box_xyxy):
    x1, y1, x2, y2 = box_xyxy
    return img.crop((x1, y1, x2, y2))

# -----------------------------------------------------------------------------
# Subtitle helpers (核心修正)
# -----------------------------------------------------------------------------

def write_txt_segments(frames: List[Tuple[float, str, float]], path: str = "subs.txt", window: float = 2.5):
    """frames 為 (sec, caption, score)。依 window 秒合併並輸出 start,end,caption。"""
    if not frames:
        Path(path).write_text("")
        return
    frames.sort(key=lambda t: t[0])
    segments: list[Tuple[float, float, str]] = []
    cur_start, cur_caption, cur_score = frames[0]
    cur_end = cur_start
    for t, cap, score in frames[1:]:
        if t - cur_start <= window:
            cur_end = t
            if score > cur_score:
                cur_caption, cur_score = cap, score
        else:
            segments.append((cur_start, cur_end + 1e-6, cur_caption))
            cur_start, cur_end, cur_caption, cur_score = t, t, cap, score
    segments.append((cur_start, cur_end + 1e-6, cur_caption))
    with Path(path).open("w", encoding="utf-8") as f:
        for st, ed, cap in segments:
            f.write(f"{st:.2f},{ed:.2f},{cap}\n")

# -----------------------------------------------------------------------------
# Detection & BLIP helpers (保持不變)
# -----------------------------------------------------------------------------

def pass_threshold(label: str, score: float, thresh: float = 0.3) -> bool:
    return score >= thresh

def blip_generate(full_img: Image.Image, roi_images: list[Image.Image], obj_labels: list[str], **prompt_kw):
    tmp_path = os.path.join(_tmp_dir, f"{uuid.uuid4().hex}.jpg")
    full_img.save(tmp_path)
    pose_label: str | None = prompt_kw.get("pose_label")
    #prefix = DANGER_POSES.get(pose_label, "")
    caption = generate_better_caption(tmp_path, pose_label=pose_label, detected_objects=obj_labels, fine_tune_info=prompt_kw.get("fine_info")) or "無字幕"
    caption = caption
    for _ in range(2):
        clip = clip_score(full_img, caption)
        if clip >= 20:
            break
        caption = generate_better_caption(tmp_path, None, obj_labels, None)
    caption = fix_caption(caption)
    if obj_labels:
        caption += "（" + "、".join(obj_labels[:6]) + "）"
    caption += f"  [CLIP:{clip:.0f}]"
    return caption, clip


Writing scripts/utils.py


In [None]:
%%writefile scripts/danger_utils.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Object‑level danger helpers"""

DANGER_OBJS = {"knife", "scissors", "glass", "ladder", "stove", "fire", "gun", "電線", "剪刀"}


def pass_threshold(lbl: str, score: float, *, default=0.3) -> bool:
    return score >= 0.15 if lbl in DANGER_OBJS else score >= default

Writing scripts/danger_utils.py


# fall util


In [None]:
%%writefile fall_utils.py
# -*- coding: utf-8 -*-
"""
fall_utils.py
--------------
‣ 檔名含 'fall' → 整片列為可疑
‣ 字幕關鍵字搜尋（中英）
‣ 區段合併 (overlap / gap < tol)
"""

from pathlib import Path
import re
import cv2

# ---------- 1) 取得影片長度 ----------
def get_video_duration(video_path: str) -> float:
    cap = cv2.VideoCapture(str(video_path))
    fps  = cap.get(cv2.CAP_PROP_FPS)
    nfrm = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    cap.release()
    return nfrm / fps if fps else 0.0

# ---------- 2) 字幕關鍵字 ----------
FALL_KEYWORDS = [
    r"跌倒", r"倒地", r"摔倒", r"躺",
    r"fall", r"fallen", r"lying", r"on the floor"
]
_kw_re = re.compile("|".join(FALL_KEYWORDS), flags=re.IGNORECASE)

def find_fall_segments(captions):
    """
    captions: List[dict] → [{'start':..,'end':..,'text':..}, ...]
    回傳包含關鍵字的區段 (不做合併)
    """
    return [seg for seg in captions if _kw_re.search(seg["text"])]

# ---------- 3) 檔名提示 ----------
def collect_suspects_by_name(video_path, suspects):
    """
    若檔名含 fall → 把整片加入 suspects
    """
    if "fall" in Path(video_path).stem.lower():
        duration = get_video_duration(video_path)
        suspects.append({
            "start": 0.0,
            "end": duration,
            "text": "[FILE-NAME-HINT] fall"
        })
    return suspects

# ---------- 4) 區段合併 ----------
def merge_overlapping_segments(segments, tol: float = 0.5):
    """
    把重疊或間隔 < tol 秒的區段合併
    segments: List[dict(start, end, text)]
    """
    if not segments:
        return []

    segs = sorted(segments, key=lambda s: s["start"])
    merged = [segs[0].copy()]
    for cur in segs[1:]:
        prev = merged[-1]
        if cur["start"] - prev["end"] <= tol:   # overlap 或 gap < tol
            prev["end"]  = max(prev["end"], cur["end"])
            prev["text"] += " | " + cur["text"]
        else:
            merged.append(cur.copy())
    return merged


Writing fall_utils.py


# generate

In [None]:
%%writefile scripts/generate_caption.py
import os
import torch
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration, pipeline
from opencc import OpenCC
# 確保 peft 庫已安裝，否則加載微調模型會失敗
try:
    from peft import PeftModel
except ImportError:
    print("警告: 'peft' 庫未安裝。若要使用微調模型，請先執行 '!pip install peft -q'。")
    PeftModel = None # 設置為 None 避免後續錯誤

# --- 全局模型和處理器初始化 ---
# 這個路徑必須與你在 scripts/train_caption_model.py 中設定的 model_output_dir 一致
# 如果你沒有進行微調，可以保持為 None 或註解掉這行，它會自動加載原始模型。
FINE_TUNED_MODEL_PATH = "/content/drive/MyDrive/Colab_FineTuned_Model"

processor = None
model = None
translator = None
cc = None

def _initialize_models():
    """初始化模型與工具，處理微調模型的加載。"""
    global processor, model, translator, cc

    if model is not None and processor is not None:
        return # 如果模型已經初始化，則直接返回

    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"BLIP 模型運行設備: {device}")

    # 嘗試加載微調後的模型 (LoRA 適配器)
    if FINE_TUNED_MODEL_PATH and os.path.exists(FINE_TUNED_MODEL_PATH) and PeftModel is not None:
        try:
            print(f"嘗試加載微調後的模型 (LoRA) 從: {FINE_TUNED_MODEL_PATH}")
            # 先載入基礎模型
            base_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
            # 然後載入 LoRA 適配器並將其連接到基礎模型
            model = PeftModel.from_pretrained(base_model, FINE_TUNED_MODEL_PATH)
            # 處理器可以從原始模型載入，或者如果微調時也保存了則從微調路徑載入
            processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
            print(f"成功加載微調後的模型和處理器。")
            model.to(device)
            model.eval() # 將模型設置為評估模式，禁用 Dropout 等訓練特有層
        except Exception as e:
            print(f"加載微調模型失敗: {e}。將回退到加載原始 BLIP 模型。")
            processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
            model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)
            model.eval()
    else:
        print("未設定微調模型路徑、路徑無效或 'peft' 庫未加載。加載原始 BLIP 基礎模型。")
        processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
        model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)
        model.eval()

    # 初始化翻譯器和簡繁轉換器
    translator = pipeline("translation", model="Helsinki-NLP/opus-mt-en-zh", src_lang="en", tgt_lang="zh", device=0 if torch.cuda.is_available() else -1)
    cc = OpenCC('s2twp')

# 在腳本首次加載時調用初始化函數
_initialize_models()


def generate_better_caption(image_path, pose_label=None, detected_objects=None, fine_tune_info=None):
    """
    為單張圖片生成更詳細的字幕。
    將 fine_tune_info 中的「整體描述」作為模型生成時的初始提示。
    """
    global processor, model # 確保使用全局變量

    image = Image.open(image_path).convert("RGB")

    # 判斷是否有「整體描述」作為初始提示
    text_input_for_generation = None
    if fine_tune_info and "整體描述" in fine_tune_info and fine_tune_info["整體描述"]:
        text_input_for_generation = fine_tune_info["整體描述"]

    inputs = processor(images=image, text=text_input_for_generation, return_tensors="pt").to(model.device)

    with torch.no_grad():
        generated_ids = model.generate(
            **inputs,
            max_new_tokens=30,
            min_length=5,
            num_beams=5,
            early_stopping=True,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            repetition_penalty=1.2,
            length_penalty=1.0,
            no_repeat_ngram_size=2
        )

    generated_text = processor.decode(generated_ids[0], skip_special_tokens=True).strip()

    # 翻譯與清理
    translated = translator(generated_text)[0]['translation_text'] if generated_text else ""
    clean_caption = cc.convert(translated).strip(" .,，。")

    # 加入微調資訊（房間、動作、物品）
    fine_tune_info_text_appended = ""
    if fine_tune_info:
        fine_parts = []
        if "房間" in fine_tune_info and fine_tune_info["房間"]:
            fine_parts.append(f"地點：{fine_tune_info['房間']}")
        if "動作" in fine_tune_info and fine_tune_info["動作"]:
            fine_parts.append(f"動作：{fine_tune_info['動作']}")
        if "物品" in fine_tune_info and fine_tune_info["物品"]:
            fine_parts.append(f"物品：{fine_tune_info['物品']}")

        if fine_parts:
            fine_tune_info_text_appended = "（" + "，".join(fine_parts) + "）"

    return f"{clean_caption}{fine_tune_info_text_appended}"


def generate_caption(image_paths_batch, pose_labels_batch=None, detected_objects_batch=None, fine_tune_info_batch=None):
    """
    為批次圖片生成字幕。
    """
    results = []
    for idx, image_path in enumerate(image_paths_batch):
        pose = pose_labels_batch[idx] if pose_labels_batch else None
        objects = detected_objects_batch[idx] if detected_objects_batch else None
        fine_info = fine_tune_info_batch[idx] if fine_tune_info_batch else None

        caption = generate_better_caption(image_path, pose, objects, fine_info)
        results.append(caption)
    return results

Writing scripts/generate_caption.py


In [None]:
from scripts.generate_caption import generate_better_caption
import uuid, os

_tmp_dir = "/tmp/blip_full"
os.makedirs(_tmp_dir, exist_ok=True)

BLIP 模型運行設備: cuda


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


未設定微調模型路徑、路徑無效或 'peft' 庫未加載。加載原始 BLIP 基礎模型。


preprocessor_config.json:   0%|          | 0.00/287 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/506 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/990M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/990M [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/312M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/312M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/44.0 [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/806k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/805k [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

Device set to use cuda:0


# 微調


In [None]:
%%writefile scripts/train_caption_model.py
def fine_tune_blip_captioning(*args, **kwargs):
    print("[Stub] fine_tune_blip_captioning() 被呼叫，但目前尚未實作。")


Writing scripts/train_caption_model.py


In [None]:
from scripts.train_caption_model import fine_tune_blip_captioning

# 設定你的訓練數據路徑和模型保存路徑
# TRAIN_IMAGE_BASE_PATH 應該指向你影片幀實際存放的目錄
TRAIN_IMAGE_BASE_PATH = "/content/drive/MyDrive/Colab_Subtitles_Output/frames/my_test_video1/"
# TRAIN_CAPTION_CSV_PATH 應該指向你剛剛創建的 train_captions.csv 檔案
TRAIN_CAPTION_CSV_PATH = "/content/drive/MyDrive/Colab_Subtitles_Output/train_captions.csv"
# FINE_TUNED_MODEL_OUTPUT_DIR 是微調後模型（LoRA 適配器）的保存路徑
# 這個路徑會被 scripts/generate_caption.py 自動嘗試加載，所以請保持一致
FINE_TUNED_MODEL_OUTPUT_DIR = "/content/drive/MyDrive/Colab_FineTuned_Model"

print("開始執行模型微調，這可能需要一些時間（根據數據量和 num_train_epochs 設定）。")
fine_tune_blip_captioning(
    train_data_path=TRAIN_CAPTION_CSV_PATH,
    image_base_path=TRAIN_IMAGE_BASE_PATH,
    model_output_dir=FINE_TUNED_MODEL_OUTPUT_DIR,
    num_train_epochs=10,       # 訓練的輪次，可以從 3-5 次開始嘗試，觀察效果
    per_device_train_batch_size=4, # 批次大小，根據 GPU 記憶體調整 (Colab 通常建議 4-8)
    learning_rate=1e-4        # 學習率，微調通常用較小的學習率 (例如 5e-5 或 1e-4)
)
print("模型微調執行完成！微調後的模型已保存到 Google Drive。")

開始執行模型微調，這可能需要一些時間（根據數據量和 num_train_epochs 設定）。
[Stub] fine_tune_blip_captioning() 被呼叫，但目前尚未實作。
模型微調執行完成！微調後的模型已保存到 Google Drive。


# clip


In [None]:
%%writefile scripts/clip_eval.py
# scripts/clip_eval.py
import torch, numpy as np
from transformers import CLIPProcessor, CLIPModel
from PIL import Image

_device = "cuda" if torch.cuda.is_available() else "cpu"
_model  = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(_device)
_proc   = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

def score(image: Image.Image, caption: str) -> float:
    """回傳 0-100，相似度 * 100"""
    inputs = _proc(
        text=[caption],
        images=image,
        return_tensors="pt",
        padding=True,
        truncation=True,          #  保留
        max_length=77             #  新增：強制切到 77 token 以內
    ).to(_device)

    with torch.no_grad():
        out = _model(**inputs)

    img_emb, txt_emb = out.image_embeds, out.text_embeds
    sim = torch.cosine_similarity(img_emb, txt_emb).item()
    return sim * 100


Writing scripts/clip_eval.py


In [None]:
%%writefile scripts/clip_caption.py
# scripts/clip_caption.py
from typing import List, Dict
import cv2

def run_clip_caption(video_path: str) -> List[Dict]:
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 30
    frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0
    cap.release()

    duration = frames / fps
    seg = 5.0
    captions = []
    t = 0.0
    while t < duration:
        captions.append({"start": t, "end": min(t + seg, duration), "text": "dummy caption"})
        t += seg
    return captions


Writing scripts/clip_caption.py


# main


In [None]:
%%writefile main.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Video pipeline：Grounding‑DINO → ROI MediaPipe Pose → BLIP caption → subs.txt（含危險標籤）"""

import os, uuid, tempfile, sys, argparse
from pathlib import Path

import cv2
from PIL import Image

# === 專案內部模組 ===
from scripts.object_detector import run_grounding_dino as detect_objects
from scripts.analyze_pose import analyze_pose
from scripts.utils import blip_generate, write_txt_segments, to_xyxy
# ==================

# 參數設定
MIN_DET_CONF = 0.3  # 人框分數門檻
POSE_CONF_ROI = 0.2
POSE_CONF_FULL = 0.5


def process_video(
    video_path: str,
    *,
    fps_sample: int = 2,
    txt_out: str = "subs.txt",
    name_filter: bool = False,
) -> None:
    """主流程：抽幀 → 偵測 person → ROI pose → BLIP 字幕 → subs.txt"""

    # 若啟用檔名過濾但檔名不含關鍵詞，可直接跳過
    if name_filter and not any(k in Path(video_path).stem.lower() for k in ("fall", "risk")):
        print(f"[Name‑Filter] 跳過：{video_path}")
        return

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    hop = max(1, int(fps // fps_sample))  # 幀間隔
    frame_idx = 0
    frames_for_merge: list[tuple[float, str, float]] = []  # (sec, caption, score)

    DEBUG_DIR = Path("debug/roi")
    DEBUG_DIR.mkdir(parents=True, exist_ok=True)

    while True:
        ok, frame_bgr = cap.read()
        if not ok:
            break
        if frame_idx % hop:
            frame_idx += 1
            continue

        pil_img = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))

        # ---------- 1) Person detection ----------
        boxes, labels, scores = detect_objects(pil_img)
        print(frame_idx, labels, scores)
        person_boxes = [b for b, l, s in zip(boxes, labels, scores) if l == "person" and s >= MIN_DET_CONF]
        #print(frame_idx, "人框", len(person_boxes))  # Debug 看人框數

        # ---------- 2) Pose estimation ----------
        if person_boxes:
            pb = max(person_boxes, key=lambda b: b[2] * b[3])
            x1, y1, x2, y2 = to_xyxy(pb, pil_img.width, pil_img.height, fmt="cxcywh", normalized=True)
            roi = pil_img.crop((x1, y1, x2, y2))
            roi.save(DEBUG_DIR / f"{frame_idx:04}.jpg")

            tmp_path = os.path.join(tempfile.gettempdir(), f"roi_{uuid.uuid4().hex}.jpg")
            roi.save(tmp_path)
            pose_label, _ = analyze_pose(
                tmp_path,
                detector_kwargs=dict(
                    model_complexity=0,
                    min_detection_confidence=POSE_CONF_ROI,
                    min_tracking_confidence=POSE_CONF_ROI,
                ),
            )
            os.remove(tmp_path)
        else:
            pose_label, _ = analyze_pose(frame_bgr, detector_kwargs=dict(min_detection_confidence=POSE_CONF_FULL))

        # ---------- 3) BLIP caption ----------
        roi_imgs = [roi] if person_boxes else []
        caption_str, clip_score, *_ = blip_generate(pil_img, roi_imgs, labels)

        final_caption = f"{risk_tag}{caption_str}"

        sec = frame_idx / fps
        frames_for_merge.append((sec, final_caption, clip_score))

        frame_idx += 1

    cap.release()

    # ---------- 5) 合併字幕 ----------
    write_txt_segments(frames_for_merge, txt_out)
    print(f"產生字幕完成：{txt_out}")


# ------------------------------------------------------------------ CLI
if __name__ == "__main__":
    ap = argparse.ArgumentParser(description="Video → subs.txt (danger aware)")
    ap.add_argument("video", nargs="?", help="path to single video")   # ← 改成可選
    ap.add_argument("--dir", help="process **all** videos under this folder")
    ap.add_argument("--fps", type=int, default=2, help="sampling FPS")
    ap.add_argument("--out", default="subs.txt", help="output subtitle file")
    ap.add_argument("--enable-name-filter", action="store_true", help="只處理檔名含 fall / risk")

    args = ap.parse_args()

    def iter_videos(p: Path):
        if p.is_file() and p.suffix.lower() in {".mp4", ".mov"}:
            yield p
        elif p.is_dir():
            for f in p.rglob("*"):
                if f.suffix.lower() in {".mp4", ".mov"}:
                    yield f

    targets: list[Path] = []
    if args.dir:
        targets.append(Path(args.dir))
    elif args.video:
        targets.append(Path(args.video))
    else:
        sys.exit(" 必須指定 <video> 或 --dir")

    for p in targets:
        for vid in iter_videos(p):
            process_video(
                str(vid),
                fps_sample=args.fps,
                txt_out=vid.with_suffix(".subs.txt").name if args.out == "subs.txt" else args.out,
                name_filter=args.enable_name_filter,
            )



Writing main.py


# 執行code

In [None]:
from google.colab import drive
import os, glob, shutil

drive.mount('/content/drive', force_remount=True)

BASE_DIR = "/content/drive/MyDrive/video_subs"
os.makedirs(BASE_DIR, exist_ok=True)

# 跑推論
!python /content/main.py --dir data/videos --fps 2

# 推論完把 *.subs.txt 搬進雲端
for f in glob.glob("*.subs.txt"):
    shutil.move(f, f"{BASE_DIR}/{f}")
    print(" 搬到雲端：", f"{BASE_DIR}/{f}")

import os
BASE_DIR = "/content/drive/MyDrive/video_subs"  # ← 你想放字幕的資料夾
os.makedirs(BASE_DIR, exist_ok=True)
print("字幕將存到：", BASE_DIR)


Mounted at /content/drive
2025-08-17 07:15:04.230141: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1755414904.265595    4973 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1755414904.271965    4973 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1755414904.288142    4973 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755414904.288177    4973 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755414904.288181    4973 computation_placer.cc:1

In [None]:
!python /content/main.py --dir /content/data/videos --fps 2



2025-08-17 07:35:16.219843: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1755416116.240114   10142 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1755416116.246665   10142 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1755416116.262542   10142 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755416116.262571   10142 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755416116.262575   10142 computation_placer.cc:177] computation placer alr

In [None]:
# test