# Sistema ML: Detección de **Riesgo de Corrupción** en Obras Públicas (Perú)

Este notebook implementa un *pipeline* end-to-end para detectar obras públicas con **riesgo de corrupción**:
1) **Ingesta** de datos (simulada si no se detectan archivos reales).  
2) **ETL** y *feature engineering* con banderas de riesgo.  
3) Entrenamiento de **modelos** (*baseline* y árbol de decisión/ensamble).  
4) **Evaluación** con métricas y curvas.  
5) **XAI**: Importancia por permutación y **PDP/ICE**.  
6) **Exportación** de artefactos (pipeline + modelo) y función de inferencia.

> ⚠️ Reemplace los *placeholders* de rutas por sus fuentes reales (SIAF, SEACE/OSCE, INFObras, Módulos CGR/BID, etc.) cuando estén disponibles.


## Contexto y objetivo

En el marco de la **Contraloría (CGR)** y el proyecto **BID‑3**, buscamos **priorizar** y **alertar** sobre obras con probabilidad de incurrir en **riesgos de corrupción** (p. ej., adicionales y ampliaciones atípicas, fraccionamiento, sanciones previas de empresas, colusión, sobrecostos, baja competencia, etc.).

**Variable objetivo (label)**: `riesgo_corrupcion` (1 = alto riesgo, 0 = bajo riesgo).  
**Fuentes típicas:** SIAF, SEACE/OSCE, INFOBRAS, PERUCOMPRAS, SERVIR, SNCP, padrones de sanciones, y registros internos de la CGR.


In [10]:
# == 0) Config & utilidades
import json
import re
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

plt.rcParams["figure.figsize"] = (7, 5)


def find_data_root(start: Path) -> Path | None:
    for p in [start] + list(start.parents):
        if (p / "data" / "external").is_dir():
            return p / "data" / "external"
    return None


DATA_ROOT = find_data_root(Path.cwd())
# DATA_ROOT = Path(r'/ruta/absoluta/a/tu/proyecto/data/external')  # descomenta y ajusta si es necesario

print("CWD:", Path.cwd())
print("DATA_ROOT:", DATA_ROOT)

ARTIF_DIR = Path("artifacts")
ARTIF_DIR.mkdir(exist_ok=True, parents=True)


