# Colab GA-RF (Genetic Algorithm + Random Forest)

1) GA-based feature selection → train Random Forest on selected features (GA-RF).
2) Evaluate with ROC curve and hyperparameter-tuned metrics (AUC, ACC, SEN, SPEC).


In [1]:
import os, time, pickle, json
import sys
from pathlib import Path
from collections import deque
from itertools import product

import random
import pandas as pd
import numpy as np, random

random.seed(42)
np.random.seed(42)
try:
    import cupy as cp
    cp.random.seed(42)
except Exception:
    pass

import xgboost as xgb

from sklearn.model_selection import cross_val_score
from sklearn.dummy import DummyClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import (
    StratifiedKFold,
    RepeatedStratifiedKFold,
    RandomizedSearchCV,
)
from sklearn.metrics import (
    accuracy_score,
    roc_auc_score,
    roc_curve,
    auc,
    classification_report,
    confusion_matrix,
)

from scipy.stats import randint, uniform, loguniform

import matplotlib.pyplot as plt

import cuml
from cuml.ensemble import RandomForestClassifier as CURF




In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 1. Load data
We load `X_train_xgbsel.csv`, `X_test_xgbsel.csv`, `y_train.csv`, and `y_test.csv`.  
Labels are squeezed to 1D so that scikit-learn can use them directly.  
If your labels are strings like `"Methylated"`, you can map them to integers right after loading.


In [3]:
def save_progress(run, gen, pop=None, history_score=None, history_chromo=None):
    os.makedirs(os.path.dirname(PROG_PATH), exist_ok=True)
    with open(PROG_PATH, "w") as f:
        json.dump({"run": int(run), "gen": int(gen), "ts": int(time.time())}, f)
    if pop is not None:
        np.save(GA_POP_PATH, np.array(pop, dtype=bool), allow_pickle=False)
    if history_score is not None:
        np.save(GA_SCORE_PATH, np.array(history_score, dtype=float), allow_pickle=False)
    if history_chromo is not None:
        # save chromosome history (fallback path if GA_CHROMO_PATH not defined)
        chromo_path = GA_CHROMO_PATH if "GA_CHROMO_PATH" in globals() else os.path.join(os.path.dirname(GA_SCORE_PATH), "ga_chromo.npy")
        np.save(chromo_path, np.array(history_chromo, dtype=bool), allow_pickle=False)

def load_progress():
    try:
        with open(PROG_PATH) as f:
            d = json.load(f)
        return int(d.get("run", 0)), int(d.get("gen", 0))
    except Exception:
        return 0, 0

SAVE_EVERY = 5              # save every N generations
TIME_BUDGET_HOURS = 9.0     # safe stop before Colab timeout
t0 = time.time()            # session start time

# resume cursor (requires your save_progress/load_progress utils)
start_run, start_gen = load_progress()


In [4]:
# set seeds for reproducibility
random.seed(42)
np.random.seed(42)

DATA_DIR = "/content/drive/MyDrive/UPENN-GBM/MRI/Net"

X_train = pd.read_csv(f"{DATA_DIR}/X_train.csv")
X_test  = pd.read_csv(f"{DATA_DIR}/X_test.csv")
y_train = pd.read_csv(f"{DATA_DIR}/y_train.csv").squeeze("columns")
y_test  = pd.read_csv(f"{DATA_DIR}/y_test.csv").squeeze("columns")


# quick sanity check on shapes
print("X_train:", X_train.shape)   # (n_train, d)
print("X_test: ", X_test.shape)    # (n_test, d)
print("y_train:", y_train.shape)   # (n_train,)
print("y_test: ", y_test.shape)    # (n_test,)

# --- GA will select columns from TRAIN ---
New_FS = X_train.copy()               # GA works on training features
y_trn  = y_train.reset_index(drop=True)

print("GA base features (train):", New_FS.shape)
print("GA base labels  (train):", y_trn.shape)

BASE_DIR = DATA_DIR
os.makedirs(BASE_DIR, exist_ok=True)
BEST_PATH     = f"{BASE_DIR}/ga_best.npz"
PROG_PATH     = f"{BASE_DIR}/ga_progress.json"
GA_POP_PATH   = f"{BASE_DIR}/ga_pop.npy"
GA_SCORE_PATH = f"{BASE_DIR}/ga_scores.npy"
GA_CHROMO_PATH = os.path.join(BASE_DIR, "ga_chromo.npy")

