In [None]:
########################################################################################################################################################################
########################################################################################################################################################################
############################################################# PIPELINE BATCH PARA EXTRACCIÓN ESTRUCTURADA ##############################################################
########################################################################################################################################################################
########################################################################################################################################################################

In [None]:
# 1. Librerías.
from __future__ import annotations
from typing import List, Optional, Literal
from pydantic import BaseModel, Field
import pandas as pd
import json, pathlib
from openai import OpenAI
from pprint import pprint
from datetime import datetime

pd.options.display.max_columns = None

In [None]:
#2. Constantes.
nombre_prueba = input("Por favor, asigne un subfijo para el nombre de los archivos output siguiendo el patrón: 'IL1610_1' (inicial nombre, inicial apellido,dia, mes,número de prueba/detalle de prueba):")
project_path = "C:/Users/i_link/Maestría/Text Mining/nlp_dmuba/"
dataset_file_path = project_path + "1-Scraping/dataset_consolidado/df.parquet"
batch_requests_path = project_path + "5-LLMs/openai/pruebas_batch/batch_requests_{}.jsonl".format(nombre_prueba)
batch_results_path =  project_path + "5-LLMs/openai/pruebas_batch/batch_results_{}.jsonl".format(nombre_prueba)
batch_errors_path =   project_path + "5-LLMs/openai/pruebas_batch/batch_errors_{}.jsonl".format(nombre_prueba)
df_final_path = project_path + "5-LLMs/openai/pruebas_batch/df_final_{}.csv".format(nombre_prueba)

In [None]:
#3. Lecturas.
#a. Datos.
df = pd.read_parquet(dataset_file_path)
#b. Clave API.
with open(project_path + "secrets/opeinai_api_key.txt", "r") as f:
    key = f.read().strip()

In [None]:
#4. Genero un Cliente de OpenAI.
client = OpenAI(api_key=key)

In [None]:
########################################### SAMPLEO PARA PRUEBAS ####################################################
sample = 1500
df_sample = df.dropna(subset=["contenido"]).sample(sample, random_state=42).reset_index(drop=True)

