
# 03 · Test Assembly via Bees Algorithm (24 / 48 items)

This notebook assembles test forms using a **Bees Algorithm** metaheuristic.
The objective is to **maximize measurement quality** under simple content constraints.

**Objective (multi-criteria):**
We use a blended score for a candidate item set:
- `alpha`: Cronbach's alpha (internal consistency)
- `pc1_var`: variance explained by the first principal component of the item matrix

Objective = `w_alpha * alpha + w_pc1 * pc1_var` (default: 0.6 / 0.4)

**Constraints:**
- Target length: 24 or 48 items
- Optional content balance by `scale_id`: minimal and maximal items per scale


In [None]:

import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Try importing alpha from your module, else local fallback
try:
    from src.cleaning_psych import cronbach_alpha as _alpha
except Exception:
    def _alpha(df):
        X = df.to_numpy(dtype=float)
        col_means = np.nanmean(X, axis=0, keepdims=True)
        inds = np.where(np.isnan(X))
        if inds[0].size:
            X[inds] = np.take(col_means, inds[1])
        k = X.shape[1]
        if k < 2:
            return np.nan
        var_items = X.var(axis=0, ddof=1)
        var_total = X.sum(axis=1).var(ddof=1)
        if var_total == 0:
            return np.nan
        return float((k/(k-1)) * (1 - var_items.sum()/var_total))

def pc1_variance_explained(df):
    """Proportion of variance explained by PC1 on column-standardized data."""
    X = df.to_numpy(dtype=float)
    # impute column means
    col_means = np.nanmean(X, axis=0, keepdims=True)
    inds = np.where(np.isnan(X))
    if inds[0].size:
        X[inds] = np.take(col_means, inds[1])
    # standardize columns
    X = (X - X.mean(axis=0, keepdims=True))
    col_std = X.std(axis=0, ddof=1, keepdims=True)
    col_std[col_std == 0] = 1.0
    X = X / col_std
    # SVD
    U, S, VT = np.linalg.svd(X, full_matrices=False)
    var = (S**2)
    return float(var[0] / var.sum()) if var.sum() > 0 else 0.0

def objective(subset_cols, wide_scored, w_alpha=0.6, w_pc1=0.4):
    if len(subset_cols) < 3:
        return -np.inf, {"alpha": np.nan, "pc1_var": np.nan}
    df = wide_scored[subset_cols]
    a = _alpha(df)
    p = pc1_variance_explained(df)
    score = w_alpha * a + w_pc1 * p
    return score, {"alpha": a, "pc1_var": p}

def content_ok(subset_cols, item_stats, min_per_scale=None, max_per_scale=None):
    """Check simple content constraints per scale_id."""
    if min_per_scale is None and max_per_scale is None:
        return True
    scales = item_stats.set_index("item_id").reindex(subset_cols)["scale_id"]
    counts = scales.value_counts(dropna=False).to_dict()
    if min_per_scale:
        for sc, m in min_per_scale.items():
            if counts.get(sc, 0) < m:
                return False
    if max_per_scale:
        for sc, M in max_per_scale.items():
            if counts.get(sc, 0) > M:
                return False
    return True

def random_feasible_set(all_items, item_stats, k, min_per_scale=None, max_per_scale=None, max_tries=5000):
    """Sample a feasible set honoring content constraints."""
    items = list(all_items)
    for _ in range(max_tries):
        cand = random.sample(items, k)
        if content_ok(cand, item_stats, min_per_scale, max_per_scale):
            return cand
    # fallback: return any subset (may violate constraints)
    return random.sample(items, k)

def neighborhood_swap(current, all_items, n_swaps=1):
    """Generate a neighbor by swapping n items with the pool."""
    current_set = set(current)
    pool = list(set(all_items) - current_set)
    new = current.copy()
    for _ in range(n_swaps):
        if not new or not pool:
            break
        out_idx = random.randrange(len(new))
        in_idx = random.randrange(len(pool))
        new[out_idx], pool[in_idx] = pool[in_idx], new[out_idx]
    return new


In [None]:

# Load data
ITEM_STATS_PATH = "output/item_stats.csv"
WIDE_SCORED_PATH = "output/wide_scored.csv"

assert os.path.exists(ITEM_STATS_PATH), f"Missing {ITEM_STATS_PATH}"
assert os.path.exists(WIDE_SCORED_PATH), f"Missing {WIDE_SCORED_PATH}"

item_stats = pd.read_csv(ITEM_STATS_PATH)
wide_scored = pd.read_csv(WIDE_SCORED_PATH, index_col=0)

# Ensure item_id column exists
assert "item_id" in item_stats.columns, "item_stats must include 'item_id'"
assert "scale_id" in item_stats.columns, "item_stats must include 'scale_id'"

all_items = [c for c in wide_scored.columns if c in set(item_stats["item_id"])]
print(f"Candidate items: {len(all_items)}")


In [None]:

