In [None]:
#このコードを持つファイルを10個複製して、SHARD_IDのみをそれぞれファイルの数に対応させて（10個のファイルなら0-9の値をそれぞれのファイルで登録する。）実行します。
#これに追加して、RUN_IDもdecide_what_live_to_collect.ipynbで取得した値をはめ込みます。

# =========================
# Robust Twitch Worker v2 (crash-resume / multi-shard / ledger+locks in GCS)
# =========================
# 依存: google-cloud-storage, pandas, requests, pytz, tqdm, yt-dlp, ffmpeg(TwitchDownloaderCLIは別)
# 役割:
#  - assignments_{RUN_ID}.csv で割り当てられた VOD を、この SHARD_ID が担当
#  - 各 VOD について chat → audio(m4a) → mp4 を順に実施
#  - 各工程の完了/失敗は ledger.csv に “安全に” 反映（世代一致の楽観ロック + バックアップ）
#  - GCS lock（locks/<vod>.lock）は心拍(heartbeat)で延命、クラッシュ時は TTL 超過で他ワーカーが奪取可能
#  - 全滅後でも ledger 自動復元・再起動だけで続行
# =========================

import os, re, json, time, subprocess, shutil, pathlib, math, signal, threading, socket, uuid
from typing import Optional, Tuple, List
from datetime import datetime, timezone, timedelta

import pytz, requests, pandas as pd
from tqdm.auto import tqdm
from google.cloud import storage

# ========= ユーザー設定 =========
RUN_ID   = os.environ.get("RUN_ID", "20250910_190657")  # 例: "YYYYmmdd_HHMMSS"
SHARD_ID = int(os.environ.get("SHARD_ID", "0"))         # 0..9
SHARD_COUNT = int(os.environ.get("SHARD_COUNT", "10"))

CLIENT_ID     = os.environ.get("TWITCH_CLIENT_ID", 'ここに機密コードを入れます')
CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", 'ここに機密コードを入れます')

GCS_BUCKET      = os.environ.get("GCS_BUCKET", "dena-ai-intern-yoshihara-data")
GCS_ROOT_PREFIX = os.environ.get("GCS_ROOT_PREFIX", "twitch_v2")

API_BASE_URL = "https://api.twitch.tv/helix"
AUTH_URL     = "https://id.twitch.tv/oauth2/token"

TWITCHDL = os.environ.get("TWITCHDL", "/home/jupyter/Twitch_data_collection/TwitchDownloaderCLI")

# ========= 固定ルート（どこから実行しても同じ場所を使う）=========
ROOT_DIR  = "/home/jupyter/Twitch_Pipeline"
STATE_DIR = os.path.join(ROOT_DIR, "state_v2")                  # 共有台帳/割当のローカルキャッシュ
TMP_DIR   = os.path.join(ROOT_DIR, f"temp_downloads_v2/shard_{SHARD_ID}")  # ワーカーごとの一時領域

os.makedirs(STATE_DIR, exist_ok=True)
os.makedirs(TMP_DIR, exist_ok=True)

# ========= GCS キー類 =========
def gcs_key(prefix: str, filename: str) -> str:
    return f"{GCS_ROOT_PREFIX}/{RUN_ID}/{prefix}/{filename}"

LEDGER_KEY            = gcs_key("manifests", "ledger.csv")
LEDGER_BACKUP_KEY     = gcs_key("manifests", "ledger.prev.csv")
ASSIGN_KEY            = gcs_key("manifests", f"assignments_{RUN_ID}.csv")
LOCK_PREFIX           = f"{GCS_ROOT_PREFIX}/{RUN_ID}/locks"

LEDGER_LOCAL          = os.path.join(STATE_DIR, f"ledger_{RUN_ID}.csv")
LEDGER_BACKUP_LOCAL   = os.path.join(STATE_DIR, f"ledger_{RUN_ID}.prev.csv")
ASSIGN_LOCAL          = os.path.join(STATE_DIR, f"assignments_{RUN_ID}.csv")

