# Análisis de Pensiones Colombianas

Notebook guía que documenta el pipeline completo de descarga, limpieza, enriquecimiento y modelado del dataset `uawh-cjvi` publicado en Datos Abiertos Colombia.


## Ruta de trabajo

1. Conectar con la API de Socrata y descargar la totalidad del recurso.
2. Estandarizar tipos de datos, controlar calidad y generar diccionarios de apoyo.
3. Deduplicar, detectar outliers y enriquecer el dataset con variables derivadas.
4. Exportar productos de datos (dataset limpio, resumen de ejecución y subconjuntos).
5. Construir un modelo base con árbol de decisión para explorar la relevancia de variables.


In [None]:
import time
from pathlib import Path

import numpy as np
import pandas as pd
import requests

import matplotlib.pyplot as plt
import seaborn as sns

from scipy.stats import entropy as shannon_entropy

from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import mutual_info_classif
from sklearn.inspection import permutation_importance
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.tree import DecisionTreeClassifier, plot_tree

pd.set_option("display.max_columns", None)
sns.set_theme(style="whitegrid")

BASE = "https://www.datos.gov.co"
RESOURCE = "uawh-cjvi"
URL = f"{BASE}/resource/{RESOURCE}.json"

RAW_DIR = Path("data/raw")
PROCESSED_DIR = Path("data/processed")
REPORTS_DIR = Path("data/reports")
for path in (RAW_DIR, PROCESSED_DIR, REPORTS_DIR):
    path.mkdir(parents=True, exist_ok=True)


## Descarga completa desde la API

Socrata limita las respuestas a 1000 filas; se implementa paginación con `$limit` y `$offset` para traer todo el histórico.


In [None]:
def fetch_full_dataset(limit=50_000, delay=0.3):
    pages = []
    offset = 0
    while True:
        params = {"$limit": limit, "$offset": offset}
        response = requests.get(URL, params=params, timeout=120)
        response.raise_for_status()
        chunk = response.json()
        if not chunk:
            break
        pages.append(pd.DataFrame(chunk))
        offset += limit
        print(f"Descargadas: {offset} filas…")
        time.sleep(delay)
    if pages:
        return pd.concat(pages, ignore_index=True)
    return pd.DataFrame()

try:
    total_filas = int(requests.get(f"{URL}?$select=count(*)").json()[0]["count"])
except Exception:
    total_filas = None

print(f"Total reportado por la API: {total_filas}")
df_raw = fetch_full_dataset()
print(f"Filas descargadas: {len(df_raw)}")

df = df_raw.copy()
df.head()


## Exploración inicial

Revisamos la estructura del DataFrame y una muestra de filas para validar la descarga.


In [None]:
if not df.empty:
    display(df.head())
    display(df.sample(min(5, len(df)), random_state=42))
df.info()


## Limpieza de tipos

Se fuerza `fecha` a formato `datetime` y `valor_unidad` a `float`, eliminando símbolos indeseados.


In [None]:
df["fecha"] = pd.to_datetime(df["fecha"], errors="coerce")
df["valor_unidad"] = (
    df["valor_unidad"]
    .astype(str)
    .str.replace(r"[^\d\-,\.]", "", regex=True)
    .str.replace(",", ".", regex=False)
    .astype(float)
)

df.dtypes


## Calidad de datos

Calculamos porcentaje de nulos, cardinalidades y verificamos la relación uno a uno entre códigos y etiquetas.


In [None]:
nulls = df.isna().mean().sort_values(ascending=False).mul(100).round(2)
cardinalidad = df.nunique(dropna=True).sort_values(ascending=False)

print("% de nulos por columna:")
display(nulls)