SEED = 42
RNG_PATH = "/content/drive/MyDrive/gbm-mgmt-radiomics/SVM/data/rng_state.npz"

def save_rng(path=RNG_PATH):
    py = np.array(random.getstate(), dtype=object)
    np_state = np.array(np.random.get_state(), dtype=object)
    np.savez(path, py=py, np=np_state, allow_pickle=True)

def load_rng(path=RNG_PATH):
    if not os.path.exists(path):
        return
    f = np.load(path, allow_pickle=True)
    random.setstate(tuple(f["py"]))
    np.random.set_state(tuple(f["np"]))

size  = 50
n_feat = New_FS.shape[1]   # number of features GA will see

# 1) Convert pandas DataFrames to NumPy arrays in advance
X_np = New_FS.to_numpy(dtype=np.float32)
y_np = y_trn.to_numpy()

# 2) Precompute the folds from StratifiedKFold (we’ll reuse these in fitness)
from sklearn.model_selection import StratifiedKFold
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
folds = [(train_idx, test_idx) for train_idx, test_idx in kfold.split(X_np, y_np)]


X_train: (47, 30)
X_test:  (12, 30)
y_train: (47,)
y_test:  (12,)
GA base features (train): (47, 30)
GA base labels  (train): (47,)


In [5]:
# if 'BASE_DIR' in globals() and BASE_DIR:
#     os.makedirs(BASE_DIR, exist_ok=True)

# # Collect checkpoint paths that are defined in the current runtime
# CANDIDATE_PATH_VARS = [
#     'PROG_PATH', 'GA_POP_PATH', 'GA_SCORE_PATH',
#     'GA_CHROMO_PATH', 'RNG_PATH', 'BEST_PATH'
# ]
# paths = []
# for var in CANDIDATE_PATH_VARS:
#     p = globals().get(var, None)
#     if isinstance(p, str) and p:
#         paths.append(p)

# # Delete checkpoint files (safe to run multiple times)
# for p in paths:
#     try:
#         os.remove(p)
#         print("removed:", p)
#     except FileNotFoundError:
#         pass  # it's fine if the file wasn't there

# # Clear in-memory variables so nothing leaks into the new run
# pop = None
# history_score, history_chromo = [], []

# # (Optional) Re-seed RNG for reproducibility
# np.random.seed(42)
# random.seed(42)

# # Force the resume cursor and timer to reset
# start_run, start_gen = 0, 0
# t0 = time.time()

# print("exists:", [os.path.exists(p) for p in [PROG_PATH, GA_POP_PATH, GA_SCORE_PATH, globals().get('GA_CHROMO_PATH',''), globals().get('RNG_PATH',''), BEST_PATH]])
# print("cursor:", start_run, start_gen)
# print("mem:", pop, len(history_score), len(history_chromo))

In [6]:
def save_progress(run, gen, pop=None, history_score=None, history_chromo=None):
    os.makedirs(os.path.dirname(PROG_PATH), exist_ok=True)
    with open(PROG_PATH, "w") as f:
        json.dump({"run": int(run), "gen": int(gen), "ts": int(time.time())}, f)
    if pop is not None:
        np.save(GA_POP_PATH, np.array(pop, dtype=bool), allow_pickle=False)
    if history_score is not None:
        np.save(GA_SCORE_PATH, np.array(history_score, dtype=float), allow_pickle=False)
    if history_chromo is not None:
        # save chromosome history (fallback path if GA_CHROMO_PATH not defined)
        chromo_path = GA_CHROMO_PATH if "GA_CHROMO_PATH" in globals() else os.path.join(os.path.dirname(GA_SCORE_PATH), "ga_chromo.npy")
        np.save(chromo_path, np.array(history_chromo, dtype=bool), allow_pickle=False)

def load_progress():
    try:
        with open(PROG_PATH) as f:
            d = json.load(f)
        return int(d.get("run", 0)), int(d.get("gen", 0))
    except Exception:
        return 0, 0

