# Bloque 1 — Feature Selection con GA
Usa **DEAP** + **LogisticRegression** con CV.


In [None]:
# --- Imports base ---
import numpy as np
import random, os, joblib
import matplotlib.pyplot as plt

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.model_selection import cross_val_score, StratifiedKFold, KFold

from deap import base, creator, tools

from utils.data import load_custom_csv          # <— tu loader
from utils.ga_tools import set_seed, cv_score, penalty_k

# --- Fijar semilla para reproducibilidad ---
set_seed(42)

# --- Carga tu dataset propio ---
# target="producto" => CLASIFICACIÓN (predecir Producto_Regional)
# target="indice"   => REGRESIÓN (predecir Índice_Industrialización)
X_train, X_test, y_train, y_test, feature_names, problem_type = load_custom_csv(
    "data/productos_puno.csv",
    target="producto",      # ← cambia a "indice" si quieres regresión
    test_size=0.2,
    random_state=42,
    scale=False             # MUY IMPORTANTE: escalamos dentro de la CV con Pipeline
)

n_features = X_train.shape[1]
n_features


71

## Configuración del GA

In [4]:
# Hiperparámetros GA (valores recomendados)
POP_SIZE = 50
N_GEN    = 50
CX_PB    = 0.8     # prob. de aplicar crossover
MUT_PB   = 0.2     # prob. de aplicar mutación
INDPB    = max(1/n_features, 0.01)  # prob. flip por gen (~1 bit de media)
TOUR_SIZE = 3
ALPHA     = 0.005  # penalización por número de features
ELITISM   = 1      # nº de élites que pasan directos a la siguiente generación

def make_estimator():
    """
    Estimador del fitness con Pipeline(scaler, modelo).
    - Clasificación: LogisticRegression
    - Regresión: Ridge
    """
    if problem_type == "classification":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", LogisticRegression(max_iter=2000, solver="lbfgs"))
        ])
    else:
        return Pipeline([
            ("scaler", StandardScaler()),
            ("reg", Ridge(alpha=1.0))
        ])

def ensure_one_active(individual):
    """Asegura que al menos 1 feature esté encendida (evita máscara toda-cero)."""
    if sum(individual) == 0:
        idx = random.randrange(len(individual))
        individual[idx] = 1


## DEAP: espacio de búsqueda y fitness

In [None]:
# Definir tipos en DEAP
creator.create("FitnessMax", base.Fitness, weights=(1.0,))   # maximizamos el fitness
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)  # gen binario
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=n_features)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

def eval_individual(individual):
    """Fitness = métrica CV del estimador solo con las columnas activas – penalización por #features."""
    ensure_one_active(individual)
    mask = np.array(individual, dtype=bool)
    k = int(mask.sum())
    X_sel = X_train[:, mask]

    est = make_estimator()

    if problem_type == "classification":
        cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        # cambia a "balanced_accuracy" o "f1_macro" si tienes desbalance
        score = cross_val_score(est, X_sel, y_train, cv=cv, scoring="accuracy").mean()
        fit = score - penalty_k(k, ALPHA)
    else:
        cv = KFold(n_splits=5, shuffle=True, random_state=42)
        # neg_root_mean_squared_error: más alto es mejor (0 es el tope)
        score = cross_val_score(est, X_sel, y_train, cv=cv, scoring="neg_root_mean_squared_error").mean()
        fit = score - penalty_k(k, ALPHA)  # como es negativo, “más cerca de 0” ⇒ mayor fitness

    return (float(fit),)

# Operadores GA
toolbox.register("evaluate", eval_individual)
toolbox.register("mate", tools.cxOnePoint)                       # cruce 1 punto
toolbox.register("mutate", tools.mutFlipBit, indpb=INDPB)        # mutación flip bit
toolbox.register("select", tools.selTournament, tournsize=TOUR_SIZE)  # torneo


## Evolución

In [None]:
# Población inicial y evaluación
pop = toolbox.population(n=POP_SIZE)

for ind in pop:
    ind.fitness.values = toolbox.evaluate(ind)

log = []  # guardaremos estadísticas por generación