def save_df(df: pd.DataFrame, path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    try:
        import pyarrow  # noqa

        if path.suffix.lower() != ".parquet":
            path = path.with_suffix(".parquet")
        df.to_parquet(path, index=False)
        print("[SAVE] Parquet ->", path)
    except Exception:
        if path.suffix.lower() != ".csv":
            path = path.with_suffix(".csv")
        df.to_csv(path, index=False)
        print("[SAVE] CSV ->", path)

CWD: c:\MaestriaUNI\Cursos\III-CICLO\TesisI\Solucion\Deteccion_Corrupcion\notebooks
DATA_ROOT: c:\MaestriaUNI\Cursos\III-CICLO\TesisI\Solucion\Deteccion_Corrupcion\data\external


In [11]:
# == 0.1) Diagnóstico
if DATA_ROOT is not None:
    for sub in ["obra", "empresa", "funcionario", "catalogos"]:
        p = DATA_ROOT / sub
        print(f"{sub:12s}:", p, "exists?", p.exists())
        if p.exists():
            print("  ejemplos:", [x.name for x in list(p.iterdir())[:5]])
else:
    print("No se detectó data/external. Ajusta DATA_ROOT manualmente.")

obra        : c:\MaestriaUNI\Cursos\III-CICLO\TesisI\Solucion\Deteccion_Corrupcion\data\external\obra exists? True
  ejemplos: ['Datos generales - Obra - 2025.05.15.xlsx', 'DS_DASH_Obra_1A.csv', 'DS_DASH_Obra_2A_3A.csv', 'DS_DASH_Obra_2B.csv', 'DS_DASH_Obra_3B.csv']
empresa     : c:\MaestriaUNI\Cursos\III-CICLO\TesisI\Solucion\Deteccion_Corrupcion\data\external\empresa exists? True
  ejemplos: ['Datos generales - Empresa -2023.05.15.xlsx', 'DS_DASH_Empresa_1A.csv', 'DS_DASH_Empresa_1B.csv', 'DS_DASH_Empresa_2A.csv', 'DS_DASH_Empresa_2B.csv']
funcionario : c:\MaestriaUNI\Cursos\III-CICLO\TesisI\Solucion\Deteccion_Corrupcion\data\external\funcionario exists? True
  ejemplos: ['Datos generales - Funcionario - 2025.05.15.xlsx', 'DS_DASH_Miembro_1A.csv', 'DS_DASH_Miembro_2A.csv', 'DS_DASH_Miembro_3A.csv', 'DS_DASH_Miembro_3B.csv']
catalogos   : c:\MaestriaUNI\Cursos\III-CICLO\TesisI\Solucion\Deteccion_Corrupcion\data\external\catalogos exists? True
  ejemplos: ['Diccionario vistas dashboard

In [12]:
# == 1) Ingesta robusta
from pathlib import Path

import pandas as pd

assert DATA_ROOT is not None, "Ajusta DATA_ROOT en la celda de Config."

P_OBRA = DATA_ROOT / "obra"
P_EMP = DATA_ROOT / "empresa"
P_FUNC = DATA_ROOT / "funcionario"


def read_any(path: Path) -> pd.DataFrame:
    if path.suffix.lower() in (".xlsx", ".xls"):
        return pd.read_excel(path)
    for enc in ("utf-8-sig", "latin-1"):
        try:
            return pd.read_csv(path, encoding=enc)
        except Exception:
            pass
    return pd.read_csv(path)


def concat_many(folder: Path, patterns):
    files = []
    for pat in patterns:
        files += list(folder.glob(pat))
    files = sorted(set(files))
    if not files:
        return pd.DataFrame()
    dfs = []
    for f in files:
        try:
            df = read_any(f)
            df["_source"] = f.name
            dfs.append(df)
        except Exception as e:
            print("[WARN]", f.name, "->", e)
    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


obra_raw = concat_many(P_OBRA, ["DS_DASH_Obra_*.*", "Datos generales*Obra*.*"])
emp_raw = concat_many(P_EMP, ["DS_DASH_Empresa_*.*", "Datos generales*Empresa*.*"])
func_raw = concat_many(P_FUNC, ["DS_DASH_Miembro_*.*", "Datos generales*Funcion*.*"])

print(
    "[Obra] shape:",
    obra_raw.shape,
    " [Empresa] shape:",
    emp_raw.shape,
    " [Func] shape:",
    func_raw.shape,
)

save_df(obra_raw, Path("data/bronze/obra_raw"))
save_df(emp_raw, Path("data/bronze/empresa_raw"))
save_df(func_raw, Path("data/bronze/funcionario_raw"))

[Obra] shape: (14344, 58)  [Empresa] shape: (8523, 29)  [Func] shape: (5629, 20)
[SAVE] Parquet -> data\bronze\obra_raw.parquet
[SAVE] CSV -> data\bronze\empresa_raw.csv
[SAVE] Parquet -> data\bronze\funcionario_raw.parquet


In [13]:
# == 2) Estandarización + limpieza (SILVER)
import pandas as pd

colmap_obras = {
    "costo_total": ["MontoContrato", "CostoTotal", "costo_total", "Monto total", "Monto_total"],
    "plazo_meses": ["PlazoMeses", "plazo_meses", "Plazo (meses)"],
    "adicionales_pct": ["AdicPct", "%Adicionales", "adicionales_pct", "Porc_Adicionales"],
    "ampliaciones": ["NroAmpliaciones", "ampliaciones"],
    "penalidades": ["NroPenalidades", "penalidades"],
    "baja_competencia": ["BajaCompetencia", "baja_competencia", "PocosPostores"],
    "consorcio": ["Consorcio", "consorcio"],
    "experiencia_entidad": ["ExperienciaEntidad", "experiencia_entidad"],
    "region_riesgo": ["RegionRiesgo", "region_riesgo"],
    "tipo_proceso": ["TipoProceso", "tipo_proceso"],
    "RUC": ["RUC", "ruc"],
    "CUI": ["CUI", "cui"],
    "riesgo_corrupcion": ["riesgo_corrupcion"],
}
colmap_empresa = {
    "RUC": ["RUC", "ruc"],
    "empresa_sancionada": ["EmpresaSancionada", "SancionOSCE", "empresa_sancionada"],
}


def standardize(df, colmap):
    if df.empty:
        return df
    out = df.copy()
    for std, variants in colmap.items():
        for v in variants:
            if v in out.columns:
                out[std] = out[v]
                break
        if std not in out.columns:
            out[std] = np.nan
    return out


obra = standardize(obra_raw, colmap_obras)
emp = standardize(emp_raw, colmap_empresa)


def to_num(x):
    if pd.isna(x):
        return np.nan
    if isinstance(x, str):
        x = x.replace("S/.", "").replace(",", "").strip()
        x = re.sub(r"[^0-9\\.-eE]", "", x)
    try:
        return float(x)
    except:
        return np.nan


for c in [
    "costo_total",
    "plazo_meses",
    "adicionales_pct",
    "ampliaciones",
    "penalidades",
    "baja_competencia",
    "consorcio",
    "experiencia_entidad",
]:
    if c in obra.columns:
        obra[c] = obra[c].map(to_num)

if "adicionales_pct" in obra.columns:
    obra.loc[obra["adicionales_pct"] > 1, "adicionales_pct"] = obra["adicionales_pct"] / 100.0


def norm_tipo(s):
    if pd.isna(s):
        return np.nan
    s = str(s).lower()
    if "directa" in s:
        return "Contratación Directa"
    if "simplificada" in s:
        return "Adjudicación Simplificada"
    return "Licitación"


def norm_region(s):
    if pd.isna(s):
        return "MEDIA"
    s = str(s).upper()
    return s if s in {"ALTA", "MEDIA", "BAJA"} else "MEDIA"


if "tipo_proceso" in obra.columns:
    obra["tipo_proceso"] = obra["tipo_proceso"].apply(norm_tipo)
obra["region_riesgo"] = obra.get("region_riesgo", "MEDIA")
obra["region_riesgo"] = obra["region_riesgo"].apply(norm_region)


def norm_ruc(x):
    s = re.sub(r"[^0-9]", "", str(x) if x is not None else "")
    return s.zfill(11) if len(s) == 11 else s


if "RUC" in obra.columns:
    obra["RUC"] = obra["RUC"].map(norm_ruc)
if "RUC" in emp.columns:
    emp["RUC"] = emp["RUC"].map(norm_ruc)

obra["empresa_sancionada"] = obra.get("empresa_sancionada", np.nan)

if not emp.empty and "RUC" in emp.columns:
    emp_aux = emp[["RUC", "empresa_sancionada"]].dropna(subset=["RUC"]).copy()
    if emp_aux["empresa_sancionada"].dtype == object:
        emp_aux["empresa_sancionada"] = (
            emp_aux["empresa_sancionada"]
            .astype(str)
            .str.strip()
            .str.lower()
            .map({"si": 1, "sí": 1, "true": 1, "1": 1, "no": 0, "false": 0, "0": 0})
            .fillna(0)
            .astype("int8")
        )
    else:
        emp_aux["empresa_sancionada"] = (
            pd.to_numeric(emp_aux["empresa_sancionada"], errors="coerce").fillna(0).astype("int8")
        )
    emp_map = (
        emp_aux.groupby("RUC", as_index=True)["empresa_sancionada"].max().astype("int8").to_dict()
    )
    obra["empresa_sancionada_emp"] = obra["RUC"].map(emp_map).fillna(0).astype("int8")
    obra["empresa_sancionada"] = (
        pd.to_numeric(obra["empresa_sancionada"], errors="coerce")
        .fillna(obra["empresa_sancionada_emp"])
        .fillna(0)
        .astype("int8")
    )
    obra.drop(columns=["empresa_sancionada_emp"], inplace=True)

for c in [
    "plazo_meses",
    "ampliaciones",
    "penalidades",
    "baja_competencia",
    "consorcio",
    "experiencia_entidad",
    "empresa_sancionada",
]:
    if c in obra.columns:
        obra[c] = pd.to_numeric(obra[c], errors="coerce", downcast="integer")
if "costo_total" in obra.columns:
    obra["costo_total"] = pd.to_numeric(obra["costo_total"], errors="coerce", downcast="float")
if "adicionales_pct" in obra.columns:
    obra["adicionales_pct"] = pd.to_numeric(
        obra["adicionales_pct"], errors="coerce", downcast="float"
    )

for c in ["region_riesgo", "tipo_proceso", "_source"]:
    if c in obra.columns and obra[c].dtype == "object":
        obra[c] = obra[c].astype("category")

print("SILVER obra shape:", obra.shape)
save_df(obra, Path("data/silver/obra_silver"))

SILVER obra shape: (14344, 72)
[SAVE] Parquet -> data\silver\obra_silver.parquet


In [14]:
# == 3) GOLD (features + etiqueta si existe)
features_needed = [
    "costo_total",
    "plazo_meses",
    "adicionales_pct",
    "ampliaciones",
    "penalidades",
    "baja_competencia",
    "empresa_sancionada",
    "consorcio",
    "experiencia_entidad",
    "region_riesgo",
    "tipo_proceso",
]
label_col = "riesgo_corrupcion"

for c in features_needed:
    if c not in obra.columns:
        obra[c] = np.nan

if label_col in obra.columns:
    obra[label_col] = pd.to_numeric(obra[label_col], errors="coerce").astype("Int64")

df = obra[features_needed + ([label_col] if label_col in obra.columns else [])].copy()
print("GOLD df shape:", df.shape)
save_df(df, Path("data/gold/obras_ml"))
df.head(3)

GOLD df shape: (14344, 12)
[SAVE] Parquet -> data\gold\obras_ml.parquet


Unnamed: 0,costo_total,plazo_meses,adicionales_pct,ampliaciones,penalidades,baja_competencia,empresa_sancionada,consorcio,experiencia_entidad,region_riesgo,tipo_proceso,riesgo_corrupcion
0,,,,,,,0,,,MEDIA,,
1,,,,,,,0,,,MEDIA,,
2,,,,,,,0,,,MEDIA,,


In [None]:
# == 4) ETL + Entrenamiento
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    ConfusionMatrixDisplay,
    PrecisionRecallDisplay,
    RocCurveDisplay,
    average_precision_score,
    brier_score_loss,
    classification_report,
    roc_auc_score,
)
from sklearn.model_selection import GridSearchCV, StratifiedKFold, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler


def winsorize(s: pd.Series, p_low=0.01, p_high=0.99):
    lo, hi = s.quantile(p_low), s.quantile(p_high)
    return s.clip(lo, hi)


raw = df.copy()
for c in ["costo_total", "plazo_meses", "adicionales_pct", "ampliaciones", "penalidades"]:
    if c in raw.columns:
        raw[c] = winsorize(pd.to_numeric(raw[c], errors="coerce"))

raw["flag_adicionales_altos"] = (raw["adicionales_pct"] > 0.15).astype("Int64")
raw["flag_muchas_ampliaciones"] = (raw["ampliaciones"] >= 2).astype("Int64")
raw["flag_penalidades"] = (raw["penalidades"] >= 1).astype("Int64")
raw["flag_contratacion_directa"] = (raw["tipo_proceso"] == "Contratación Directa").astype("Int64")
raw["flag_region_alta"] = (raw["region_riesgo"] == "ALTA").astype("Int64")


def synth_label(d: pd.DataFrame, seed=42):
    rng = np.random.default_rng(seed)
    score = (
        0.5 * (d["adicionales_pct"] > 0.15).astype(int)
        + 0.4 * (d["ampliaciones"] >= 2).astype(int)
        + 0.3 * d["empresa_sancionada"].fillna(0).astype(int)
        + 0.25 * d["baja_competencia"].fillna(0).astype(int)
        + 0.2 * (d["penalidades"] >= 1).astype(int)
        + 0.15 * (d["tipo_proceso"] == "Contratación Directa").astype(int)
        + 0.1 * (d["region_riesgo"] == "ALTA").astype(int)
        + 0.05 * d["consorcio"].fillna(0).astype(int)
    )
    prob = 1 / (1 + np.exp(-(score + rng.normal(0, 0.3, len(d)))))
    return (prob > 0.55).astype(int)


if "riesgo_corrupcion" in raw.columns and raw["riesgo_corrupcion"].notna().any():
    raw["riesgo_corrupcion"] = raw["riesgo_corrupcion"].fillna(0).astype(int)
else:
    print("[INFO] No hay etiqueta histórica -> se sintetiza para entrenar.")
    raw["riesgo_corrupcion"] = synth_label(raw)

target = "riesgo_corrupcion"
features = [c for c in raw.columns if c != target]
X = raw[features].copy()
y = raw[target].astype(int)

cat_cols = X.select_dtypes(include=["object", "category"]).columns.tolist()
bin_cols = [
    c for c in X.columns if X[c].dropna().nunique() == 2 and X[c].dtype not in (object, "category")
]
num_cols = [c for c in X.columns if c not in cat_cols + bin_cols]

numeric = Pipeline(steps=[("scaler", StandardScaler())])
categorical = Pipeline(steps=[("ohe", OneHotEncoder(handle_unknown="ignore"))])

preprocess = ColumnTransformer(
    [("num", numeric, num_cols), ("cat", categorical, cat_cols), ("pass", "passthrough", bin_cols)]
)

assert len(X) > 0, "Dataset vacío. Revisa DATA_ROOT/patrones."

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, stratify=y, random_state=42
)