SAVE_EVERY = 5              # save every N generations
TIME_BUDGET_HOURS = 9.0     # safe stop before Colab timeout
t0 = time.time()            # session start time

# resume cursor (requires your save_progress/load_progress utils)
start_run, start_gen = load_progress()


In [7]:
def save_progress(run, gen, pop=None, history_score=None, history_chromo=None):
    os.makedirs(os.path.dirname(PROG_PATH), exist_ok=True)
    with open(PROG_PATH, "w") as f:
        json.dump({"run": int(run), "gen": int(gen), "ts": int(time.time())}, f)
    if pop is not None:
        np.save(GA_POP_PATH, np.array(pop, dtype=bool), allow_pickle=False)
    if history_score is not None:
        np.save(GA_SCORE_PATH, np.array(history_score, dtype=float), allow_pickle=False)
    if history_chromo is not None:
        # save chromosome history (fallback path if GA_CHROMO_PATH not defined)
        chromo_path = GA_CHROMO_PATH if "GA_CHROMO_PATH" in globals() else os.path.join(os.path.dirname(GA_SCORE_PATH), "ga_chromo.npy")
        np.save(chromo_path, np.array(history_chromo, dtype=bool), allow_pickle=False)

def load_progress():
    try:
        with open(PROG_PATH) as f:
            d = json.load(f)
        return int(d.get("run", 0)), int(d.get("gen", 0))
    except Exception:
        return 0, 0

SAVE_EVERY = 5              # save every N generations
TIME_BUDGET_HOURS = 9.0     # safe stop before Colab timeout
t0 = time.time()            # session start time

# resume cursor (requires your save_progress/load_progress utils)
start_run, start_gen = load_progress()


In [8]:
# from google.colab import drive
# drive.mount('/content/drive')

## 2. Dummy baselines
We train two dummy models to see the minimum performance on this split.  
`most_frequent` = always predict the majority class.  
`stratified` = predict classes according to training distribution (a bit harder baseline).


In [9]:
# dummy that always predicts the most frequent class in y_train
dummy_mf = DummyClassifier(strategy="most_frequent")
dummy_mf.fit(X_train, y_train)
y_pred_mf = dummy_mf.predict(X_test)
acc_mf = accuracy_score(y_test, y_pred_mf)
print(f"Dummy (most_frequent) accuracy on TEST: {acc_mf:.3f}")

# dummy that samples labels according to class proportion in y_train
dummy_st = DummyClassifier(strategy="stratified", random_state=42)
dummy_st.fit(X_train, y_train)
y_pred_st = dummy_st.predict(X_test)
acc_st = accuracy_score(y_test, y_pred_st)
print(f"Dummy (stratified) accuracy on TEST:   {acc_st:.3f}")

Dummy (most_frequent) accuracy on TEST: 0.500
Dummy (stratified) accuracy on TEST:   0.750


## 3. Plain Random Forest (no GA)

We now train a real model on **all 38 radiomics features** using 54 training cases.  
Because the training set is small and high-dimensional, we first run a 5-fold **stratified** cross-validation on the training set to get a more stable estimate.  



In [10]:
# # ---- User settings ----
# SEED = 42
# N_SPLITS = 5

# # ---- Preconditions (asserts to fail fast) ----
# assert "X_np" in globals() and "y_np" in globals(), "Please define X_np, y_np first."
# assert "CURF" in globals(), "Please import/alias your RF class as CURF (cuML or sklearn)."
# X_np = np.asarray(X_np)
# y_np = np.asarray(y_np).ravel()

# # ---- Fixed folds for reproducibility ----
# cv = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)
# folds = list(cv.split(X_np, y_np))

# # ---- Helpers ----
# def _safe_auc_or_acc(y_true, y_proba=None, y_pred=None):
#     """Prefer AUC if proba available; otherwise use accuracy."""
#     try:
#         if y_proba is not None:
#             return roc_auc_score(y_true, y_proba)
#     except Exception:
#         pass
#     if y_pred is None and y_proba is not None:
#         y_pred = (y_proba >= 0.5).astype(int)
#     return accuracy_score(y_true, y_pred)

# def _cap_n_bins_for_folds(n_bins, folds):
#     """For cuML: ensure n_bins ≤ min train size across folds (avoids auto-adjust)."""
#     min_train = min(len(tr) for tr, _ in folds)
#     return int(min(max(2, min_train), n_bins))

