# YouTube · 2024 Ucrania · Search mensual con **reanudación** (v7-fix)

Este notebook fuerza el uso de la 4ª key por defecto y ofrece resume por canal/mes/pageToken.


## 0) Cargar `.env` y priorizar la ultima key

In [None]:
import os
from typing import Dict
from dotenv import load_dotenv
from datetime import datetime, timezone
import json
import socket
import time
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from httplib2 import HttpLib2Error

def load_env_from_repo_root(max_up: int = 5):
    here = os.path.abspath(os.getcwd())
    for _ in range(max_up):
        candidate = os.path.join(here, ".env")
        if os.path.exists(candidate):
            load_dotenv(candidate)
            return candidate
        here = os.path.dirname(here)
    load_dotenv()
    return None

env_path = load_env_from_repo_root()
print("ENV cargado desde:", env_path)

k1 = os.getenv("YOUTUBE_API_KEY")
k2 = os.getenv("YOUTUBE_API_KEY_SECONDARY")
k3 = os.getenv("YOUTUBE_API_KEY_TERTIARY")
k4 = os.getenv("YOUTUBE_API_KEY_QUATERNARY")
k5 = os.getenv("YOUTUBE_API_KEY_QUINARY")

# === MODO: usar sólo la más nueva (p.ej., la 6ª) ===
USE_ONLY_NEWEST = False         # ← pon True si quieres usar exclusivamente la última (k5)

if USE_ONLY_NEWEST and k5:
    API_KEYS = [k5]
else:
    # Orden recomendado: más nuevas primero (k5 → k4 → k1 → k2 → k3)
    API_KEYS = [k for k in [k5, k4, k1, k2, k3] if k]

if not API_KEYS:
    raise RuntimeError("No hay API keys disponibles en .env")


canales_interes: Dict[str, str] = {
    'UCPH3Oz99Y_jrVBCQMjQZNSg': 'pro-ucraniano',
    'UCJQQVLyM6wtPleV4wFBK06g': 'pro-ucraniano',
    'UCnsvJeZO4RigQ898WdDNoBw': 'noticiero',
    'UC7QZIf0dta-XPXsp9Hv4dTw': 'noticiero',
    'UClLLRs_mFTsNT5U-DqTYAGg': 'noticiero',
    'UCGXbLrVe8vnkiFv7q2vYv3w': 'noticiero',
    'UCBQnW5_C-6Ns6bob5ozacZg': 'pro-ruso'
}


ENV cargado desde: c:\Users\User\Desktop\Facu\Master_Espana\Master_UEMC\TFM\codigo\analisis_guerra_ucrania_youtube\.env


## 1) Ventana 2024 + keywords

In [None]:


DATE_MIN = datetime(2024,1,1,0,0,0,tzinfo=timezone.utc)
DATE_MAX = datetime(2024,12,31,23,59,59,tzinfo=timezone.utc)
UKR_KEYWORDS = {
    'ucrania','ucraniana','ucranianos','zelenski','zelensky','zelenskyy','kiev','kyiv','donbas','donbás','crimea',
    'dnipro','odessa','odesa','kharkiv','jersón','kherson','mariúpol','mariupol','zelenskiy','guerra de ucrania',
    'guerra en ucrania','rusia ataca','invasión a ucrania','invasion a ucrania','guerra rusia ucrania',
    'guerra ruso ucraniana','rusia','putin'
}
def is_ukraine_war_related(title: str, description: str, tags):
    tags = tags or []
    text = f"{title or ''} {description or ''} {' '.join(tags)}".lower()
    return any(k in text for k in UKR_KEYWORDS)
def safe_int(x, default=0):
    try: return int(x)
    except: return default
def iso8601_duration_to_seconds(d: str) -> float:
    if not d or not d.startswith('P'): return float('nan')
    h=m=s=0.0
    t = d.split('T',1)[1] if 'T' in d else ''
    num = ''
    for ch in t:
        if ch.isdigit() or ch=='.': num+=ch
        else:
            if ch=='H': h=float(num or 0); num=''
            elif ch=='M': m=float(num or 0); num=''
            elif ch=='S': s=float(num or 0); num=''
    return h*3600 + m*60 + s


