In [1]:
import pandas as pd
import time
import json
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys

# Parámetros de configuración
CSV_INPUT = "spain/es.csv"         # Archivo CSV con columnas "id" y "full_text"
JSON_OUTPUT = "tweets_españoles_anotados_finales_id.json"  # Archivo de salida en JSON
DELAY_BETWEEN_REQUESTS = 1         # Tiempo de retardo entre cada tweet
MAX_REPLIES = 50                  # Número máximo de respuestas a extraer por tweet
SAMPLE_SIZE = 300                # Número de tweets aleatorios a procesar
RANDOM_SEED = 65                  # Semilla para reproducibilidad de la muestra

# Credenciales para login en Twitter
USERNAME = "marcosHern80439"
PASSWORD = "Servidor"

# 1. Configurar el WebDriver
def init_driver():
    chrome_options = Options()
    chrome_options.add_argument("--headless=new")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--no-sandbox")
    return webdriver.Chrome(options=chrome_options)


def login_twitter(driver, username, password):
    driver.get("https://twitter.com/i/flow/login")
    wait = WebDriverWait(driver, 20)

    # Usuario
    username_field = wait.until(
        EC.presence_of_element_located((By.NAME, 'text'))
    )
    username_field.send_keys(username)
    username_field.send_keys(Keys.RETURN)
    time.sleep(2)

    # A veces aparece un botón "Siguiente"
    try:
        next_btn = wait.until(
            EC.element_to_be_clickable((By.XPATH, "//div[@role='button' and contains(., 'Siguiente')]/div"))
        )
        next_btn.click()
        time.sleep(2)
    except Exception:
        pass

    # Contraseña
    password_field = wait.until(
        EC.presence_of_element_located((By.NAME, 'password'))
    )
    password_field.send_keys(password)
    password_field.send_keys(Keys.RETURN)
    time.sleep(5)


def extract_tweet_data(driver, tweet_id, max_attempts=3, max_replies=MAX_REPLIES):
    url = f"https://twitter.com/i/web/status/{tweet_id}"
    for attempt in range(1, max_attempts+1):
        try:
            driver.get(url)
            wait = WebDriverWait(driver, 20)

            # Tweet original
            tweet_el = wait.until(
                EC.presence_of_element_located((By.XPATH, '//*[@data-testid="tweetText"]'))
            )
            tweet_text = tweet_el.text.strip()
            user_el = driver.find_element(By.XPATH, '//*[@data-testid="User-Name"]')
            user = user_el.text.strip()

            # Cargar respuestas con scroll
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)

            # Recolectar respuestas
            replies = []
            articles = driver.find_elements(By.XPATH, '//article[@data-testid="tweet"]')[1:]
            for art in articles[:max_replies]:
                try:
                    text = art.find_element(By.XPATH, './/*[@data-testid="tweetText"]').text.strip()
                    usr = art.find_element(By.XPATH, './/*[@data-testid="User-Name"]').text.strip()
                    replies.append({'user': usr, 'reply': text})
                except:
                    continue

            return {'tweet_text': tweet_text, 'user': user, 'replies': replies}
        except Exception as e:
            print(f"Intento {attempt} falló para tweet {tweet_id}: {e}")
            time.sleep(2)

    # Falla total
    return {
        'tweet_text': f"Error al extraer tweet {tweet_id} tras {max_attempts} intentos.",
        'user': f"Error al extraer usuario de {tweet_id} tras {max_attempts} intentos.",
        'replies': []
    }


def main():
    # Iniciar driver y login
    driver = init_driver()
    login_twitter(driver, USERNAME, PASSWORD)
    print("Login completado.")

    # Cargar CSV y muestreo aleatorio
    print("Cargando CSV y tomando muestra aleatoria...")
    df_iter = pd.read_csv(CSV_INPUT, usecols=['id', 'mp_party'])
    df_sample = df_iter.sample(n=SAMPLE_SIZE, random_state=RANDOM_SEED)
    print(f"Procesando {len(df_sample)} tweets aleatorios.")

    results = []
    for idx, row in df_sample.iterrows():
        tweet_id = row['id']
        mp_party = row['mp_party']
        print(f"Procesando tweet ID: {tweet_id}")
        data = extract_tweet_data(driver, tweet_id)
        results.append({
            'id' : tweet_id,
            'user': data['user'],
            'tweet': data['tweet_text'],
            'replies': data['replies'],
            'mp_party': mp_party
        })
        time.sleep(DELAY_BETWEEN_REQUESTS)

    driver.quit()

    # Guardar JSON
    with open(JSON_OUTPUT, 'w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    print(f"Guardado en {JSON_OUTPUT}.")


if __name__ == '__main__':
    main()


Login completado.
Cargando CSV y tomando muestra aleatoria...
Procesando 300 tweets aleatorios.
Procesando tweet ID: 1419711803723063300
Procesando tweet ID: 1440314582778613768
Procesando tweet ID: 1386655923976953857
Procesando tweet ID: 1356980641674522625
Procesando tweet ID: 1405535574677594113
Procesando tweet ID: 1417139214483959819
Procesando tweet ID: 1415426096758202374
Procesando tweet ID: 1436734021082435586
Procesando tweet ID: 1471176787199729670
Procesando tweet ID: 1443946506915037189
Procesando tweet ID: 1414158698843418626
Procesando tweet ID: 1378632360393064449
Procesando tweet ID: 1403389251740385287
Procesando tweet ID: 1451120281603514372
Procesando tweet ID: 1405253103885164552
Procesando tweet ID: 1354155528121950208
Procesando tweet ID: 1405601699478245380
Procesando tweet ID: 1474041520768618497
Procesando tweet ID: 1434648327560339456
Procesando tweet ID: 1471250420060327942
Procesando tweet ID: 1426220571067666445
Procesando tweet ID: 1460706091386482704
Pr