# DATOS DE PRUEBA

In [None]:
import pandas as pd
import numpy as np

# Leer el archivo CSV
df = pd.read_csv("C:\\Users\\roesc\\Desktop\\hospital-efficiency-dashboard_data\\df_consolidado_final_v2.csv")

# Mostrar las primeras filas del DataFrame
df.head()

# pasar valores '--' a 0 en columnas 'Bienes y servicios' y 'Remuneraciones'
df['Bienes y servicios'] = df['Bienes y servicios'].replace('--', 0)
df['Remuneraciones'] = df['Remuneraciones'].replace('--', 0)

# pasar columnas de tipo string a tipo float
cols_to_float = ['Bienes y servicios', 'Remuneraciones']
for col in cols_to_float:
    df[col] = df[col].astype(str).str.replace(',', '.').astype(float)

# contar missing en las columnas usando pandas
missing_counts = df.isnull().sum()
# mostrar los conteos de valores faltantes
print(missing_counts)

# pasar missing a 0
df.fillna(0, inplace=True)

# contar missing en las columnas usando pandas
missing_counts = df.isnull().sum()
# mostrar los conteos de valores faltantes
print(missing_counts)

df_copy = df.copy()
df_2014 = df_copy[df_copy["Año"] == 2014]
df_2016 = df_copy[df_copy["Año"] == 2016]

# print shapes
print("Shape of original DataFrame:", df.shape)
print("Shape of 2014 DataFrame:", df_2014.shape)
print("Shape of 2016 DataFrame:", df_2016.shape)

# FUNCIONES

## SFA

In [None]:
import numpy as np
import pandas as pd
from pysfa import SFA

def calculate_sfa_metrics(df: pd.DataFrame,
                          input_cols: list[str],
                          output_col: list[str],
                          te_threshold: float = 0.6,
                          fun: str = SFA.FUN_PROD,
                          method: str = SFA.TE_teJ) -> tuple[pd.DataFrame, dict]:
    """
    Ejecuta SFA sobre df y devuelve:
      - df_out: df con columna 'Eff_SFA'
      - metrics: diccionario con KPI y parámetros clave
    
    Parámetros:
    -----------
    df : DataFrame
      Datos con insumos y output.
    input_cols : lista de str
      Nombres de columnas de insumos.
    output_col : str
      Nombre de la columna de output.
    te_threshold : float
      Umbral para definir 'hospital crítico' (TE < te_threshold).
    fun : str
      Función a usar (SFA.FUN_PROD o FUN_COST).
    method : str
      Método de eficiencia (SFA.TE_teJ, TE_te, TE_teMod).
    
    Retorna:
    --------
    df_out : DataFrame
      df con la nueva columna 'Eff_SFA'.
    metrics : dict
      {
        'ET_promedio': float,      # eficiencia técnica promedio
        'pct_criticos': float,     # % de CRÍTICOS (TE < te_threshold)
        'variable_clave': str,     # insumo con β más alto y p<0.05
        'sigma2': float,           # varianza total del error
        'betas': np.ndarray,       # todos los β incl. intercept y λ
        'p_values': np.ndarray     # todos los p-values
      }
    """
    # solo conservar las filas donde los inputs y outputs son mayores que 0
    df = df[(df[input_cols] > 0).all(axis=1) & (df[output_col] > 0).all(axis=1)]

    x = np.log(df[input_cols]).to_numpy()   # aplicar logaritmo a los inputs
    y = np.log(df[output_col]).to_numpy()   # aplicar logaritmo a los outputs

    sfa = SFA.SFA(y, x, fun=fun, method=method)
    sfa.optimize()
    
    # Extraer eficiencia y añadirla
    te = np.array(sfa.get_technical_efficiency())
    df_out = df.copy()
    df_out['ET SFA'] = te
    
    # Extraer parámetros
    all_betas = np.array(sfa.get_beta())     # [β0, β1..βk, λ]
    all_pvals = np.array(sfa.get_pvalue())
    sigma2    = sfa.get_sigma2()
    
    # ET promedio y % críticos
    et_promedio = float(te.mean())
    pct_crit    = float((te < te_threshold).mean() * 100)
    
    # Determinar variable clave
    k = len(input_cols)
    betas_in = all_betas[1:1+k]
    pvals_in = all_pvals[1:1+k]
    df_coef = pd.DataFrame({
        'input':   input_cols,
        'beta':    betas_in,
        'p_value': pvals_in
    })
    df_sign = df_coef[df_coef.p_value < 0.05].copy()
    if not df_sign.empty:
        df_sign['abs_beta'] = df_sign.beta.abs()
        var_clave = df_sign.sort_values('abs_beta', ascending=False).iloc[0].input
    else:
        var_clave = None
    
    # Empaquetar métricas
    metrics = {
        'ET_promedio':    et_promedio,
        'pct_criticos':   pct_crit,
        'variable_clave': var_clave,
        'sigma2':         float(sigma2),
        'betas':          all_betas,
        'p_values':       all_pvals
    }

    # imprimir summary de sfa
    # print(sfa.summary())
    
    return df_out, metrics