# def _mean_cv_score(params, folds):
#     """Train/eval with given params on fixed folds, return mean±std score."""
#     scores = []

#     # keep n_bins safe if provided (no harm for sklearn; ignored if unsupported)
#     p = dict(params)
#     if "n_bins" in p:
#         p["n_bins"] = _cap_n_bins_for_folds(p["n_bins"], folds)

#     for tr_idx, te_idx in folds:
#         X_tr, X_te = X_np[tr_idx], X_np[te_idx]
#         y_tr, y_te = y_np[tr_idx], y_np[te_idx]

#         rf = CURF(
#             n_estimators=p.get("n_estimators", 100),
#             max_depth=p.get("max_depth", 12),
#             max_features=p.get("max_features", "sqrt"),
#             min_samples_leaf=p.get("min_samples_leaf", 2),
#             bootstrap=p.get("bootstrap", True),
#             random_state=SEED,
#             **({ "n_bins": p["n_bins"] } if "n_bins" in p else {})
#         )
#         rf.fit(X_tr, y_tr)

#         y_proba = None
#         y_pred  = None
#         try:
#             y_proba = rf.predict_proba(X_te)[:, 1]
#         except Exception:
#             try:
#                 y_pred = rf.predict(X_te)
#             except Exception:
#                 scores.append(0.0)
#                 continue

#         scores.append(_safe_auc_or_acc(y_te, y_proba=y_proba, y_pred=y_pred))

#     scores = np.asarray(scores, dtype=float)
#     return float(scores.mean()), float(scores.std())

# # ---- Search space (lightweight but effective for GA ranking) ----
# param_space = {
#     "n_estimators":     [100,150],              # keep light for GA evaluator
#     "max_depth":        [8, 10, 12],       # shallow-ish to stabilize ranking
#     "max_features":     ["sqrt", 0.5, 0.7], # sqrt and fractions
#     "min_samples_leaf": [1, 2, 4],
#     "bootstrap":        [True],
#     "n_bins":           [16, 32],               # cuML only; sklearn will ignore via **kwargs guard
# }

# # ---- Grid search ----
# keys   = list(param_space.keys())
# combos = list(product(*[param_space[k] for k in keys]))

# best = {"mean": -1.0, "std": 1e9, "params": None}
# for vals in combos:
#     params = dict(zip(keys, vals))
#     m, s = _mean_cv_score(params, folds)
#     print(f"[GA-evaluator] {params} -> mean={m:.4f}, std={s:.4f}")

#     better = (m > best["mean"]) or (np.isclose(m, best["mean"]) and s < best["std"])
#     if better:
#         best = {"mean": m, "std": s, "params": params}

# print("\n[Chosen GA evaluator params]")
# print(best["params"], f"mean={best['mean']:.4f}, std={best['std']:.4f}")

## 4. GA-RF (paper-faithful version)

Below is a version that matches the 2022 code style much more closely.

Key points:

- we recreate their globals: `New_FS`, `y_trn`, `kfold`, `model`
- we keep their function names: `initilization_of_population`, `fitness_score`, `selection`, `crossover`, `mutation`, `generations`
- we fix only the things that would break in pandas/NumPy now:
  - `np.bool` → `bool`
  - `.iloc[train].iloc[:,chromosome]` → `.iloc[train, chromosome]`
- at the end we **actually call** `generations(...)` so you see `gen 0 ...`, `gen 1 ...` printed like their script
- this version uses **your** data: `X_train` → `New_FS`, `y_train` → `y_trn`

You can change `n_gen` or `size` later. After this, in step 5, you can take `best_chromo[-1]` and train a clean RF/XGB on that subset.


In [11]:
# -------------------------------
# 0) Reproducibility & data setup
# -------------------------------
RANDOM_STATE = 42
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)

# GA will search over training features (train-only!)
New_FS = X_train.copy()
y_trn  = y_train.reset_index(drop=True)

# Precompute numpy arrays and fixed CV folds (train-only)
X_np = New_FS.to_numpy()
y_np = y_trn.to_numpy()
cv   = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
folds = list(cv.split(X_np, y_np))