## 2) Cliente YouTube (exec verboso + sin `None` en kwargs)

In [None]:


def _parse_reason_from_http_error(e: HttpError):
    status = getattr(e.resp, 'status', None)
    reason = ''
    try:
        payload = json.loads(e.content.decode('utf-8', errors='ignore'))
        if isinstance(payload, dict):
            err = payload.get('error') or {}
            errs = err.get('errors') or []
            if errs and isinstance(errs, list) and isinstance(errs[0], dict):
                reason = str(errs[0].get('reason') or '')
            elif 'message' in err:
                reason = str(err.get('message'))
    except Exception:
        reason = (getattr(e, 'content', b'') or b'').decode('utf-8', 'ignore')
    return status, (reason or '').lower()

class YouTubeClient:
    def __init__(self, keys):
        self.keys = [k for k in (keys or []) if k]
        if not self.keys:
            raise RuntimeError('No hay API keys disponibles.')
        self.idx = 0
        self.service = build('youtube', 'v3', developerKey=self.keys[self.idx])
    def _switch_key(self) -> bool:
        if self.idx + 1 < len(self.keys):
            self.idx += 1
            self.service = build('youtube', 'v3', developerKey=self.keys[self.idx])
            print(f'[KEY] Cambiando a key #{self.idx+1}')
            time.sleep(0.2)
            return True
        return False
    def exec(self, req_fn, retries: int = 1, backoff: float = 0.6):
        last_exc = None
        for attempt in range(retries + 1):
            try:
                req = req_fn()
            except Exception as e:
                last_exc = e
                print(f'[exec] key_idx={self.idx} fallo construyendo request: {type(e).__name__}: {e}')
                time.sleep(backoff * (attempt + 1)); continue
            if not hasattr(req, 'execute'):
                last_exc = RuntimeError(f'Request inesperada: {type(req)}')
                print(f'[exec] key_idx={self.idx} request sin execute(): {type(req)}')
                time.sleep(backoff * (attempt + 1)); continue
            try:
                resp = req.execute()
                if not isinstance(resp, dict):
                    last_exc = RuntimeError(f'Respuesta no-JSON (type={type(resp)}): {resp!r}')
                    print('[exec] respuesta no-JSON; reintento…')
                    time.sleep(backoff * (attempt + 1)); continue
                return resp
            except HttpError as e:
                status, reason = _parse_reason_from_http_error(e)
                print(f"[exec] HttpError key_idx={self.idx} status={status} reason='{reason}' (attempt={attempt})")
                quota_like = reason in {'quotaexceeded','dailylimitexceeded','ratelimitexceeded'} or (status==403 and ('quota' in reason or 'forbidden' in reason))
                key_invalid = reason in {'keyinvalid','forbidden'} and status in (400,403)
                if (quota_like or key_invalid) and self._switch_key():
                    time.sleep(backoff * (attempt + 1)); continue
                last_exc = e
                time.sleep(backoff * (attempt + 1))
            except (HttpLib2Error, socket.timeout, ConnectionError) as e:
                last_exc = e
                print(f'[exec] Transporte key_idx={self.idx}: {type(e).__name__}: {e} (attempt={attempt})')
                time.sleep(backoff * (attempt + 1))
            except Exception as e:
                last_exc = e
                print(f'[exec] Excepción inesperada key_idx={self.idx}: {type(e).__name__}: {e} (attempt={attempt})')
                time.sleep(backoff * (attempt + 1))
        if isinstance(last_exc, HttpError):
            status, reason = _parse_reason_from_http_error(last_exc)
            raise RuntimeError(f"API agotada o error persistente (HTTP {status}, reason='{reason}', key_idx={self.idx}).") from last_exc
        if last_exc is not None:
            raise RuntimeError(f"Fallo persistente con key_idx={self.idx}: {type(last_exc).__name__}: {last_exc}") from last_exc
        raise RuntimeError(f"Fallo desconocido ejecutando la request de YouTube API (sin excepción previa). key_idx={self.idx}")

def yt_search_list(yc: YouTubeClient, **kwargs):
    params = {k: v for k, v in kwargs.items() if v is not None}
    dbg = {k: params.get(k) for k in ('channelId','publishedAfter','publishedBefore','pageToken','maxResults','order','type','q')}
    print('[search] key_idx=', yc.idx, 'params=', dbg)
    return yc.exec(lambda: yc.service.search().list(**params))
