# Algoritmo de Explicación de Tópicos
Este notebook implementa un pipeline completo para:
1. Procesamiento de tripletas de conocimiento
2. Expansión de vocabulario con similitud semántica
3. Clustering jerárquico de términos
4. Generación de explicaciones con LLMs
5. Evaluación automática de las explicaciones

## 1. Configuración y Preparación
Configuración inicial y carga de recursos necesarios

In [None]:
import os
import re
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import scipy.cluster.hierarchy as shc
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score, silhouette_samples
import spacy
import torch
from utils.types import *
from utils.triplet_manager_lib import Tripleta
from operator import itemgetter
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, T5Tokenizer, T5ForConditionalGeneration
import joblib
import nltk
from nltk.corpus import wordnet as wn

# Configuración
TRIPLES_PATH = 'data/triples_ft/processed/dataset_final_triplet_bbc_pykeen'
TOPIC_ID = 3
VISITAR_OBJETO = True
TERMINOS_A_INCLUIR = set(['dvd','google','electronic','tv','sony','screen','nintendo',
                         'player','mobile','phone','software','video','network','apple',
                         'program','linux'])
DBPEDIA_PATH = 'data/corpus_ft/bbc/diccionario_topic_entidades_dbpedia'
NER_PATH = 'data/corpus_ft/bbc/diccionario_ner'
N_SINONIMOS = 1
OUTPUT_DIR = 'output'
SPACY_MODEL = 'en_core_web_lg'
GEN_MODEL = 'Qwen/Qwen2-7B-Instruct'
EVAL_MODEL = 'google/mt5-small'
TOP_K_TERMS = 10

# Preparación
os.makedirs(OUTPUT_DIR, exist_ok=True)
df_tr = joblib.load(TRIPLES_PATH)
topics_dbp = joblib.load(DBPEDIA_PATH)
dictdbp = topics_dbp.get(TOPIC_ID, {})
dictner = joblib.load(NER_PATH)
nlp = spacy.load(SPACY_MODEL)
nltk.download('wordnet', quiet=True)

# Funciones auxiliares
def remove_numbers(text): return re.sub(r"\d+", "", text)
def remove_dbpedia_categories(s): return s.split('/')[-1]
def return_url_element(s):
    for sep in ['#','/']:
        if sep in s:
            s = s.split(sep)[-1]
    return s

## 2. Filtrado de Tripletas y Extracción de Términos
Procesamiento de tripletas para extraer términos relevantes usando información de DBpedia y NER

In [None]:
listado_tripletas = []
palabrasdbpedia = set(k.lower() for k in dictdbp.keys())
anterior = None

for i, row in df_tr.iterrows():
    tripleta = Tripleta({'subject': str(row['subject']),
                     'relation': row['relation'],
                     'object': str(row['object'])})

    sujeto = set(tripleta.sujeto.split())
    objeto = set(tripleta.objeto.split()) if VISITAR_OBJETO else set()

    # Lógica de comparación entre tripletas
    if anterior is None:
        anterior = tripleta
    
    misma_super = (tripleta.esTripletaSuper(anterior) == anterior.esTripletaSuper(tripleta))
    dif = tripleta.dondeSonDiferentes(anterior)

    if (misma_super and (dif == ('sujeto', 'relacion', 'objeto') or dif == ('sujeto', None, 'objeto'))):
        anterior = tripleta
    else:
        continue

    # Filtro por términos relevantes
    if (TERMINOS_A_INCLUIR is None
            or not TERMINOS_A_INCLUIR.isdisjoint(sujeto)
            or (VISITAR_OBJETO and not TERMINOS_A_INCLUIR.isdisjoint(objeto))):

        visitados = set()
        encontradas = sujeto.intersection(palabrasdbpedia)
        no_encontradas = sujeto.difference(palabrasdbpedia)

        if VISITAR_OBJETO:
            encontradas.update(objeto.intersection(palabrasdbpedia))
            no_encontradas.update(objeto.difference(palabrasdbpedia))

        for termino in encontradas:
            termino_lower = termino.lower()

            if termino in visitados:
                continue

            if termino[0].isdigit():
                no_encontradas.add(termino)
                continue

            info_list = dictdbp.get(termino_lower, [])
            if not info_list:
                no_encontradas.add(termino)
                continue

            info_termino = info_list[0]
            uri_db = info_termino.get('URI', '')
            tipos_db = info_termino.get('tipos', [])

            # Extracción de sinónimos y hypernyms de WordNet
            sinonimos = []
            lwordnet = []
            for syn in wn.synsets(termino):
                sinonimos.extend(syn.lemma_names())
                for h in syn.hypernyms():
                    lwordnet.extend(h.lemma_names())

            # Información NER
            sujeto_en_ner = dictner.get(termino_lower, '')
            ner = []
            if sujeto_en_ner:
                ner.append(sujeto_en_ner)

            diccionario_termino = {
                'termino': termino,
                'sinonimos': list(set(sinonimos)),
                'resource': uri_db,
                'dbpedia': tipos_db,
                'ner': ner,
                'wordnet': lwordnet
            }

            listado_tripletas.append(diccionario_termino)
            visitados.add(termino)

