In [2]:
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)


import os
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple


from sklearn.preprocessing import StandardScaler
from sklearn.covariance import LedoitWolf
from sklearn.metrics import silhouette_score, adjusted_rand_score


from scipy.spatial.distance import pdist, squareform
from scipy.cluster.hierarchy import linkage, dendrogram, cophenet, fcluster
import matplotlib.pyplot as plt

In [3]:
CSV_PATH = "../../assets/huggingface_with_fairness.csv" # <-- cámbialo
OUTPUT_DIR = "../../assets/cluster_output"

In [4]:
FEATURE_COLS = [
"performance_score",
"co2_eq_emissions",
"likes",
"downloads",
"size",
"size_efficiency"
]

In [5]:
EXTERNAL_LABELS = {
"tipo_modelo": "tipo_modelo",
"clasificacion_fairness": "clasificacion_fairness",
"es_justo": "es_justo",
}

In [6]:
# Criterios de corte (elige uno). Si ambos son None, se optimiza k por silueta en rango.
K_TARGET: Optional[int] = None # p.ej., 3
DIST_THRESHOLD: Optional[float] = None # p.ej., 1.2 (solo para métodos compatibles)

In [7]:
# Rango para búsqueda de k óptimo por silueta
K_RANGE = range(2, 9)

In [8]:
# Semilla
RANDOM_STATE = 42

In [9]:
@dataclass
class Config:
    name: str
    distance: str # 'euclidean' | 'cosine' | 'mahalanobis'
    linkage: str # 'ward' | 'average' | 'complete' | 'single'

In [10]:
CONFIGS: List[Config] = [
    Config("Ward-Euclid", "euclidean", "ward"), # ward solo con euclidiana
    Config("Average-Euclid", "euclidean", "average"),
    Config("Complete-Euclid", "euclidean", "complete"),
    Config("Single-Euclid", "euclidean", "single"),
    Config("Average-Cosine", "cosine", "average"),
    Config("Complete-Cosine", "cosine", "complete"),
    Config("Single-Cosine", "cosine", "single"),
    Config("Average-Mahalanobis", "mahalanobis", "average"),
    Config("Complete-Mahalanobis", "mahalanobis", "complete"),
    Config("Single-Mahalanobis", "mahalanobis", "single"),
]

In [11]:
np.random.seed(RANDOM_STATE)

In [12]:
# load and processing
def load_and_prepare(csv_path: str,
    feature_cols: List[str],
    alias: Dict[str, str],
    external: Dict[str, str]) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    df = pd.read_csv(csv_path)
    df.columns = [c.strip() for c in df.columns]
    rename_map = {}
    for col in df.columns:
        if col in alias:
            rename_map[col] = alias[col]
    df = df.rename(columns=rename_map)
    existing = set(df.columns)
    fixed = []
    for col in feature_cols:
        if col in existing:
            fixed.append(col)
        else:
            inv = {v: k for k, v in alias.items()}
        if col in inv and inv[col] in existing:
            fixed.append(inv[col])
        else:
            print(f"[ADVERTENCIA] Columna no encontrada: {col}")
    feature_cols = fixed
    
    
    # Variables externas si existen
    ext_df = pd.DataFrame()
    for k, v in external.items():
        if v in df.columns:
            ext_df[k] = df[v]
        else:
            print(f"[INFO] Etiqueta externa no encontrada: {v}")
    
    
    X = df[feature_cols].copy()
    
    
    for c in X.columns:
        X[c] = pd.to_numeric(X[c], errors="coerce")
    X = X.replace([np.inf, -np.inf], np.nan)
    
    mask_valid = ~X.isna().any(axis=1)
    rows_before = len(X)
    X = X.loc[mask_valid]
    ext_df = ext_df.loc[mask_valid]
    print(f"Filas eliminadas por NA/inf: {rows_before - len(X)}")
    
    scaler = StandardScaler()
    Z = pd.DataFrame(scaler.fit_transform(X), index=X.index, columns=X.columns)
    
    
    return df, Z, ext_df

In [13]:
# ============================
# 4) Distancias y aglomeración
# ============================


def mahalanobis_pdist(Z: pd.DataFrame) -> np.ndarray:
    X = Z.values
    # Estima covarianza con Ledoit-Wolf (robusta) sobre Z
    lw = LedoitWolf().fit(X)
    VI = np.linalg.pinv(lw.covariance_)
    d = pdist(X, metric="mahalanobis", VI=VI)
    return d

In [14]:
def compute_linkage(Z: pd.DataFrame, cfg: Config) -> Tuple[np.ndarray, np.ndarray]:
    X = Z.values
    if cfg.linkage == "ward":
    # Ward requiere datos (no distancias) y equivale a euclidiana
        L = linkage(X, method="ward")
        coph_dists = pdist(X, metric="euclidean")
    else:
        if cfg.distance == "euclidean":
            D = pdist(X, metric="euclidean")
        elif cfg.distance == "cosine":
            D = pdist(X, metric="cosine")
        elif cfg.distance == "mahalanobis":
            D = mahalanobis_pdist(Z)
        else:
            raise ValueError("Distancia no soportada")
    L = linkage(D, method=cfg.linkage)
    coph_dists = D
    return L, coph_dists