print("
Cardinalidad por columna:")
display(cardinalidad)

if {"codigo_entidad", "nombre_entidad"}.issubset(df.columns):
    rel_entidad = df.groupby("codigo_entidad")["nombre_entidad"].nunique().sort_values(ascending=False).head()
    print("
Relación código_entidad → nombre_entidad:")
    display(rel_entidad)

if {"codigo_patrimonio", "nombre_fondo"}.issubset(df.columns):
    rel_fondo = df.groupby("codigo_patrimonio")["nombre_fondo"].nunique().sort_values(ascending=False).head()
    print("
Relación código_patrimonio → nombre_fondo:")
    display(rel_fondo)


## Diccionarios de entidades y fondos

Construimos diccionarios de referencia y los exportamos para consultas rápidas.


In [None]:
dict_entidad = (
    df[["nombre_entidad", "codigo_entidad"]]
    .drop_duplicates()
    .dropna(subset=["nombre_entidad", "codigo_entidad"])
    .set_index("nombre_entidad")["codigo_entidad"]
    .to_dict()
)

support = df[["nombre_fondo", "codigo_patrimonio"]]
support = support.drop_duplicates().dropna(subset=["nombre_fondo", "codigo_patrimonio"])
dict_fondo = support.set_index("nombre_fondo")["codigo_patrimonio"].to_dict()

pd.DataFrame(list(dict_entidad.items()), columns=["nombre_entidad", "codigo_entidad"]).to_csv(
    RAW_DIR / "entidad_codigo.csv", index=False
)
pd.DataFrame(list(dict_fondo.items()), columns=["nombre_fondo", "codigo_patrimonio"]).to_csv(
    RAW_DIR / "fondos_codigo.csv", index=False
)

print(f"Diccionario de entidades: {len(dict_entidad)} entradas")
print(f"Diccionario de fondos: {len(dict_fondo)} entradas")


## Normalización de texto

Homogeneizamos espacios y mayúsculas/minúsculas en nombres de entidad y fondo para reducir cardinalidad artificial.


In [None]:
for columna in ["nombre_entidad", "nombre_fondo"]:
    if columna in df.columns:
        df[columna] = (
            df[columna]
            .astype(str)
            .str.strip()
            .str.replace(r"\s+", " ", regex=True)
        )

df[["nombre_entidad", "nombre_fondo"]].nunique()


## Deduplicación

Se eliminan duplicados exactos y, posteriormente, duplicados conceptuales definidos por (`nombre_entidad`, `nombre_fondo`, `fecha`).


In [None]:
duplicados = df.duplicated().sum()
print(f"Filas duplicadas exactas: {duplicados}")
if duplicados > 0:
    df = df.drop_duplicates()
    print(f"Dataset tras remover duplicados exactos: {len(df)} filas")

duplicados_conceptuales = df.duplicated(subset=["nombre_entidad", "nombre_fondo", "fecha"]).sum()
print(f"Duplicados conceptuales: {duplicados_conceptuales}")
if duplicados_conceptuales > 0:
    df = df.drop_duplicates(subset=["nombre_entidad", "nombre_fondo", "fecha"], keep="first")
    print(f"Dataset tras limpieza conceptual: {len(df)} filas")


## Detección de outliers

Utilizamos el rango intercuartílico (IQR) para detectar observaciones atípicas en `valor_unidad` y creamos una bandera `es_outlier`.


In [None]:
Q1 = df['valor_unidad'].quantile(0.25)
Q3 = df['valor_unidad'].quantile(0.75)
IQR = Q3 - Q1
limite_inferior = Q1 - 1.5 * IQR
limite_superior = Q3 + 1.5 * IQR

outliers = df[(df['valor_unidad'] < limite_inferior) | (df['valor_unidad'] > limite_superior)]
porcentaje_outliers = (len(outliers) / len(df) * 100) if len(df) else 0

df['es_outlier'] = (
    (df['valor_unidad'] < limite_inferior) |
    (df['valor_unidad'] > limite_superior)
)

print(f"Límite inferior: {limite_inferior:.2f}")
print(f"Límite superior: {limite_superior:.2f}")
print(f"Outliers detectados: {len(outliers)} ({porcentaje_outliers:.2f}%)")


## Optimización y variables derivadas

Convertimos columnas de alta repetición a categoría y generamos atributos temporales y de tipología de fondo.


In [None]:
if 'nombre_entidad' in df.columns:
    df['nombre_entidad'] = df['nombre_entidad'].astype('category')
if 'nombre_fondo' in df.columns:
    df['nombre_fondo'] = df['nombre_fondo'].astype('category')
df['es_outlier'] = df['es_outlier'].astype('bool')

df['año'] = df['fecha'].dt.year
df['mes'] = df['fecha'].dt.month
df['trimestre'] = df['fecha'].dt.quarter

def clasificar_fondo(nombre_fondo: str) -> str:
    nombre = str(nombre_fondo).lower()
    if 'cesantia' in nombre:
        return 'Cesantías'
    if 'pension' in nombre:
        return 'Pensiones'
    if 'alternativo' in nombre:
        return 'Alternativo'
    return 'Otros'

df['tipo_fondo'] = df['nombre_fondo'].astype(str).apply(clasificar_fondo).astype('category')

df[['tipo_fondo', 'año', 'mes', 'trimestre']].head()


## Exportaciones principales

Generamos el dataset limpio, un resumen de la sesión y guardamos versiones sin códigos para análisis posteriores.


In [None]:
columnas_a_eliminar = [c for c in ["codigo_entidad", "codigo_patrimonio"] if c in df.columns]
df_clean = df.drop(columns=columnas_a_eliminar)

df_clean.to_csv(RAW_DIR / "pensionesLimpio.csv", index=False)
df.to_csv(PROCESSED_DIR / "pensiones_limpio_final.csv", index=False, encoding='utf-8')

resumen_limpieza = {
    'filas_finales': len(df),
    'columnas_finales': len(df.columns),
    'duplicados_eliminados': int(duplicados),
    'duplicados_conceptuales_eliminados': int(duplicados_conceptuales),
    'outliers_detectados': int(len(outliers)),
    'memoria_mb': float(df.memory_usage(deep=True).sum() / 1024**2),
    'fecha_limpieza': pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
}

pd.Series(resumen_limpieza).to_csv(PROCESSED_DIR / "resumen_limpieza.csv")
resumen_limpieza


## Subconjuntos temáticos

Se generan CSV específicos por entidad y por tipo de fondo que facilitan análisis focalizados.


In [None]:
def guardar_subset(df_origen, columna, valores, nombre_archivo, match="exact"):
    if isinstance(valores, (list, tuple, set)):
        valores_iter = list(valores)
    else:
        valores_iter = [valores]

    if match == "contains":
        mascara = pd.Series(False, index=df_origen.index)
        for valor in valores_iter:
            mascara |= df_origen[columna].astype(str).str.contains(valor, case=False, na=False)
    else:
        mascara = df_origen[columna].isin(valores_iter)

    subset = df_origen.loc[mascara].copy()
    if columna in subset.columns:
        subset = subset.drop(columns=[columna])
    ruta = RAW_DIR / nombre_archivo
    subset.to_csv(ruta, index=False)
    print(f"{ruta}: {subset.shape[0]} filas")

guardar_subset(df_clean, "nombre_entidad", ["Skandia", "Skandia Afp - Accai"], "pensiones_skandia.csv", match="contains")
guardar_subset(df_clean, "nombre_entidad", ["Proteccion"], "pensiones_proteccion.csv", match="contains")
guardar_subset(df_clean, "nombre_entidad", ["Porvenir"], "pensiones_porvenir.csv", match="contains")
guardar_subset(df_clean, "nombre_entidad", ["Colfondos"], "colfondos_colfondos.csv", match="contains")

guardar_subset(df_clean, "nombre_fondo", ["Fondo de Cesantias Largo Plazo"], "fondo_cesantias_largo_plazo.csv")
guardar_subset(df_clean, "nombre_fondo", ["Fondo de Cesantias Corto Plazo"], "fondo_cesantias_corto_plazo.csv")
guardar_subset(df_clean, "nombre_fondo", ["Fondo de Pensiones Moderado"], "fondo_pensiones_moderado.csv")
guardar_subset(df_clean, "nombre_fondo", ["Fondo de Pensiones Conservador"], "fondo_pensiones_conservador.csv")
guardar_subset(df_clean, "nombre_fondo", ["Fondo de Pensiones Mayor Riesgo"], "fondo_pensiones_mayor_riesgo.csv")
guardar_subset(df_clean, "nombre_fondo", ["Fondo de Pensiones Retiro Programado"], "fondo_pensiones_retiro_programado.csv")
guardar_subset(df_clean, "nombre_fondo", ["Fondo de Pensiones Alternativo"], "fondo_pensiones_alternativo.csv")


## Modelo exploratorio: árbol de decisión

Se entrena un árbol de decisión poco profundo para estimar terciles de `valor_unidad` y estudiar la relevancia de los predictores disponibles.


In [None]:
df_modelo = df_clean.dropna(subset=['valor_unidad']).copy()
df_modelo['target_bin'] = pd.qcut(df_modelo['valor_unidad'], q=3, labels=['bajo', 'medio', 'alto'])

num_cols = ['valor_unidad']
cat_cols = [c for c in ['nombre_entidad', 'nombre_fondo', 'tipo_fondo'] if c in df_modelo.columns]

X = df_modelo[num_cols + cat_cols].copy()
y = df_modelo['target_bin'].astype('category')

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=42, stratify=y
)

print(f"Train size: {X_train.shape}, Test size: {X_test.shape}")

def shannon_entropy_categorical(serie: pd.Series) -> float:
    counts = serie.value_counts(dropna=True)
    probabilidades = counts / counts.sum()
    return float(shannon_entropy(probabilidades, base=2))

def shannon_entropy_numeric(serie: pd.Series, bins: int = 20) -> float:
    serie = serie.dropna()
    hist, _ = np.histogram(serie, bins=bins)
    probabilidades = hist / hist.sum() if hist.sum() else np.array([1.0])
    return float(shannon_entropy(probabilidades, base=2))

entropias = {}
for col in cat_cols:
    entropias[col] = shannon_entropy_categorical(X_train[col].astype(str))
for col in num_cols:
    entropias[col] = shannon_entropy_numeric(X_train[col].astype(float), bins=20)
entropias = pd.Series(entropias).sort_values(ascending=False)

print("Entropía (bits) por variable:")
display(entropias)

preprocesador = ColumnTransformer(
    transformers=[
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse=False), cat_cols),
        ('num', 'passthrough', num_cols)
    ],
    remainder='drop'
)