def yt_videos_list(yc: YouTubeClient, **kwargs):
    params = {k: v for k, v in kwargs.items() if v is not None}
    return yc.exec(lambda: yc.service.videos().list(**params))


## 3) Estado (resume) + rangos mensuales

In [None]:
OUT_DIR = '../data/raw/data_rebuild'
os.makedirs(OUT_DIR, exist_ok=True)
STATE_PATH = os.path.join(OUT_DIR, 'discovery_state.json')
F_VIDEOS_MASTER = os.path.join(OUT_DIR, '0_videos_por_canal_2024_ukraine.csv')

def month_ranges_2024_indexed():
    ranges = []
    for m in range(1, 13):
        start = datetime(2024, m, 1, 0, 0, 0, tzinfo=timezone.utc)
        end = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc) if m == 12 else datetime(2024, m+1, 1, 0, 0, 0, tzinfo=timezone.utc)
        ranges.append((m-1, start.strftime('%Y-%m-%dT%H:%M:%SZ'), end.strftime('%Y-%m-%dT%H:%M:%SZ')))
    return ranges
def load_state(path=STATE_PATH):
    if os.path.exists(path):
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {}
def save_state(state, path=STATE_PATH):
    tmp = path + '.tmp'
    with open(tmp, 'w', encoding='utf-8') as f:
        json.dump(state, f, ensure_ascii=False, indent=2)
    os.replace(tmp, path)


## 4) Search mensual con reanudación

In [None]:
def search_channel_2024_monthly_resume(ytc: YouTubeClient, channel_id: str, q: str, state: dict, verbose=True):
    ch_state = state.get(channel_id, {'month_index': 0, 'page_token': None, 'done': False})
    if ch_state.get('done'):
        if verbose: print('   · resume: channel already done')
        return [], state, False
    vids, seen = [], set()
    per_ch_csv = os.path.join(OUT_DIR, f'0_videos_por_canal_2024_ukraine_{channel_id}.csv')
    if os.path.exists(per_ch_csv):
        try:
            prev_ids = pd.read_csv(per_ch_csv, usecols=['video_id']).get('video_id', pd.Series([], dtype=str)).astype(str).tolist()
            seen.update(prev_ids)
        except Exception:
            pass
    month_ranges = month_ranges_2024_indexed()
    start_month = int(ch_state.get('month_index', 0))
    page = ch_state.get('page_token') or None
    for idx, start_iso, end_iso in month_ranges[start_month:]:
        month_count = 0
        while True:
            try:
                r = yt_search_list(
                    ytc,
                    part='id',
                    channelId=channel_id,
                    type='video',
                    order='date',
                    maxResults=50,
                    publishedAfter=start_iso,
                    publishedBefore=end_iso,
                    q=(q.strip() if isinstance(q, str) and q.strip() else None),
                    pageToken=page
                )
            except RuntimeError as e:
                msg = str(e).lower()
                if any(t in msg for t in ['quota', 'ratelimit', 'daily', 'api agotada']):
                    state[channel_id] = {'month_index': idx, 'page_token': page, 'done': False}
                    save_state(state)
                    if verbose: print('   · cuota/key → estado guardado; deteniendo.')
                    return vids, state, True
                raise
            items = r.get('items', [])
            for it in items:
                vid = (it.get('id') or {}).get('videoId')
                if vid and vid not in seen:
                    seen.add(vid); vids.append(vid); month_count += 1
            page = r.get('nextPageToken') or None
            if not page:
                if verbose: print(f'   · {start_iso[:7]} +{month_count} (acum canal={len(seen)})')
                state[channel_id] = {'month_index': idx+1, 'page_token': None, 'done': False}
                save_state(state)
                break
    state[channel_id] = {'month_index': 12, 'page_token': None, 'done': True}
    save_state(state)
    if verbose: print('   · canal finalizado (12/12 meses).')
    return vids, state, False


## 5) Enriquecer + guardar CSVs

