# Boss-assignment metaheuristic with plastic neural agents (combinatorial assignment)
This notebook implements a combinatorial "boss" metaheuristic that assigns a subset of agents (with shared ANN-like optimizer architecture and varying plasticity) to a subset of tasks each iteration.

Two experimental blocks are provided:
- Numerical optimization block: Each iteration, 3 out of 4 functions are sampled from {Himmelblau(2D), Rosenbrock(10D), Rastrigin(10D), Griewank(10D)}. The boss assigns agents to maximize solution quality while penalizing the number of agents via lambda=0.2.
- ATSP block: Instances {br17, ft53, kro124p}. Each iteration, 2 out of 3 instances are sampled; boss assigns agents similarly.

We compare the proposed boss against a GA-based assignment baseline, perform 31 independent runs per configuration, report per-problem statistics, and run Wilcoxon rank-sum tests. Figures and tables are saved to results/ and figs/.

In [2]:
from __future__ import annotations
import numpy as np, pandas as pd, random, math, time, itertools, io, os, sys
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Callable, Optional
from scipy.stats import ranksums
# Reproducibility
SEED = 42
np.random.seed(SEED); random.seed(SEED)
ROOT = os.getcwd()
RESULTS_DIR = os.path.join(ROOT, 'results')
FIGS_DIR = os.path.join(ROOT, 'figs')
os.makedirs(RESULTS_DIR, exist_ok=True); os.makedirs(FIGS_DIR, exist_ok=True)
print('Results ->', RESULTS_DIR)
print('Figs    ->', FIGS_DIR)


Results -> /home/raqcoss/Shared/Notebook/Evolutionary Computing/EvoComputing/results
Figs    -> /home/raqcoss/Shared/Notebook/Evolutionary Computing/EvoComputing/figs


## Agents: ANN-like optimizers with plasticity
Agents share the same optimizer structure; plasticity p controls adaptation step size and consolidation to a memory anchor.

In [3]:
@dataclass
class Agent:
    id: int
    p: float
    dim: int
    x: np.ndarray = field(default_factory=lambda: None)
    anchor: np.ndarray = field(default_factory=lambda: None)
    ema_score: float = 0.0
    age: int = 0
    def __post_init__(self):
        if self.x is None: self.x = np.random.uniform(-2,2,size=(self.dim,))
        if self.anchor is None: self.anchor = self.x.copy()
    def step(self, f: Callable[[np.ndarray], float], steps:int, lr_base:float=0.02):
        # stochastic finite-diff gradient step scaled by plasticity; consolidate toward anchor inversely to p
        for _ in range(steps):
            eps = np.random.normal(size=self.dim); eps /= (np.linalg.norm(eps)+1e-8)
            delta = 1e-2
            f1 = f(self.x + delta*eps); f2 = f(self.x - delta*eps)
            g = (f1 - f2)/(2*delta) * eps
            lr = lr_base * (0.2 + 1.6*self.p)
            self.x -= lr * g
            lam = 0.05*(1-self.p)
            self.x = (1-lam)*self.x + lam*self.anchor
        self.anchor = (1-0.01*self.p)*self.anchor + (0.01*self.p)*self.x
        self.age += 1
    def value(self, f: Callable[[np.ndarray], float]):
        return f(self.x)


## Numerical benchmark functions

In [4]:
def himmelblau(x):
    X, Y = x[0], x[1]
    return (X*X + Y - 11)**2 + (X + Y*Y - 7)**2
def rosenbrock(x):
    return sum(100.0*(x[i+1]-x[i]**2)**2 + (1-x[i])**2 for i in range(len(x)-1))
def rastrigin(x):
    A = 10.0; n=len(x); return A*n + sum(xi*xi - A*np.cos(2*np.pi*xi) for xi in x)
def griewank(x):
    s = sum(xi*xi/4000.0 for xi in x); p=1.0
    for i,xi in enumerate(x, start=1): p*= np.cos(xi/np.sqrt(i))
    return s - p + 1.0
NUM_FUNCS = {
    'Himmelblau2': (himmelblau, 2),
    'Rosenbrock10': (rosenbrock, 10),
    'Rastrigin10': (rastrigin, 10),
    'Griewank10': (griewank, 10),
}
NUM_OPTIMA = { 'Himmelblau2':0.0, 'Rosenbrock10':0.0, 'Rastrigin10':0.0, 'Griewank10':0.0 }