def evaluar(modelo: Pipeline, Xtr, ytr, Xte, yte, name="modelo"):
    modelo.fit(Xtr, ytr)
    ypro = modelo.predict_proba(Xte)[:, 1]
    yhat = (ypro >= 0.5).astype(int)
    roc = roc_auc_score(yte, ypro)
    pr = average_precision_score(yte, ypro)
    print(f"== {name} ==")
    print("ROC-AUC:", round(roc, 4), " PR-AUC:", round(pr, 4))
    print("\\n", classification_report(yte, yhat, digits=4))

    fig, ax = plt.subplots()
    RocCurveDisplay.from_predictions(yte, ypro, ax=ax)
    ax.set_title(f"ROC — {name}")
    plt.show()
    fig, ax = plt.subplots()
    PrecisionRecallDisplay.from_predictions(yte, ypro, ax=ax)
    ax.set_title(f"PR — {name}")
    plt.show()
    fig, ax = plt.subplots()
    ConfusionMatrixDisplay.from_predictions(yte, yhat, ax=ax)
    ax.set_title(f"Matriz — {name}")
    plt.show()
    try:
        print("Brier:", round(brier_score_loss(yte, ypro), 4))
    except:
        pass
    return {"roc_auc": roc, "pr_auc": pr, "model": modelo}