In [26]:
def enrich_and_filter_videos_to_df(ytc: YouTubeClient, video_ids):
    rows = []
    for i in range(0, len(video_ids), 50):
        chunk = video_ids[i:i+50]
        r = yt_videos_list(ytc, part='snippet,statistics,contentDetails', id=','.join(chunk))
        for it in r.get('items', []):
            sn = it.get('snippet', {}); st = it.get('statistics', {}); ct = it.get('contentDetails', {})
            title = sn.get('title', ''); desc = sn.get('description', ''); tags = sn.get('tags', [])
            if not is_ukraine_war_related(title, desc, tags):
                continue
            rows.append({
                'video_id': it.get('id'),
                'video_title': title,
                'channel_title': sn.get('channelTitle'),
                'video_published_at': sn.get('publishedAt'),
                'video_views': safe_int(st.get('viewCount')),
                'video_likes': safe_int(st.get('likeCount')),
                'video_duration': iso8601_duration_to_seconds(ct.get('duration', '')),
                'video_tags': '|'.join(tags) if tags else '',
                'video_category_id': int(sn.get('categoryId')) if sn.get('categoryId') else 0
            })
        time.sleep(0.05)
    return pd.DataFrame(rows)
def append_dedup_csv(path: str, df: pd.DataFrame, subset=('video_id',)):
    if df is None or df.empty: return
    if os.path.exists(path):
        prev = pd.read_csv(path)
        out = pd.concat([prev, df], ignore_index=True).drop_duplicates(subset=list(subset))
    else:
        out = df.drop_duplicates(subset=list(subset))
    out.to_csv(path, index=False)


## 6) Orquestador reanudable

In [16]:
def discover_search_only_resume(canales: Dict[str,str], q_keywords: str=None):
    state = load_state()
    ytc_local = YouTubeClient(API_KEYS)
    for ch_i, (ch_id, bloque) in enumerate(canales.items(), start=1):
        print(f"\n== Canal {ch_i}/{len(canales)} [{bloque}] {ch_id}")
        vids, state, stopped = search_channel_2024_monthly_resume(
            ytc_local, ch_id, q=q_keywords, state=state, verbose=True
        )
        if not vids and not state.get(ch_id, {}).get('done'):
            print('   · sin nuevos ids (posible cuota); paso al siguiente canal.')
            if stopped: break
            continue
        per_ch_csv = os.path.join(OUT_DIR, f'videos_2024_ukraine_{ch_id}.csv')
        if vids:
            df = enrich_and_filter_videos_to_df(ytc_local, sorted(set(vids)))
            print(f'   · relevantes añadidos (Ucrania): {len(df)}')
            if not df.empty:
                df.insert(0, 'channel_id', ch_id)
                df.insert(1, 'bloque', bloque)
                append_dedup_csv(per_ch_csv, df, subset=('video_id',))
                print(f'   · guardado parcial → {per_ch_csv}')
        else:
            print('   · no hubo ids nuevos este ciclo.')
        if stopped:
            print('⚠️  Detenido por cuota/key. Estado guardado. Podrás reanudar desde aquí.')
            break
    per_files = [os.path.join(OUT_DIR, f) for f in os.listdir(OUT_DIR) if f.startswith('videos_2024_ukraine_') and f.endswith('.csv')]
    if per_files:
        dfs = []
        for f in per_files:
            try: dfs.append(pd.read_csv(f))
            except Exception: pass
        if dfs:
            master = pd.concat(dfs, ignore_index=True).drop_duplicates(subset=['video_id'])
            master.to_csv(F_VIDEOS_MASTER, index=False)
            print(f'✅ Master actualizado: {F_VIDEOS_MASTER}  | vídeos={len(master)}')
    else:
        print('No hay CSVs parciales por canal aún.')


## 7) Ejecutar discovery (reanudable)

In [17]:
df_run = discover_search_only_resume(canales_interes, q_keywords=None)
print('Listo para siguiente ciclo si salta cuota. Estado en:', STATE_PATH)