In [None]:
#5. Prompt. 
#a. System y User Prompt.
SYSTEM_PROMPT = '''
Eres un analista económico-financiero especializado en Argentina.
Objetivo: extraer datos ESTRUCTURADOS de una noticia para modelar el MERVAL.

Salida:
Genera SOLO un JSON plano con todas las columnas al mismo nivel. Usa los valores por defecto si no hay información.

Campos y definiciones:

- tipo_actor_principal: Actor dominante al que refiere la noticia. Posibles valores: "gobierno_nacional","bcra","provincia","municipio","empresa_local","empresa_extranjera","sindicato","poder_judicial","congreso","organismo_internacional","desconocido". Default: "desconocido"
- nombre_actor_principal: Nombre del actor principal si es claro. Default: "unknown"
- empresas_mencionadas: Lista de empresas mencionadas (nombre legal). Default: []
- tickers_mencionados: Lista de tickers (BYMA/ADRs) en MAYÚSCULAS, sin duplicados. Default: []
- sectores_mencionados: Lista de sectores/industrias relevantes. Default: []

- tipo_evento: Categoría del hecho principal. Posibles valores: "monetario","fiscal","regulatorio","corporativo","externo","sindical_social","judicial","electoral","otro","desconocido". Default: "desconocido"
- shock: Signo cualitativo del impacto sobre mercados o economía. Posibles valores: "positivo","negativo","mixto","neutro","desconocido". Default: "desconocido"
- caracter: Temporalidad del evento. Posibles valores: "retroactivo","vigente","prospectivo","desconocido". Default: "desconocido"
- horizonte_dias: Días hasta que se espera el impacto, si se menciona explícitamente; si no, null. Default: null

- merval: Sesgo esperado para el índice Merval (-1: baja fuerte, +1: sube fuerte). Default: 0.0
- volatilidad_merval: Indicador cualitativo de volatilidad esperada (-1: baja, +1: alta). Default: 0.0
- fx_usdars: Sesgo para el tipo de cambio USD/ARS (-1: aprecia ARS, +1: deprecia ARS). Default: 0.0
- spread_usd: Indicador cualitativo de spread dólar oficial vs paralelo (-1: estrecho, +1: amplio). Default: 0.0
- tasa_bcra: Sesgo para la tasa de política del BCRA (-1: baja, +1: sube). Default: 0.0
- bonos_soberanos: Sesgo sobre precio de bonos soberanos (-1: baja, +1: sube). Default: 0.0
- spread_bonos: Indicador cualitativo de spreads de bonos (-1: estrecho, +1: amplio). Default: 0.0
- actividad_economica: Sesgo sobre nivel de actividad económica (-1: baja, +1: sube). Default: 0.0
- volumen_mercado: Nivel de actividad en trading (-1: bajo, +1: alto). Default: 0.0

- valencia_general: Tono general del artículo sobre economía y mercados (-1: negativo, +1: positivo). Default: 0.0
- gobernanza: Tono respecto a gobierno o instituciones (-1: negativo, +1: positivo). Default: 0.0
- expectativa_macro_corto: Expectativa macro a 1–3 meses (-1: pesimista, +1: optimista). Default: 0.0
- expectativa_macro_largo: Expectativa macro a >6 meses (-1: pesimista, +1: optimista). Default: 0.0
- expectativa_fin_corto: Expectativa financiera a 1–3 meses (-1: negativo, +1: positivo). Default: 0.0
- expectativa_fin_largo: Expectativa financiera a >6 meses (-1: negativo, +1: positivo). Default: 0.0

- menciona_inflacion: Se mencionan inflación o precios. Default: false
- menciona_pbi: Se menciona PBI o crecimiento económico. Default: false
- menciona_reservas: Se mencionan reservas del BCRA. Default: false
- menciona_embi: Se menciona riesgo país o EMBI. Default: false
- menciona_deuda: Se menciona deuda pública o privada. Default: false
- menciona_fmi: Se menciona FMI o acuerdos con el FMI. Default: false
- menciona_salarios_paritarias: Se mencionan salarios o paritarias. Default: false
- menciona_tipo_cambio: Se menciona tipo de cambio o dólar. Default: false
- menciona_confianza_consumidor: Se menciona índice o sentimiento de confianza del consumidor. Default: false

- menciona_sector_bancario: Se menciona al sistema financiero o bancos. Default: false
- impacto_sector_bancario: Sesgo del impacto sobre el sector bancario (-1: negativo, +1: positivo). Default: 0.0
- menciona_sector_energia: Se menciona el sector energético (YPF, Pampa, gas, electricidad, petróleo). Default: false
- impacto_sector_energia: Sesgo del impacto sobre el sector energético (-1: negativo, +1: positivo). Default: 0.0
- menciona_sector_agroexportador: Se menciona el agro o exportaciones del campo (soja, maíz, trigo, etc.). Default: false
- impacto_sector_agroexportador: Sesgo del impacto sobre el sector agroexportador (-1: negativo, +1: positivo). Default: 0.0
- menciona_sector_industrial: Se menciona la industria manufacturera o producción local. Default: false
- impacto_sector_industrial: Sesgo del impacto sobre la industria (-1: negativo, +1: positivo). Default: 0.0
- menciona_eeuu: Se mencionan Estados Unidos, su economía, o sus mercados. Default: false
- impacto_eeuu: Impacto esperado de noticias o condiciones de EE.UU. sobre Argentina (-1: desfavorable, +1: favorable). Default: 0.0
- menciona_brasil: Se menciona Brasil o su economía (importante socio comercial). Default: false
- impacto_brasil: Impacto esperado de Brasil sobre Argentina o sus mercados (-1: desfavorable, +1: favorable). Default: 0.0
- menciona_mercosur: Se menciona el Mercosur o acuerdos comerciales regionales. Default: false
- impacto_mercosur: Impacto esperado del Mercosur sobre la economía argentina (-1: desfavorable, +1: favorable). Default: 0.0
- menciona_prestamos_internacionales: Se mencionan préstamos o financiamiento de organismos internacionales. Default: false
- impacto_prestamos_internacionales: Sesgo esperado de dicho financiamiento (-1: negativo, +1: positivo). Default: 0.0
- impacto_fmi: Sesgo del impacto del FMI sobre Argentina (-1: negativo, +1: positivo). Default: 0.0
- menciona_commodities: Se mencionan commodities relevantes (soja, petróleo, oro, litio, etc.). Default: false
- impacto_commodities: Impacto esperado del movimiento de commodities sobre Argentina (-1: desfavorable, +1: favorable). Default: 0.0
- menciona_bolsa_extranjera: Se mencionan bolsas o índices extranjeros (S&P 500, Nasdaq, Bovespa, etc.). Default: false
- impacto_bolsa_extranjera: Impacto esperado de esos mercados sobre el Merval o Argentina (-1: desfavorable, +1: favorable). Default: 0.0

- categoria_fuente: Tipo de fuente del contenido. Posibles valores: "oficial","periodistica","analisis","rumor","desconocido". Default: "desconocido"
- score_fuente: Confiabilidad de la fuente según categoría (0..1). Default: 0.5
- confianza: Confianza global de extracción (claridad y evidencia) (0..1). Default: 0.0

Reglas y robustez:
- Usa SOLO el texto de la noticia. NO infieras más allá.
- Señales y expectativas macro/financieras solo en [-1..1].
- Valores boolean: true/false.
- Valores numéricos: float.
- Null si no hay información explícita.
- Tickers en MAYÚSCULAS; listas sin duplicados.
- Listas siempre en formato JSON array, aunque estén vacías.
- Si el artículo no es económico/financiero (ej.: cultura, deportes, política sin relación económica), llenar señales y expectativas con 0.0, booleanos con false, tipo_evento="desconocido", confianza ≤ 0.3.
- Contenido puede ser truncado a 8000 caracteres si es muy extenso.

Instrucciones finales:
- Devuelve SOLO JSON plano, sin anidamiento ni explicaciones.
- Respeta todos los tipos (boolean, string, float, null).
- No agregues texto adicional ni explicaciones.
'''