baseline = Pipeline(
    [
        ("prep", preprocess),
        ("clf", LogisticRegression(max_iter=300, class_weight="balanced", solver="liblinear")),
    ]
)
res_base = evaluar(baseline, X_train, y_train, X_test, y_test, "Baseline — RegLog")

rf_pipe = Pipeline(
    [("prep", preprocess), ("rf", RandomForestClassifier(class_weight="balanced", random_state=42))]
)

param_grid = {
    "rf__n_estimators": [200, 350],
    "rf__max_depth": [None, 10],
    "rf__min_samples_split": [2, 10],
    "rf__min_samples_leaf": [1, 3],
}

cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
gs = GridSearchCV(rf_pipe, param_grid, scoring="average_precision", cv=cv, n_jobs=-1, verbose=0)
gs.fit(X_train, y_train)
best_rf = gs.best_estimator_
res_rf = evaluar(best_rf, X_train, y_train, X_test, y_test, "RandomForest (mejor)")
print("Mejores params RF:", gs.best_params_)

best = res_rf if res_rf["pr_auc"] >= res_base["pr_auc"] else res_base
best_name = "RandomForest" if best is res_rf else "RegLog"
best_model = best["model"]
print(
    "Modelo seleccionado:",
    best_name,
    "PR-AUC:",
    round(best["pr_auc"], 4),
    " ROC-AUC:",
    round(best["roc_auc"], 4),
)

