<center>
    
 # **ACIT 4610 Mandatory 2**
 
 ## **Multi-Objective VRP: NSGA-II vs SPEA2**
 
 ## **Group nr: 4**

</center>


## **1. Data Ingestion and Setup**

- Import libraries and dependencies
- Load instance(s)
- Build distance matrix and basic helpers


In [16]:
import math, random, time
from typing import List, Tuple, Dict


import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import vrplib


# --- Reproducibility
RNG = random.Random(42)
np.random.seed(42)


In [17]:
def load_instance(path: str):
    inst = vrplib.read_instance(path)
    capacity: int = inst['capacity']
    demands = inst['demand']
    coords = inst["node_coord"]
    depot_ids = inst['depot']

    # Handle coords as dict or ndarray
    if isinstance(coords, dict):
        ordered_ids = list(sorted(coords.keys()))  # e.g., [1,2,...,n]
        coords_arr = np.array([coords[i] for i in ordered_ids], dtype=float)
    else:  # assume it's already an ndarray (shape (n,2))
        coords_arr = np.array(coords, dtype=float)
        ordered_ids = list(range(1, len(coords_arr)+1))

    demand_arr = np.array(demands, dtype=int)

    # depot index in 0-based numpy arrays
    depot_idx = depot_ids[0] - 1  # node_id=1 → index=0

    return capacity, demand_arr, coords_arr, depot_idx, ordered_ids





def distance_matrix(coords_arr: np.ndarray) -> np.ndarray:
    n = coords_arr.shape[0]
    # Efficient vectorized build
    diffs = coords_arr[:, None, :] - coords_arr[None, :, :]
    return np.sqrt((diffs**2).sum(axis=2))
INST_SMALL = "A-n32-k5.vrp"
INST_MED = "B-n78-k10.vrp"
INST_LARGE = "X-n101-k25.vrp"

cap_s, dem_s, XY_s, dep_s, ids_s = load_instance(INST_SMALL)
cap_m, dem_m, XY_m, dep_m, ids_m = load_instance(INST_MED)
cap_l, dem_l, XY_l, dep_l, ids_l = load_instance(INST_LARGE)


D_s = distance_matrix(XY_s)
D_m = distance_matrix(XY_m)
D_l = distance_matrix(XY_l)


## **2. Representation & helpers**

# We use a giant-tour permutation of customers (exclude depot). A split procedure
# constructs feasible routes under capacity.


In [18]:
class VRPInstance:
    def __init__(self, cap, demands, D, depot=0):
        self.cap = cap
        self.demands = demands
        self.D = D # distance matrix
        self.depot = depot

    def customers(self) -> np.ndarray:# return all customer indices (excluding depot)
        n = len(self.demands)
        return np.array([i for i in range(n) if i != self.depot], dtype=int)



def split_routes(tour: np.ndarray, vrp: VRPInstance) -> List[List[int]]: 
   # filling vehicles up to capacity
    routes: List[List[int]] = []
    cur, load = [], 0 
    for node in tour: 
        d = int(vrp.demands[node]) 
        if load + d <= vrp.cap:
            cur.append(node)
            load += d
        else:
            if cur:
                routes.append(cur)
                cur, load = [node], d
            if cur:
                routes.append(cur)
    return routes




def route_distance(route: List[int], vrp: VRPInstance) -> float:
    if not route:
        return 0.0
    dep = vrp.depot
    dist = vrp.D[dep, route[0]]
    for i in range(len(route) - 1):
        dist += vrp.D[route[i], route[i+1]]
    dist += vrp.D[route[-1], dep]
    return float(dist)




def fitness(tour: np.ndarray, vrp: VRPInstance, penalty: float = 1e6) -> Tuple[float, float]: # calculate total distance and load stddev
    routes = split_routes(tour, vrp)
    loads = [int(vrp.demands[r].sum()) for r in routes]
    dists = [route_distance(r, vrp) for r in routes]

    total_dist = float(np.sum(dists))
    load_std = float(np.std(loads)) if len(loads) > 1 else 0.0

    over = sum(max(0, L - vrp.cap) for L in loads)
    if over > 0:
        total_dist += penalty * over

    return (total_dist, load_std)  # tuple, not numpy array




## **3.Genetic operators**



