In [None]:
#CONFIGURACIÓN DE LA CONSULTA

import os
from dotenv import load_dotenv
from datetime import datetime

# Cargar variables del archivo .env
load_dotenv()

# Obtener el API Key desde la variable de entorno
API_KEY = os.getenv("YOUTUBE_API_KEY")

# Parámetros de entrada que se pueden modificar
config = {
    "query": "juicio alvaro uribe",
    "published_after": "2024-01-01T00:00:00Z",
    "published_before": "2025-04-13T23:59:59Z",
    "min_views": 100,
    "min_comments": 20,
    "max_comments": 10000, # Número máximo de comentarios a extraer por video
    "max_results": 2000  # Número máximo de videos a leer
}

In [None]:
# FUNCIONES PARA BÚSQUEDA DE VIDEOS

import requests
from datetime import datetime

def search_videos():
    videos = []
    next_page_token = None

    while len(videos) < config["max_results"]:
        print(f"Buscando videos... ({len(videos)}/{config['max_results']})")
        
        url = "https://www.googleapis.com/youtube/v3/search"
        params = {
            "part": "snippet",
            "q": config["query"],
            "type": "video",
            "order": "date",
            "publishedAfter": config["published_after"],
            "publishedBefore": config["published_before"],
            "maxResults": 50,  # máximo por llamada
            "key": API_KEY
        }
        if next_page_token:
            params["pageToken"] = next_page_token

        response = requests.get(url, params=params)
        data = response.json()

        # Revisamos resultados
        video_ids = [item["id"]["videoId"] for item in data.get("items", [])]
        if not video_ids:
            break

        # Ahora buscamos detalles de los videos (views, comments)
        details = get_video_details(video_ids)
        for video in details:
            views = int(video.get("statistics", {}).get("viewCount", 0))
            comments = int(video.get("statistics", {}).get("commentCount", 0))
            if views >= config["min_views"] and comments >= config["min_comments"]:
                videos.append(video)
                if len(videos) >= config["max_results"]:
                    break

        next_page_token = data.get("nextPageToken")
        if not next_page_token:
            break

    print(f"Total de videos encontrados que cumplen filtros: {len(videos)}")
    return videos

def get_video_details(video_ids):
    url = "https://www.googleapis.com/youtube/v3/videos"
    params = {
        "part": "snippet,statistics,contentDetails",
        "id": ",".join(video_ids),
        "key": API_KEY
    }

    response = requests.get(url, params=params)
    return response.json().get("items", [])


In [None]:
# Prueba directa de la búsqueda
videos = search_videos()
for v in videos[:3]:  # Mostrar los primeros 3 títulos
    print("Video:", v["snippet"]["title"])

In [None]:
# ALMACENAMIENTO PRELIMINAR DE ARCHIVO DE VIDEOS
import pandas as pd
from isodate import parse_duration
import os

def save_videos_to_csv(videos, output_path="../data/videos_preliminares.csv"):
    os.makedirs("data", exist_ok=True)

    rows = []
    for v in videos:
        stats = v.get("statistics", {})
        snippet = v.get("snippet", {})
        content = v.get("contentDetails", {})
        rows.append({
            "video_id": v["id"],
            "title": snippet.get("title"),
            "publishedAt": snippet.get("publishedAt"),
            "channel_id": snippet.get("channelId"),
            "channel_title": snippet.get("channelTitle"),
            "views": stats.get("viewCount"),
            "likes": stats.get("likeCount"),
            #"dilikes": stats.get("dilikeCount"), No disponible en la API
            #"favorite": stats.get("favoriteCount"), No disponible en la API
            "comments": stats.get("commentCount"),
            "description": snippet.get("description"),
            "video_tags": ", ".join(snippet.get("tags", [])),
            "duration_seconds" : int(parse_duration(content.get("duration")).total_seconds()),
            #"video_category": snippet.get("categoryId"),
            #"default_audio_language": snippet.get("defaultAudioLanguage"),
            "video_url": f"https://www.youtube.com/watch?v={v['id']}"
        })

    df = pd.DataFrame(rows)
    df.to_csv(output_path, index=False)
    print(f"Videos preliminares guardados en: {output_path}")

