In [1]:
from numba import njit,prange, get_thread_id, get_num_threads
import numpy as np
from Swarm_IQ import decode_prod_plan, compute_objective
from CLSP_instances import *
import time

In [2]:
@njit
def iterate_abc(
    X, Q, trial, fitness,
    gbest_X, gbest_Q, gbest_val,
    demand, production_times, setup_times, capacities,
    production_costs, setup_costs, inventory_costs,
    limit, K_onlookers,
    rand_partner_e, rand_phi_e, rand_flip_e,
    rand_partner_o, rand_phi_o, rand_flip_o, best_idx
):
    SN, M, T = X.shape
    best_viols = gbest_val[0, 0]
    best_obj   = gbest_val[0, 1]

    # 1. Employed bees phase
    for i in range(SN):
        k = rand_partner_e[i]
        if k == i:
            continue
        phi   = rand_phi_e[i]
        share = abs(phi)

        # generate candidate
        Xc = X[i].copy()
        Qc = Q[i].copy()

        # bit‐flip (binary part)
        for m in range(M):
            for t in range(T):
                if X[i, m, t] != X[k, m, t] and rand_flip_e[i, m, t] < (0.2*share):
                #if rand_flip_e[i, m, t] < (0.2*share):
                    Xc[m, t] = 1.0 - Xc[m, t]

        # continuous update (Q part)
        for m in range(M):
            for t in range(T):
                val = Q[i, m, t] + phi * (Q[i, m, t] - Q[k, m, t])
                if val < 0.0:
                    val = 0.0
                elif val > 1.0:
                    val = 1.0
                Qc[m, t] = val

        # evaluate candidate
        plan_c = decode_prod_plan(Xc, Qc, demand)
        fit_c  = compute_objective(
            plan_c, Xc.astype(np.float64).T,
            setup_costs, production_costs,
            production_times, setup_times,
            capacities, inventory_costs, demand
        )

        # compare & update
        if (fit_c[0] < fitness[i, 0]) or (fit_c[0] == fitness[i, 0] and fit_c[1] < fitness[i, 1]):
            X[i]          = Xc
            Q[i]          = Qc
            fitness[i, 0] = fit_c[0]
            fitness[i, 1] = fit_c[1]
            trial[i]      = 0
        else:
            trial[i] += 1

    # 2. Onlooker bees phase
    w_sum = 0.0
    w     = np.empty(SN, dtype=np.float64)
    for i in range(SN):
        w[i]   = 1.0 / (1.0 + fitness[i, 0])
        w_sum += w[i]

    for o in range(K_onlookers):
        # roulette‐wheel select source idx
        threshold = rand_phi_o[o] * w_sum
        accum     = 0.0
        idx       = 0
        for j in range(SN):
            accum += w[j]
            if accum >= threshold:
                idx = j
                break

        k = rand_partner_o[o]
        if k == idx:
            continue

        phi   = rand_phi_o[o] * 2.0 - 1.0
        share = abs(phi)

        # generate candidate
        Xc = X[idx].copy()
        Qc = Q[idx].copy()

        # bit‐flip
        for m in range(M):
            for t in range(T):
                if X[idx, m, t] != X[k, m, t] and rand_flip_o[o, m, t] <(share):
                    Xc[m, t] = 1.0 - Xc[m, t]

        # continuous update
        for m in range(M):
            for t in range(T):
                
                val = Q[idx, m, t] + phi * (Q[idx, m, t] - Q[k, m, t])
                if val < 0.0:
                    val = 0.0
                elif val > 1.0:
                    val = 1.0
                Qc[m, t] = val

        # evaluate candidate
        plan_c = decode_prod_plan(Xc, Qc, demand)
        fit_c  = compute_objective(
            plan_c, Xc.astype(np.float64).T,
            setup_costs, production_costs,
            production_times, setup_times,
            capacities, inventory_costs, demand
        )

        #  compare & update
        if (fit_c[0] < fitness[idx, 0]) or (fit_c[0] == fitness[idx, 0] and fit_c[1] < fitness[idx, 1]):

            X[idx]          = Xc
            Q[idx]          = Qc
            fitness[idx, 0] = fit_c[0]
            fitness[idx, 1] = fit_c[1]
            trial[idx]      = 0
        else:
            trial[idx] += 1

    # 3. Scout bee phase (on‐demand)
    for i in range(SN):
        if i == best_idx:          # protect only the true best slot
                continue
        if trial[i] > limit:
            for m in range(M):
                for t in range(T):
                    X[i, m, t] = 1.0 if np.random.rand() < 0.5 else 0.0
                    Q[i, m, t] = np.random.rand()
            trial[i] = 0
            # re-evaluate fully
            plan_i = decode_prod_plan(X[i], Q[i], demand)
            fit_i  = compute_objective(
                plan_i, X[i].astype(np.float64).T,
                setup_costs, production_costs,
                production_times, setup_times,
                capacities, inventory_costs, demand
            )
            fitness[i, 0] = fit_i[0]
            fitness[i, 1] = fit_i[1]

    # 4. Update global bests
    for i in range(SN):
        if (fitness[i,0]<gbest_val[0,0]) or (fitness[i,0]==gbest_val[0,0] and fitness[i,1]<gbest_val[0,1]):
            
            gbest_val[0] = fitness[i].copy()
            gbest_X[:]   = X[i].copy()
            gbest_Q[:]   = Q[i].copy()
            best_idx     = i

    return (
        X, Q, trial, fitness,
        gbest_X, gbest_Q, gbest_val, best_idx
    )