In [None]:
# == 5) XAI
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.inspection import PartialDependenceDisplay, permutation_importance

try:
    pi = permutation_importance(best_model, X_test, y_test, n_repeats=5, random_state=42, n_jobs=-1)
    importances = pi.importances_mean

    def get_feature_names(ct, input_features):
        out = []
        for name, trans, cols in ct.transformers_:
            if name == "pass" and trans == "passthrough":
                out.extend(cols if isinstance(cols, list) else [cols])
            else:
                if hasattr(trans, "get_feature_names_out"):
                    out.extend(trans.get_feature_names_out(cols))
                else:
                    out.extend(cols if isinstance(cols, list) else [cols])
        return [str(x) for x in out]

    feat_names = get_feature_names(best_model.named_steps["prep"], X_test.columns)
    idx = np.argsort(importances)[::-1][:20]
    fig, ax = plt.subplots()
    ax.bar(range(len(idx)), importances[idx])
    ax.set_xticks(range(len(idx)))
    ax.set_xticklabels(
        [feat_names[i] if i < len(feat_names) else f"f{i}" for i in idx], rotation=90
    )
    ax.set_title("Importancia por permutación (Top 20)")
    plt.tight_layout()
    plt.show()
except Exception as e:
    print("[WARN] Importancia no disponible:", e)