X_train_enc = preprocesador.fit_transform(X_train)
X_test_enc = preprocesador.transform(X_test)

cat_feature_names = list(preprocesador.named_transformers_['cat'].get_feature_names_out(cat_cols)) if cat_cols else []
feature_names = cat_feature_names + num_cols

mi_scores = mutual_info_classif(X_train_enc, y_train.cat.codes, random_state=42)
mi_series = pd.Series(mi_scores, index=feature_names).sort_values(ascending=False)
print("Información mutua con la clase:")
display(mi_series.head(20))

arbol = DecisionTreeClassifier(
    criterion='entropy',
    max_depth=3,
    min_samples_leaf=50,
    random_state=42
)

pipeline = Pipeline([
    ('prep', preprocesador),
    ('tree', arbol)
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)

print("Reporte de clasificación:")
print(classification_report(y_test, y_pred))
print("Matriz de confusión:")
print(confusion_matrix(y_test, y_pred))

arbol_vis = DecisionTreeClassifier(
    criterion='entropy',
    max_depth=3,
    min_samples_leaf=50,
    random_state=42
)
arbol_vis.fit(X_train_enc, y_train)

plt.figure(figsize=(18, 8))
plot_tree(
    arbol_vis,
    feature_names=feature_names,
    class_names=list(y_train.cat.categories),
    filled=True,
    rounded=True,
    impurity=True
)
plt.title("Árbol de decisión (criterio = entropía)")
plt.show()

importancias_arbol = pd.Series(arbol_vis.feature_importances_, index=feature_names)
print("Importancias por reducción de entropía:")
display(importancias_arbol.sort_values(ascending=False).head(20))

importancias_perm = permutation_importance(
    pipeline,
    X_test,
    y_test,
    n_repeats=10,
    random_state=42,
    n_jobs=-1
)
importancias_perm_df = pd.Series(importancias_perm.importances_mean, index=num_cols + cat_cols).sort_values(ascending=False)
print("Importancias por permutación:")
display(importancias_perm_df.head(20))


## Exportes analíticos

Guardamos los resultados del análisis de importancia de variables y el gráfico del árbol para documentar el modelo exploratorio.


In [None]:
mi_series.head(20).to_csv(REPORTS_DIR / "top20_info_mutua.csv")
importancias_arbol.sort_values(ascending=False).to_csv(REPORTS_DIR / "importancias_arbol_entropy.csv")
importancias_perm_df.to_csv(REPORTS_DIR / "importancias_permutacion.csv")

plt.figure(figsize=(18, 8))
plot_tree(
    arbol_vis,
    feature_names=feature_names,
    class_names=list(y_train.cat.categories),
    filled=True,
    rounded=True,
    impurity=True
)
plt.title("Árbol de decisión (criterio = entropía)")
plt.savefig(REPORTS_DIR / "arbol_decision_entropy.png", dpi=300, bbox_inches='tight')
plt.close()


## Próximos pasos

- Incorporar visualizaciones adicionales (series de tiempo, boxplots por tipo de fondo) sobre `df_clean`.
- Ajustar hiperparámetros o probar modelos alternativos (p. ej. `RandomForestClassifier`) manteniendo la misma matriz de diseño.
- Automatizar la actualización periódica del dataset mediante tareas programadas.