In [3]:
class ABCAlgorithm:
    def __init__(
        self, n_bees, config, limit, K_onlookers):
        
        # Problem config
        self.cfg = config()
        self.SN = n_bees
        self.M = self.cfg.M
        self.T = self.cfg.T

        # Scout threshold
        self.limit = limit
        # number of onlooker 
        self.K = K_onlookers 

        # Initialize solution arrays
        self.X = np.random.randint(0, 2, size=(self.SN, self.M, self.T)).astype(np.float64)
        self.Q = np.random.rand(self.SN, self.M, self.T)

        # Trial counters
        self.trial = np.zeros(self.SN, dtype=np.int32)

        # Evaluate initial fitness
        self.fitness = np.full((self.SN, 2), np.inf)
        for i in range(self.SN):
            plan = decode_prod_plan(self.X[i], self.Q[i], self.cfg.demand)
            fit = compute_objective(
                plan, self.X[i].T,
                self.cfg.setup_costs, self.cfg.production_costs,
                self.cfg.production_times, self.cfg.setup_times,
                self.cfg.capacities, self.cfg.inventory_costs,
                self.cfg.demand
            )
            self.fitness[i, 0] = fit[0]
            self.fitness[i, 1] = fit[1]


        # Global best iniitalization
        # find minimal constraint violations
        min_viols = np.min(self.fitness[:, 0])
        # indices with minimal violations
        candidates = np.where(self.fitness[:, 0] == min_viols)[0]
        # among them, pick the one with minimal objective cost
        obj_vals = self.fitness[candidates, 1]
        self.best_idx = candidates[np.argmin(obj_vals)]
        # set global best value, X, and Q
        self.gbest_val = self.fitness[self.best_idx:self.best_idx+1].copy()
        self.gbest_X = self.X[self.best_idx].copy()
        self.gbest_Q = self.Q[self.best_idx].copy()   
        
        
    def step(self):
        # pre-generate random values
        self.rand_partner_e = np.random.randint(0,self.SN,self.SN)
        self.rand_phi_e     = np.random.rand(self.SN)*2-1
        self.rand_flip_e    = np.random.rand(self.SN,self.M,self.T)
        self.rand_partner_o = np.random.randint(0,self.SN,self.K)
        self.rand_phi_o     = np.random.rand(self.K)
        self.rand_flip_o    = np.random.rand(self.K,self.M,self.T)
    
        (self.X, self.Q, self.trial, self.fitness,
         self.gbest_X, self.gbest_Q, self.gbest_val, self.best_idx
        ) = iterate_abc(
            self.X, self.Q, self.trial, self.fitness,
            self.gbest_X, self.gbest_Q, self.gbest_val,
            self.cfg.demand, self.cfg.production_times, self.cfg.setup_times,
            self.cfg.capacities, self.cfg.production_costs, self.cfg.setup_costs,
            self.cfg.inventory_costs, self.limit, self.K,
            self.rand_partner_e, self.rand_phi_e, self.rand_flip_e,
            self.rand_partner_o, self.rand_phi_o, self.rand_flip_o, self.best_idx
        )
    
    def optimize(self, n_iter):
        for _ in range(n_iter):
            self.step()
        return decode_prod_plan(self.gbest_X, self.gbest_Q, self.cfg.demand), self.gbest_val
    


In [4]:

results = []
bad_results = []
for seed in range(1):
    np.random.seed(seed)
    # instantiate swarm
    start = time.perf_counter()
    abc = ABCAlgorithm(
        n_bees=100,
        config=CLSP2,
        limit=100,
        K_onlookers= 100
    )
    abc.optimize(10000)
    end = time.perf_counter()
    print(f"Elapsed time: {end - start:.4f} seconds")
    # retrieve best plan and value
    plan = decode_prod_plan(abc.gbest_X, abc.gbest_Q, abc.cfg.demand)
    best_val = abc.gbest_val
    end = time.perf_counter()

    if best_val[0, 0] == 0.0:
        results.append(best_val[0, 1])
    else:
        bad_results.append((plan, best_val, abc.gbest_Q, abc.gbest_X))

print(results)
print('median', np.median(results))
print('infeasible', len(bad_results))


Elapsed time: 14.5101 seconds
[6821.0]
median 6821.0
infeasible 0
