Configuracion

In [3]:
# Parametros de conexion y ejecucion

MONGO_URI = "mongodb://admin:pass@127.0.0.1:27017/?authSource=admin"
MONGO_DB = "streamdb"
MONGO_COLL = "raw_messages"

MYSQL_CFG = dict(
    host="127.0.0.1",
    port=3306,
    user="root",
    password="pass",
    database="lab",
)

MODEL_NAME = "tabularisai/multilingual-sentiment-analysis"

# Inferencia
MAX_LENGTH = 256
BATCH_SIZE = 64

# Control del loop
POLL_WAIT = 2.0      # segundos a esperar cuando no hay pendientes
MAX_TOTAL = 500       # numero de docs a procesar y terminar; usa None para modo continuo
LOG_EVERY = 100       # imprimir un log cada N documentos

# Persistencia adicional en DWH
RAW_JSON = True       # guardar el documento original en dw_messages.raw_json
UPSERT = True         # usar ON DUPLICATE KEY UPDATE (idempotencia)


Imports y carga de modelo (sin pipelines)

In [4]:
import json, time, sys
from datetime import datetime, timezone
from typing import Dict, List

import mysql.connector
from pymongo import MongoClient, ReturnDocument

from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F

from tqdm.auto import tqdm

def utcnow_iso():
    return datetime.now(timezone.utc).isoformat()

def essential_str(v):
    return "" if v is None else str(v)

print("Versiones:")
import pkgutil, importlib
for pkg in ["torch","transformers","pymongo","mysql.connector","ipywidgets","tqdm"]:
    try:
        mod = importlib.import_module(pkg if pkg!="mysql.connector" else "mysql")
        v = getattr(mod, "__version__", None)
        if v is None and pkg=="mysql.connector":
            import mysql.connector as mc
            v = mc.__version__
        print(f"  {pkg}: {v}")
    except Exception as e:
        print(f"  {pkg}: no importable ({e})")

print("\nCargando modelo:", MODEL_NAME)
tok = AutoTokenizer.from_pretrained(MODEL_NAME)
mdl = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
mdl.eval()
id2label = getattr(mdl.config, "id2label",
                   {0:"Very Negative",1:"Negative",2:"Neutral",3:"Positive",4:"Very Positive"})
print("Clases:", id2label)


Versiones:
  torch: 2.4.1+cu121
  transformers: 4.46.3
  pymongo: 4.10.1
  mysql.connector: 9.0.0
  ipywidgets: 8.1.7
  tqdm: 4.67.1

Cargando modelo: tabularisai/multilingual-sentiment-analysis
Clases: {0: 'Very Negative', 1: 'Negative', 2: 'Neutral', 3: 'Positive', 4: 'Very Positive'}


Conexiones y SQL

In [5]:
# Conexiones
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
coll = mongo[MONGO_DB][MONGO_COLL]

mysql_conn = mysql.connector.connect(**MYSQL_CFG, autocommit=True)
cur = mysql_conn.cursor(prepared=True)

# SQL de insercion con upsert opcional
insert_sql = (
    "INSERT INTO dw_messages "
    "(id, user_id, comment, sentiment_label, sentiment_score, raw_json) "
    "VALUES (%s, %s, %s, %s, %s, %s)"
)
if UPSERT:
    insert_sql += (
        " ON DUPLICATE KEY UPDATE "
        "user_id=VALUES(user_id), comment=VALUES(comment), "
        "sentiment_label=VALUES(sentiment_label), sentiment_score=VALUES(sentiment_score), "
        "raw_json=VALUES(raw_json)"
    )

print("Conexion Mongo OK y cursor MySQL listo.")


Conexion Mongo OK y cursor MySQL listo.


Utilidades (mapeo de etiquetas, locking, helpers)