USER_TEMPLATE = '''
Diario: {diario}
Fecha: {fecha}  # formato YYYY-MM-DD.
Seccion: {seccion}
Titulo: {titulo}
Contenido: {contenido}  # truncado a 8000 caracteres si es muy largo.

Instrucciones:
- Genera SOLO un JSON plano con todos los campos al mismo nivel, siguiendo el esquema definido en el SYSTEM_PROMPT.
- No agregues texto adicional ni explicaciones.
- Respeta tipos de datos, valores por defecto y rangos.
'''

In [None]:
#6. Creo archivo JSONL para carga batch,
with open(batch_requests_path, "w", encoding="utf-8") as f:
    for i, row in df_sample.iterrows():
        contenido = (row.get("contenido") or "")[:8000]
        prompt = USER_TEMPLATE.format(
            diario=row.get("diario", "unknown"),
            fecha=str(row.get("fecha", "unknown")),
            seccion=row.get("seccion", "unknown"),
            titulo=row.get("titulo", "unknown"),
            contenido=contenido
        )

        request_dict = {
            "custom_id": f"row_{i}",
            "method": "POST",
            "url": "/v1/responses",
            "body": {
                "model": "gpt-5-mini",
                "input": [
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": prompt}
                ]
            }

        }
        f.write(json.dumps(request_dict, ensure_ascii=False) + "\n")

print(f"✅ Archivo JSONL creado en batch_request.")