# ========= ログ置き場 =========
LOG_ROOT = os.path.join(STATE_DIR, "logs", RUN_ID)
os.makedirs(LOG_ROOT, exist_ok=True)

# ========= グローバル =========
JST = pytz.timezone("Asia/Tokyo")
def jst_now_iso() -> str: return datetime.now(JST).isoformat()

def seconds_to_hms(sec: int) -> str:
    h = sec // 3600; m = (sec % 3600) // 60; s = sec % 60
    return f"{h:02d}:{m:02d}:{s:02d}"

print(f"🧵 Worker 起動 | RUN_ID={RUN_ID} | SHARD_ID={SHARD_ID} | ROOT={ROOT_DIR}")

# ========= GCS I/O =========
def gcs_client() -> storage.Client: return storage.Client()

def gcs_download(key: str, local_path: str) -> bool:
    """
    GCS からダウンロード。0バイトは不正として False を返す。
    """
    b = gcs_client().bucket(GCS_BUCKET); bl = b.blob(key)
    if not bl.exists():
        print(f"⚠️ GCSに存在しません: gs://{GCS_BUCKET}/{key}")
        return False
    os.makedirs(os.path.dirname(local_path), exist_ok=True)
    bl.download_to_filename(local_path)
    try:
        bl.reload()
        if (bl.size or 0) == 0:
            print(f"⚠️ 空ファイルを検出: gs://{GCS_BUCKET}/{key}")
            return False
    except Exception:
        pass
    return os.path.exists(local_path) and os.path.getsize(local_path) > 0

def _safe_remove(path: str):
    try:
        if os.path.exists(path):
            os.remove(path)
            print(f"🧹 deleted local: {path}")
    except Exception as e:
        print(f"⚠️ local delete failed: {path} ({e})")

def gcs_upload(local_path: str, key: str, *, content_type: Optional[str]=None) -> bool:
    """
    GCSへアップロードし、サイズ一致で検証。成功時 True を返す。
    呼び出し側で削除ポリシーを制御する（ここでは削除しない）。
    """
    if not os.path.exists(local_path):
        print(f"⚠️ upload skipped (no local file): {local_path}")
        return False
    b = gcs_client().bucket(GCS_BUCKET)
    bl = b.blob(key)
    kwargs = {}
    if content_type:
        kwargs["content_type"] = content_type
    try:
        bl.upload_from_filename(local_path, **kwargs)
        bl.reload()  # size 取得のため
        local_sz = os.path.getsize(local_path)
        remote_sz = int(bl.size) if bl.size is not None else None
        ok = (remote_sz is None) or (local_sz == remote_sz)
        if ok:
            print(f"☁️ Uploaded: gs://{GCS_BUCKET}/{key}  (size={remote_sz})")
            return True
        else:
            print(f"⚠️ upload verification failed (local={local_sz}, remote={remote_sz}): gs://{GCS_BUCKET}/{key}")
            return False
    except Exception as e:
        print(f"⚠️ upload error: {local_path} -> gs://{GCS_BUCKET}/{key} ({e})")
        return False

def gcs_exists(key: str) -> bool:
    b = gcs_client().bucket(GCS_BUCKET); bl = b.blob(key)
    return bl.exists()