for gen in range(N_GEN):
    # Elitismo: mejores que pasan directos
    elites = tools.selBest(pop, ELITISM) if ELITISM > 0 else []

    # Selección + clonación
    offspring = toolbox.select(pop, len(pop) - ELITISM)
    offspring = list(map(toolbox.clone, offspring))

    # Crossover
    for i in range(0, len(offspring), 2):
        if i + 1 < len(offspring) and random.random() < CX_PB:
            toolbox.mate(offspring[i], offspring[i+1])
            if "fitness" in offspring[i].__dict__: del offspring[i].fitness.values
            if "fitness" in offspring[i+1].__dict__: del offspring[i+1].fitness.values

    # Mutación
    for i in range(len(offspring)):
        if random.random() < MUT_PB:
            toolbox.mutate(offspring[i])
            if "fitness" in offspring[i].__dict__: del offspring[i].fitness.values

    # Re-evaluar hijos inválidos
    invalid = [ind for ind in offspring if not ind.fitness.valid]
    for ind in invalid:
        ind.fitness.values = toolbox.evaluate(ind)

    # Nueva población
    pop = elites + offspring

    # Stats de la generación
    fits = np.array([ind.fitness.values[0] for ind in pop], dtype=float)
    log.append({
        "gen": gen + 1,
        "avg": float(fits.mean()),
        "min": float(fits.min()),
        "max": float(fits.max())
    })

# Mejor individuo final
best = tools.selBest(pop, 1)[0]
best_mask = np.array(best, dtype=bool)
best_k = int(best_mask.sum())
best_fitness = float(best.fitness.values[0])

best_k, best_fitness


## Resultados y comparación

In [None]:
# Baseline: TODAS las features (CV)
est_all = make_estimator()

if problem_type == "classification":
    cv_all = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    baseline_cv = cross_val_score(est_all, X_train, y_train, cv=cv_all, scoring="accuracy").mean()
else:
    cv_all = KFold(n_splits=5, shuffle=True, random_state=42)
    baseline_cv = cross_val_score(est_all, X_train, y_train, cv=cv_all, scoring="neg_root_mean_squared_error").mean()

# Seleccionadas por GA
X_sel = X_train[:, best_mask]
est_sel = make_estimator()

if problem_type == "classification":
    selected_cv = cross_val_score(est_sel, X_sel, y_train, cv=cv_all, scoring="accuracy").mean()
else:
    selected_cv = cross_val_score(est_sel, X_sel, y_train, cv=cv_all, scoring="neg_root_mean_squared_error").mean()

selected_features = list(feature_names[best_mask])

print(f"Features seleccionadas (k={best_k}):")
for f in selected_features:
    print(" -", f)

print("\nMétrica CV")
if problem_type == "classification":
    print(f"Baseline (todas, accuracy): {baseline_cv:.4f}")
    print(f"Seleccionadas (accuracy):   {selected_cv:.4f}")
else:
    print(f"Baseline (todas, -RMSE): {baseline_cv:.4f}  (más cerca de 0 es mejor)")
    print(f"Seleccionadas (-RMSE):   {selected_cv:.4f}  (más cerca de 0 es mejor)")


## Curva de mejor fitness por generación

In [None]:
gens = [r["gen"] for r in log]
best_curve = [r["max"] for r in log]

plt.figure()
plt.plot(gens, best_curve, marker="o")
plt.xlabel("Generación")
plt.ylabel("Mejor fitness")
plt.title("Evolución del mejor fitness")
plt.grid(True)
plt.show()


## Guardado opcional con joblib

In [None]:
os.makedirs("results", exist_ok=True)

out = {
    "problem_type": problem_type,
    "best_mask": best_mask,
    "selected_features": selected_features,
    "best_fitness": best_fitness,
    "baseline_cv": float(baseline_cv),
    "selected_cv": float(selected_cv),
    "log": log,
    "ga_params": {
        "POP_SIZE": POP_SIZE, "N_GEN": N_GEN, "CX_PB": CX_PB,
        "MUT_PB": MUT_PB, "INDPB": INDPB, "TOUR_SIZE": TOUR_SIZE,
        "ALPHA": ALPHA, "ELITISM": ELITISM
    }
}

joblib.dump(out, "results/bloque1_selection.joblib")
"Guardado en results/bloque1_selection.joblib"