# Model factory for GA-internal CV (avoid nested parallelism)
def make_rf(seed=42):
    return CURF(
        n_estimators=150,
        max_depth=12,
        min_samples_leaf=2,
        bootstrap=True,
        n_bins=16,
        random_state=seed,
    )
# -------------------------------
# 1) GA/global hyperparameters
# -------------------------------
size   = 50
n_feat = New_FS.shape[1]


In [12]:
# ================================
# 2. population init + fitness
# ================================
def initilization_of_population(size, n_feat, off_ratio=0.3, rng=None):
    """
    create an initial list of chromosomes
    each chromosome is a boolean mask over features
    about `off_ratio` will be OFF (False), the rest ON (True), then shuffled
    """
    if rng is None:
        rng = np.random.default_rng(42)
    population = []
    off_cnt = int(off_ratio * n_feat)
    for _ in range(size):
        chromosome = np.ones(n_feat, dtype=bool)
        chromosome[:off_cnt] = False
        rng.shuffle(chromosome)
        population.append(chromosome)
    return population



In [13]:
# ================================
# 3. GA operators (selection / crossover / mutation)
# ================================
def selection_indices(order_idx, weights, k):
    order_idx = np.asarray(order_idx)
    w = np.asarray(weights, dtype=float)
    if np.any(~np.isfinite(w)) or np.all(w <= 0):
        p = np.ones_like(order_idx, dtype=float) / len(order_idx)
    else:
        w = np.clip(w, 0.0, None)
        s = w.sum()
        p = w / s if s > 0 else np.ones_like(order_idx, dtype=float) / len(order_idx)
    return np.random.choice(order_idx, size=k, replace=True, p=p)


def crossover(p1, p2, crossover_rate):
    """
    Single-point crossover.
    Two parents → two children.
    With probability <crossover_rate> we swap tails after a random point.
    Otherwise we just copy the parents.
    """
    c1, c2 = p1.copy(), p2.copy()

    # need at least 3 genes to do a meaningful 1-point crossover
    if random.random() < crossover_rate and len(p1) > 2:
        pt = random.randint(1, len(p1) - 2)
        c1 = np.concatenate((p1[:pt], p2[pt:]))
        c2 = np.concatenate((p2[:pt], p1[pt:]))

    return [c1, c2]


def mutation(chromosome, mutation_rate):
    """
    Flip each bit with probability = mutation_rate.
    Return a NEW chromosome (don’t mutate callers' copy by surprise).
    """
    mutated = chromosome.copy()
    for i in range(len(mutated)):
        if random.random() < mutation_rate:
            mutated[i] = not mutated[i]
    return mutated

TARGET_K = 4   # 원하는 feature 개수 중심
LAMBDA_K = 0.015 # |k - 25| 한 개당 깎을 양 (0.005~0.02 사이에서 조정해보기)

def fitness_score(population, kfold=None, model=None, X=None, y=None):
    """
    evaluate each chromosome by k-fold CV accuracy (now using precomputed numpy folds)
    """
    global X_np, y_np, folds

    scores, newtp, newfp, newtn, newfn = [], [], [], [], []

    for chromosome in population:
        col_idx = np.where(chromosome)[0]

        # empty subset guard
        if col_idx.size == 0:
            scores.append(0.0)
            newtp.append(0); newfp.append(0); newtn.append(0); newfn.append(0)
            continue

        tp_list, fp_list, tn_list, fn_list, acc_list = [], [], [], [], []

        # 2) use precomputed folds
        for train_idx, test_idx in folds:
            X_tr = X_np[train_idx][:, col_idx]
            X_te = X_np[test_idx][:, col_idx]
            y_tr = y_np[train_idx]
            y_te = y_np[test_idx]

            if callable(model):
                clf = model()
            else:
                clf = model

            clf.fit(X_tr, y_tr)
            preds = clf.predict(X_te)

            tn_i, fp_i, fn_i, tp_i = confusion_matrix(
                y_te, preds, labels=[0, 1]
            ).ravel()

            tp_list.append(tp_i); fp_list.append(fp_i)
            tn_list.append(tn_i); fn_list.append(fn_i)

            acc_list.append(accuracy_score(y_te, preds))

        base_score = float(np.mean(acc_list))   # 원래 쓰던 CV accuracy
        k = col_idx.size                        # 선택된 feature 개수

        # 20개에서 멀어질수록 penalty 증가
        penalty = LAMBDA_K * abs(k - TARGET_K)
        score = base_score - penalty

        scores.append(score)
        # ---- 여기까지 새로 추가/수정 ----

        newtp.append(int(np.sum(tp_list)))
        newfp.append(int(np.sum(fp_list)))
        newtn.append(int(np.sum(tn_list)))
        newfn.append(int(np.sum(fn_list)))

    # 3) sort by score desc (아래는 그대로)
    scores = np.array(scores, dtype=float)
    population = np.array(population, dtype=object)

    total_score = scores.sum()
    if total_score == 0:
        weights = np.ones_like(scores) / len(scores)
    else:
        weights = scores / total_score

    newtp = np.array(newtp); newfp = np.array(newfp)
    newtn = np.array(newtn); newfn = np.array(newfn)

    inds = np.argsort(scores)[::-1]

    return (
        list(scores[inds]),
        list(population[inds]),
        list(weights[inds]),
        list(newtp[inds]),
        list(newfp[inds]),
        list(newtn[inds]),
        list(newfn[inds]),
    )

