# Análisis multivariado - Reactiva Perú 2022

**Autores:** Luis Lucero, Dayvis Quispe

**Fuente de datos (2022):** https://www.mef.gob.pe/contenidos/archivos-descarga/REACTIVA_Lista_beneficiarios_270422.xlsx


### 1. Importación y preparación de datos

In [None]:
# Librerías
import os, re, unicodedata, warnings, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.linear_model import LogisticRegressionCV
from sklearn.pipeline import Pipeline
from sklearn.metrics import (classification_report, confusion_matrix, roc_auc_score, roc_curve, auc)

warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 100)

print("Versiones -> pandas:", pd.__version__)


In [None]:
# Configuración de dataset
# Edita 'dataset_path' si tu archivo está en otra ruta o con otro nombre.
# Si lo dejas vacío, intentaremos detectar automáticamente un Excel que contenga 'reactiva' en /mnt/data
dataset_path = ""  # ejemplo: "/mnt/data/reactiva_peru_2022.xlsx"

def find_dataset(default_hint="reactiva"):
    candidates = []
    for root, _, files in os.walk("/mnt/data"):
        for f in files:
            if f.lower().endswith((".xlsx", ".xls")) and default_hint in f.lower():
                candidates.append(os.path.join(root, f))
    return sorted(candidates)

if not dataset_path or not os.path.exists(dataset_path):
    candidates = find_dataset()
    if candidates:
        dataset_path = candidates[0]

if not dataset_path or not os.path.exists(dataset_path):
    raise FileNotFoundError("No se encontró el dataset. Edita 'dataset_path' con la ruta correcta.")

print("Usando dataset:", dataset_path)

# Leer Excel
df = pd.read_excel(dataset_path)
print("Dimensiones:", df.shape)
df.head()


In [None]:
# Estandarización de nombres y alias (tildes/variantes)
def slug(s):
    s = (unicodedata.normalize("NFKD", str(s))
         .encode("ascii", "ignore").decode("ascii"))
    s = re.sub(r"[^0-9a-zA-Z]+", "_", s).strip("_").lower()
    return s

df.columns = [slug(c) for c in df.columns]

# Diccionario de alias -> claves canónicas (soporta tildes/variantes)
alias = {
    "repro": ["repro"],
    "sector_economico": ["sector_economico", "sector_economico_", "sector_economico__"],
    "departamento": ["departamento"],
    "entidad_otorgante_credito": [
        "entidad_otorgante_credito",
        "nombre_de_entidad_otorgante_del_credito",
        "entidad_otorgante_del_credito"
    ],
    "saldo_insoluto": [
        "saldo_insoluto", "saldo_insoluto_s"
    ],
    "cobertura_saldo_insoluto": [
        "cobertura_del_saldo_insoluto_s", "cobertura_saldo_insoluto"
    ],
}

# Construimos un mapa de renombrado canónico
rename_map = {}
present = set(df.columns)
for canon, variants in alias.items():
    for v in variants:
        if v in present:
            rename_map[v] = canon
df = df.rename(columns=rename_map)

print("Columnas (estandarizadas):", list(df.columns))


In [None]:
# Mapeo REPRO (SI/NO -> 1/0) y conversión de numéricos
if "repro" in df.columns:
    if df["repro"].dtype == object:
        vals = df["repro"].astype(str).str.strip().str.upper()
        df["repro"] = vals.map({"SI": 1, "NO": 0}).fillna(pd.to_numeric(vals, errors="coerce"))
    df["repro"] = pd.to_numeric(df["repro"], errors="coerce")
else:
    print("⚠️ Falta la columna 'REPRO' (o alias). No se podrá entrenar el modelo.")

# Conversión numérica de métricas monetarias si existen
for num_col in ["saldo_insoluto", "cobertura_saldo_insoluto"]:
    if num_col in df.columns:
        if df[num_col].dtype == object:
            # Quita separadores de miles y maneja decimales
            df[num_col] = (df[num_col].astype(str)
                           .str.replace(r"[\s\,](?=\d{3}(?:\D|$))", "", regex=True)
                           .str.replace(",", ".", regex=False))
        df[num_col] = pd.to_numeric(df[num_col], errors="coerce")

# Vista rápida
display(df.head())
df.describe(include="all").T.head(20)


