<a href="https://colab.research.google.com/github/yuzoo0226/3D-OVS/blob/main/scene_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Colab で一度だけ実行
!pip -q install torch torchvision torchaudio librosa soundfile opencv-python
!apt -y -q update && apt -y -q install ffmpeg

Hit:1 https://cli.github.com/packages stable InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [80.4 kB]
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,002 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,237 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:10 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,272 kB]
Get:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [3,310 kB]
Hi

In [87]:
from pathlib import Path
import json, math, os, sys
import openai, base64
import glob, csv, shutil
from typing import List, Tuple, Dict, Any, Optional

import cv2
import subprocess
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import librosa
from google.colab import userdata
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


In [88]:
# ====== 設定（ここだけ変えればOK） ======
CFG = {
    # 入力ファイル
    # "annotations": "",          # 動画一覧（前のままの形式でOK）
    # "binary_timestamps": "/content/positive_times.json", # ポジティブ点 or 区間ファイル（下に例）
    "dataset_json": "/content/drive/MyDrive/ResearchIntern/InteractiveVQA/annotation/action_segments.json",  # ←あなたの JSON パスに変更

    # 出力
    "work_dir": "/content/runs/exp_bin1",

    # 時間同期/サンプリング
    "target_fps": 2.0,       # フレームサンプリングFPS
    "audio_sr": 16000,       # 音声リサンプリング
    "audio_win": 0.5,        # 各フレームの±(audio_win/2)で窓切り出し

    # 特徴量
    "resize": 224,
    "center_crop": 224,
    "n_mels": 64,
    "n_fft": 400,            # 25ms @16k
    "hop_length": 160,       # 10ms @16k

    # 学習
    "epochs": 5,
    "batch_size": 4,
    "lr": 1e-3,
    "pos_weight": None,      # POSクラスの重み（不均衡対策: 例えば 2.0）

    # データ分割
    "split_ratio": 0.8,      # train/val
    "seed": 42,

    # ポジティブ点→区間化の幅（秒）
    "pos_window": 0.6,
}

os.makedirs(CFG["work_dir"], exist_ok=True)


In [89]:
from dataclasses import dataclass

@dataclass
class SatsudoraDataset:
    # id: int
    talk_start_time: float
    talk_end_time: float
    action_start_time: float
    action_end_time: float
    _from: str
    _to: str
    talk_text: str
    objective_nonverbal: str
    subjective_nonverbal: str

@dataclass
class AnnotationInfo:
    annotation_path: str
    video_path: str
    data: List[SatsudoraDataset]