In [14]:
# ================================
# 4. full GA loop (original style)
# ================================
def generations(size, n_feat, crossover_rate, mutation_rate, n_gen):
    """
    run GA for n_gen generations in one shot
    (this is the original style)
    """
    best_chromo = []
    best_score = []

    # start with random population
    population_nextgen = initilization_of_population(size, n_feat)

    for gen in range(n_gen):
        # evaluate population
        scores, pop_after_fit, weights, tp, fp, tn, fn = fitness_score(population_nextgen)

        # best of this generation
        top_score = scores[0]
        top_chrom = pop_after_fit[0]
        print("gen", gen, "best_acc=", round(top_score, 4), "on_features=", int(top_chrom.sum()))

        # elitism (keep 2)
        elites = pop_after_fit[:2]
        k = size - 2

        # select parents
        parents = selection_indices(pop_after_fit, weights, k)

        # make children
        children = []
        for i in range(0, len(parents), 2):
            p1 = parents[i]
            p2 = parents[(i + 1) % len(parents)]
            for child in crossover(p1, p2, crossover_rate):
                child = mutation(child, mutation_rate)  # ← apply mutation return value
                children.append(child)

        # build next population
        population_nextgen = []
        for c in elites:
            population_nextgen.append(c)
        for c in children:
            if len(population_nextgen) < size:
                population_nextgen.append(c)

        # keep history
        best_chromo.append(top_chrom)
        best_score.append(top_score)

    return best_chromo, best_score


In [15]:
# ================================
# 5. single GA step (one generation)
# ================================
def ga_one_step(
    population,
    size,
    crossover_rate,
    mutation_rate,
    model,
):
    """
    Run GA for exactly 1 generation and return:
      - next_population
      - best_chromo (from current generation)
      - best_score  (from current generation)
    Uses global X_np, y_np, folds via fitness_score().
    """
    # 1) evaluate current population
    scores, pop_after_fit, weights, tp, fp, tn, fn = fitness_score(
        population,
        model=model,
        # kfold, X, y not needed anymore
    )

    # 2) best in this generation
    best_score  = scores[0]
    best_chromo = pop_after_fit[0]

    # 3) keep top-2 (elitism)
    elites = pop_after_fit[:2]
    k = size - 2

    # 4) select parents for the remaining slots
    idxs = selection_indices(np.arange(len(pop_after_fit)), weights, k)
    parents = [pop_after_fit[i] for i in idxs]

    # 5) crossover + mutation → build children
    children = []
    for i in range(0, len(parents), 2):
        p1 = parents[i]
        p2 = parents[(i + 1) % len(parents)]  # wrap around
        for child in crossover(p1, p2, crossover_rate):
            child = mutation(child, mutation_rate)  # ← apply mutation return value
            children.append(child)

    # 6) build next population (elites first)
    next_pop = []
    for e in elites:
        next_pop.append(e)
    for c in children:
        if len(next_pop) < size:
            next_pop.append(c)

    return next_pop, best_chromo, best_score


# GA BEST NPZ