In [None]:
# Correlación (Spearman) entre numéricos de interés
num_pair = ["saldo_insoluto", "cobertura_saldo_insoluto"]
if all(col in df.columns for col in num_pair):
    spearman = df[num_pair].corr(method="spearman").iloc[0, 1]
    print("Correlación Spearman saldo_insoluto vs cobertura_saldo_insoluto:", spearman)
else:
    print("No se pudo calcular la correlación solicitada; faltan columnas:", [c for c in num_pair if c not in df.columns])


### 2. Analisis descriptivo

In [None]:
# 1) Distribución REPRO (tabla + gráfico circular)
if "repro" in df.columns:
    counts = df["repro"].value_counts(dropna=False).sort_index()
    print("Distribución REPRO (0/1):")
    print(counts)

    fig = plt.figure()
    labels = counts.index.astype(str)
    sizes = counts.values
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
    plt.title("Porcentaje de empresas con REPRO")
    plt.axis('equal')
    plt.show()
else:
    print("Sin 'repro', se omite el gráfico circular.")


In [None]:
# 2) Frecuencia de empresas por DEPARTAMENTO (gráfico de barras)
if "departamento" in df.columns:
    freq_depa = df["departamento"].value_counts()
    print(freq_depa.head())

    fig = plt.figure()
    freq_depa.sort_values(ascending=True).plot(kind="barh")
    plt.title("Frecuencia de empresas según región (Departamento)")
    plt.xlabel("Frecuencia de empresas")
    plt.ylabel("Región")
    plt.tight_layout()
    plt.show()

    # Porcentajes
    perc_depa = (freq_depa / freq_depa.sum() * 100).sort_values(ascending=True)
    fig = plt.figure()
    perc_depa.plot(kind="barh")
    for i, v in enumerate(perc_depa.values):
        plt.text(v, i, f"{v:.1f}%", va='center')
    plt.title("Porcentaje de empresas según región")
    plt.xlabel("Porcentaje de empresas")
    plt.ylabel("Región")
    plt.tight_layout()
    plt.show()


In [None]:
# 3) Frecuencia y porcentaje por SECTOR ECONOMICO
if "sector_economico" in df.columns:
    freq_sector = df["sector_economico"].value_counts()
    print(freq_sector.head())

    fig = plt.figure()
    freq_sector.sort_values(ascending=True).plot(kind="barh")
    plt.title("Frecuencia de empresas según sector económico")
    plt.xlabel("Frecuencia de empresas")
    plt.ylabel("Sector Económico")
    plt.tight_layout()
    plt.show()

    # Porcentajes
    perc_sector = (freq_sector / freq_sector.sum() * 100).sort_values(ascending=True)
    fig = plt.figure()
    perc_sector.plot(kind="barh")
    for i, v in enumerate(perc_sector.values):
        plt.text(v, i, f"{v:.1f}%", va='center')
    plt.title("Porcentaje de empresas según sector económico")
    plt.xlabel("Porcentaje de empresas")
    plt.ylabel("Sector Económico")
    plt.tight_layout()
    plt.show()


In [None]:
# 4) Frecuencia por sector económico y REPRO
if "sector_economico" in df.columns and "repro" in df.columns:
    crosstab = pd.crosstab(df["sector_economico"], df["repro"]).sort_index()
    print(crosstab.head())

    fig = plt.figure()
    crosstab.plot(kind="bar")
    plt.title("Frecuencia de empresas por sector económico y REPRO")
    plt.xlabel("Sector Económico")
    plt.ylabel("Frecuencia de empresas")
    plt.tight_layout()
    plt.show()


### 3. Desarrollo del modelo

In [None]:
# Variables requeridas para el modelo
needed = ["sector_economico", "entidad_otorgante_credito", "departamento", "saldo_insoluto", "repro"]
missing = [c for c in needed if c not in df.columns]
if missing:
    raise RuntimeError(f"No se puede entrenar el modelo. Faltan columnas clave: {missing}")

# Construcción de X, y
base_features = ["sector_economico", "entidad_otorgante_credito", "departamento", "saldo_insoluto"]
if "cobertura_saldo_insoluto" in df.columns:
    base_features.append("cobertura_saldo_insoluto")

X = df[base_features].copy()
y = df["repro"].astype(int)

# Split estratificado
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

cat_features = ["sector_economico", "entidad_otorgante_credito", "departamento"]
num_features = [c for c in ["saldo_insoluto", "cobertura_saldo_insoluto"] if c in X.columns]

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_features),
        ("num", StandardScaler(with_mean=False), num_features),
    ],
    remainder="drop"
)