# ========= Twitch 認証 =========
HEADERS = {}
def authenticate() -> bool:
    global HEADERS
    if not CLIENT_ID or not CLIENT_SECRET or CLIENT_ID == "YOUR_TWITCH_CLIENT_ID":
        print("❌ CLIENT_ID/SECRET を設定してください"); return False
    r = requests.post(AUTH_URL, params={"client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "client_credentials"})
    try:
        r.raise_for_status()
        token = r.json()["access_token"]
        HEADERS = {"Client-ID": CLIENT_ID, "Authorization": f"Bearer {token}"}
        print("✅ Twitch 認証OK")
        return True
    except Exception as e:
        print("❌ 認証失敗:", e, "| resp:", getattr(r, "text", ""))
        return False

if not authenticate():
    raise SystemExit("Twitch 認証に失敗")

# ========= Ledger Utilities (自動修復＋安全更新) =========
LEDGER_COLUMNS = ["run_id","vod_id","user_name","duration_seconds","status",
                  "step_chat","step_audio","step_mp4","step_whisper",
                  "last_update","note","shard_id"]

def atomic_write_csv(df: pd.DataFrame, path: str):
    """安全なローカル書き込み（途中ファイル→置換）"""
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    df.to_csv(tmp, index=False, encoding="utf-8")
    os.replace(tmp, path)

def _enforce_ledger_types(df: pd.DataFrame) -> pd.DataFrame:
    # 文字列列
    for c in ["run_id","vod_id","user_name","status",
              "step_chat","step_audio","step_mp4","step_whisper",
              "last_update","note"]:
        if c in df.columns:
            df[c] = df[c].astype(str)
    # 数値列
    if "duration_seconds" in df.columns:
        df["duration_seconds"] = pd.to_numeric(df["duration_seconds"], errors="coerce").fillna(0).astype(int)
    if "shard_id" in df.columns:
        df["shard_id"] = pd.to_numeric(df["shard_id"], errors="coerce").fillna(-1).astype(int)
    return df

def restore_ledger_if_empty():
    """
    ledger.csv が空(0B)なら ledger.prev.csv から自動復元（起動時一度だけ実行）
    """
    b = gcs_client().bucket(GCS_BUCKET)
    main = b.blob(LEDGER_KEY); prev = b.blob(LEDGER_BACKUP_KEY)
    try:
        if not main.exists():
            return
        main.reload()
        if (main.size or 0) == 0 and prev.exists():
            print("🩺 ledger.csv is empty → restoring from ledger.prev.csv")
            prev.download_to_filename(LEDGER_BACKUP_LOCAL)
            main.upload_from_filename(LEDGER_BACKUP_LOCAL)
            print("✅ restored ledger.csv")
    except Exception as e:
        print("⚠️ ledger auto-restore failed:", e)

def load_ledger() -> pd.DataFrame:
    # 本体
    if gcs_download(LEDGER_KEY, LEDGER_LOCAL):
        try:
            if os.path.getsize(LEDGER_LOCAL) == 0:
                raise ValueError("ledger.csv is empty")
            return _enforce_ledger_types(pd.read_csv(LEDGER_LOCAL))
        except Exception as e:
            print("⚠️ ledger 読み込み失敗:", e)
    # 次に prev
    if gcs_download(LEDGER_BACKUP_KEY, LEDGER_BACKUP_LOCAL):
        try:
            shutil.copy2(LEDGER_BACKUP_LOCAL, LEDGER_LOCAL)
            return _enforce_ledger_types(pd.read_csv(LEDGER_LOCAL))
        except Exception as e:
            print("⚠️ ledger.prev 読み込み失敗:", e)
    # どちらもダメなら最小カラムで空DF
    return pd.DataFrame(columns=LEDGER_COLUMNS)

def _ensure_ledger_columns(df: pd.DataFrame) -> pd.DataFrame:
    for c in LEDGER_COLUMNS:
        if c not in df.columns:
            df[c] = "" if c not in ("duration_seconds","shard_id") else 0
    return _enforce_ledger_types(df)

def mark_step_safe(vod_id: str, step: str, status: str, note: str = "", max_retries: int = 6, backoff_base: int = 2):
    """
    ledger.csv を“世代一致(CAS)”で安全に更新。
    1) 最新世代の ledger をDL
    2) 当該行のみ編集
    3) ローカルbackup
    4) 本体を if_generation_match=<current_gen> でアップロード（CAS）
    5) 成功後に prev を更新
    レース時は指数バックオフで再試行
    """
    col_map = {"chat":"step_chat","audio":"step_audio","mp4":"step_mp4","whisper":"step_whisper"}
    col = col_map.get(step)
    if not col:
        raise ValueError(f"unknown step: {step}")

    b = gcs_client().bucket(GCS_BUCKET)
    blob = b.blob(LEDGER_KEY)

    attempt = 0; last_err = None
    while attempt < max_retries:
        attempt += 1
        try:
            # 最新世代
            blob.reload()
            current_gen = int(blob.generation) if blob.generation is not None else None

            # 最新DL
            blob.download_to_filename(LEDGER_LOCAL)
            df = _ensure_ledger_columns(pd.read_csv(LEDGER_LOCAL))

            # 行が無ければ作る
            if not (df["vod_id"].astype(str) == str(vod_id)).any():
                new_row = {
                    "run_id":RUN_ID, "vod_id":str(vod_id), "user_name":"", "duration_seconds":0,
                    "status":"processing", "step_chat":"pending","step_audio":"pending","step_mp4":"pending","step_whisper":"pending",
                    "last_update": jst_now_iso(), "note":"", "shard_id":SHARD_ID
                }
                df.loc[len(df)] = new_row

            idx = df.index[df["vod_id"].astype(str)==str(vod_id)][0]
            df.at[idx, col] = status
            if note: df.at[idx, "note"] = note
            df.at[idx, "last_update"] = jst_now_iso()

            # 保存 & ローカルバックアップ
            atomic_write_csv(df, LEDGER_LOCAL)
            shutil.copy2(LEDGER_LOCAL, LEDGER_BACKUP_LOCAL)

            # 先に“本体”（CAS）→ 成功したら prev を更新
            if current_gen is not None:
                blob.upload_from_filename(LEDGER_LOCAL, if_generation_match=current_gen)
            else:
                blob.upload_from_filename(LEDGER_LOCAL)
            b.blob(LEDGER_BACKUP_KEY).upload_from_filename(LEDGER_BACKUP_LOCAL)
            return
        except Exception as e:
            last_err = e
            wait = backoff_base ** attempt
            print(f"↻ ledger race/retry in {wait}s (attempt {attempt}/{max_retries}) ... [{vod_id}:{col}={status}] ({e})")
            time.sleep(wait)
    print("⚠️ ledger update failed after retries:", last_err)
    raise RuntimeError(f"ledger update failed: {vod_id} {col} -> {status}")

# ========= Assignment =========
def load_assignments() -> pd.DataFrame:
    if gcs_download(ASSIGN_KEY, ASSIGN_LOCAL):
        try:
            return pd.read_csv(ASSIGN_LOCAL)
        except Exception as e:
            print("⚠️ assignments 読込失敗（フォールバックshard割当を使用）:", e)
    return pd.DataFrame()

# ========= TTL / Heartbeat =========
HEARTBEAT_SEC    = int(os.environ.get("HEARTBEAT_SEC", "60"))     # 60s ごとに心拍
STALE_LOCK_SEC   = int(os.environ.get("STALE_LOCK_SEC", "300"))   # 既定 5分（全滅時の復旧を速く）

WORKER_ID = f"{socket.gethostname()}:{os.getpid()}:{uuid.uuid4().hex[:8]}"

class GCSLock:
    def __init__(self, bucket: str, key: str):
        self.bucket_name = bucket
        self.key = key
        self._stop = threading.Event()
        self._thread = None

    def _heartbeat_loop(self):
        b = gcs_client().bucket(self.bucket_name); bl = b.blob(self.key)
        while not self._stop.wait(HEARTBEAT_SEC):
            try:
                payload = json.dumps({"worker": WORKER_ID, "ts": jst_now_iso()})
                bl.upload_from_string(payload)  # last-writer-wins
            except Exception:
                pass

    def acquire(self, steal_if_stale=True) -> bool:
        b = gcs_client().bucket(self.bucket_name); bl = b.blob(self.key)
        payload = json.dumps({"worker": WORKER_ID, "ts": jst_now_iso()})
        # 1) 新規作成
        try:
            bl.upload_from_string(payload, if_generation_match=0)
            self._thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
            self._thread.start()
            print(f"🔒 lock acquired: gs://{self.bucket_name}/{self.key}")
            return True
        except Exception:
            # 2) 既存 → stale なら奪取
            try:
                bl.reload()
                upd = bl.updated  # datetime
                if upd is None:
                    if steal_if_stale:
                        bl.upload_from_string(payload)
                        self._thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
                        self._thread.start()
                        print(f"🔧 lock force-acquired(no-updated): {self.key}")
                        return True
                    return False
                age = (datetime.now(timezone.utc) - upd).total_seconds()
                if steal_if_stale and age > STALE_LOCK_SEC:
                    meta = bl.metageneration
                    bl.upload_from_string(payload, if_metageneration_match=meta)
                    self._thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
                    self._thread.start()
                    print(f"🛠️ lock stolen (stale {int(age)}s): {self.key}")
                    return True
                else:
                    print(f"⛔ lock busy (age {int(age)}s <= {STALE_LOCK_SEC}s): {self.key}")
                    return False
            except Exception as e:
                print(f"⚠️ lock check error: {e}")
                return False

    def release(self):
        self._stop.set()
        if self._thread and self._thread.is_alive():
            self._thread.join(timeout=3)
        try:
            b = gcs_client().bucket(self.bucket_name); bl = b.blob(self.key)
            if bl.exists(): bl.delete()
            print(f"🔓 lock released: {self.key}")
        except Exception as e:
            print(f"⚠️ lock release error: {e}")

# ========= 任意：古すぎるロック掃除（全滅後の再始動を早める） =========
def sweep_stale_locks(multiplier: float = 2.0):
    """
    環境変数 SWEEP_STALE_LOCKS=1 のときだけ実行。
    TTLの multiplier 倍より古い lock を metageneration 条件付きで削除。
    """
    if os.environ.get("SWEEP_STALE_LOCKS", "0") != "1":
        return
    b = gcs_client().bucket(GCS_BUCKET)
    prefix = f"{GCS_ROOT_PREFIX}/{RUN_ID}/locks/"
    threshold = STALE_LOCK_SEC * multiplier
    for bl in b.list_blobs(prefix=prefix):
        try:
            bl.reload()
            upd = bl.updated
            if upd and (datetime.now(timezone.utc) - upd).total_seconds() > threshold:
                meta = bl.metageneration
                bl.delete(if_metageneration_match=meta)
                print(f"🧹 removed stale lock: gs://{GCS_BUCKET}/{bl.name}")
        except Exception as e:
            print("⚠️ sweep lock error:", e)

# ========= DL/変換 =========
def _ok_file(path, min_bytes=128) -> bool: return os.path.exists(path) and os.path.getsize(path) >= min_bytes

def _write_log(path, text):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "a", encoding="utf-8") as f:
        f.write(text if text.endswith("\n") else text + "\n")

def twitch_video_exists(vod_id: str) -> bool:
    try:
        r = requests.get(f"{API_BASE_URL}/videos", headers=HEADERS, params={"id": vod_id}, timeout=15)
        if r.status_code != 200: return False
        return len(r.json().get("data", [])) > 0
    except Exception:
        return False

CHAT_MAX_RETRY = 3
AUDIO_MAX_RETRY = 2
RETRY_BACKOFF_BASE = 2

def chat_download(vod_id: str, out_json: str, begin: Optional[str]=None, end: Optional[str]=None, max_retry=CHAT_MAX_RETRY):
    log_path = os.path.join(LOG_ROOT, f"{vod_id}_chat.log")
    for attempt in range(1, max_retry+1):
        cmd = [TWITCHDL,"chatdownload","--id",vod_id,"-o",out_json,"--timestamp-format","Relative"]
        if begin: cmd += ["-b", begin]
        if end:   cmd += ["-e", end]
        t0 = time.time()
        proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        out = proc.stdout or ""
        _write_log(log_path, f"\n--- attempt {attempt}/{max_retry} ---\n{out}\n(exit {proc.returncode})\n")
        ok = (proc.returncode == 0) and _ok_file(out_json)
        if ok:
            return True, f"ok ({time.time()-t0:.1f}s)"
        low = out.lower()
        if "does not exist" in low or "404" in low or "private" in low:   reason = "not_found_or_private"
        elif "forbidden" in low or "unauthorized" in low or "403" in low: reason = "forbidden_or_auth"
        elif "too many requests" in low or "429" in low:                  reason = "rate_limited"
        elif "no comments" in low:                                        reason = "no_comments"
        else:                                                              reason = "unknown"
        if attempt < max_retry: time.sleep(RETRY_BACKOFF_BASE ** attempt)
        else: return False, reason

class QuietLogger:
    def debug(self, msg): pass
    def info(self, msg): pass
    def warning(self, msg): pass
    def error(self, msg): pass

def audio_download_m4a(vod_id: str, out_m4a: str, max_retry=AUDIO_MAX_RETRY):
    import yt_dlp
    url = f"https://www.twitch.tv/videos/{vod_id}"
    ff = shutil.which("ffmpeg")
    base_noext = out_m4a.rsplit(".",1)[0]
    # 断片掃除（クラッシュ再開時に効く）
    for ext in (".part", ".ytdl", ".temp", ".part-Frag12.part"):
        p = base_noext + ext
        if os.path.exists(p):
            try: os.remove(p)
            except: pass

    for attempt in range(1, max_retry+1):
        t0 = time.time()
        pbar = None
        def hook(d):
            nonlocal pbar
            if d.get("status") == "downloading":
                tb = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
                db = d.get("downloaded_bytes", 0)
                if pbar is None:
                    pbar = tqdm(total=tb if tb else None, unit="B", unit_scale=True, desc=f"audio {vod_id}", leave=False)
                pbar.update(db - pbar.n)
            elif d.get("status") == "finished":
                if pbar is not None:
                    pbar.total = pbar.n; pbar.close(); pbar = None
        opts = {
            "format":"bestaudio/best",
            "outtmpl": out_m4a.replace(".m4a",".%(ext)s"),
            "postprocessors":[{"key":"FFmpegExtractAudio","preferredcodec":"m4a"}],
            "ffmpeg_location": os.path.dirname(ff) if ff else None,
            "progress_hooks":[hook],
            "logger": QuietLogger(), "quiet": True, "no_warnings": True,
        }
        log_path = os.path.join(LOG_ROOT, f"{vod_id}_audio.log")
        try:
            with yt_dlp.YoutubeDL(opts) as ydl:
                ydl.download([url])
        except Exception as e:
            _write_log(log_path, f"[error try={attempt}] {e}\n")
            if attempt < max_retry:
                time.sleep(RETRY_BACKOFF_BASE ** attempt);
                continue
            else:
                return False, f"error:{e}"
        if _ok_file(out_m4a):
            return True, f"ok ({time.time()-t0:.1f}s)"
        if attempt < max_retry:
            time.sleep(RETRY_BACKOFF_BASE ** attempt)
        else:
            return False, "unknown"

def m4a_to_mp4(m4a_path: str, mp4_path: str):
    t0 = time.time()
    cmd = ["ffmpeg","-hide_banner","-nostdin","-y","-i",m4a_path,"-c","copy",mp4_path]
    try:
        subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        return (_ok_file(mp4_path), f"ok ({time.time()-t0:.1f}s)")
    except subprocess.CalledProcessError:
        return False, "ffmpeg_fail"

# ========= Graceful Stop (SIGINT/SIGTERM) =========
STOP = False
def _handle_stop(signum, frame):
    global STOP
    STOP = True
    print(f"\n🛑 signal {signum} 受信: 現在のVODを安全に終了後、停止します…")
for _sig in (signal.SIGINT, signal.SIGTERM):
    try: signal.signal(_sig, _handle_stop)
    except Exception: pass

# ========= 起動時の自己修復（ledger / ロック） =========
restore_ledger_if_empty()
sweep_stale_locks()  # SWEEP_STALE_LOCKS=1 のときだけ動く

# ========= Assignment ロード =========
assign_df = load_assignments()
if assign_df.empty or "vod_id" not in assign_df.columns or "shard_id" not in assign_df.columns:
    # フォールバック: ledger から VOD を拾ってハッシュで分散
    led = load_ledger()
    if "vod_id" in led.columns:
        def _shard(v): return (hash(str(v)) % SHARD_COUNT)
        tmp = led[["vod_id"]].dropna().copy()
        tmp["shard_id"] = tmp["vod_id"].astype(str).map(_shard)
        assign_df = tmp
    else:
        assign_df = pd.DataFrame(columns=["vod_id","shard_id"])

my_vods = assign_df.loc[assign_df["shard_id"]==SHARD_ID, "vod_id"].astype(str).unique().tolist()
print(f"🔢 SHARD {SHARD_ID}: 担当 {len(my_vods)} 件")

# ========= メイン: 各VODを工程ごとに安全処理 =========
ledger = load_ledger()
if not ledger.empty:
    ledger = _ensure_ledger_columns(ledger)

if ledger.empty or "vod_id" not in ledger.columns:
    print("⚠️ ledger が空です。事前に収集側で ledger を初期化してください。")
    target_vods = my_vods
else:
    # この shard が担当の行に限定
    target_rows = ledger[ledger["vod_id"].astype(str).isin(my_vods)].copy()
    mask_done = (target_rows["step_chat"]=="ok") & (target_rows["step_audio"]=="ok") & (target_rows["step_mp4"]=="ok")
    to_process = target_rows[~mask_done].copy()
    target_vods = to_process["vod_id"].astype(str).tolist()

print(f"🗂️ 処理予定: {len(target_vods)} / 担当 {len(my_vods)} 件（SHARD {SHARD_ID}）")

vod_iter = tqdm(target_vods, total=len(target_vods), unit="vod", desc=f"SHARD {SHARD_ID} processing")
for vod_id in vod_iter:
    if STOP: break

    lock_key = f"{LOCK_PREFIX}/{vod_id}.lock"
    lock = GCSLock(GCS_BUCKET, lock_key)
    if not lock.acquire(steal_if_stale=True):
        # 他ワーカーが処理中
        continue

    step_bar = tqdm(total=3, desc=f"{vod_id} steps", unit="step", leave=False)
    try:
        # 生存確認（消去/非公開）
        if not twitch_video_exists(vod_id):
            print(f"📼 VOD {vod_id} | ❌ not found/private")
            try:
                mark_step_safe(vod_id, "chat",  "fail", "video_not_found")
                mark_step_safe(vod_id, "audio", "fail", "video_not_found")
                mark_step_safe(vod_id, "mp4",   "fail", "video_not_found")
            finally:
                step_bar.update(3)
            continue

        chat_path = os.path.join(TMP_DIR, f"{vod_id}_chat.json")
        m4a_path  = os.path.join(TMP_DIR, f"{vod_id}.m4a")
        mp4_path  = os.path.join(TMP_DIR, f"{vod_id}.mp4")

        # --- chat ---
        cur = load_ledger()
        step_chat = "pending"
        if not cur.empty and "vod_id" in cur.columns and "step_chat" in cur.columns:
            row = cur[cur["vod_id"].astype(str)==vod_id]
            if not row.empty:
                step_chat = str(row["step_chat"].iloc[0] or "pending")
        if step_chat != "ok":
            ok_chat, chat_note = chat_download(vod_id, chat_path)
            if ok_chat:
                up_ok = gcs_upload(chat_path, gcs_key("raw/chat", f"{vod_id}_chat.json"), content_type="application/json")
                if up_ok:
                    _safe_remove(chat_path)  # ✅ 即削除
                    mark_step_safe(vod_id, "chat", "ok", chat_note)
                else:
                    mark_step_safe(vod_id, "chat", "fail", "gcs_upload_failed")
            else:
                mark_step_safe(vod_id, "chat", "fail", chat_note)
        step_bar.update(1)

        # --- audio ---
        cur = load_ledger()
        step_audio = "pending"
        if not cur.empty and "vod_id" in cur.columns and "step_audio" in cur.columns:
            row = cur[cur["vod_id"].astype(str)==vod_id]
            if not row.empty:
                step_audio = str(row["step_audio"].iloc[0] or "pending")
        if step_audio != "ok":
            ok_audio, audio_note = audio_download_m4a(vod_id, m4a_path)
            if ok_audio:
                up_ok = gcs_upload(m4a_path, gcs_key("raw/audio", f"{vod_id}.m4a"), content_type="audio/mp4")
                if up_ok:
                    # mp4 変換で使うためここでは削除しない
                    mark_step_safe(vod_id, "audio", "ok", audio_note)
                else:
                    mark_step_safe(vod_id, "audio", "fail", "gcs_upload_failed")
            else:
                mark_step_safe(vod_id, "audio", "fail", audio_note)
        step_bar.update(1)

        # --- mp4 ---
        cur = load_ledger()
        step_mp4 = "pending"
        if not cur.empty and "vod_id" in cur.columns and "step_mp4" in cur.columns:
            row = cur[cur["vod_id"].astype(str)==vod_id]
            if not row.empty:
                step_mp4 = str(row["step_mp4"].iloc[0] or "pending")
        if step_mp4 != "ok":
            if _ok_file(m4a_path):
                ok_mp4, mp4_note = m4a_to_mp4(m4a_path, mp4_path)
                if ok_mp4 and _ok_file(mp4_path):
                    up_ok = gcs_upload(mp4_path, gcs_key("processed/audio_mp4", f"{vod_id}.mp4"), content_type="video/mp4")
                    if up_ok:
                        _safe_remove(mp4_path)  # ✅ mp4は即削除
                        _safe_remove(m4a_path)  # ✅ mp4生成後はm4aも削除
                        mark_step_safe(vod_id, "mp4", "ok", mp4_note)
                    else:
                        mark_step_safe(vod_id, "mp4", "fail", "gcs_upload_failed")
                else:
                    mark_step_safe(vod_id, "mp4", "fail", mp4_note)
            else:
                mark_step_safe(vod_id, "mp4", "fail", "no_m4a")
        step_bar.update(1)

    except Exception as e:
        print(f"⚠️ unexpected error in {vod_id}: {e}")
    finally:
        step_bar.close()
        # 可能な残骸の掃除（.part等）
        for p in pathlib.Path(TMP_DIR).glob(f"{vod_id}*"):
            try:
                if p.is_file():
                    p.unlink()
            except Exception:
                pass
        lock.release()

print("🎉 worker 完了 (v2 daily)")

# =========================
# 使い方メモ:
# - このスクリプトは、ledger.csv が空でも自動で .prev から復旧します（起動時）。
# - 台帳更新は「本体(CAS)→成功後に .prev」順。“.prevだけ先行／本体が空”の不整合を防ぎます。
# - TTLは既定5分（HEARTBEAT=60s）。クラッシュ後は5分経過で別ワーカーが自動引き継ぎ。
# - 全滅後の再起動で stale lock を早期回収したいときは SWEEP_STALE_LOCKS=1 を指定して起動。
# =========================