def bees_algorithm(
    all_items,
    item_stats,
    wide_scored,
    target_k=24,
    min_per_scale=None,
    max_per_scale=None,
    iters=120,
    n_scouts=20,
    elite_sites=3,
    best_sites=5,
    recruits_elite=25,
    recruits_best=10,
    neighborhood_size=2,
    seed=42,
    w_alpha=0.6,
    w_pc1=0.4,
):
    random.seed(seed)
    np.random.seed(seed)

    # 1) Initialize scouts (feasible sets)
    population = []
    for _ in range(n_scouts):
        subset = random_feasible_set(all_items, item_stats, target_k, min_per_scale, max_per_scale)
        score, metas = objective(subset, wide_scored, w_alpha, w_pc1)
        population.append({"subset": subset, "score": score, **metas})

    history = []
    best_global = max(population, key=lambda x: x["score"]).copy()

    for it in range(iters):
        # 2) Sort by score
        population.sort(key=lambda x: x["score"], reverse=True)

        # 3) Neighborhood search (recruitment) around top sites
        new_population = []
        for idx, sol in enumerate(population[:elite_sites + best_sites]):
            recruits = recruits_elite if idx < elite_sites else recruits_best
            local_best = sol
            for _ in range(recruits):
                neighbor = neighborhood_swap(sol["subset"], all_items, n_swaps=neighborhood_size)
                if not content_ok(neighbor, item_stats, min_per_scale, max_per_scale):
                    continue
                s, metas = objective(neighbor, wide_scored, w_alpha, w_pc1)
                if s > local_best["score"]:
                    local_best = {"subset": neighbor, "score": s, **metas}
            new_population.append(local_best)

        # 4) Random scouts to refill population
        while len(new_population) < n_scouts:
            subset = random_feasible_set(all_items, item_stats, target_k, min_per_scale, max_per_scale)
            s, metas = objective(subset, wide_scored, w_alpha, w_pc1)
            new_population.append({"subset": subset, "score": s, **metas})

        population = new_population

        # 5) Track best
        current_best = max(population, key=lambda x: x["score"]
        )
        if current_best["score"] > best_global["score"]:
            best_global = current_best.copy()

        history.append(best_global["score"])

    return best_global, history



## Content balance

You can set minimum / maximum items per `scale_id`.  
If you don't need content constraints, leave both as `None`.


In [None]:

# Example policy: minimal presence of each scale that appears in the data.
present_scales = item_stats["scale_id"].dropna().unique().tolist()
min_per_scale_24 = {sc: 1 for sc in present_scales}   # minimal presence
max_per_scale_24 = None

min_per_scale_48 = {sc: 2 for sc in present_scales}
max_per_scale_48 = None

print("Scales:", present_scales[:10], "...")


## Run Bees Algorithm — 24 items

In [None]:

best24, hist24 = bees_algorithm(
    all_items=all_items,
    item_stats=item_stats,
    wide_scored=wide_scored,
    target_k=24,
    min_per_scale=min_per_scale_24,
    max_per_scale=max_per_scale_24,
    iters=120,
    n_scouts=20,
    elite_sites=3,
    best_sites=5,
    recruits_elite=25,
    recruits_best=10,
    neighborhood_size=2,
    seed=42,
    w_alpha=0.6,
    w_pc1=0.4
)

print("Best 24-form objective:", best24["score"])
print("Alpha:", best24["alpha"], "PC1 Var:", best24["pc1_var"])

# Save selection
os.makedirs("output", exist_ok=True)
sel24 = pd.DataFrame({"item_id": best24["subset"]}).merge(item_stats, on="item_id", how="left")
sel24.to_csv("output/assembly_24.csv", index=False)
print("Saved: output/assembly_24.csv")

# Plot convergence
plt.figure()
plt.plot(range(1, len(hist24)+1), hist24, marker="o")
plt.title("Bees Algorithm — Convergence (24 items)")
plt.xlabel("Iteration")
plt.ylabel("Objective")
plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.6)
plt.show()


## Run Bees Algorithm — 48 items

In [None]:

best48, hist48 = bees_algorithm(
    all_items=all_items,
    item_stats=item_stats,
    wide_scored=wide_scored,
    target_k=48,
    min_per_scale=min_per_scale_48,
    max_per_scale=max_per_scale_48,
    iters=140,
    n_scouts=24,
    elite_sites=3,
    best_sites=6,
    recruits_elite=30,
    recruits_best=12,
    neighborhood_size=2,
    seed=123,
    w_alpha=0.6,
    w_pc1=0.4
)

print("Best 48-form objective:", best48["score"])
print("Alpha:", best48["alpha"], "PC1 Var:", best48["pc1_var"])

sel48 = pd.DataFrame({"item_id": best48["subset"]}).merge(item_stats, on="item_id", how="left")
sel48.to_csv("output/assembly_48.csv", index=False)
print("Saved: output/assembly_48.csv")

plt.figure()
plt.plot(range(1, len(hist48)+1), hist48, marker="o")
plt.title("Bees Algorithm — Convergence (48 items)")
plt.xlabel("Iteration")
plt.ylabel("Objective")
plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.6)
plt.show()


## Compare 24 vs 48 forms

In [None]:

# Compute and print summary stats
def form_summary(subset):
    cols = [c for c in subset if c in wide_scored.columns]
    df = wide_scored[cols]
    a = _alpha(df)
    p = pc1_variance_explained(df)
    return {"k": len(cols), "alpha": a, "pc1_var": p}

sum24 = form_summary(best24["subset"])
sum48 = form_summary(best48["subset"])

print("Summary 24:", sum24)
print("Summary 48:", sum48)

# Scale coverage
def coverage(subset):
    sc = item_stats.set_index("item_id").reindex(subset)["scale_id"]
    return sc.value_counts(dropna=False).to_frame("count")

cov24 = coverage(best24["subset"])
cov48 = coverage(best48["subset"])

print("
Coverage 24:")
print(cov24.sort_index())

print("
Coverage 48:")
print(cov48.sort_index())