In [None]:
df_sfa, sfa_metrics = calculate_sfa_metrics(
    df=df_2014,
    input_cols=["Bienes y servicios", "Remuneraciones", "Dias Cama Disponibles"],
    output_col=["Quirofanos"],
    te_threshold=0.5
)

# Ejemplo de lectura de métricas:
print(f"Eficiencia promedio: {sfa_metrics['ET_promedio']:.2%}")
print(f"% críticos:         {sfa_metrics['pct_criticos']:.2f}%")
print(f"Variable clave:     {sfa_metrics['variable_clave']}")
print(f"Varianza σ²:        {sfa_metrics['sigma2']:.2f}")

## DEA

In [None]:
import numpy as np
import pandas as pd
from Pyfrontier.frontier_model import EnvelopDEA

def calculate_dea_metrics(df: pd.DataFrame,
                          input_cols: list[str],
                          output_cols: list[str],
                          orientation: str = "in",
                          rts: str = "CRS",
                          te_threshold: float = 0.6,
                          n_jobs: int = 1
                         ) -> tuple[pd.DataFrame, dict]:
    """
    Ejecuta un DEA y devuelve:
      - df_out: df con columna 'Eff_DEA'
      - metrics: {
          'ET_promedio': float,
          'pct_criticos': float,
          'variable_slack_clave': str,
          'rts': str,
          'orientation': str
        }
    
    Parámetros:
      df           : DataFrame con tus datos
      input_cols   : lista de nombres de columnas de insumos
      output_cols  : lista de nombres de columnas de outputs
      orientation  : 'in' o 'out'
      rts          : 'CRS' o 'VRS'
      te_threshold : umbral para % críticos (score < te_threshold)
    """
    df = df[(df[input_cols] > 0).all(axis=1) & (df[output_cols] > 0).all(axis=1)]

    # 1) Armar arrays
    x = df[input_cols].to_numpy()
    y = df[output_cols].to_numpy()
    
    # 2) Entrenar DEA
    dea_crs = EnvelopDEA(rts, orientation, n_jobs=n_jobs)
    dea_crs.fit(x, y)
    
    # 3) Scores y slacks
    scores_crs = np.array([res.score for res in dea_crs.result])
    # slacks: entrada [n_inputs x_slack_i] en res.x_slack
    slacks_crs = np.stack([res.x_slack for res in dea_crs.result], axis=0)  # shape (n, k)
    
    # 4) DataFrame de salida
    df_out = df.copy()
    df_out["ET DEA"] = scores_crs
    
    # 5) KPI: promedio y críticos
    et_promedio = float(scores_crs.mean())
    pct_crit    = float((scores_crs < te_threshold).mean() * 100)
    
    # 6) Variable slack clave: el input cuyo slack medio es mayor
    mean_slacks = np.nanmean(np.where(slacks_crs==0, np.nan, slacks_crs), axis=0)
    idx_max     = int(np.nanargmax(mean_slacks))
    var_slack_clave = input_cols[idx_max]
    
    # 8) Empaquetar métricas
    metrics = {
        "ET_promedio":          et_promedio,
        "pct_criticos":         pct_crit,
        "variable_slack_clave": var_slack_clave,
        "orientation":          orientation,
        "rts":                  rts
    }
    return df_out, metrics