In [6]:
def label_code_from_id2label(idx: int, id2label: Dict[int,str]) -> str:
    """
    Mapea la etiqueta textual del modelo a las 5 clases del DWH:
      vneg, neg, neu, pos, vpos
    """
    name = id2label.get(int(idx), str(idx)).lower().strip()
    if "very negative" in name: return "vneg"
    if name == "negative" or ("negative" in name and "very" not in name): return "neg"
    if "neutral" in name: return "neu"
    if "very positive" in name: return "vpos"
    if name == "positive" or ("positive" in name and "very" not in name): return "pos"
    return "neu"

def lock_batch(n: int):
    """
    Toma hasta n documentos sin 'proc' y los marca como locked (operacion atomica por doc).
    """
    out = []
    for _ in range(n):
        d = coll.find_one_and_update(
            {"proc": {"$exists": False}},
            {"$set": {"proc": {"status": "locked", "ts": utcnow_iso()}}},
            return_document=ReturnDocument.AFTER,
        )
        if not d:
            break
        out.append(d)
    return out

def unlock_stale_locks(minutes=15) -> int:
    """
    Libera locks antiguos (por si el notebook se interrumpe a mitad).
    Retorna cantidad desbloqueada.
    """
    from datetime import timedelta
    threshold = (datetime.now(timezone.utc) - timedelta(minutes=minutes)).isoformat()
    res = coll.update_many({"proc.status":"locked", "proc.ts":{"$lt": threshold}}, {"$unset":{"proc":1}})
    return res.modified_count


Paso de procesamiento (batch)

In [7]:
def process_once(batch_size=BATCH_SIZE, max_length=MAX_LENGTH, raw_json=RAW_JSON) -> int:
    batch = lock_batch(batch_size)
    if not batch:
        return 0

    texts = [(essential_str(d.get("comment","")).strip() or " ") for d in batch]
    try:
        enc = tok(texts, padding=True, truncation=True, max_length=max_length, return_tensors="pt")
        with torch.no_grad():
            out = mdl(**enc)
        probs = F.softmax(out.logits, dim=-1)
    except Exception as e:
        # Marca todo el lote en error si la inferencia falla
        for d in batch:
            coll.update_one({"_id": d["_id"]},
                            {"$set": {"proc.status":"error","proc.ts":utcnow_iso(),"proc.error": f"inferencia:{e}"}})
        print("ERROR inferencia:", e, file=sys.stderr)
        return 0

    ok = 0
    for d, prob in zip(batch, probs):
        try:
            idx = int(torch.argmax(prob).item())
            label = label_code_from_id2label(idx, id2label)
            score = float(prob[idx].item())
            rjson = json.dumps(d, ensure_ascii=False) if raw_json else None

            cur.execute(insert_sql, (
                str(d.get("_id")),
                essential_str(d.get("user_id","")),
                essential_str(d.get("comment","")),
                label,
                score,
                rjson
            ))

            coll.update_one(
                {"_id": d["_id"]},
                {"$set": {"proc.status":"done","proc.ts":utcnow_iso(),"pred.label":label,"pred.score":score}}
            )
            ok += 1

        except mysql.connector.Error as me:
            coll.update_one(
                {"_id": d["_id"]},
                {"$set": {"proc.status":"error","proc.ts":utcnow_iso(),"proc.error": f"mysql:{getattr(me,'msg',me)}"}}
            )
            print("ERROR MySQL:", me, file=sys.stderr)
        except Exception as e:
            coll.update_one(
                {"_id": d["_id"]},
                {"$set": {"proc.status":"error","proc.ts":utcnow_iso(),"proc.error": f"proc:{e}"}}
            )
            print("ERROR proc:", e, file=sys.stderr)

    return ok


Loop de trabajo (smoke test o continuo)

In [8]:
total = 0
target = float("inf") if MAX_TOTAL is None else MAX_TOTAL

print(f"Iniciando loop. MAX_TOTAL = {MAX_TOTAL}, batch = {BATCH_SIZE}")