num_candidates = [c for c in X_test.columns if pd.api.types.is_numeric_dtype(X_test[c])][:3]
for feat in num_candidates:
    try:
        fig, ax = plt.subplots()
        PartialDependenceDisplay.from_estimator(best_model, X_test, [feat], ax=ax)
        ax.set_title(f"PDP — {feat}")
        plt.show()
    except Exception as e:
        print("[WARN] PDP falló", feat, "->", e)

In [None]:
# == 6) Exportar artefactos
from pathlib import Path

import joblib
import pandas as pd

PIPE_PATH = ARTIF_DIR / "preprocess_pipeline.joblib"
MODEL_PATH = ARTIF_DIR / "model.joblib"
META_PATH = ARTIF_DIR / "metadata.json"

joblib.dump(best_model, MODEL_PATH)
try:
    joblib.dump(best_model.named_steps["prep"], PIPE_PATH)
except Exception:
    pass

meta = {
    "model": best_name,
    "metrics": {"roc_auc": float(best["roc_auc"]), "pr_auc": float(best["pr_auc"])},
    "features": X.columns.tolist(),
    "target": "riesgo_corrupcion",
}
with open(META_PATH, "w", encoding="utf-8") as f:
    json.dump(meta, f, ensure_ascii=False, indent=2)

MODEL_PATH, META_PATH

In [None]:
# == 7) Inferencia rápida
import joblib
import pandas as pd


def predict_df(df_new: pd.DataFrame, model_path="artifacts/model.joblib"):
    model = joblib.load(model_path)
    p = model.predict_proba(df_new)[:, 1]
    out = df_new.copy()
    out["prob_riesgo"] = p
    out["pred_riesgo"] = (p >= 0.5).astype(int)
    return out


try:
    predict_df(X_test.head(3)).head()
except Exception as e:
    print("[INFO] Sin muestra:", e)