In [None]:
df_dea, dea_metrics = calculate_dea_metrics(
    df=df_2014,
    input_cols=["Bienes y servicios", "Remuneraciones", "Dias Cama Disponibles"],
    output_cols=["Consultas"],
    orientation="in",
    rts="CRS",
    te_threshold=0.6,
    n_jobs=2
)

print(f"ET Promedio:      {dea_metrics['ET_promedio']:.2%}")
print(f"% Hosp. críticos:  {dea_metrics['pct_criticos']:.2f}%")
print(f"Variable slack clave: {dea_metrics['variable_slack_clave']}")

## DEA-M

In [None]:
import numpy as np
import pandas as pd
from Pyfrontier.frontier_model import EnvelopDEA

def calculate_dea_malmquist_simple(df_t, df_t1,
                                   input_cols, output_cols,
                                   rts="CRS", orientation="in",
                                   n_jobs: int = 1):
    """
    Índice de Malmquist simplificado = solo cambio de eficiencia
    (sin re-resolver con cross-eficiencias).
    
    Devuelve:
      • df_out: EFF_t, EFF_t1, EFFCH (EFF_t1/EFF_t)
      • summary: media de EFFCH
    """
    # 1) Índice común de hospitales
    idx = df_t.index.intersection(df_t1.index)
    df_t  = df_t.loc[idx]
    df_t1 = df_t1.loc[idx]

    # 2) Calcular EFF en cada periodo con el helper DEA
    def dea_scores(df_period):
        X = df_period[input_cols].to_numpy(dtype=float)
        Y = df_period[output_cols].to_numpy(dtype=float)
        dea = EnvelopDEA(rts, orientation, n_jobs=n_jobs)
        dea.fit(X, Y)
        return np.array([res.score for res in dea.result])

    eff_t  = dea_scores(df_t)
    eff_t1 = dea_scores(df_t1)

    # 3) Cambio de eficiencia (EFFCH)
    effch = eff_t1 / eff_t

    # 4) DataFrame de salida
    df_out = pd.DataFrame({
        "EFF_t":   eff_t,
        "EFF_t1":  eff_t1,
        "EFFCH":   effch
    }, index=idx)

    # 5) Resumen
    summary = {
        "EFFCH_mean": float(effch.mean())
    }
    return df_out, summary

In [None]:
df_malm_simp, malm_simp_meta = calculate_dea_malmquist_simple(
    df_2014, df_2016,
    input_cols  = ["Bienes y servicios", "Remuneraciones", "Dias Cama Disponibles"],
    output_cols = ["Consultas"],
    rts="CRS", orientation="in",
    n_jobs=4
)

print("Cambio de eficiencia promedio (EFFCH):",
      f"{malm_simp_meta['EFFCH_mean']:.3f}")
df_malm_simp.head()


## PCA

In [None]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from typing import List, Tuple, Dict

def run_pca(df: pd.DataFrame,
            feature_cols: List[str],
            n_components: int | None = None,
            scale: bool = True
           ) -> Tuple[pd.DataFrame, Dict]:
    """
    Ejecuta PCA sobre las columnas `feature_cols` y devuelve:
      • df_pca  : DataFrame con columnas PC1, PC2, …, PCk
      • metrics : dict con varianza explicada y matriz de cargas
    
    Parámetros
    ----------
    df           : DataFrame original
    feature_cols : columnas a incluir en el PCA (solo numéricas)
    n_components : nº de componentes (None ⇒ tantas como variables)
    scale        : estandarizar variables a media 0 y σ 1 antes de PCA
    
    Ejemplo
    -------
    df_pca, meta = run_pca(df, ["slack_bs", "slack_rem", "slack_cama"], 2)
    """
    X = df[feature_cols].to_numpy(dtype=float)
    
    # Escalado opcional
    if scale:
        X = StandardScaler().fit_transform(X)
    
    pca = PCA(n_components=n_components)
    pcs = pca.fit_transform(X)
    
    pc_cols = [f"PC{i+1}" for i in range(pcs.shape[1])]
    df_pca  = pd.DataFrame(pcs, index=df.index, columns=pc_cols)
    
    metrics = {
        "explained_variance_ratio": pca.explained_variance_ratio_.tolist(),
        "components": pd.DataFrame(pca.components_,
                                   index=pc_cols,
                                   columns=feature_cols)
    }
    return df_pca, metrics