In [15]:
def cut_tree(L: np.ndarray,
    k_target: Optional[int] = None,
    dist_threshold: Optional[float] = None,
    k_range = range(2, 9),
    X_for_score: Optional[np.ndarray] = None) -> Tuple[np.ndarray, int, Optional[float], float]:
    """Devuelve labels, k elegido, threshold (si aplica) y silueta."""
    if dist_threshold is not None:
        labels = fcluster(L, t=dist_threshold, criterion="distance")
        k = len(np.unique(labels))
        sil = silhouette_score(X_for_score, labels, metric="euclidean") if X_for_score is not None and k>1 else np.nan
        return labels, k, dist_threshold, sil


    if k_target is not None:
        labels = fcluster(L, t=k_target, criterion="maxclust")
        sil = silhouette_score(X_for_score, labels, metric="euclidean") if X_for_score is not None and k_target>1 else np.nan
        return labels, k_target, None, sil
    
    
    # Optimiza k por silueta en el rango
    best = (-np.inf, None, None)
    for k in k_range:
        labels = fcluster(L, t=k, criterion="maxclust")
        if len(np.unique(labels)) < 2:
            continue
        sil = silhouette_score(X_for_score, labels, metric="euclidean") if X_for_score is not None else np.nan
        if np.isnan(sil):
            continue
        if sil > best[0]:
            best = (sil, k, labels)
    if best[1] is None:
        labels = fcluster(L, t=2, criterion="maxclust")
        sil = silhouette_score(X_for_score, labels, metric="euclidean") if X_for_score is not None else np.nan
        return labels, 2, None, sil
    return best[2], best[1], None, best[0]

In [16]:
def summarize_external(labels: np.ndarray, ext_df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    out = {}
    cl = pd.Series(labels, name="cluster")
    for name, col in ext_df.items():
        try:
            tab = pd.crosstab(cl, col)
            out[name] = tab
        except Exception:
            pass
    return out

In [17]:
def run_pipeline(csv_path: str = CSV_PATH):
    df, Z, ext_df = load_and_prepare(csv_path, FEATURE_COLS, FEATURE_COLS, EXTERNAL_LABELS)
    results = []
        

    
    for cfg in CONFIGS:
    # Ward solo tiene sentido con euclidiana
        if cfg.linkage == "ward" and cfg.distance != "euclidean":
            continue
        try:
            L, coph_d = compute_linkage(Z, cfg)
        except Exception as e:
            print(f"[ERROR] {cfg.name}: {e}")
            continue
    
    
    # Cophenetic
    coph_corr, _ = cophenet(L, coph_d)
    
    
    # Dendrograma
    plt.figure(figsize=(9, 5))
    dendrogram(L, no_labels=True)
    plt.title(f"Dendrograma – {cfg.name}")
    plt.tight_layout()
    fig_path = os.path.join(OUTPUT_DIR, f"dendrogram_{cfg.name.replace(' ', '_')}.png")
    plt.savefig(fig_path, dpi=150)
    plt.close()
    
    
    # Corte y evaluación
    labels, k, thr, sil = cut_tree(L, K_TARGET, DIST_THRESHOLD, K_RANGE, Z.values)
    
    
    # Resumen externo
    ext_tabs = summarize_external(labels, ext_df)
    
    
    # ARI vs es_justo (si es binaria y sin NA)
    ari = np.nan
    if "es_justo" in ext_df.columns:
        y = ext_df["es_justo"].dropna()
        inter = np.intersect1d(y.index, Z.index)
        if len(np.unique(y.loc[inter])) > 1:
            ari = adjusted_rand_score(y.loc[inter].astype(int), pd.Series(labels, index=Z.index).loc[inter])
        
        
        # Guardar tablas externas
    ext_paths = {}
    for name, tab in ext_tabs.items():
        p = os.path.join(OUTPUT_DIR, f"tabla_{cfg.name.replace(' ', '_')}_{name}.csv")
        tab.to_csv(p)
        ext_paths[name] = p
    
    
    results.append({
    "config": cfg.name,
    "distance": cfg.distance,
    "linkage": cfg.linkage,
    "cophenetic": coph_corr,
    "k": k,
    "threshold": thr,
    "silhouette": sil,
    "ari_vs_es_justo": ari,
    "dendrogram_path": fig_path,
    "external_tables": ext_paths,
    })
    
    
    # Ranking por silueta y cophenético
    res_df = pd.DataFrame(results)
    if not res_df.empty:
        res_df = res_df.sort_values(["silhouette", "cophenetic"], ascending=False)
        res_path = os.path.join(OUTPUT_DIR, "resumen_configuraciones.csv")
        res_df.to_csv(res_path, index=False)
        print("\nResumen guardado en:", res_path)
        print("\nTop configuraciones (por silueta, luego cophenético):\n")
        print(res_df.head(10))
        print("\nDendrogramas y tablas por configuración en:", os.path.abspath(OUTPUT_DIR))
    else:
        print("No se pudo generar ningún resultado. Verifica columnas y datos.")

In [18]:
run_pipeline(CSV_PATH)

TypeError: list indices must be integers or slices, not str

In [2]:
df = pd.read_csv("../../assets/huggingface_with_fairness.csv")

In [3]:
features = ['performance_score', 'co2_eq_emissions', 'size', 'si]
df_cluster = df[features].dropna() # Usar .dropna() para eliminar filas con datos faltantes
