# Proyecto 2 — Detección de Fraude (Pipeline Completo)
Notebook generado con preprocesamiento, SMOTE en CV, selección de características, búsqueda de hiperparámetros y evaluación.

**Requisitos**: `scikit-learn>=1.2`, `imbalanced-learn`, `pandas`, `numpy`.

**Entrada**: `creditcard.csv` (Kaggle: mlg-ulb/creditcardfraud) ubicado junto al notebook.

## 1) Setup e importaciones

In [2]:

# !pip install -q scikit-learn imbalanced-learn pandas numpy
import os, numpy as np, pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC
from sklearn.feature_selection import SequentialFeatureSelector, RFECV, SelectKBest, mutual_info_classif, SelectFromModel
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, precision_recall_curve
RANDOM_STATE = 42


## 2) Utilidades (métricas, evaluación, preprocesador)

In [3]:

def cargar_datos(path_csv: str = "dataset/creditcard.csv") -> pd.DataFrame:
    df = pd.read_csv(path_csv)
    return df

def construir_preprocesador(feature_names):
    base_cols = ["Time", "Amount"]
    pca_cols = [c for c in feature_names if c.startswith("V")]
    return ColumnTransformer([
        ("robust", RobustScaler(), base_cols),
        ("std", StandardScaler(with_mean=False), pca_cols),
    ], remainder="drop")

def metricas_desde_confusion(cm):
    tn, fp, fn, tp = cm.ravel()
    acc = (tp + tn) / (tp + tn + fp + fn + 1e-12)
    sensibilidad = tp / (tp + fn + 1e-12)
    especificidad = tn / (tn + fp + 1e-12)
    precision = tp / (tp + fp + 1e-12)
    f1 = 2 * precision * sensibilidad / (precision + sensibilidad + 1e-12)
    return dict(exactitud=acc, sensibilidad=sensibilidad, especificidad=especificidad,
                precision=precision, f1=f1, tp=int(tp), fp=int(fp), tn=int(tn), fn=int(fn))

def evaluar_modelo(nombre, y_true, y_pred, y_score=None):
    print(f"\\n=== {nombre} ===")
    print(classification_report(y_true, y_pred, digits=4))
    cm = confusion_matrix(y_true, y_pred)
    print("Matriz de confusión:\\n", cm)
    resumen = metricas_desde_confusion(cm)
    if y_score is not None:
        try:
            auc = roc_auc_score(y_true, y_score)
            resumen["roc_auc"] = auc
            print(f"ROC-AUC: {auc:.4f}")
        except Exception as e:
            print("No fue posible calcular ROC-AUC:", e)
    return resumen


## 3) Pipelines con selección de características + GridSearchCV (SMOTE en CV)

In [4]:

def pipeline_knn(feature_names):
    pre = construir_preprocesador(feature_names)
    knn = KNeighborsClassifier()
    sfs = SequentialFeatureSelector(
        estimator=KNeighborsClassifier(n_neighbors=5),
        n_features_to_select=15,
        direction="forward",
        scoring="f1",
        n_jobs=-1
    )
    pipe = ImbPipeline([("pre", pre),
                        ("smote", SMOTE(random_state=RANDOM_STATE, k_neighbors=5)),
                        ("sfs", sfs),
                        ("clf", knn)])
    param_grid = {"clf__n_neighbors": [3,5,7,9,11],
                  "clf__metric": ["euclidean", "manhattan"]}
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    return GridSearchCV(pipe, param_grid, scoring="f1", cv=cv, n_jobs=-1, refit=True, verbose=1)

def pipeline_arbol(feature_names):
    pre = construir_preprocesador(feature_names)
    dt = DecisionTreeClassifier(random_state=RANDOM_STATE)
    rfecv = RFECV(estimator=DecisionTreeClassifier(random_state=RANDOM_STATE),
                  step=2, min_features_to_select=10, scoring="f1", cv=3, n_jobs=-1)
    pipe = ImbPipeline([("pre", pre),
                        ("smote", SMOTE(random_state=RANDOM_STATE, k_neighbors=5)),
                        ("rfecv", rfecv),
                        ("clf", dt)])
    param_grid = {"clf__max_depth":[5,10,15,20,None],
                  "clf__min_samples_split":[2,5,10],
                  "clf__min_samples_leaf":[1,2,4]}
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    return GridSearchCV(pipe, param_grid, scoring="f1", cv=cv, n_jobs=-1, refit=True, verbose=1)

def pipeline_svm(feature_names):
    pre = construir_preprocesador(feature_names)
    selector = SelectKBest(score_func=mutual_info_classif, k=20)
    svm = LinearSVC(random_state=RANDOM_STATE)
    pipe = ImbPipeline([("pre", pre),
                        ("smote", SMOTE(random_state=RANDOM_STATE, k_neighbors=5)),
                        ("sel", selector),
                        ("clf", svm)])
    param_grid = {"clf__C":[0.1,1,10],
                  "clf__tol":[1e-3,1e-4],
                  "sel__k":[15,20,25]}
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    return GridSearchCV(pipe, param_grid, scoring="f1", cv=cv, n_jobs=-1, refit=True, verbose=1)