In [90]:
class LoadDataset():
    def __init__(self):
        # 環境変数からAPIキーを取得
        self.api_key = userdata.get('OPENAI_API')
        self.gpt_version = "gpt-4o"

        self.annotation_dir = "/content/drive/MyDrive/ResearchIntern/EscortData/202501_サツドラ/【最終版】データセット/annotations_csv"
        # self.satsudora_dataset = self.load_satsudora_datasets(annotation_dir=annotation_dir)
        # print(self.satsudora_dataset)
        # self.satsudora_dataset[0].data[0].talk_text

        if self.api_key is None:
            print("OPENAI_API is not set.")
            sys.exit()
        else:
            self.client = openai.OpenAI(api_key=self.api_key)

    def load_satsudora_datasets(self, annotation_dir: str) -> List[List[SatsudoraDataset]]:
        datasets: List[AnnotationInfo] = []
        csv_files = sorted(glob.glob(os.path.join(annotation_dir, "*.csv")))
        # video_files = sorted(glob.glob(os.path.join(video_dir, "*.mp4 のコピー")))
        for idx, csv_file in enumerate(csv_files):
            datasets.append(
                AnnotationInfo(
                    annotation_path=csv_file,
                    video_path=csv_file.replace("annotations_csv", "videos").replace(".csv", ".mp4 のコピー"),
                    data=self._load_csv(csv_file),
                )
            )
        return datasets

    @staticmethod
    def time_to_seconds(timestr: str) -> float:
        """ 'hh:mm:ss.xxx' 形式を秒数(float)に変換 """
        if not timestr:
            return None
        h, m, s = timestr.split(":")
        return int(h) * 3600 + int(m) * 60 + float(s)

    def _load_csv(self, csv_path: str) -> List[SatsudoraDataset]:
        datasets: List[SatsudoraDataset] = []
        with open(csv_path, newline='', encoding="utf-8") as f:
            reader = csv.DictReader(f)  # ヘッダー行を利用して辞書として読み込む
            for idx, row in enumerate(reader):
                try:
                    datasets.append(
                        SatsudoraDataset(
                            # id=idx,
                            talk_start_time=self.time_to_seconds(row["発話開始タイムスタンプ"]),
                            talk_end_time=self.time_to_seconds(row["発話終了タイムスタンプ"]),
                            _from=row["発話者"],
                            _to=row["宛先"],
                            talk_text=row["発話内容"],
                            action_start_time=self.time_to_seconds(row["行動開始タイムスタンプ"]),
                            action_end_time=self.time_to_seconds(row["行動終了タイムスタンプ"]),
                            objective_nonverbal=row["客観的非言語行動"],
                            subjective_nonverbal=row["主観的非言語行動"],
                        )
                    )

                except (KeyError, ValueError) as e:
                    print(f"⚠️ スキップしました: {row} (エラー: {e})")
        return datasets

    def set_inital_prompt(self) -> None:
        self.messages = [
            # inital promptの定義
            {"role": "system", "content": "以下は、実際の店舗内でヒューマノイドロボット（Sota）とお客さんが対話している動画から切り出した複数のフレーム画像です。それぞれのフレームをよく観察し、お客さんの具体的な非言語的行動を詳細に説明してください。特に、お客さんの目に見える行動、ジェスチャー、姿勢、顔の向き、ロボットや周囲の環境との物理的な関わりに注目してください。"},
            {"role": "system", "content": "抽象的で曖昧な表現は避け、お客さんが何をしているのかを明確に述べてください。（例：「お客さんは棚を指さしている」「お客さんはロボットの方に身を乗り出している」「お客さんは右手で商品を持っている」など）"},
            {"role": "system", "content": "出力は以下の形式でお願いします："},
            {"role": "system", "content": "1. **観察された客観的な行動** – お客さんが実際にしている行動を簡潔に説明する"},
            {"role": "system", "content": "1. **観察された行動に対する主観的な評価** – その事象に対して考えられること，なぜそのような行動をとっているかなどの理由を簡潔に説明する"},
            {"role": "system", "content": "2. **根拠** – なぜその行動と解釈したのか（例：手の位置、体の向き、物体との関わりなどに基づく）"},
        ]

    def extract_uniform_frames(
        self,
        video_path: str,
        k: int = 5,
        out_dir: Optional[str] = None,
        prefix: str = "frame",
        jpg_quality: int = 95,
        rewrite: bool = False
    ) -> List[Dict[str, Any]]:
        p = Path(video_path)
        out_dir = out_dir or str((p.parent / f"{p.stem}_uniform_frames").resolve())

        if rewrite:
            try:
                os.rmdir(out_dir)
            except Exception as e:
                # check = input(f"{out_dir}を削除しますがよろしいですか？ [y/N]>>")
                # if check == "y":
                shutil.rmtree(out_dir)
                    # print(e)
                # else:
                    # pass

        os.makedirs(out_dir, exist_ok=True)

        cap = cv2.VideoCapture(str(p))
        if not cap.isOpened():
            raise RuntimeError(f"Failed to open video: {video_path}")

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = float(cap.get(cv2.CAP_PROP_FPS)) or 30.0
        if total_frames <= 0:
            duration_ms = cap.get(cv2.CAP_PROP_POS_MSEC) or 0.0
            total_frames = max(1, int((duration_ms/1000.0) * fps))

        # ---- 等間隔にフレーム番号を抽出 ----
        k = min(k, max(1, total_frames))
        step = total_frames / float(k)
        indices = [int(i * step) for i in range(k)]

        results = []
        for idx in indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ok, frame = cap.read()
            if not ok or frame is None:
                continue
            ts_sec = idx / max(fps, 1e-6)
            out_path = str(Path(out_dir) / f"{prefix}_{idx:06d}.jpg")
            cv2.imwrite(out_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), jpg_quality])
            results.append({"index": idx, "timestamp_sec": ts_sec, "path": out_path})

        cap.release()
        return results

    def _fmt_time(self, t: float) -> str:
        """秒(float) -> ffmpeg向けの 'HH:MM:SS.mmm' 文字列"""
        if t < 0:
            t = 0.0
        h = int(t // 3600)
        m = int((t % 3600) // 60)
        s = t - h*3600 - m*60
        return f"{h:02d}:{m:02d}:{s:06.3f}"

    def _sanitize(self, s: str) -> str:
        """ファイル名に使いやすいようにサニタイズ"""
        s = s.strip()
        s = re.sub(r"\s+", "_", s)
        s = re.sub(r"[^A-Za-z0-9._-]", "_", s)
        return s[:80] if len(s) > 80 else s


    def cut_clip(
        self,
        id: int,
        video_name: str,
        video_path: str,
        start_time: float,
        end_time: float,
        out_dir: str = "clips",
        accurate: bool = True
    ) -> str:
        """
        自分の start_time/end_time を使って video_path からクリップを作成
        accurate=True: 再エンコードしてフレーム精度で切る
        accurate=False: 高速コピーだがキーフレーム単位になる
        戻り値: 出力ファイルパス
        """
        os.makedirs(out_dir, exist_ok=True)

        ss = self._fmt_time(start_time)
        to = self._fmt_time(end_time)

        # 出力ファイル名
        base = f"{id:04d}_{ss.replace(':','-')}_{to.replace(':','-')}.mp4"
        out_path = os.path.join(out_dir, base)

        if accurate:
            cmd = [
                "ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
                "-i", video_path, "-ss", ss, "-to", to,
                "-c:v", "libx264", "-c:a", "aac", "-preset", "medium", "-crf", "23",
                "-movflags", "+faststart",
                out_path
            ]
        else:
            cmd = [
                "ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
                "-ss", ss, "-to", to, "-i", video_path, "-c", "copy",
                out_path
            ]

        subprocess.run(cmd, check=True)
        return out_path

    def chat(self, image_paths: Dict[str, Any], prompt: str = "これらの画像から，接客対話に向けてお客さんに発話するべき内容をいくつか生成してください．") -> None:
        """
        複数画像を含んだ会話サンプル
        """

        # プロンプトの初期化
        self.set_inital_prompt()

        # まずユーザ発話（テキスト部分）
        content = [
            {
                "type": "text",
                "text": prompt
            }
        ]

        # 画像をすべて追加
        for path in image_paths:
            image_url = self.encode_image(image_path=path["path"])
            content.append(
                {
                    "type": "image_url",
                    "image_url": {"url": image_url}
                }
            )

        # プロンプトに追加
        self.messages.append({"role": "user", "content": content})

        # GPT の推論
        response = self.client.chat.completions.create(
            model=self.gpt_version,
            messages=self.messages
        )

        # 結果の表示
        answer = response.choices[0].message.content
        print(answer)
        return answer

    def encode_image(self, image_path: str) -> str:
        """chatgptが必要とする形式にエンコードする関数
        """
        _, image_extension = os.path.splitext(image_path)

        with open(image_path, "rb") as image_file:
            image_base64 = base64.b64encode(image_file.read()).decode('utf-8')
            url = f"data:image/{image_extension};base64,{image_base64}"

        return url

    def run_and_save_nonverbal_outputs(self, frames, data_id, target_video_path: str, csv_path="nonverbal_outputs.csv"):
        """
        cls.satsudora_dataset を走査し、各 data に対する3種類の出力を CSV に追記保存する。
        - columns: dataset_idx, data_idx, objective_nonverbal, subjective_nonverbal, answer_obj, answer_sub, answer_both
        """
        out_path = Path(csv_path)
        need_header = not out_path.exists()

        # 文字化けしにくい utf-8-sig、改行は newline="" を推奨
        with out_path.open("a", newline="", encoding="utf-8-sig") as f:
            writer = csv.writer(f)
            if need_header:
                writer.writerow([
                    # "dataset_idx",
                    "dataset_path",
                    "data_idx",
                    # "objective_nonverbal",
                    # "subjective_nonverbal",
                    "answer_obj",
                    # "answer_sub",
                    # "answer_both",
                ])

            # for ds_idx, dataset in tqdm(enumerate(cls.satsudora_dataset)):
            #     # print(dataset.data)  # 必要ならログを残す
            #     target_video_path = dataset.video_path
            #     for data_idx, data in enumerate(dataset.data):
            # obj = getattr(data, "objective_nonverbal", "") or ""
            # sub = getattr(data, "subjective_nonverbal", "") or ""

            # 空文字チェック（どちらかが非空なら実行）
            # if obj != "" or sub != "":
            try:
                answer_obj = self.chat(
                    frames,
                    prompt=(
                        "動画内に映っている顧客の非言語的行動を出力してください"
                        # f"客観的非言語行動は {obj} です．出力は発話文，その理由という形式で出してください．"
                    ),
                )
            except Exception as e:
                answer_obj = f"[ERROR] {e}"

                # try:
                #     answer_sub = cls.chat(
                #         frames,
                #         prompt=(
                #             "動画内に映っている顧客の非言語的行動に着目しながら，発話文を作成してください．"
                #             f"主観的な非言語的行動は {sub} です．出力は発話文，その理由という形式で出してください．"
                #         ),
                #     )
                # except Exception as e:
                #     answer_sub = f"[ERROR] {e}"

                # try:
                #     answer_both = cls.chat(
                #         frames,
                #         prompt=(
                #             "動画内に映っている顧客の非言語的行動に着目しながら，発話文を作成してください．"
                #             f"客観的非言語行動は {obj} です．主観的な非言語的行動は {sub} です．"
                #             "出力は発話文，その理由という形式で出してください．"
                #         ),
                #     )
                # except Exception as e:
                #     answer_both = f"[ERROR] {e}"

                # 行を追記
                # print(f"save {target_video_path}'s data_idx {data_idx}")
            writer.writerow([
                # ds_idx,
                target_video_path,
                data_id,
                # obj,
                # sub,
                answer_obj,
                # " ",
                # " ",
                # answer_sub,
                # answer_both,
            ])
            print(answer_obj)


In [91]:
def calc_stats(values: List[float]):
    if not values:
        raise ValueError("空のリストです。値を1つ以上与えてください。")

    n = len(values)
    mean = sum(values) / n

    # 不偏分散ではなく母分散（nで割る）
    variance = sum((x - mean) ** 2 for x in values) / n
    std_dev = math.sqrt(variance)

    return mean, variance, std_dev


def get_video_duration(video_path: str) -> float:
    """
    動画ファイルの長さを秒数で返す関数
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"動画を開けませんでした: {video_path}")

    # 総フレーム数とFPSを取得
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    cap.release()

    if fps <= 0:
        raise RuntimeError("FPS情報を取得できませんでした。")

    duration = frame_count / fps
    return duration


In [None]:
cls = LoadDataset()
datasets = cls.load_satsudora_datasets("/content/drive/MyDrive/ResearchIntern/EscortData/202501_サツドラ/【最終版】データセット/annotations_csv")

work_dir = "/content/drive/MyDrive/ResearchIntern/InteractiveVQA/temp_data"
work_video_dir = "/content/drive/MyDrive/ResearchIntern/InteractiveVQA/temp_video_v3"

start_time = -1
end_time = -1

json_output_data = []
num = 0
elapsed_time = 0.
elapsed_times = []
window_size = 2

for i, dataset in enumerate(datasets):
    target_video_path = dataset.video_path
    segments = []
    objective_infos = []
    video_duration = get_video_duration(dataset.video_path)
    filename = os.path.basename(target_video_path).replace(" のコピー", "")
    for id, start in enumerate(range(0, int(video_duration), window_size)):
        clip_video_path = cls.cut_clip(id=id, video_name=filename, video_path=target_video_path, start_time=start, end_time=(start+window_size), out_dir=work_video_dir)
        frames = cls.extract_uniform_frames(clip_video_path, out_dir=work_dir, rewrite=True)
        cls.run_and_save_nonverbal_outputs(frames, id, target_video_path, csv_path="/content/drive/MyDrive/ResearchIntern/nonverbal_outputs_v3.csv")
    break


#     for idx, data in enumerate(dataset.data):
#         if data.objective_nonverbal != "" or data.subjective_nonverbal != "":
#             # ディレクトリ部分を除去
#             clip_video_path = cls.cut_clip(id=idx, video_path=target_video_path, start_time=0.0, end_time=data.action_end_time, out_dir=work_video_dir)
#             # filename = os.path.basename(target_video_path).replace(" のコピー", "")
#             if start_time == data.action_start_time and end_time == data.action_end_time:
#                 continue

#             start_time = data.action_start_time
#             end_time = data.action_end_time
#             elapsed_time += end_time - start_time
#             elapsed_times.append((end_time - start_time))
#             segments.append([start_time, end_time])
#             objective_infos.append(data.objective_nonverbal)

#     num += len(objective_infos)

#     json_output_data.append({
#             "video_path": target_video_path,
#             "positive_segments": segments,
#             "objective": objective_infos
#         }
#     )

# out_path = "/content/drive/MyDrive/ResearchIntern/InteractiveVQA/annotation/action_segments_temp.json"
# with open(out_path, "w", encoding="utf-8") as f:
#     json.dump(json_output_data, f, ensure_ascii=False, indent=2)

# print(f"アノテーション数{num}")
# print(f"総アノテーション時間 {elapsed_time}")
# mean, var, std = calc_stats(elapsed_times)
# print(elapsed_times)
# print(f"平均: {mean}, 分散: {var}, 標準偏差: {std}")

1. **観察された客観的な行動** – お客さんはロボットに接近し、ロボットの方向に軽く手を伸ばしている。

2. **観察された行動に対する主観的な評価** – お客さんはロボットに興味を持っている可能性が高い。また、ロボットに質問したり触れたいと考えているかもしれない。

3. **根拠** – お客さんの体の向きや手の動きがロボットに向かっている点。接近している動作から興味や関心を抱いていると考えられる。
1. **観察された客観的な行動** – お客さんはロボットに接近し、ロボットの方向に軽く手を伸ばしている。

2. **観察された行動に対する主観的な評価** – お客さんはロボットに興味を持っている可能性が高い。また、ロボットに質問したり触れたいと考えているかもしれない。

3. **根拠** – お客さんの体の向きや手の動きがロボットに向かっている点。接近している動作から興味や関心を抱いていると考えられる。
1. **観察された客観的な行動** – お客さんは左手を上げて通り過ぎようとしている。

2. **観察された行動に対する主観的な評価** – ロボットを避けて移動しようとしている可能性がある。

3. **根拠** – お客さんの左手が上がっており、体が前に進む動きを示していること。ロボットに対して接触を避けるための行動にも見える。
1. **観察された客観的な行動** – お客さんは左手を上げて通り過ぎようとしている。

2. **観察された行動に対する主観的な評価** – ロボットを避けて移動しようとしている可能性がある。

3. **根拠** – お客さんの左手が上がっており、体が前に進む動きを示していること。ロボットに対して接触を避けるための行動にも見える。
1. **観察された客観的な行動**  
   - お客さんは身体をロボットの方へ向けており、時々頭を少し下げています。

2. **観察された行動に対する主観的な評価**  
   - お客さんはロボットに興味を持って話を聞いている状態である可能性があります。

3. **根拠**  
   - お客さんの体の向きがロボットに対して正面を向いていることと、頭を少し傾けている様子から、ロボットとの対話に集中していることが伺えます。
1. **観察された客観的な行動**  
   

In [None]:
NEG, POS = 0, 1

def _merge_intervals(intervals: List[Tuple[float,float]]) -> List[Tuple[float,float]]:
    xs = []
    for s, e in intervals:
        if s is None or e is None:
            continue
        s, e = float(s), float(e)
        if e <= s:
            continue
        xs.append((min(s, e), max(s, e)))
    if not xs:
        return []
    xs.sort(key=lambda x: x[0])
    out = [list(xs[0])]
    for s, e in xs[1:]:
        if s <= out[-1][1]:
            out[-1][1] = max(out[-1][1], e)
        else:
            out.append([s, e])
    return [tuple(x) for x in out]

def _in_any_interval(t: float, intervals: List[Tuple[float,float]]) -> bool:
    for s, e in intervals:
        if s <= t < e:
            return True
    return False

def build_positive_intervals(entry: Dict[str, Any], pos_window: float) -> List[Tuple[float, float]]:
    out = []
    for s, e in entry.get("positive_segments", []) or []:
        out.append((float(s), float(e)))
    half = pos_window / 2.0
    for t in entry.get("positives", []) or []:
        t = float(t)
        out.append((t - half, t + half))
    out = [(min(s, e), max(s, e)) for (s, e) in out if e != s]
    return _merge_intervals(out)

def in_any_interval(t: float, intervals: List[Tuple[float, float]]) -> bool:
    for s, e in intervals:
        if s <= t < e:
            return True
    return False

class CNNEncoder(nn.Module):
    """Frozen ResNet18 -> 512-d feature"""
    def __init__(self, resize=224, center_crop=224):
        super().__init__()
        backbone = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        self.backbone = nn.Sequential(*list(backbone.children())[:-1])  # [B,512,1,1]
        for p in self.backbone.parameters():
            p.requires_grad = False
        self.tf = transforms.Compose([
            transforms.Resize(resize),
            transforms.CenterCrop(center_crop),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
        ])

    @torch.no_grad()
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        f = self.backbone(x).view(x.size(0), -1)
        return f

class AudioCNN(nn.Module):
    """Log-mel (B,1,n_mels,Tm) -> 128-d"""
    def __init__(self, n_mels=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 16, 3, padding=1), nn.BatchNorm2d(16), nn.ReLU(),
            nn.MaxPool2d((2,2)),
            nn.Conv2d(16, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(),
            nn.MaxPool2d((2,2)),
            nn.Conv2d(32, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1))
        )
        self.proj = nn.Linear(64, 128)

    def forward(self, x):
        h = self.net(x).flatten(1)
        z = self.proj(h)
        return z

class AV_LSTM(nn.Module):
    """BiLSTM over [vis512 + aud128] -> logits(2)"""
    def __init__(self, in_dim=512+128, hidden_dim=256, num_layers=1, bidir=True, dropout=0.1):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=in_dim, hidden_size=hidden_dim, num_layers=num_layers,
            dropout=0.0 if num_layers == 1 else dropout, bidirectional=bidir, batch_first=True
        )
        out_dim = hidden_dim * (2 if bidir else 1)
        self.cls = nn.Linear(out_dim, 2)

    def forward(self, x_packed):
        y_packed, _ = self.lstm(x_packed)
        y, _ = pad_packed_sequence(y_packed, batch_first=True)
        logits = self.cls(y)
        return logits

class AVBinaryDataset(Dataset):
    """
    入力JSON形式:
    [
      {"video_path": ".../video1.mp4", "positive_segments": [[s,e], ...]},
      {"video_path": ".../video2.mp4", "positive_segments": [[s,e], ...]},
      ...
    ]
    """
    def __init__(self,
                 dataset_json: str,
                 split: str = "train",
                 split_ratio: float = 0.8,
                 seed: int = 42,
                 target_fps: float = 2.0,
                 audio_sr: int = 16000,
                 audio_win: float = 0.5,
                 n_mels: int = 64,
                 n_fft: int = 400,
                 hop_length: int = 160,
                 resize: int = 224,
                 center_crop: int = 224):
        super().__init__()
        assert split in ("train", "val")
        self.target_fps = float(target_fps)
        self.audio_sr = int(audio_sr)
        self.audio_win = float(audio_win)
        self.n_mels = int(n_mels)
        self.n_fft = int(n_fft)
        self.hop_length = int(hop_length)
        self.resize = int(resize)
        self.center_crop = int(center_crop)

        with open(dataset_json, "r", encoding="utf-8") as f:
            items = json.load(f)
        # 事前にマージ・正規化
        for it in items:
            segs = it.get("positive_segments", []) or []
            it["positive_segments"] = _merge_intervals([(float(s), float(e)) for s, e in segs])

        # split
        rng = np.random.default_rng(seed)
        idxs = np.arange(len(items)); rng.shuffle(idxs)
        cut = int(len(idxs) * split_ratio)
        self.items = [items[i] for i in (idxs[:cut] if split=="train" else idxs[cut:])]

        self.tf = transforms.Compose([
            transforms.Resize(self.resize),
            transforms.CenterCrop(self.center_crop),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
        ])

    def __len__(self):
        return len(self.items)

    def _sample_video(self, path: str):
        cap = cv2.VideoCapture(path)
        if not cap.isOpened():
            raise RuntimeError(f"Failed to open video: {path}")
        vfps = cap.get(cv2.CAP_PROP_FPS) or 30.0
        if vfps <= 1e-3: vfps = 30.0
        total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = total / vfps if total > 0 else None

        frames, ts_list = [], []
        step = 1.0 / self.target_fps
        t = 0.0
        while duration is None or t < duration:
            idx = int(round(t * vfps))
            if total > 0 and idx >= total: break
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ok, f = cap.read()
            if not ok: break
            frames.append(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))
            ts_list.append(t)
            t += step
        cap.release()
        if duration is None:
            duration = len(ts_list) * step
        return frames, ts_list, duration

    def _read_audio(self, path: str):
        y, sr = librosa.load(path, sr=self.audio_sr, mono=True)
        return y, sr

    def _logmel(self, y: np.ndarray, sr: int) -> np.ndarray:
        S = librosa.feature.melspectrogram(
            y=y, sr=sr, n_fft=self.n_fft, hop_length=self.hop_length,
            n_mels=self.n_mels, power=2.0
        )
        return librosa.power_to_db(S, ref=np.max)  # [n_mels, Tm]

    def __getitem__(self, idx: int) -> Dict[str, Any]:
        it   = self.items[idx]
        path = it["video_path"]
        pos_intervals = it.get("positive_segments", [])

        frames, ts_list, vdur = self._sample_video(path)
        audio, sr = self._read_audio(path)
        adur = len(audio) / sr
        total_dur = min(vdur, adur)

        # labels
        labels = [POS if _in_any_interval(t, pos_intervals) else NEG for t in ts_list]
        labels = torch.tensor(labels, dtype=torch.long)

        # visual tensors
        v_tensors = [self.tf(transforms.functional.to_pil_image(img)) for img in frames]
        if len(v_tensors) == 0:
            v_tensors = [torch.zeros(3, self.center_crop, self.center_crop)]
            labels = torch.tensor([NEG], dtype=torch.long)
            ts_list = [0.0]
        frames_tensor = torch.stack(v_tensors, 0)  # [T,3,H,W]

        # audio windows -> log-mel
        half = self.audio_win / 2.0
        specs = []
        for t in ts_list:
            a0 = max(0.0, t - half)
            a1 = min(total_dur, t + half)
            s0 = int(round(a0 * sr)); s1 = int(round(a1 * sr))
            if s1 <= s0: s1 = min(len(audio), s0 + self.hop_length)
            y = audio[s0:s1]
            if len(y) < self.n_fft:
                pad = self.n_fft - len(y)
                y = np.pad(y, (0, pad), mode='reflect')
            m = self._logmel(y, sr)  # [n_mels, Tm]
            specs.append(torch.from_numpy(m).float().unsqueeze(0))  # [1,n_mels,Tm]

        return {
            "frames": frames_tensor,   # [T,3,H,W]
            "specs": specs,            # list of [1,n_mels,Tm]
            "labels": labels,          # [T]
            "length": len(labels),
            "video_path": path,
        }

In [None]:
def collate_var(batch):
    lengths = torch.tensor([b["length"] for b in batch], dtype=torch.long)
    max_len = int(lengths.max().item())
    labels = torch.full((len(batch), max_len), -100, dtype=torch.long)
    for i, b in enumerate(batch):
        labels[i, :b["length"]] = b["labels"]
    return {
        "frames_list": [b["frames"] for b in batch],
        "specs_list":  [b["specs"]  for b in batch],
        "labels": labels,
        "lengths": lengths,
        "video_paths": [b["video_path"] for b in batch],
    }

@torch.no_grad()
def extract_visual_features(frames_seq: torch.Tensor, venc: CNNEncoder, chunk=64) -> torch.Tensor:
    T = frames_seq.shape[0]
    out = []
    for i in range(0, T, chunk):
        f = venc(frames_seq[i:i+chunk].to(device, non_blocking=True))  # [B,512]
        out.append(f.cpu())
    return torch.cat(out, 0)

@torch.no_grad()
def extract_audio_features(specs_seq: List[torch.Tensor], aenc: AudioCNN, chunk=64) -> torch.Tensor:
    out, buf = [], []
    for m in specs_seq:
        buf.append(m)
        if len(buf) == chunk:
            tm = max(x.shape[-1] for x in buf)
            batch = torch.zeros(len(buf), 1, buf[0].shape[1], tm, dtype=torch.float32)
            for i, x in enumerate(buf):
                batch[i, :, :, :x.shape[-1]] = x
            out.append(aenc(batch.to(device, non_blocking=True)).cpu())
            buf = []
    if buf:
        tm = max(x.shape[-1] for x in buf)
        batch = torch.zeros(len(buf), 1, buf[0].shape[1], tm, dtype=torch.float32)
        for i, x in enumerate(buf):
            batch[i, :, :, :x.shape[-1]] = x
        out.append(aenc(batch.to(device, non_blocking=True)).cpu())
    return torch.cat(out, 0)

def train_one_epoch(model, venc, aenc, loader, opt, pos_weight: Optional[float]):
    model.train()
    if pos_weight is not None:
        ce = nn.CrossEntropyLoss(ignore_index=-100, weight=torch.tensor([1.0, pos_weight], device=device))
    else:
        ce = nn.CrossEntropyLoss(ignore_index=-100)

    total_loss = total_tok = total_cor = 0
    for batch in loader:
        labels = batch["labels"].to(device)
        B = labels.size(0)
        lengths = batch["lengths"].clone()
        max_len = int(lengths.max().item())

        feats_pad = torch.zeros(B, max_len, 512+128, dtype=torch.float32)
        for i in range(B):
            vseq = extract_visual_features(batch["frames_list"][i], venc)   # [T,512]
            aseq = extract_audio_features(batch["specs_list"][i], aenc)     # [T,128]
            T = min(vseq.size(0), aseq.size(0))
            feats_pad[i, :T] = torch.cat([vseq[:T], aseq[:T]], dim=1)
            lengths[i] = T
        feats_pad = feats_pad.to(device)
        lengths = lengths.to(device)

        packed = pack_padded_sequence(feats_pad, lengths.cpu(), batch_first=True, enforce_sorted=False)
        logits = model(packed)  # [B,T,2]

        loss = ce(logits.reshape(-1, 2), labels.reshape(-1))
        opt.zero_grad(set_to_none=True)
        loss.backward()
        opt.step()

        with torch.no_grad():
            mask = labels.ne(-100)
            pred = logits.argmax(-1)
            cor = (pred.eq(labels) & mask).sum().item()
            total_cor += cor
            total_tok += mask.sum().item()
            total_loss += loss.item() * mask.sum().item()

    return total_loss / max(total_tok, 1), total_cor / max(total_tok, 1)

@torch.no_grad()
def evaluate(model, venc, aenc, loader):
    model.eval()
    ce = nn.CrossEntropyLoss(ignore_index=-100)
    total_loss = total_tok = total_cor = 0
    for batch in loader:
        labels = batch["labels"].to(device)
        B = labels.size(0)
        lengths = batch["lengths"].clone()
        max_len = int(lengths.max().item())

        feats_pad = torch.zeros(B, max_len, 512+128, dtype=torch.float32)
        for i in range(B):
            vseq = extract_visual_features(batch["frames_list"][i], venc)
            aseq = extract_audio_features(batch["specs_list"][i], aenc)
            T = min(vseq.size(0), aseq.size(0))
            feats_pad[i, :T] = torch.cat([vseq[:T], aseq[:T]], dim=1)
            lengths[i] = T
        feats_pad = feats_pad.to(device)
        lengths = lengths.to(device)

        packed = pack_padded_sequence(feats_pad, lengths.cpu(), batch_first=True, enforce_sorted=False)
        logits = model(packed)

        loss = ce(logits.reshape(-1, 2), labels.reshape(-1))
        mask = labels.ne(-100)
        pred = logits.argmax(-1)
        cor = (pred.eq(labels) & mask).sum().item()
        total_cor += cor
        total_tok += mask.sum().item()
        total_loss += loss.item() * mask.sum().item()

    return total_loss / max(total_tok, 1), total_cor / max(total_tok, 1)


In [None]:
@torch.no_grad()
def predict_video_binary(video_path: str, model: AV_LSTM, venc: CNNEncoder, aenc: AudioCNN, cfg=CFG):
    # 単体動画のフレーム列を構築（NEG/ POS ラベルは作らない）
    tmp_ann = [{"video_path": video_path}]
    tmp_ts  = [{"video_path": video_path, "positive_segments": []}]  # ラベル不要のため空
    ann_p = "/content/_tmp_ann.json"
    ts_p  = "/content/_tmp_ts.json"
    json.dump(tmp_ann, open(ann_p, "w")); json.dump(tmp_ts, open(ts_p, "w"))
    tmp_cfg = dict(cfg); tmp_cfg["annotations"] = ann_p; tmp_cfg["binary_timestamps"] = ts_p

    ds = AVBinaryDataset(tmp_cfg, split="train")  # 中でサンプリングや音声切り出しだけ使う
    rec = ds[0]
    frames_seq = rec["frames"]
    specs_seq  = rec["specs"]

    v = extract_visual_features(frames_seq, venc)       # [T,512]
    a = extract_audio_features(specs_seq, aenc)         # [T,128]
    T = min(v.size(0), a.size(0))
    feats = torch.cat([v[:T], a[:T]], dim=1).unsqueeze(0).to(device)  # [1,T,640]
    lengths = torch.tensor([T])
    packed = pack_padded_sequence(feats, lengths, batch_first=True, enforce_sorted=False)
    logits = model(packed)                              # [1,T,2]
    probs = torch.softmax(logits, dim=-1).squeeze(0).cpu().numpy()    # [T,2]
    preds = probs.argmax(-1)                            # [T]
    return preds, probs