In [None]:
import matplotlib.pyplot as plt

# 1) Selecciona las columnas que quieres analizar
input_cols  = ["Bienes y servicios", "Remuneraciones"]
output_cols = ["Consultas", "Quirofanos"]

# unir columnas en una sola lista
cols_pca = input_cols + output_cols

# 2) Ejecuta PCA (2 componentes para plot)
df_pca, pca_meta = run_pca(df_2014, cols_pca, n_components=2)

# 3) KPI rápidos
print("Varianza explicada:", pca_meta["explained_variance_ratio"])
print("\nMatriz de cargas:")
print(pca_meta["components"])

loadings = pca_meta["components"]   # DataFrame PC×vars
squared = loadings**2           # coef²
contribs = squared.div(squared.sum(axis=1), axis=0)

# 4) Mostrar contribuciones de cada variable a los PCs
print("\nContribuciones de variables a los PCs:")
print(contribs)

# 4) Ejemplo de visualización 2-D
plt.figure(figsize=(6,4))
plt.scatter(df_pca["PC1"], df_pca["PC2"])
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.title("Hosp. en espacio de Componentes Principales")
plt.show()


## CLUSTERIZACIÓN

In [None]:
from typing import List, Tuple, Dict
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

# ------- usa la función run_pca que ya tienes --------
def pca_kmeans(df: pd.DataFrame,
               feature_cols: List[str],
               n_components: int = 2,
               k: int | None = None,
               k_max: int = 10,
               scale: bool = True,
               random_state: int = 42
              ) -> Tuple[pd.DataFrame, Dict]:
    """
    1) Ejecuta PCA con run_pca
    2) Aplica K-means (elige k óptimo con silhouette si k=None)
    3) Devuelve df con PCs y 'cluster', y un diccionario de metadatos
    """
    # ---- PCA --------------------------------------------------
    df_pca, pca_meta = run_pca(df, feature_cols,
                               n_components=n_components,
                               scale=scale)
    pc_cols = df_pca.columns.tolist()

    # ---- elegir k si no viene fijo ---------------------------
    if k is None:
        best_k, best_score = 2, -1
        for kk in range(2, k_max + 1):
            km = KMeans(n_clusters=kk, n_init="auto", random_state=random_state)
            labels = km.fit_predict(df_pca[pc_cols])
            score = silhouette_score(df_pca[pc_cols], labels)
            if score > best_score:
                best_k, best_score = kk, score
        k = best_k
        silhouette_best = best_score
    else:
        silhouette_best = None

    # ---- K-means definitivo -----------------------------------
    kmeans = KMeans(n_clusters=k, n_init="auto", random_state=random_state)
    cluster_labels = kmeans.fit_predict(df_pca[pc_cols])

    # ---- ensamblar DataFrame de salida ------------------------
    df_out = df.copy()
    df_out = pd.concat([df_out, df_pca], axis=1)
    df_out["cluster"] = cluster_labels

    # ---- empaquetar metadatos ---------------------------------
    meta = {
        "explained_variance_ratio": pca_meta["explained_variance_ratio"],
        "components": pca_meta["components"],
        "k": k,
        "silhouette": silhouette_best,
        "cluster_centers": pd.DataFrame(kmeans.cluster_centers_,
                                        columns=pc_cols)
    }
    return df_out, meta

In [None]:
# columnas para el PCA
feature_cols = ["Bienes y servicios", "Remuneraciones",
                "Dias Cama Disponibles","ET SFA"]

# ejecutar sfa con funcion creada
df_sfa, sfa_metrics = calculate_sfa_metrics(
    df=df_2014,
    input_cols=["Bienes y servicios", "Remuneraciones", "Dias Cama Disponibles"],
    output_col=["Quirofanos"],
    te_threshold=0.5
)