In [16]:
BEST_PATH = f"{DATA_DIR}/ga_best.npz"
TOP_K = 5

def update_top_solutions_by_run(best_path, run_id, new_score, new_mask, new_cols, top_k=5):
    """
    Keep at most `top_k` best runs (one entry per run_id).
    If file exists but has old / wrong keys, we re-initialize.
    """
    prev_run_ids, prev_scores, prev_masks, prev_cols = [], [], [], []

    if os.path.exists(best_path):
        try:
            prev = np.load(best_path, allow_pickle=True)
            # try toread modern keys
            prev_run_ids = prev["run_ids"].tolist()
            prev_scores  = prev["top_scores"].tolist()
            prev_masks   = prev["top_masks"].tolist()
            prev_cols    = prev["top_cols"].tolist()
        except KeyError:
            # old / incompatible file → start fresh
            prev_run_ids, prev_scores, prev_masks, prev_cols = [], [], [], []

    # update or append
    if run_id in prev_run_ids:
        idx = prev_run_ids.index(run_id)
        if new_score > prev_scores[idx]:
            prev_scores[idx] = float(new_score)
            prev_masks[idx]  = new_mask
            prev_cols[idx]   = new_cols
    else:
        prev_run_ids.append(run_id)
        prev_scores.append(float(new_score))
        prev_masks.append(new_mask)
        prev_cols.append(new_cols)

    # keep top_k
    order = np.argsort(prev_scores)[::-1][:top_k]

    top_run_ids = [prev_run_ids[i] for i in order]
    top_scores  = [prev_scores[i]  for i in order]
    top_masks   = [prev_masks[i]   for i in order]
    top_cols    = [prev_cols[i]    for i in order]

    # save back
    np.savez(
        best_path,
        run_ids=np.array(top_run_ids, dtype=int),
        top_scores=np.array(top_scores, dtype=float),
        top_masks=np.array(top_masks, dtype=object),
        top_cols=np.array(top_cols, dtype=object),
    )
    return top_run_ids, top_scores

## Repeated GA with stratified 5-fold CV (20×)

In [17]:
# ---- Safe defaults (define if missing) ----
N_REPEATS       = globals().get("N_REPEATS", 5)
GENS_PER_RUN    = globals().get("GENS_PER_RUN", 50)
SAVE_EVERY      = globals().get("SAVE_EVERY", 5)          # checkpoint period (generations)
TIME_BUDGET_HRS = globals().get("TIME_BUDGET_HOURS", 11)  # stop before Colab timeout
K_HISTORY       = globals().get("K", 32)                   # keep only last K items in history

assert callable(make_rf), "make_rf (model factory) is required."
assert callable(save_progress) and callable(load_progress), "save_progress/load_progress are required."

t0 = time.time()

# 1) Resume cursor (RNG + where to start)
load_rng()
start_run, start_gen = load_progress()