## ATSP instances (br17, ft53, kro124p)

In [6]:
def read_atsp(path):
    # simple TSPLIB parser supporting explicit FULL_MATRIX
    n=None; mat=[]; reading=False
    with open(path,'r') as f:
        for line in f:
            L=line.strip()
            if L.startswith('DIMENSION'): n=int(L.split(':')[1])
            if L.startswith('EDGE_WEIGHT_SECTION'): reading=True; continue
            if L.startswith('EOF'): break
            if reading:
                nums=[int(x) for x in L.split()]
                if nums: mat.extend(nums)
    W=np.array(mat,dtype=int).reshape((n,n))
    return W
def atsp_cost(W, tour):
    return sum(W[tour[i], tour[(i+1)%len(tour)]] for i in range(len(tour)))
def two_opt(tour, i, k):
    new = tour[:i] + tour[i:k+1][::-1] + tour[k+1:]
    return new
def agent_2opt_iteration(tour, W, p):
    n=len(tour)
    i = np.random.randint(0, n-1); k = np.random.randint(i+1, n)
    new = two_opt(tour, i, k)
    # accept with probability favored by higher plasticity when worse
    if atsp_cost(W,new) <= atsp_cost(W,tour): return new
    else:
        if np.random.rand() < 0.1*p: return new
    return tour
def init_tour(n):
    t=list(range(n)); random.shuffle(t); return t
ATSP_FILES = { 'br17': os.path.join(ROOT,'br17.atsp'),
               'ft53': os.path.join(ROOT,'ft53.atsp'),
               'kro124p': os.path.join(ROOT,'kro124p.atsp') }
for k,v in ATSP_FILES.items():
    if not os.path.exists(v):
        raise FileNotFoundError(f'Missing ATSP file: {v}')


## Boss assignment objective and baseline GA
Boss objective per iteration: sum of per-problem scores minus lambda * number of assigned agents per problem. lambda=0.2 as specified.

In [7]:
LAMBDA = 0.2
K_MAX = 5
def normalize_score(val, ref=0.0):
    # larger is better: invert positive values toward 0
    return 1.0/(1.0+max(0.0, val-ref))
def boss_objective(problem_keys, assign: Dict[str,List[int]], eval_fn: Callable[[str,List[int]], float]):
    total=0.0
    for pr in problem_keys:
        sel = assign.get(pr, [])
        if len(sel)==0: continue
        sc = eval_fn(pr, sel)
        total += sc - LAMBDA*len(sel)
    return total