df_cluster, info = pca_kmeans(df_sfa,
                              feature_cols=feature_cols,
                              n_components=3,   # sólo PC1 y PC2
                              k=None,           # deja que la función elija
                              k_max=8)

print("k elegido:", info["k"])
print("Varianza explicada:", info["explained_variance_ratio"])
print("\nMatriz de cargas:")
print(info["components"])

# 1) Agrupa y agrega
summary = df_cluster.groupby("cluster").agg(
    N_Hospitales = ("cluster", "size"),
    TE_Media     = ("ET SFA", "mean"),
    PC1_media    = ("PC1", "mean"),
    PC2_media    = ("PC2", "mean"),
).reset_index()

# 2) Formatea un poquito (opcional)
summary["TE_Media"]  = summary["TE_Media"].round(2)
summary["PC1_media"] = summary["PC1_media"].round(2)
summary["PC2_media"] = summary["PC2_media"].round(2)

# 3) Muestra
print("\nResumen de clusters:")
print(summary)

# Visualizar
import matplotlib.pyplot as plt
plt.figure(figsize=(6,4))
for cl in range(info["k"]):
    mask = df_cluster["cluster"] == cl
    plt.scatter(df_cluster.loc[mask, "PC1"],
                df_cluster.loc[mask, "PC2"],
                s=25, label=f"Cluster {cl}")
plt.xlabel("PC1"); plt.ylabel("PC2"); plt.legend()
plt.title("Hosp. agrupados por K-means en espacio PCA")
plt.show()

## ANÁLISIS DE DETERMINANTES

In [None]:
# deter_analysis.py
import pandas as pd
import numpy as np
import statsmodels.api as sm
from typing import List, Tuple, Dict

def determinant_analysis(df: pd.DataFrame,
                         dependent: str,
                         independents: List[str],
                         top_n: int = 5,
                         add_constant: bool = True
                        ) -> Tuple[pd.DataFrame, Dict]:
    """
    Ajusta un modelo OLS  y entrega la tabla de coeficientes + métricas resumidas.
    
    Parámetros
    ----------
    df          : DataFrame con los datos (sin nulos en cols usadas)
    dependent   : nombre de la columna dependiente (Y)
    independents: lista de columnas explicativas (X)
    top_n       : nº de determinantes "clave" a destacar
    add_constant: añade intercepto si True
    
    Devuelve
    --------
    coef_table : DataFrame con coef, std_err, t, p
    meta       : dict  {
                         'r2'           : float,
                         'r2_adj'       : float,
                         'top_vars'     : list[str]
                       }
    """
    # 0) Filtrar filas completas
    df_clean = df.dropna(subset=[dependent] + independents).copy()

    # 1) Matrices
    y = df_clean[dependent].astype(float).to_numpy()
    X = df_clean[independents].astype(float)
    if add_constant:
        X = sm.add_constant(X)
    
    # 2) Ajustar modelo
    model = sm.OLS(y, X).fit()
    
    # 3) Tabla de coeficientes
    coef_table = model.summary2().tables[1]                 # coef, std err, t, P>|t|
    coef_table.index.name = "variable"
    coef_table.reset_index(inplace=True)
    
    # 4) Variables “clave”  (|coef| grande & p<0.05)
    sig = coef_table[coef_table["P>|t|"] < 0.05].copy()
    sig["abs_coef"] = sig["Coef."].abs()
    sig_sorted = sig.sort_values("abs_coef", ascending=False)
    top_vars = sig_sorted["variable"].head(top_n).tolist()
    
    # 5) Métricas de resumen
    meta = {
        "r2":      model.rsquared,
        "r2_adj":  model.rsquared_adj,
        "top_vars": top_vars
    }
    return coef_table, meta


In [None]:
coef_tbl, info = determinant_analysis(
    df           = df_sample,
    dependent    = "Eff_DEA",              # o el KPI que quieras explicar
    independents = ["Bienes y servicios",
                    "Remuneraciones",
                    "Días Cama Disponibles",
                    "Consultas"],
    top_n        = 3
)

print("R²:", info["r2"]:.3f, "  –  R² ajustado:", info["r2_adj"]:.3f)
print("Determinantes clave:", info["top_vars"])
display(coef_tbl)