def pipeline_rf(feature_names):
    pre = construir_preprocesador(feature_names)
    sfm = SelectFromModel(RandomForestClassifier(n_estimators=200, random_state=RANDOM_STATE),
                          threshold="median")
    rf = RandomForestClassifier(random_state=RANDOM_STATE)
    pipe = ImbPipeline([("pre", pre),
                        ("smote", SMOTE(random_state=RANDOM_STATE, k_neighbors=5)),
                        ("sel", sfm),
                        ("clf", rf)])
    param_grid = {"clf__n_estimators":[200,400,600],
                  "clf__max_depth":[None,10,15,20],
                  "clf__min_samples_leaf":[1,2,4]}
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    return GridSearchCV(pipe, param_grid, scoring="f1", cv=cv, n_jobs=-1, refit=True, verbose=1)


## 4) Carga de datos y split 70/30 estratificado

In [5]:

# === Cargar datos ===
path_csv = "dataset/creditcard.csv"  # Ajusta si tu archivo está en otra ruta
if not os.path.exists(path_csv):
    raise FileNotFoundError("No se encontró creditcard.csv. Descárgalo de Kaggle (mlg-ulb/creditcardfraud).")

df = cargar_datos(path_csv)
feature_cols = ["Time", "Amount"] + [f"V{i}" for i in range(1, 29)]
X = df[feature_cols].copy()
y = df["Class"].astype(int).values

# Split 70/30 estratificado
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.30, stratify=y, random_state=RANDOM_STATE
)
print(f"Fraude total: {y.mean()*100:.4f}% | Train: {y_train.mean()*100:.4f}% | Test: {y_test.mean()*100:.4f}%")


Fraude total: 0.1727% | Train: 0.1725% | Test: 0.1732%


## 5) Entrenamiento + evaluación + resumen comparativo

In [None]:

# === Entrenar modelos con GridSearchCV ===
modelos = {
    "k-NN": pipeline_knn(feature_cols),
    "Árbol de Decisión": pipeline_arbol(feature_cols),
    "SVM (LinearSVC)": pipeline_svm(feature_cols),
    "Random Forest": pipeline_rf(feature_cols),
}
resultados = {}

for nombre, grid in modelos.items():
    print("\n" + "="*80)
    print(f"Entrenando {nombre} + GridSearchCV ...")
    grid.fit(X_train, y_train)
    print(f"Mejores hiperparámetros ({nombre}): {grid.best_params_}")
    best = grid.best_estimator_
    y_pred = best.predict(X_test)

    y_score = None
    if hasattr(best, "decision_function"):
        try:
            y_score = best.decision_function(X_test)
        except Exception:
            y_score = None
    if y_score is None and hasattr(best, "predict_proba"):
        try:                                                                                            
            y_score = best.predict_proba(X_test)[:, 1]
        except Exception:
            y_score = None

    resumen = evaluar_modelo(nombre, y_test, y_pred, y_score)
    resultados[nombre] = {"mejores_hiperparametros": grid.best_params_, **resumen}

import pandas as pd
print("\n" + "#"*80)
print("RESUMEN COMPARATIVO (métricas clave)")
resumen_df = pd.DataFrame(resultados).T[
    ["sensibilidad", "especificidad", "precision", "f1", "exactitud"]
].sort_values(by="sensibilidad", ascending=False)
display(resumen_df)
resumen_df.to_csv("resumen_metricas_modelos.csv", index=True)
print("CSV guardado: resumen_metricas_modelos.csv")



Entrenando k-NN + GridSearchCV ...
Fitting 5 folds for each of 10 candidates, totalling 50 fits


## 6) (Opcional) Ajuste de umbral sobre el mejor modelo

In [None]:

# === (Opcional) Ajuste de umbral en el modelo con mayor sensibilidad ===
mejor_modelo = resumen_df.index[0]
best_estimator = modelos[mejor_modelo].best_estimator_

if hasattr(best_estimator, "predict_proba") or hasattr(best_estimator, "decision_function"):
    scores = best_estimator.predict_proba(X_test)[:, 1] if hasattr(best_estimator, "predict_proba") else best_estimator.decision_function(X_test)
    precisions, recalls, thresholds = precision_recall_curve(y_test, scores)
    objetivo = 0.90
    idx = np.where(recalls >= objetivo)[0]
    if len(idx) > 0:
        thr = thresholds[idx[0]-1] if idx[0] > 0 else thresholds[0]
        y_pred_thr = (scores >= thr).astype(int)
        print(f"Ajuste de umbral para ~{objetivo:.2f} de sensibilidad — threshold={thr:.4f}")
        _ = evaluar_modelo(f"{mejor_modelo} (umbral ajustado)", y_test, y_pred_thr, scores)
    else:
        print("No se encontró un umbral que alcance la sensibilidad objetivo en test.")
else:
    print("El mejor modelo no expone score continuo; se omite ajuste de umbral.")