def ga_assign(problem_keys, agent_ids, eval_fn, iters=50, pop=40, pm=0.1):
    # chromosome: for each problem, a subset of agents (size up to K_MAX)
    probs=list(problem_keys)
    A=len(agent_ids)
    def random_ind():
        ind={pr: np.random.choice(agent_ids, size=np.random.randint(1, min(K_MAX,A)+1), replace=False).tolist() for pr in probs}
        return ind
    def mutate(ind):
        new={pr: list(sel) for pr,sel in ind.items()}
        pr=random.choice(probs)
        if np.random.rand()<0.5 and len(new[pr])>0:
            new[pr].pop(np.random.randint(0,len(new[pr])))
        else:
            cand=[a for a in agent_ids if a not in new[pr]]
            if len(cand)>0 and len(new[pr])<K_MAX:
                new[pr].append(random.choice(cand))
        return new
    P=[random_ind() for _ in range(pop)]
    def fit(ind): return boss_objective(probs, ind, eval_fn)
    for _ in range(iters):
        scores=[fit(ind) for ind in P]
        idx=np.argsort(scores)[-pop//2:]
        elites=[P[i] for i in idx]
        children=[]
        while len(children)<pop-len(elites):
            p=random.choice(elites); c={pr: list(sel) for pr,sel in p.items()}
            if np.random.rand()<pm: c=mutate(c)
            children.append(c)
        P=elites+children
    best=max(P, key=fit)
    return best


## Numerical block: 31 runs, boss vs GA baseline

In [8]:
def run_numeric_block(n_runs=31, T=100, seed0=123):
    results=[]
    keys=list(NUM_FUNCS.keys())
    for r in range(n_runs):
        rng = np.random.RandomState(seed0+r)
        # build agents with diverse plasticity
        agents=[Agent(id=i, p=float(i)/(12-1), dim=NUM_FUNCS['Rosenbrock10'][1]) for i in range(12)]
        # per-problem agent positions carry correct dims
        for a in agents: a.x = np.random.uniform(-3,3,size=(a.dim,)); a.anchor=a.x.copy()
        def eval_assign(pr, sel_ids):
            f,dim = NUM_FUNCS[pr]
            best=np.inf
            for aid in sel_ids:
                ag=agents[aid]
                # ensure correct dim view
                if ag.dim!=dim:
                    ag.dim=dim; ag.x=np.random.uniform(-3,3,size=(dim,)); ag.anchor=ag.x.copy()
                steps=100 if dim==2 else 200
                ag.step(f, steps=steps, lr_base=0.02)
                val=ag.value(f)
                if val<best: best=val
            sc=normalize_score(best, ref=0.0)
            return sc
        # iterate
        for t in range(T):
            probs=rng.choice(keys, size=3, replace=False)
            # GA baseline assignment
            ga_ind = ga_assign(probs, list(range(len(agents))), eval_assign, iters=20, pop=20, pm=0.2)
            ga_fit = boss_objective(probs, ga_ind, eval_assign)
            # Proposed boss: greedy marginal-gain selection per problem under K_MAX
            boss_ind={}
            for pr in probs:
                used=[]
                for _ in range(K_MAX):
                    best_gain=-1e9; best_a=None
                    for aid in range(len(agents)):
                        if aid in used: continue
                        g = eval_assign(pr, used+[aid]) - LAMBDA*len(used+[aid])
                        g_prev = eval_assign(pr, used) - LAMBDA*len(used) if len(used)>0 else 0.0
                        marg = g - g_prev
                        if marg>best_gain: best_gain=marg; best_a=aid
                    if best_gain>0: used.append(best_a)
                boss_ind[pr]=used
            boss_fit = boss_objective(probs, boss_ind, eval_assign)
            # record per problem best values (lower is better)
            for pr in probs:
                f,dim = NUM_FUNCS[pr]
                best_val = np.inf
                # best among assigned agents (either boss or GA)
                for ind in (boss_ind.get(pr,[]), ga_ind.get(pr,[])):
                    for aid in ind:
                        ag=agents[aid]
                        if ag.dim!=dim: continue
                        val=ag.value(f); best_val=min(best_val,val)
                results.append({
                    'run': r, 'iter': t, 'block':'numeric', 'problem': pr,
                    'boss_fit': boss_fit, 'ga_fit': ga_fit,
                    'boss_agents': len(boss_ind.get(pr,[])), 'ga_agents': len(ga_ind.get(pr,[])),
                    'best_val': best_val
                })
    df=pd.DataFrame(results)
    out=os.path.join(RESULTS_DIR,'numeric_boss_vs_ga.csv')
    df.to_csv(out,index=False)
    return df


## ATSP block: 31 runs, boss vs GA baseline

In [9]:
def run_atsp_block(n_runs=31, T=100, seed0=456):
    Wmaps={k: read_atsp(ATSP_FILES[k]) for k in ['br17','ft53','kro124p']}
    results=[]
    keys=['br17','ft53','kro124p']
    for r in range(n_runs):
        rng=np.random.RandomState(seed0+r)
        agents=[ {'id':i,'p':float(i)/(12-1)} for i in range(12)]
        tours={ k: {a['id']: init_tour(Wmaps[k].shape[0]) for a in agents} for k in keys }
        def eval_assign(pr, sel_ids):
            W=Wmaps[pr]
            best=np.inf
            iters = 500 if pr=='br17' else (1500 if pr=='ft53' else 3000)
            for aid in sel_ids:
                t=tours[pr][aid]
                for _ in range(iters):
                    t = agent_2opt_iteration(t, W, p=agents[aid]['p'])
                tours[pr][aid]=t
                val=atsp_cost(W,t)
                if val<best: best=val
            # normalize: invert (1/(1+best)) so higher is better
            return 1.0/(1.0+best)
        for t in range(T):
            probs=rng.choice(keys, size=2, replace=False)
            ga_ind = ga_assign(probs, [a['id'] for a in agents], eval_assign, iters=20, pop=20, pm=0.2)
            ga_fit = boss_objective(probs, ga_ind, eval_assign)
            boss_ind={}
            for pr in probs:
                used=[]
                for _ in range(K_MAX):
                    best_gain=-1e9; best_a=None
                    for aid in [a['id'] for a in agents]:
                        if aid in used: continue
                        g = eval_assign(pr, used+[aid]) - LAMBDA*len(used+[aid])
                        g_prev = eval_assign(pr, used) - LAMBDA*len(used) if len(used)>0 else 0.0
                        marg = g - g_prev
                        if marg>best_gain: best_gain=marg; best_a=aid
                    if best_gain>0: used.append(best_a)
                boss_ind[pr]=used
            boss_fit = boss_objective(probs, boss_ind, eval_assign)
            for pr in probs:
                W=Wmaps[pr]
                best_val=np.inf
                for aid in (boss_ind.get(pr,[])+ga_ind.get(pr,[])):
                    val=atsp_cost(W, tours[pr][aid])
                    best_val=min(best_val,val)
                results.append({'run':r,'iter':t,'block':'atsp','problem':pr,
                                'boss_fit':boss_fit,'ga_fit':ga_fit,
                                'boss_agents':len(boss_ind.get(pr,[])),'ga_agents':len(ga_ind.get(pr,[])),
                                'best_val':best_val})
    df=pd.DataFrame(results)
    out=os.path.join(RESULTS_DIR,'atsp_boss_vs_ga.csv')
    df.to_csv(out,index=False)
    return df


## Analysis: statistics, Wilcoxon tests, and plots

In [None]:
def summarize_block(df, block):
    d=df[df['block']==block]
    # final iteration per run-problem
    last = d.groupby(['run','problem']).apply(lambda x: x.sort_values('iter').iloc[-1]).reset_index(drop=True)
    # stats
    stats=[]
    for pr,grp in last.groupby('problem'):
        vals=grp['best_val'].values
        stats.append({'problem':pr,'n':len(vals),'mean':np.mean(vals),'median':np.median(vals),'std':np.std(vals)})
    s=pd.DataFrame(stats)
    s.to_csv(os.path.join(RESULTS_DIR,f'{block}_summary.csv'), index=False)
    # Wilcoxon rank-sum comparing boss_fit vs ga_fit across runs (take last iter per run averaged across problems)
    agg = d.groupby(['run','iter']).agg(boss=('boss_fit','mean'), ga=('ga_fit','mean')).reset_index()
    w = agg.groupby('run').apply(lambda x: x.sort_values('iter').iloc[-1]).reset_index(drop=True)
    stat,p = ranksums(w['boss'], w['ga'])
    pd.DataFrame([{'block':block,'stat':stat,'pvalue':p}]).to_csv(os.path.join(RESULTS_DIR,f'{block}_wilcoxon.csv'), index=False)
    # Plots: boxplot of best_val per problem
    plt.figure(figsize=(10,4))
    last.boxplot(column='best_val', by='problem')
    plt.title(f'{block} best final values by problem'); plt.suptitle('')
    plt.ylabel('best value (lower better)')
    plt.tight_layout(); plt.savefig(os.path.join(FIGS_DIR,f'{block}_boxplot.png'), dpi=150)
    # Agents vs performance scatter
    plt.figure(figsize=(6,4))
    plt.scatter(d['boss_agents'], d['boss_fit'], s=10, alpha=0.4, label='boss')
    plt.scatter(d['ga_agents'], d['ga_fit'], s=10, alpha=0.4, label='ga')
    plt.xlabel('# agents'); plt.ylabel('fit'); plt.legend();
    plt.tight_layout(); plt.savefig(os.path.join(FIGS_DIR,f'{block}_agents_vs_fit.png'), dpi=150)
    return s, w


In [None]:
# Excecution
numeric_df = run_numeric_block(T=10)

In [None]:
atsp_df = run_atsp_block(T=10)

In [None]:
summarize_block(numeric_df,'numeric'); summarize_block(atsp_df,'atsp')