In [19]:
def init_population(vrp: VRPInstance, pop_size: int) -> List[np.ndarray]:
    base = vrp.customers().copy()
    pop = []
    for _ in range(pop_size):
        RNG.shuffle(base)
        pop.append(base.copy())
    return pop


def order_crossover(p1: np.ndarray, p2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    n = len(p1)
    a, b = sorted(RNG.sample(range(n), 2))
    def ox(pa, pb):
        child = np.full(n, -1, dtype=int)
        child[a:b+1] = pa[a:b+1]
        fill = [x for x in pb if x not in child]
        idxs = [i for i in range(n) if child[i] == -1]
        for i, v in zip(idxs, fill):
            child[i] = v
        return child
    return ox(p1, p2), ox(p2, p1)


def swap_mutation(ind: np.ndarray, pm: float = 0.1) -> np.ndarray:
    ind = ind.copy()
    n = len(ind)
    for i in range(n):
        if RNG.random() < pm:
            j = RNG.randrange(n)
            ind[i], ind[j] = ind[j], ind[i]
    return ind

## **4. Non-dominated sorting & utilities**


In [20]:
def dominates(a: Tuple[float, float], b: Tuple[float, float]) -> bool:
    return (a[0] <= b[0] and a[1] <= b[1]) and (a != b)


def fast_nondominated_sort(F: List[Tuple[float, float]]):
    n = len(F)
    S = [set() for _ in range(n)]
    n_dom = [0]*n
    fronts = [[]]
    for p in range(n):
        for q in range(n):
            if dominates(F[p], F[q]):
                S[p].add(q)
            elif dominates(F[q], F[p]):
                n_dom[p] += 1
        if n_dom[p] == 0:
            fronts[0].append(p)
    i = 0
    while fronts[i]:
        next_front = []
        for p in fronts[i]:
            for q in S[p]:
                n_dom[q] -= 1
                if n_dom[q] == 0:
                    next_front.append(q)
        i += 1
        fronts.append(next_front)
    fronts.pop() # remove last empty
    return fronts


def crowding_distance(front_idx: List[int], F: List[Tuple[float, float]]):
    if not front_idx:
        return {}
    m = len(F[0])
    d = {i: 0.0 for i in front_idx}
    for obj in range(m):
        sorted_idx = sorted(front_idx, key=lambda i: F[i][obj])
        d[sorted_idx[0]] = d[sorted_idx[-1]] = float('inf')
        vals = [F[i][obj] for i in sorted_idx]
        vmin, vmax = min(vals), max(vals)
        denom = (vmax - vmin) or 1.0
        for k in range(1, len(sorted_idx)-1):
            i_prev, i_next = sorted_idx[k-1], sorted_idx[k+1]
            d[sorted_idx[k]] += (F[i_next][obj] - F[i_prev][obj]) / denom
    return d

## **5. NSGA-II**



In [21]:
def nsga2(vrp: VRPInstance, pop_size=80, gens=200, pm=0.1) -> Tuple[List[np.ndarray], List[Tuple[float, float]]]:
    pop = init_population(vrp, pop_size)
    Fvals = [fitness(ind, vrp) for ind in pop]

    for _ in range(gens):
        # Selection by tournament on (rank, crowding)
        fronts = fast_nondominated_sort(Fvals)
        ranks = {}
        for r, fr in enumerate(fronts):
            for i in fr:
                ranks[i] = r
        cd_cache = {}
        for fr in fronts:
            cd_cache.update(crowding_distance(fr, Fvals))

        def tournament():
            i, j = RNG.randrange(pop_size), RNG.randrange(pop_size)
            a = (ranks[i], -cd_cache.get(i, 0.0))
            b = (ranks[j], -cd_cache.get(j, 0.0))
            return pop[i] if a < b else pop[j]

        offspring = []
        while len(offspring) < pop_size:
            p1, p2 = tournament(), tournament()
            c1, c2 = order_crossover(p1, p2)
            offspring.extend([swap_mutation(c1, pm), swap_mutation(c2, pm)])
        offspring = offspring[:pop_size]
        off_F = [fitness(ind, vrp) for ind in offspring]

        # Combine and select next generation by NSGA-II environmental selection
        all_pop = pop + offspring
        all_F = Fvals + off_F
        fronts = fast_nondominated_sort(all_F)
        new_pop, new_F = [], []
        for fr in fronts:
            if len(new_pop) + len(fr) <= pop_size:
                new_pop += [all_pop[i] for i in fr]
                new_F += [all_F[i] for i in fr]
            else:
                cd = crowding_distance(fr, all_F)
                fr_sorted = sorted(fr, key=lambda i: cd[i], reverse=True)
                take = pop_size - len(new_pop)
                sel = fr_sorted[:take]
                new_pop += [all_pop[i] for i in sel]
                new_F += [all_F[i] for i in sel]
                break
        pop, Fvals = new_pop, new_F

    return pop, Fvals  # last population and objective values

## **6. SPEA2**



In [22]:
def spea2(vrp: VRPInstance, pop_size=80, archive_size=80, gens=200, pm=0.1, k_nn=1):
    def fitness(vals):
        n = len(vals)
        S = np.zeros(n)  # strength
        R = np.zeros(n)  # raw fitness
        for i in range(n):
            for j in range(n):
                if dominates(vals[i], vals[j]):
                    S[i] += 1
        for i in range(n):
            R[i] = sum(S[j] for j in range(n) if dominates(vals[j], vals[i]))
        # density via k-th NN in objective space
        M = np.zeros(n)
        dists = np.zeros((n, n))
        for i in range(n):
            for j in range(i+1, n):
                d = math.dist(vals[i], vals[j])
                dists[i, j] = dists[j, i] = d
        for i in range(n):
            neigh = np.sort(dists[i][dists[i] > 0])
            sigma = neigh[min(k_nn-1, len(neigh)-1)] if len(neigh) else 1.0
            M[i] = 1.0 / (sigma + 2.0)
        return R + M

    pop = init_population(vrp, pop_size)
    Fp = [fitness(ind, vrp) for ind in pop]
    archive, Fa = [], []

    for _ in range(gens):
        # Merge pop + archive for fitness assignment
        union = (archive + pop)
        Fu = (Fa + Fp)
        fit = fitness(Fu)
        # Environmental selection -> archive
        idx_sorted = np.argsort(fit)
        new_archive = [union[i] for i in idx_sorted[:archive_size]]
        new_Fa = [Fu[i] for i in idx_sorted[:archive_size]]
        # If not enough nondominants, truncation by k-NN distance implicitly handled above
        archive, Fa = new_archive, new_Fa

        # Mating selection from archive (binary tournament on fitness)
        def tour_from_archive():
            i, j = RNG.randrange(len(archive)), RNG.randrange(len(archive))
            a = Fa[i]
            b = Fa[j]
            # smaller fitness is better in SPEA2
            return archive[i] if (a[0]+a[1]) < (b[0]+b[1]) else archive[j]

        offspring = []
        while len(offspring) < pop_size:
            p1, p2 = tour_from_archive(), tour_from_archive()
            c1, c2 = order_crossover(p1, p2)
            offspring.extend([swap_mutation(c1, pm), swap_mutation(c2, pm)])
        pop = offspring[:pop_size]
        Fp = [fitness(ind, vrp) for ind in pop]

    return archive, Fa  # final archive and objective values

## **7. Metrics (HV, IGD) & utilities**

In [23]:
def pareto_front(F):
    nd = []
    for i, a in enumerate(F):
        a = tuple(a)  # make sure it's a tuple
        if not any(dominates(tuple(b), a) for j, b in enumerate(F) if j != i):
            nd.append(a)
    return np.array(nd, dtype=float)



def hypervolume(F: np.ndarray, ref: Tuple[float, float]) -> float:
    """2D HV for minimization. Sort by obj1 ascending; sum rectangles to ref."""
    if len(F) == 0:
        return 0.0
    P = F[np.argsort(F[:, 0])]
    hv, prev_f1 = 0.0, P[0, 0]
    prev_f2 = P[0, 1]
    hv += (ref[0] - P[0, 0]) * (ref[1] - P[0, 1])
    for i in range(1, len(P)):
        f1, f2 = P[i]
        hv += (ref[0] - f1) * max(0.0, (prev_f2 - f2))
        prev_f2 = min(prev_f2, f2)
    return max(hv, 0.0)


def igd(F_approx: np.ndarray, F_ref: np.ndarray) -> float:
    if len(F_ref) == 0 or len(F_approx) == 0:
        return float('inf')
    d = []
    for r in F_ref:
        d.append(np.min(np.linalg.norm(F_approx - r, axis=1)))
    return float(np.mean(d))

## **8. Experiment**

In [33]:
def run_experiment(vrp: VRPInstance, algo: str, seed: int = 0, **kwargs):
    RNG.seed(seed); np.random.seed(seed)
    t0 = time.time()

    if algo.lower() == "nsga2":
        pop, F = nsga2(vrp, **kwargs)
    elif algo.lower() == "spea2":
        pop, F = spea2(vrp, **kwargs)
    else:
        raise ValueError("algo must be 'nsga2' or 'spea2'")

    dt = time.time() - t0
    F = np.array(F, dtype=float)
    PF = pareto_front(F)

    worst = np.max(F, axis=0)
    ref = tuple(worst * 1.1)
    HV = hypervolume(PF, ref)

    return {
        "F": F,
        "PF": PF,
        "HV": HV,
        "time": dt,
        "ref": ref,
        "pop": pop,
        "customers": vrp.customers(),  # you’ll need to expose coordinates in VRPInstance
        "depot": vrp.depot,
        "n_vehicles": 5 if vrp.cap == 90 else (10 if vrp.cap == 200 else 25),  # known from instances
    }


## **9. Visualization**

In [25]:
def plot_fronts(res_nsga2, res_spea2, title="Pareto fronts (distance vs. load std)"):
    plt.figure(figsize=(6,4))
    plt.scatter(res_nsga2["F"][:,0], res_nsga2["F"][:,1], label="NSGA‑II (all)", alpha=0.35)
    plt.scatter(res_spea2["F"][:,0], res_spea2["F"][:,1], label="SPEA2 (all)", alpha=0.35)
    plt.scatter(res_nsga2["PF"][:,0], res_nsga2["PF"][:,1], label="NSGA‑II PF", marker="x")
    plt.scatter(res_spea2["PF"][:,0], res_spea2["PF"][:,1], label="SPEA2 PF", marker="^")
    plt.xlabel("Total distance (min)")
    plt.ylabel("Route load std (min)")
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()

# i want a plot function that shows the route in a map drawing lines on paths taken by vehicles
def plot_routes(vrp: VRPInstance, tour: np.ndarray, title="Vehicle Routes"):
    routes = split_routes(tour, vrp)
    plt.figure(figsize=(6,6))
    plt.scatter(vrp.D[:,0], vrp.D[:,1], c='blue', label='Customers')
    plt.scatter(vrp.D[vrp.depot,0], vrp.D[vrp.depot,1], c='red', label='Depot', marker='s')
    
    colors = plt.cm.get_cmap('tab20', len(routes))
    for i, route in enumerate(routes):
        route_coords = np.array([vrp.D[vrp.depot]] + [vrp.D[node] for node in route] + [vrp.D[vrp.depot]])
        plt.plot(route_coords[:,0], route_coords[:,1], color=colors(i), label=f'Route {i+1}')
    
    plt.xlabel("X Coordinate")
    plt.ylabel("Y Coordinate")
    plt.title(title)
    plt.legend()
    plt.grid()
    plt.show()


## **10. Example usage**

In [34]:
if __name__ == "__main__":
    vrp_small = VRPInstance(cap_s, dem_s, D_s, dep_s)

    # Quick demo runs; bump pop_size/gens for report-quality results
    res_n = run_experiment(vrp_small, "nsga2", seed=1, pop_size=60, gens=100, pm=0.08)
    res_s = run_experiment(vrp_small, "spea2", seed=1, pop_size=60, gens=100, pm=0.08, archive_size=60)

    print({
        "NSGA2": {"HV": round(res_n["HV"], 3), "time_s": round(res_n["time"], 2)},
        "SPEA2": {"HV": round(res_s["HV"], 3), "time_s": round(res_s["time"], 2)}
    })

    # IGD between the two approximations (symmetrized for a rough comparison)
    igd_n_s = 0.5 * (igd(res_n["PF"], res_s["PF"]) + igd(res_s["PF"], res_n["PF"]))
    print("Symmetrized IGD (NSGA‑II vs SPEA2):", round(igd_n_s, 3))

    plot_fronts(res_n, res_s, title="A‑n32‑k5: NSGA‑II vs SPEA2")
    plot_routes(vrp_small, res_n["pop"][0], title="NSGA-II Example Route")
    plot_routes(vrp_small, res_s["pop"][0], title="SPEA2 Example Route")

TypeError: spea2.<locals>.fitness() takes 1 positional argument but 2 were given