== Canal 1/11 [pro-ucraniano] UCPH3Oz99Y_jrVBCQMjQZNSg
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 2/11 [pro-ucraniano] UCJQQVLyM6wtPleV4wFBK06g
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 3/11 [noticiero] UCnsvJeZO4RigQ898WdDNoBw
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 4/11 [noticiero] UC7QZIf0dta-XPXsp9Hv4dTw
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 5/11 [noticiero] UClLLRs_mFTsNT5U-DqTYAGg
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 6/11 [noticiero] UCGXbLrVe8vnkiFv7q2vYv3w
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 7/11 [noticiero] UCCJs5mITIqxqJGeFjt9N1Mg
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 8/11 [pro-ruso] UCwd8Byi93KbnsYmCcKLExvQ
   · resume: channel already done
   · no hubo ids nuevos este ciclo.

== Canal 9/11 [p

## Parte 2: Extracción completa

In [None]:

OUT_DIR = './data_rebuild'
F_MASTER = os.path.join(OUT_DIR, "videos_master_refinado_ukraine_rules.csv")
df_master = pd.read_csv(F_MASTER)
print('videos en master:', len(df_master))

cond_por_canal = dict(df_master[['channel_id','bloque']].drop_duplicates().itertuples(index=False, name=None))
video_ids = sorted(set(df_master['video_id'].astype(str)))
len(video_ids)

videos en master: 864


864

In [7]:
import json, time, socket, pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from httplib2 import HttpLib2Error

def safe_int(x, default=0):
    try: return int(x)
    except: return default

def iso8601_duration_to_seconds(d: str) -> float:
    if not d or not d.startswith('P'): return float('nan')
    h=m=s=0.0
    t = d.split('T',1)[1] if 'T' in d else ''
    num = ''
    for ch in t:
        if ch.isdigit() or ch=='.': num+=ch
        else:
            if ch=='H': h=float(num or 0); num=''
            elif ch=='M': m=float(num or 0); num=''
            elif ch=='S': s=float(num or 0); num=''
    return h*3600 + m*60 + s

def _parse_reason_from_http_error(e: HttpError):
    status = getattr(e.resp, 'status', None)
    reason = ''
    try:
        payload = json.loads(e.content.decode('utf-8', errors='ignore'))
        if isinstance(payload, dict):
            err = payload.get('error') or {}
            errs = err.get('errors') or []
            if errs and isinstance(errs, list) and isinstance(errs[0], dict):
                reason = str(errs[0].get('reason') or '')
            elif 'message' in err:
                reason = str(err.get('message'))
    except Exception:
        reason = (getattr(e, 'content', b'') or b'').decode('utf-8', 'ignore')
    return status, (reason or '').lower()

class YouTubeClient:
    def __init__(self, keys):
        self.keys = [k for k in (keys or []) if k]
        if not self.keys:
            raise RuntimeError('No hay API keys disponibles.')
        self.idx = 0
        self.service = build('youtube','v3',developerKey=self.keys[self.idx])
    def _switch_key(self) -> bool:
        if self.idx + 1 < len(self.keys):
            self.idx += 1
            self.service = build('youtube','v3',developerKey=self.keys[self.idx])
            print(f'[KEY] Cambiando a key #{self.idx+1}')
            time.sleep(0.2)
            return True
        return False
    def exec(self, req_fn, retries: int = 1, backoff: float = 0.6):
        last_exc = None
        for attempt in range(retries + 1):
            try:
                req = req_fn()
            except Exception as e:
                last_exc = e
                print(f'[exec] key_idx={self.idx} fallo construyendo request: {type(e).__name__}: {e}')
                time.sleep(backoff * (attempt + 1)); continue
            try:
                resp = req.execute()
                if not isinstance(resp, dict):
                    last_exc = RuntimeError(f'Respuesta no-JSON (type={type(resp)}): {resp!r}')
                    print('[exec] respuesta no-JSON; reintento…')
                    time.sleep(backoff * (attempt + 1)); continue
                return resp
            except HttpError as e:
                status, reason = _parse_reason_from_http_error(e)
                print(f"[exec] HttpError key_idx={self.idx} status={status} reason='{reason}' (attempt={attempt})")
                quota_like = reason in {'quotaexceeded','dailylimitexceeded','ratelimitexceeded'} or (status==403 and ('quota' in reason or 'forbidden' in reason))
                key_invalid = reason in {'keyinvalid','forbidden'} and status in (400,403)
                if (quota_like or key_invalid) and self._switch_key():
                    time.sleep(backoff * (attempt + 1)); continue
                last_exc = e
                time.sleep(backoff * (attempt + 1))
            except (HttpLib2Error, socket.timeout, ConnectionError) as e:
                last_exc = e
                print(f'[exec] Transporte key_idx={self.idx}: {type(e).__name__}: {e} (attempt={attempt})')
                time.sleep(backoff * (attempt + 1))
            except Exception as e:
                last_exc = e
                print(f'[exec] Excepción inesperada key_idx={self.idx}: {type(e).__name__}: {e} (attempt={attempt})')
                time.sleep(backoff * (attempt + 1))
        if isinstance(last_exc, HttpError):
            status, reason = _parse_reason_from_http_error(last_exc)
            raise RuntimeError(f"API agotada o error persistente (HTTP {status}, reason='{reason}', key_idx={self.idx}).") from last_exc
        if last_exc is not None:
            raise RuntimeError(f"Fallo persistente con key_idx={self.idx}: {type(last_exc).__name__}: {last_exc}") from last_exc
        raise RuntimeError(f"Fallo desconocido ejecutando la request de YouTube API (sin excepción previa). key_idx={self.idx}")

def yt_videos_list(yc: YouTubeClient, **kwargs):
    params = {k:v for k,v in kwargs.items() if v is not None}
    return yc.exec(lambda: yc.service.videos().list(**params))
def yt_comment_threads(yc: YouTubeClient, **kwargs):
    params = {k:v for k,v in kwargs.items() if v is not None}
    return yc.exec(lambda: yc.service.commentThreads().list(**params))
def yt_channels_list(yc: YouTubeClient, **kwargs):
    params = {k:v for k,v in kwargs.items() if v is not None}
    return yc.exec(lambda: yc.service.channels().list(**params))

In [8]:
META_CACHE = os.path.join(OUT_DIR, "video_meta_cache_refinado.csv")

def build_video_meta(ytc: YouTubeClient, ids, cache_path=META_CACHE, batch=50):
    exist = pd.read_csv(cache_path) if os.path.exists(cache_path) else pd.DataFrame()
    have = set(exist['video_id']) if not exist.empty else set()
    todo = [vid for vid in ids if vid not in have]
    rows = []
    for i in range(0, len(todo), batch):
        chunk = todo[i:i+50]
        r = yt_videos_list(ytc, part='snippet,statistics,contentDetails', id=','.join(chunk))
        for it in r.get('items', []):
            sn = it.get('snippet',{}); st=it.get('statistics',{}); ct=it.get('contentDetails',{})
            rows.append({
                'video_id': it.get('id'),
                'video_title': sn.get('title'),
                'channel_title': sn.get('channelTitle'),
                'video_published_at': sn.get('publishedAt'),
                'video_views': safe_int(st.get('viewCount')),
                'video_likes': safe_int(st.get('likeCount')),
                'video_duration': iso8601_duration_to_seconds(ct.get('duration','')),
                'video_tags': '|'.join(sn.get('tags', [])) if sn.get('tags') else '',
                'video_category_id': safe_int(sn.get('categoryId'), 0),
                'channel_id': sn.get('channelId')
            })
        time.sleep(0.05)
    if rows:
        df_new = pd.DataFrame(rows)
        df_out = pd.concat([exist, df_new], ignore_index=True).drop_duplicates(subset=['video_id'])
        df_out.to_csv(cache_path, index=False)
    else:
        df_out = exist
    return df_out

ytc = YouTubeClient(API_KEYS)
df_meta = build_video_meta(ytc, video_ids)
print('video_meta cache rows:', len(df_meta))

video_meta cache rows: 864


## 3) Extraer **comentarios top-level** con resume + guardado incremental

In [9]:
import tempfile, shutil, time


COMMENTS_TMP = os.path.join(OUT_DIR, 'comments_top_level_refinado_tmp.csv')
STATE_CMT   = os.path.join(OUT_DIR, 'comments_state_refinado.json')

def load_state(path):
    if os.path.exists(path):
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {}

def save_state(state, path=STATE_CMT):
    """
    Guardado atómico con fallback para Windows:
    - Escribe en un archivo tmp con fsync
    - Intenta os.replace; si hay PermissionError, intenta borrar el destino y mover
    - Reintenta algunas veces por si el archivo destino está bloqueado
    """
    tmp_dir = os.path.dirname(path) or "."
    base = os.path.basename(path)

    fd, tmp_path = tempfile.mkstemp(prefix=base + ".", suffix=".tmp", dir=tmp_dir)
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(state, f, ensure_ascii=False, indent=2)
            f.flush()
            os.fsync(f.fileno())

        for i in range(6):  # reintentos
            try:
                # intento principal: reemplazo atómico
                os.replace(tmp_path, path)
                return
            except PermissionError:
                # fallback Windows: eliminar destino (si existe) y mover
                try:
                    if os.path.exists(path):
                        os.remove(path)
                except PermissionError:
                    pass  # puede seguir bloqueado; dormimos y reintentamos
                try:
                    shutil.move(tmp_path, path)
                    return
                except PermissionError:
                    time.sleep(0.25 * (i + 1))  # backoff progresivo
        raise PermissionError(f"No pude reemplazar {path} tras varios reintentos")
    finally:
        # si el tmp sigue existiendo a esta altura, limpiarlo
        if os.path.exists(tmp_path):
            try:
                os.remove(tmp_path)
            except Exception:
                pass

def extract_comments_top_level(ytc: YouTubeClient, df_meta: pd.DataFrame, save_every=5000):
    """
    Extrae comentarios top-level con reanudación.
    - Lee video_ids desde df_meta (evita depender de variables globales).
    - Guarda progreso en STATE_CMT y en COMMENTS_TMP.
    """
    # --- IDs desde el meta (scope local) ---
    video_ids = [str(v) for v in df_meta['video_id'].dropna().astype(str).tolist()]

    # --- índice rápido de meta por video_id ---
    meta_by_id = {str(r['video_id']): r for _, r in df_meta.iterrows()}

    # --- estado previo ---
    state = load_state(STATE_CMT)
    done = set(state.get('done_video_ids', []))
    page_tokens = state.get('page_tokens', {})

    buf = []

    for vid in video_ids:
        if vid in done:
            continue

        meta = meta_by_id.get(vid, {})
        page = page_tokens.get(vid)

        while True:
            try:
                r = yt_comment_threads(
                    ytc,
                    part='snippet',
                    videoId=vid,
                    maxResults=100,
                    pageToken=page,
                    textFormat='plainText'
                )
            except RuntimeError as e:
                # si es cuota/ratelimit ⇒ persistimos y salimos para reanudar luego
                msg = str(e).lower()
                if any(t in msg for t in ('quota', 'ratelimit', 'daily', 'agotada')):
                    if buf:
                        pd.DataFrame(buf).to_csv(
                            COMMENTS_TMP,
                            mode='a',
                            index=False,
                            header=not os.path.exists(COMMENTS_TMP)
                        )
                        buf.clear()
                    state['done_video_ids'] = list(done)
                    state['page_tokens'] = page_tokens
                    save_state(state, STATE_CMT)
                    print('⚠️ Cuota/Key → progreso guardado. Salgo para reanudar luego.')
                    return
                # otro error ⇒ relanzar
                raise

            items = r.get('items', [])
            for it in items:
                sn = (it.get('snippet') or {})
                tlc = (sn.get('topLevelComment') or {})
                top = (tlc.get('snippet') or {})

                text = top.get('textDisplay') or ''
                author_ch = (top.get('authorChannelId') or {}).get('value')

                buf.append({
                    'comment_id': tlc.get('id'),
                    'comment': text,
                    'comment_text_length': len(text),
                    'user_id': author_ch,
                    'user_name': top.get('authorDisplayName'),
                    'comment_time': top.get('publishedAt'),
                    'comment_likes': safe_int(top.get('likeCount')),
                    'total_reply_count': safe_int(sn.get('totalReplyCount')),
                    'is_top_level_comment': True,

                    'video_title': meta.get('video_title'),
                    'channel_title': meta.get('channel_title'),
                    'video_published_at': meta.get('video_published_at'),
                    'video_views': safe_int(meta.get('video_views')),
                    'video_likes': safe_int(meta.get('video_likes')),
                    'video_duration': meta.get('video_duration'),
                    'video_tags': meta.get('video_tags', ''),
                    'video_category_id': safe_int(meta.get('video_category_id'), 0),

                    'condiciones_cuenta': cond_por_canal.get(meta.get('channel_id'), 'desconocido'),
                    'channel_id': meta.get('channel_id'),
                    'subscriber_count': None,  # se completa en la etapa de enriquecimiento
                })

            # paginación
            page = r.get('nextPageToken') or None
            if not page:
                done.add(vid)
                page_tokens.pop(vid, None)
                break
            else:
                page_tokens[vid] = page

            # flush por bloques
            if len(buf) >= save_every:
                pd.DataFrame(buf).to_csv(
                    COMMENTS_TMP,
                    mode='a',
                    index=False,
                    header=not os.path.exists(COMMENTS_TMP)
                )
                print(f'💾 Guardado parcial {save_every} rows…')
                buf.clear()

        # flush por video
        if buf:
            pd.DataFrame(buf).to_csv(
                COMMENTS_TMP,
                mode='a',
                index=False,
                header=not os.path.exists(COMMENTS_TMP)
            )
            buf.clear()

        # persistir progreso tras cada video
        state['done_video_ids'] = list(done)
        state['page_tokens'] = page_tokens
        save_state(state, STATE_CMT)

    print('✅ Comentarios top-level extraídos (resume completado)')

# Llamada:
extract_comments_top_level(ytc, df_meta)


💾 Guardado parcial 5000 rows…
✅ Comentarios top-level extraídos (resume completado)


## 4) Enriquecer `subscriber_count` (canal del **video**) + `account_created_at` (canal del **usuario**) y exportar dataset final

In [10]:
FINAL_PATH = os.path.join(OUT_DIR, 'comments_2024_final.csv')

def enrich_video_channel_subs(ytc: YouTubeClient, df_comments: pd.DataFrame):
    ch_ids = sorted(set(df_comments['channel_id'].dropna().astype(str)))
    map_subs = {}
    for i in range(0, len(ch_ids), 50):
        chunk = ch_ids[i:i+50]
        r = yt_channels_list(ytc, part='statistics', id=','.join(chunk))
        for it in r.get('items', []):
            cid = it.get('id')
            subs = safe_int((it.get('statistics') or {}).get('subscriberCount'))
            map_subs[cid] = subs
        time.sleep(0.05)
    df_comments['subscriber_count'] = df_comments['channel_id'].map(map_subs).fillna(df_comments['subscriber_count'])
    return df_comments

def enrich_user_account_created(ytc: YouTubeClient, df_comments: pd.DataFrame):
    user_ids = sorted(set(df_comments['user_id'].dropna().astype(str)))
    acc_map = {}
    for i in range(0, len(user_ids), 50):
        chunk = user_ids[i:i+50]
        try:
            r = yt_channels_list(ytc, part='snippet', id=','.join(chunk))
        except RuntimeError as e:
            msg = str(e).lower()
            if any(t in msg for t in ['quota','ratelimit','daily','agotada']):
                break
            raise
        for it in r.get('items', []):
            cid = it.get('id')
            acc_map[cid] = (it.get('snippet') or {}).get('publishedAt')
        time.sleep(0.05)
    df_comments['account_created_at'] = df_comments['user_id'].map(acc_map)
    return df_comments

df_cmt = pd.read_csv(COMMENTS_TMP)
print('comentarios cargados:', len(df_cmt))

df_cmt = enrich_video_channel_subs(YouTubeClient(API_KEYS), df_cmt)
df_cmt = enrich_user_account_created(YouTubeClient(API_KEYS), df_cmt)

final_cols = [
 'comment_id','comment','comment_text_length','user_id','user_name','comment_time','comment_likes',
 'total_reply_count','is_top_level_comment','video_title','channel_title','video_published_at','video_views',
 'video_likes','video_duration','video_tags','video_category_id','condiciones_cuenta','account_created_at',
 'channel_id','subscriber_count'
]
df_cmt = df_cmt[final_cols]
df_cmt.to_csv(FINAL_PATH, index=False)
print('✅ Dataset final →', FINAL_PATH)

comentarios cargados: 371740
✅ Dataset final → ./data_rebuild\comments_2024_final.csv