# Si se ejecuta directamente
save_videos_to_csv(videos)

In [None]:
#ADICIÓN DE METADATOS DEL CANAL

def get_channel_metadata(channel_ids):
    channel_info = {}
    for chunk in chunks(channel_ids, 50):  # API permite 50 ids por llamada
        url = "https://www.googleapis.com/youtube/v3/channels"
        params = {
            "part": "snippet,statistics",
            "id": ",".join(chunk),
            "key": API_KEY
        }
        res = requests.get(url, params=params).json()
        for item in res.get("items", []):
            channel_info[item["id"]] = {
                "channel_subscribers": item["statistics"].get("subscriberCount"),
                "channel_country": item["snippet"].get("country")
            }
    return channel_info



def chunks(lst, n):
    """Divide la lista en partes de máximo n elementos."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def enrich_video_data(input_csv="../data/videos_preliminares.csv"):
    df = pd.read_csv(input_csv)
    
    print("Obteniendo metadatos de canales...")
    channel_data = get_channel_metadata(df["channel_id"].unique().tolist())

    # Agregar al DataFrame
    df["channel_subscribers"] = df["channel_id"].map(lambda x: channel_data.get(x, {}).get("channel_subscribers"))
    df["channel_country"] = df["channel_id"].map(lambda x: channel_data.get(x, {}).get("channel_country"))

    df.to_csv(input_csv, index=False)
    print(f"Archivo actualizado con metadatos enriquecidos: {input_csv}")


enrich_video_data()

In [None]:
# OBTENER COMENTARIOS

def get_comments_for_video(video_id, max_comments=100):
    comments = []
    next_page_token = None

    while len(comments) < max_comments:
        url = "https://www.googleapis.com/youtube/v3/commentThreads"
        params = {
            "part": "snippet,replies",
            "videoId": video_id,
            "maxResults": 100,
            "textFormat": "plainText",
            "key": API_KEY
        }
        if next_page_token:
            params["pageToken"] = next_page_token

        response = requests.get(url, params=params)
        data = response.json()

        for item in data.get("items", []):
            comment = item["snippet"]["topLevelComment"]["snippet"]
            comment_data = {
                "video_id": video_id,
                "comment_id": item["snippet"]["topLevelComment"]["id"],
                "text": comment["textDisplay"],
                "author_name": comment["authorDisplayName"],
                "author_id": item["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"],
                "published_at": comment["publishedAt"],
                "likes": comment["likeCount"],
                "is_reply": False,
                "reply_to_comment_id": None
            }
            comments.append(comment_data)

            # Si hay respuestas, capturamos esas también
            if "replies" in item:
                for reply in item["replies"]["comments"]:
                    reply_data = reply["snippet"]
                    comments.append({
                        "video_id": video_id,
                        "comment_id": reply["id"],
                        "text": reply_data["textDisplay"],
                        "author_name": reply_data["authorDisplayName"],
                        "author_id": reply["snippet"]["authorChannelId"]["value"],
                        "published_at": reply_data["publishedAt"],
                        "likes": reply_data["likeCount"],
                        "is_reply": True,
                        "reply_to_comment_id": item["snippet"]["topLevelComment"]["id"]
                    })

        next_page_token = data.get("nextPageToken")
        if not next_page_token:
            break

    return comments

def get_all_comments(video_ids, max_comments=100):
    all_comments = []
    count = 1
    for video_id in video_ids:
        print(f"Obteniendo comentarios para el video ({count}): {video_id}")
        comments = get_comments_for_video(video_id, max_comments)
        all_comments.extend(comments)
        count += 1
    return all_comments

def save_comments_to_csv(comments, output_path="../data/comments.csv"):
    df = pd.DataFrame(comments)
    df.to_csv(output_path, index=False)
    print(f"Comentarios guardados en: {output_path}")


# Cargar los IDs de los videos desde el CSV de videos previos
videos_df = pd.read_csv("../data/videos_preliminares.csv")
video_ids = videos_df["video_id"].tolist()

# Obtener comentarios de todos los videos
comments = get_all_comments(video_ids, max_comments = config["max_comments"])

# Guardar los comentarios en un CSV
save_comments_to_csv(comments) 