for run in range(start_run, N_REPEATS):
    # --- Fresh run state ---
    pop = initilization_of_population(size=size, n_feat=n_feat)
    # Keep only last K items to cap memory usage (prevents history growth)
    history_score  = deque(maxlen=K_HISTORY)
    history_chromo = deque(maxlen=K_HISTORY)

    # Track the best solution within THIS run (independent of history length)
    run_best_score = -np.inf
    run_best_mask  = None

    # --- Resume point for this run ---
    g0 = start_gen if run == start_run else 0
    if run == start_run and g0 > 0:
        try:
            # Load population; masks may be Python objects → allow_pickle=True
            pop = np.load(GA_POP_PATH, allow_pickle=True)
            try:
                _score_list  = np.load(GA_SCORE_PATH,  allow_pickle=True).tolist()
                _chromo_list = np.load(GA_CHROMO_PATH, allow_pickle=True).tolist()
                # Refill deques with last K entries only
                for s in _score_list[-K_HISTORY:]:
                    history_score.append(float(s))
                for c in _chromo_list[-K_HISTORY:]:
                    history_chromo.append(np.array(c, dtype=bool))
                # Rebuild run-best once from loaded history
                if len(history_score) and len(history_chromo):
                    j = int(np.argmax(history_score))
                    run_best_score = float(history_score[j])
                    run_best_mask  = np.array(history_chromo[j], dtype=bool)
            except Exception:
                pass

            print(f"[resume] run={run}, start_gen={g0} (loaded; K={K_HISTORY})")
        except Exception as e:
            print("[resume] failed to load pop/history:", e)
    else:
        # Initial snapshot for a fresh run (empty histories)
        save_progress(run, 0, pop=pop,
                      history_score=list(history_score),
                      history_chromo=list(history_chromo))

    # --- Main GA loop for this run ---
    for gen in range(g0, GENS_PER_RUN):
        # Single GA step (must return updated population, gen-best mask, gen-best score)
        pop, best_mask_gen, best_score_gen = ga_one_step(
            pop,
            size=size,
            crossover_rate=0.8,
            mutation_rate=0.05,
            model=make_rf,   # factory: creates a fresh estimator per use
        )

        # Append current generation best into capped histories
        history_chromo.append(best_mask_gen)
        history_score.append(float(best_score_gen))

        # Update run-best immediately (robust even if history is truncated)
        if best_score_gen > run_best_score:
            run_best_score = float(best_score_gen)
            run_best_mask  = np.array(best_mask_gen, dtype=bool)

        # Periodic checkpoint with minimal necessary payload
        if gen % SAVE_EVERY == 0:
            save_progress(run, gen,
                          pop=pop,
                          history_score=list(history_score),
                          history_chromo=list(history_chromo))
            save_rng()

        # Safe stop before notebook time limit
        if (time.time() - t0) > TIME_BUDGET_HRS * 3600:
            save_progress(run, gen,
                          pop=pop,
                          history_score=list(history_score),
                          history_chromo=list(history_chromo))
            save_rng()
            print("⏱️ Time budget reached — checkpoint saved. Stop and resume later.")
            sys.exit(0)

    # --- Finalize this run based on run-best (not dependent on full history) ---
    if run_best_mask is None:
        # Extremely rare: if never updated, fall back to the best in current (capped) history
        j = int(np.argmax(history_score))
        run_best_score = float(history_score[j])
        run_best_mask  = np.array(history_chromo[j], dtype=bool)

    best_cols = np.where(run_best_mask)[0]

    # Update top solutions (user-implemented function)
    run_ids, top_scores = update_top_solutions_by_run(
        BEST_PATH,
        run_id=run,
        new_score=run_best_score,
        new_mask=run_best_mask,
        new_cols=best_cols,
        top_k=20,
    )

    # Final snapshot for this run (still minimal)
    save_progress(run, GENS_PER_RUN,
                  pop=pop,
                  history_score=list(history_score),
                  history_chromo=list(history_chromo))
    save_rng()

    print("top runs:", run_ids)
    print("top scores:", top_scores)

[resume] run=4, start_gen=50 (loaded; K=32)
top runs: [3, 0, 1, 4, 2]
top scores: [0.7738888888888888, 0.7638888888888888, 0.7588888888888888, 0.7438888888888888, 0.7416666666666666]


# Show the result

In [18]:
# 0) load GA results produced earlier (contains top-k GA masks and their CV scores)
GA_BEST_PATH  = f"{DATA_DIR}/ga_best.npz"
GA_POP_PATH   = f"{DATA_DIR}/ga_pop.npy"
GA_SCORE_PATH = f"{DATA_DIR}/ga_scores.npy"

best = np.load(GA_BEST_PATH, allow_pickle=True)
run_ids    = best["run_ids"].tolist()
top_scores = best["top_scores"].tolist()
top_cols   = best["top_cols"].tolist()

print("=== TOP RUNS ===")
for rid, s, cols in zip(run_ids, top_scores, top_cols):
    print(f"run {rid}: score={s:.4f}, cols(first 20)={cols[:20]}, total={len(cols)}")

=== TOP RUNS ===
run 3: score=0.7739, cols(first 20)=[ 2 10 12], total=3
run 0: score=0.7639, cols(first 20)=[0, 4, 8, 10, 12, 22, 28], total=7
run 1: score=0.7589, cols(first 20)=[ 2  8  9 12 13 29], total=6
run 4: score=0.7439, cols(first 20)=[ 2  5  8  9 12 21 28], total=7
run 2: score=0.7417, cols(first 20)=[ 5  8 10 14 19 22 29], total=7