In [None]:
#7. Subo el archivo y cargo el batch.
#a. Subo el archivo JSONL.
file_upload = client.files.create(
    file=open(batch_requests_path, "rb"),
    purpose="batch"
)
print("📁 Archivo subido con ID:", file_upload.id)

#b. Creo el batch job usando ese file_id.
batch_job = client.batches.create(
    input_file_id=file_upload.id,
    endpoint="/v1/responses",
    completion_window="24h"
)

print("🚀 Batch job creado:", batch_job.id)
print("Status inicial:", batch_job.status)

In [None]:
#8. Conozco el estado de lo que envié.
#a. Consulto.
job = client.batches.retrieve(batch_job.id)
#b. Printeo.
print("📋 Estado:", job.status)
print("⚙️  Output file:", job.output_file_id)
print("📦 ID:", job.id)
print("🕒 Creado:", job.created_at)
pprint(job.model_dump()) # Muestro todos los detalles en bruto.

In [None]:
#9. Conozco errores.
if job.error_file_id:
    #a. Consulto.
    error_file_id = job.error_file_id

    #b. Descargo el archivo con errores.
    error_content = client.files.content(error_file_id)

    #c. Almaceno.
    with open(batch_errors_path.format(nombre_prueba), "wb") as f:
        f.write(error_content.read())

    # d. Printeo.
    print("✅ Archivo de errores descargado en batch_errors")
else:
    print("ℹ️ No hay archivo de errores para este job (error_file_id es None).")

In [None]:
#10. Descargo resultados, si está completado.
if job.status == "completed":
    output_file_id = job.output_file_id
    result = client.files.content(output_file_id)
    
    # El contenido es un JSONL (una respuesta por línea)
    with open(batch_results_path, "wb") as f:
        f.write(result.read())

    print("✅ Resultados descargados en batch_results.")
else:
    print("ℹ️ Resultados aún no completos.")


In [None]:
#11. Armo el dataframe.
if job.status == "completed":
    #a. "Cargo el JSONL completo de respuestas de la API.
    with open(batch_results_path, "r", encoding="utf-8") as f:
        batch_responses = [json.loads(line) for line in f]

    #b. Extraigo toda la info de cada respuesta.
    all_records = []
    for resp in batch_responses:
        try:
            # Extraigo el JSON generado por el modelo.
            text_json_str = resp["response"]["body"]["output"][1]["content"][0]["text"]
            data = json.loads(text_json_str)
            
            # Agrego el custom_id para poder unirlo con el DataFrame original.
            data["custom_id"] = resp.get("custom_id", None)
            all_records.append(data)

        except Exception as e:
            print(f"❌ Error en registro {resp.get('custom_id', 'unknown')}: {e}")
            continue

    #c. Creo DataFrame plano con todas las columnas extraídas.
    df_features = pd.json_normalize(all_records)

    #d. Agrego columna custom_id al df original para poder hacer merge.
    df_sample['custom_id'] = [f'row_{i}' for i in range(len(df_sample))]

    #e. Uno el df original con las features extraídas.
    df_final = df_sample.merge(df_features, on='custom_id', how='left')

    #f. Elimino custom_id si ya no sirve.
    df_final.drop(columns=["custom_id"], inplace=True)

    #g. Exporto el resultado.
    df_final.to_csv(df_final_path,index=False)
else:
    print("ℹ️ Resultados aún no completos.")

In [None]:
#12. Visualizo cuanto tardó el proceso.
if job.status == "completed":
    #a. Convertimos timestamps a datetime.
    created = datetime.fromtimestamp(job.created_at)
    completed = datetime.fromtimestamp(job.completed_at)

    #b. Calculamos duración.
    duration = completed - created
    print("⏱ Duración del proceso:", duration)
    print("Duración en segundos:", (completed - created).total_seconds())
    print("Duración en minutos:", (completed - created).total_seconds()/60)
else:
    print("ℹ️ Resultados aún no completos.")

In [None]:
df_final