try:
    if MAX_TOTAL is None:
        # Modo continuo: sin barra de progreso, logs cada LOG_EVERY
        while True:
            n = process_once()
            if n == 0:
                time.sleep(POLL_WAIT)
                continue
            total += n
            if total % LOG_EVERY == 0:
                print(f"procesados total={total}  {utcnow_iso()}")
    else:
        # Modo N documentos: barra de progreso
        with tqdm(total=target, desc="Procesando", unit="doc") as pbar:
            while total < target:
                n = process_once()
                if n == 0:
                    time.sleep(POLL_WAIT)
                    continue
                total += n
                pbar.update(n)
        print("Finalizado. total procesados =", total)
except KeyboardInterrupt:
    print("Interrumpido por el usuario")


Iniciando loop. MAX_TOTAL = 500, batch = 64


Procesando:   0%|          | 0/500 [00:00<?, ?doc/s]

Finalizado. total procesados = 512


Verificacion rapida (Mongo y MySQL)

In [9]:
# Mongo: conteos
done   = coll.count_documents({"proc.status":"done"})
locked = coll.count_documents({"proc.status":"locked"})
pend   = coll.count_documents({"proc":{"$exists":False}})
print("Mongo -> done:", done, " locked:", locked, " pending:", pend)

# MySQL: conteo y muestra
mysql_conn.reconnect(attempts=2, delay=1)
cur2 = mysql_conn.cursor()
cur2.execute("SELECT COUNT(*) FROM dw_messages")
print("MySQL -> dw_messages total:", cur2.fetchone()[0])
cur2.execute("""
  SELECT id,user_id,sentiment_label,sentiment_score,ingest_ts
  FROM dw_messages
  ORDER BY ingest_ts DESC
  LIMIT 10
""")
rows = cur2.fetchall()
for r in rows:
    print(r)
cur2.close()


Mongo -> done: 512  locked: 0  pending: 1166
MySQL -> dw_messages total: 512
('03cedde4-dc55-4d25-8f21-9d5cecfe8d5e', 'user_08544', 'neg', 0.656003, datetime.datetime(2025, 8, 19, 23, 57, 42))
('04af8c06-8934-4d58-82b0-f2b1668e82f7', 'user_22588', 'pos', 0.569743, datetime.datetime(2025, 8, 19, 23, 57, 42))
('07835c9a-4249-4598-ac61-71c35c244d6a', 'user_34751', 'vpos', 0.459527, datetime.datetime(2025, 8, 19, 23, 57, 42))
('086b48f7-25c7-4d1c-bd06-9e351c27eba7', 'user_23503', 'vpos', 0.538442, datetime.datetime(2025, 8, 19, 23, 57, 42))
('092e5156-a92b-4d77-8a19-527f3d51e06b', 'user_88159', 'neu', 0.342642, datetime.datetime(2025, 8, 19, 23, 57, 42))
('1942baff-2679-4087-b9b6-fc8c2c9e2af5', 'user_00776', 'pos', 0.664292, datetime.datetime(2025, 8, 19, 23, 57, 42))
('2e7dd474-d805-4463-8091-7496dcbd4dcf', 'user_62446', 'neg', 0.665276, datetime.datetime(2025, 8, 19, 23, 57, 42))
('3435bb94-1819-4916-ad48-9874b633a485', 'user_97023', 'pos', 0.50092, datetime.datetime(2025, 8, 19, 23, 57,

True

Utilidad para liberar locks antiguos

In [10]:
released = unlock_stale_locks(minutes=15)
print("locks liberados:", released)


locks liberados: 0


Validar que MySQL refleja todo lo procesado

In [11]:
cur2 = mysql_conn.cursor()
cur2.execute("SELECT sentiment_label, COUNT(*) FROM dw_messages GROUP BY sentiment_label ORDER BY 2 DESC")
print(cur2.fetchall())
cur2.execute("SELECT COUNT(*) FROM dw_messages"); print("total:", cur2.fetchone()[0])
cur2.close()


[('pos', 127), ('neg', 127), ('neu', 124), ('vpos', 85), ('vneg', 49)]
total: 512


True