print(f"Términos extraídos: {len(listado_tripletas)}")

## 3. Expansión de Vocabulario con Similitud Semántica
Uso de spaCy para encontrar términos similares y expandir el vocabulario

In [None]:
df = pd.DataFrame(listado_tripletas)
vocab_aux, lista_tipos = [], []

for _, row in df.iterrows():
    termino = row['termino']
    tipos = []
    tipos.extend(row['dbpedia'] if isinstance(row['dbpedia'], list) else row['dbpedia'].split(','))
    tipos.extend(row['wordnet'])
    
    # Limpieza de tipos
    tipos_clean = []
    for t in tipos:
        el = return_url_element(remove_dbpedia_categories(remove_numbers(str(t))))
        if el and el != 'Q': tipos_clean.append(el)
    
    # Cálculo de similitudes
    sims = [nlp(termino).similarity(nlp(t2)) for t2 in tipos_clean]
    if not sims: continue
    
    # Selección de los N_SINONIMOS más similares
    idx = list(np.argpartition(sims, -N_SINONIMOS)[-N_SINONIMOS:])
    sel = itemgetter(*idx)(tipos_clean)
    puntuaciones = itemgetter(*idx)(sims)
    
    lista_tipos.append({'termino': termino, 'tipos': sel, 'similitudes': puntuaciones})
    vocab_aux.append(termino)
    if isinstance(sel, str): vocab_aux.append(sel)
    else: vocab_aux.extend(sel)

# Lemmatización del vocabulario
vocab = set()
for doc in nlp.pipe(vocab_aux):
    lemmatized = " ".join([token.lemma_.lower() for token in doc])
    vocab.add(lemmatized)

terms = list(vocab)
print(f"Términos finales después de expansión: {len(terms)}")

## 4. Clustering Jerárquico
Agrupamiento de términos basado en similitud semántica y evaluación con silhouette

In [None]:
# Matriz de similitud
M = np.array([[nlp(t1).similarity(nlp(t2)) for t2 in terms] for t1 in terms])

# Clustering
labels = AgglomerativeClustering(n_clusters=min(len(terms)-1, TOP_K_TERMS)).fit_predict(M)
sample_sil = silhouette_samples(M, labels)
global_sil = silhouette_score(M, labels)

# Selección de términos más representativos por cluster
clusters = {}
for cl in set(labels):
    idxs = np.where(labels==cl)[0]
    term_sils = [(terms[i], sample_sil[i]) for i in idxs]
    top_terms = [t for t,_ in sorted(term_sils, key=lambda x: -x[1])[:TOP_K_TERMS]]
    clusters[cl] = top_terms

# Guardado de resultados
clusters_str = {str(k): v for k, v in clusters.items()}
with open(os.path.join(OUTPUT_DIR,'clusters.json'),'w', encoding='utf-8') as f:
    json.dump({'best_k': len(clusters_str), 'global_sil': global_sil, 'clusters': clusters_str}, f, ensure_ascii=False, indent=2)

print(f"Clusters generados: {len(clusters)} con silhouette score: {global_sil:.3f}")

## 5. Generación de Explicaciones con Qwen-2.5
Uso de LLM para generar explicaciones comprensibles de cada cluster

In [None]:
# Configuración del modelo generativo
torch_dev = 'cuda' if torch.cuda.is_available() else 'cpu'
tok_g = AutoTokenizer.from_pretrained(GEN_MODEL)
mod_g = AutoModelForCausalLM.from_pretrained(GEN_MODEL).to(torch_dev)
gen = pipeline('text-generation', model=mod_g, tokenizer=tok_g,
               device=0 if torch.cuda.is_available() else -1,
               return_full_text=False)