# Logistic Regression con L1 + CV estratificada
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
logit_cv = LogisticRegressionCV(
    Cs=np.logspace(-3, 3, 10),
    cv=cv,
    penalty="l1",
    solver="saga",
    scoring="roc_auc",
    max_iter=5000,
    n_jobs=-1,
    refit=True
)

pipe = Pipeline([("prep", preprocess), ("clf", logit_cv)])
pipe.fit(X_train, y_train)

try:
    best_C = pipe.named_steps["clf"].C_[0]
except Exception:
    best_C = pipe.named_steps["clf"].C_.ravel()[0]
print("Mejor C (inverso de lambda) por validación cruzada:", best_C)


In [None]:
# Métricas, ROC y principales coeficientes
def get_feature_names(preprocess, X_fit):
    names = []
    for name, trans, cols in preprocess.transformers_:
        if name == "cat":
            ohe = trans
            try:
                fn = list(ohe.get_feature_names_out(cols))
            except TypeError:
                # Compatibilidad scikit-learn viejo
                fn = [f"{cols[i]}_{cat}" for i, categories in enumerate(ohe.categories_) for cat in categories]
            names.extend(fn)
        elif name == "num":
            if isinstance(cols, list):
                names.extend(cols)
            else:
                names.append(cols)
    return names

# Predicciones y métricas
y_proba = pipe.predict_proba(X_test)[:, 1]
y_pred = (y_proba >= 0.5).astype(int)

print("Matriz de confusión (test):")
print(confusion_matrix(y_test, y_pred))
print("\nReporte de clasificación (test):")
print(classification_report(y_test, y_pred, digits=3))
print("ROC AUC (test):", roc_auc_score(y_test, y_proba))

# Curva ROC
fpr, tpr, _ = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)
fig = plt.figure()
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
plt.plot([0,1], [0,1], linestyle="--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Curva ROC (test)")
plt.legend()
plt.tight_layout()
plt.show()

# Coeficientes principales (valor absoluto)
feature_names = get_feature_names(pipe.named_steps["prep"], X_train)
coefs = pipe.named_steps["clf"].coef_.ravel()
coef_df = pd.DataFrame({"feature": feature_names, "coef": coefs})
coef_df["abs_coef"] = coef_df["coef"].abs()
top_n = 20
top_df = coef_df.sort_values("abs_coef", ascending=False).head(top_n)
print(top_df)

# Gráfico de coeficientes
fig = plt.figure()
y_pos = np.arange(len(top_df))
plt.barh(y_pos, top_df["coef"].values)
plt.yticks(y_pos, top_df["feature"].values)
plt.gca().invert_yaxis()
plt.title(f"Top {top_n} coeficientes (L1)")
plt.xlabel("Coeficiente")
plt.tight_layout()
plt.show()


### Notas de compatibilidad

- **glmnet vs. scikit-learn**: `glmnet` usa \(\lambda\) (fuerza de regularización). En `LogisticRegressionCV`, el parámetro equivalente es \(C = 1/\lambda\). Aquí seleccionamos `C` por CV estratificada con `solver='saga'` y `penalty='l1'`.
- **Codificación y escalado**: En R se usa `model.matrix`; en Python se replicó con `OneHotEncoder` para categóricas y `StandardScaler` para numéricas dentro de `ColumnTransformer`. Esto puede cambiar el conteo/nombre de variables y, por ende, la magnitud de coeficientes.
- **Rutas y nombres de columnas**: Se agregó un diccionario de *alias* y una función que estandariza tildes y símbolos. Si un nombre no coincide, renómbralo o añade un alias.
- **Umbral de decisión**: El reporte usa umbral 0.5. Para ajustar sensibilidad/especificidad, modifica el umbral, por ejemplo:

  ```python
  y_pred = (y_proba >= 0.35).astype(int)  # ejemplo
  ```
- **Hiperparámetros**: Ajusta la malla de `Cs` (e.g., `np.logspace(-4, 2, 20)`) o `max_iter`. Si el entrenamiento no converge, aumenta `max_iter` o estandariza mejor las variables.
- **Desbalance de clases**: Considera `class_weight='balanced'` en `LogisticRegressionCV`, usar métricas como AUC-PR, o técnicas de *resampling* (SMOTE/undersampling). También puedes hacer *threshold moving* con el ROC/PR.