# Ejemplo few-shot
ejemplo = (
    "Ejemplo:\n"
    "{'explicación':'Este cluster agrupa términos relacionados con la tecnología móvil y las comunicaciones.',"
    "'coherencia':4,'relevancia':5,'cobertura':4}\n\n"
)

explanations = {}
for cid, terms_c in clusters.items():
    prompt = (
        ejemplo +
        f"Cluster {TOPIC_ID}-{cid}: términos {', '.join(terms_c)}. "
        f"Silhouette global={global_sil:.3f}.\n"
        "Genera solo un JSON con claves 'explicación','coherencia','relevancia','cobertura'."
    )
    out = gen(prompt, max_new_tokens=150, do_sample=False)
    txt = out[0].get('generated_text','')
    try:
        jf = txt[txt.find('{'):txt.rfind('}')+1]
        parsed = json.loads(jf)
    except:
        parsed = {'explicación': txt.strip()}
    explanations[cid] = parsed

# Guardado de explicaciones
explanations_str = {str(k): v for k, v in explanations.items()}
with open(os.path.join(OUTPUT_DIR,'explanations.json'),'w', encoding='utf-8') as f:
    json.dump(explanations_str, f, ensure_ascii=False, indent=2)

print("Explicaciones generadas para todos los clusters")

## 6. Evaluación con mT5
Evaluación automática de la calidad de las explicaciones generadas

In [None]:
# Configuración del modelo de evaluación
tok_e = T5Tokenizer.from_pretrained(EVAL_MODEL)
mod_e = T5ForConditionalGeneration.from_pretrained(EVAL_MODEL).to(torch_dev)
evalp = pipeline(
    'text2text-generation', model=mod_e, tokenizer=tok_e,
    device=0 if torch.cuda.is_available() else -1)

# Ejemplo few-shot para evaluación
ejemplo_eval = (
    "Ejemplo de evaluación con justificación:"
    "{'coherencia':4,'relevancia':5,'cobertura':4}"
    "Justificación: La explicación agrupa bien los términos (coherencia alta), cubre aspectos clave del tópico (relevancia máxima)"
    "y describe adecuadamente la amplitud temática (cobertura alta)."
)

evaluations = {}
for cid, exp in explanations.items():
    terms_c = clusters[cid]
    prompt = (
        ejemplo_eval +
        f"Evalúa esta explicación para el cluster {TOPIC_ID}-{cid}."
        f"Términos clave: {', '.join(terms_c)}."
        f"Explicación: {exp.get('explicación', exp)}."
        "Devuelve un JSON con claves 'coherencia','relevancia','cobertura' y añade un campo 'justificación' con unas 1-2 frases."
    )
    out = evalp(prompt, max_new_tokens=150, do_sample=False)
    txt = out[0].get('generated_text','')
    try:
        jf = txt[txt.find('{'):txt.rfind('}')+1]
        parsed = json.loads(jf)
    except:
        parsed = {'error': txt.strip()}
    evaluations[cid] = parsed

# Guardado de evaluaciones
evaluations_str = {str(k): v for k, v in evaluations.items()}
with open(os.path.join(OUTPUT_DIR,'evaluations.json'),'w', encoding='utf-8') as f:
    json.dump(evaluations_str, f, ensure_ascii=False, indent=2)

print("Evaluaciones completadas para todas las explicaciones")

## 7. Resumen Final
Generación de un resumen ejecutivo con los resultados principales

In [None]:
with open(os.path.join(OUTPUT_DIR,'summary.txt'),'w', encoding='utf-8') as f:
    f.write(f"Tópico {TOPIC_ID}: {len(clusters)} clusters, silhouette global {global_sil:.3f}\n")
    for cid, terms_c in clusters.items():
        exp = explanations[cid].get('explicación','')
        eva = evaluations[cid]
        coh = eva.get('coherencia','N/A')
        rel = eva.get('relevancia','N/A')
        cov = eva.get('cobertura','N/A')
        f.write(f"Cluster {cid}: explicación='{exp}' coherencia={coh} relevancia={rel} cobertura={cov}\n")

print('Pipeline completo. Resultados guardados en:', OUTPUT_DIR)