## GP

In [1]:
# Imports & utils 
from __future__ import annotations
import math, random, copy, statistics, time
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Any, Dict

def euclid(a: Tuple[float,float], b: Tuple[float,float]) -> float:
    dx, dy = a[0]-b[0], a[1]-b[1]
    return math.hypot(dx, dy)

def clamp(x, lo, hi): 
    return lo if x < lo else hi if x > hi else x

def pdiv(a, b, default=0.0):
    return a / b if abs(b) > 1e-12 else default

## Data models & constraints

In [None]:
# INPUT/CONSTRAINTS: K trucks, D drones, MT, MD, LD, r_i, TW [e_i, l_i], C1/C2, L_w
from dataclasses import dataclass, field
from typing import List, Tuple, Optional

@dataclass
class Request:
    rid: int
    xy: Tuple[float,float]
    demand: float
    r_time: float      #  r_i
    e: float           #  e_i
    l: float           #  l_i
    must_truck: bool   #  C1
    served: bool = False
    picked_time: Optional[float] = None
    depot_arrival_time: Optional[float] = None

@dataclass
class Vehicle:
    vid: int
    kind: str              # "truck" | "drone"
    cap_max: float         #  M_T hoặc M_D
    speed: float
    drone_radius: float = 0.0  # L_D
    load: float = 0.0
    pos: Tuple[float,float] = (0.0, 0.0)
    ready_time: float = 0.0
    queue: List[int] = field(default_factory=list)  # Q_k

    def is_drone(self) -> int:
        return 1 if self.kind == "drone" else 0

@dataclass
class SimConfig:
    # Quy mô & động học
    N: int = 60
    plane_size: float = 100.0
    horizon: float = 200.0
    tw_min: float = 10.0
    tw_max: float = 40.0
    d_min: float = 1.0
    d_max: float = 5.0
    # Phương tiện & ràng buộc tải/bán kính
    K_truck: int = 2
    D_drone: int = 3
    MT: float = 20.0
    MD: float = 3.0
    LD: float = 50.0
    v_truck: float = 1.0
    v_drone: float = 1.5
    # L_w: max waiting sau khi pick đến khi về depot
    Lw: float = 60.0
    seed: int = 42


## GP primitives & Dual-Tree (R,S)

In [None]:
import random
from dataclasses import dataclass
from typing import List, Dict, Tuple

GP_FUNS = [
    ("+", 2, lambda a,b: a+b),
    ("-", 2, lambda a,b: a-b),
    ("*", 2, lambda a,b: a*b),
    ("/", 2, lambda a,b: pdiv(a,b)),
    ("min", 2, lambda a,b: a if a<b else b),
    ("max", 2, lambda a,b: a if a>b else b),
    ("abs", 1, lambda a: abs(a)),
    ("neg", 1, lambda a: -a),
]

# TERMINALS (mở rộng): thêm tw_width_norm, dist_depot_norm
TERMINALS = [
    "q_ratio",
    "cap_slack_ratio",
    "demand_ratio",
    "dist_cur_req_norm",
    "dist_medQ_req_norm",
    "dist_depot_norm",      # NEW
    "tw_width_norm",        # NEW
    "is_drone",
    # sequencing extras:
    "t_travel_norm",
    "t_since_arrival_norm",
    "t_since_open_norm",
    "t_until_close_norm",
    "r_time_norm",
]

class Node:
    def __init__(self, sym: str, arity: int, func=None, term_key=None, children=None):
        self.sym = sym
        self.arity = arity
        self.func = func
        self.term_key = term_key
        self.children: List["Node"] = children if children else []

    def is_terminal(self):
        return self.arity == 0

    def eval(self, feats: Dict[str,float]) -> float:
        # GP EVAL: ánh xạ công thức cây → giá trị số trên vector đặc trưng (φ)
        if self.is_terminal():
            return feats.get(self.term_key, 0.0)
        if self.arity == 1:
            return self.func(self.children[0].eval(feats))
        elif self.arity == 2:
            return self.func(self.children[0].eval(feats), self.children[1].eval(feats))
        return 0.0

    def copy(self) -> "Node":
        return Node(self.sym, self.arity, self.func, self.term_key, [c.copy() for c in self.children])

    def all_nodes(self) -> List["Node"]:
        out = [self]
        for c in self.children:
            out.extend(c.all_nodes())
        return out

def rand_func_node() -> Node:
    sym, ar, fn = random.choice(GP_FUNS)
    return Node(sym, ar, fn, None, [])

def rand_term_node() -> Node:
    key = random.choice(TERMINALS)
    return Node(key, 0, None, key, [])

def grow_tree(max_depth: int) -> Node:
    if max_depth == 1:
        return rand_term_node()
    if random.random() < 0.5:
        return rand_term_node()
    f = rand_func_node()
    if f.arity == 1:
        f.children = [grow_tree(max_depth-1)]
    else:
        f.children = [grow_tree(max_depth-1), grow_tree(max_depth-1)]
    return f

def full_tree(max_depth: int) -> Node:
    if max_depth == 1:
        return rand_term_node()
    f = rand_func_node()
    if f.arity == 1:
        f.children = [full_tree(max_depth-1)]
    else:
        f.children = [full_tree(max_depth-1), full_tree(max_depth-1)]
    return f

def ramped_half_and_half(max_depth: int) -> Node:
    d = random.randint(2, max_depth)
    return full_tree(d) if random.random()<0.5 else grow_tree(d)

def pick_random_node(root: Node) -> Node:
    nodes = root.all_nodes()
    return random.choice(nodes)

def subtree_crossover(a: Node, b: Node, max_depth: int) -> Tuple[Node, Node]:
    ca, cb = a.copy(), b.copy()
    na = pick_random_node(ca)
    nb = pick_random_node(cb)
    na.sym, nb.sym = nb.sym, na.sym
    na.arity, nb.arity = nb.arity, na.arity
    na.func, nb.func = nb.func, na.func
    na.term_key, nb.term_key = nb.term_key, na.term_key
    na.children, nb.children = nb.children, na.children
    return ca, cb

def subtree_mutation(a: Node, max_depth: int, p=0.9) -> Node:
    ca = a.copy()
    n = pick_random_node(ca)
    repl = ramped_half_and_half(max_depth)
    n.sym, n.arity, n.func, n.term_key, n.children = repl.sym, repl.arity, repl.func, repl.term_key, repl.children
    return ca

@dataclass
class Individual:
    # DUAL-TREE POLICY: R(k,i) & S(k,i)
    route_tree: Node
    seq_tree: Node
    fitness: object = float("inf")  # float (weighted) hoặc tuple (lexicographic)

    def copy(self):
        return Individual(self.route_tree.copy(), self.seq_tree.copy(), self.fitness)

def tree_to_prefix(n: Node) -> str:
    if n.is_terminal():
        return n.term_key
    if n.arity == 1:
        return f"({n.sym} {tree_to_prefix(n.children[0])})"
    return f"({n.sym} {tree_to_prefix(n.children[0])} {tree_to_prefix(n.children[1])})"


## Simulator + R/S + constraints

In [None]:
import math, copy, statistics
from typing import Dict, Any, List, Tuple

def euclid(a, b):
    dx, dy = a[0]-b[0], a[1]-b[1]
    return math.hypot(dx, dy)

class Simulator:
    def __init__(self, cfg: SimConfig):
        self.cfg = cfg
        random.seed(cfg.seed)
        self.depot = (0.0, 0.0)
        self.requests: List[Request] = []
        self.vehicles: List[Vehicle] = []
        self._gen_requests()
        self._gen_vehicles()
        self.total_demand = sum(r.demand for r in self.requests)
        self.T_close = cfg.horizon

    # ---------- Build dataset ----------
    def _gen_requests(self):
        reqs = []
        for i in range(self.cfg.N):
            x = random.uniform(-self.cfg.plane_size, self.cfg.plane_size)
            y = random.uniform(-self.cfg.plane_size, self.cfg.plane_size)
            r_t = random.uniform(0, 0.8*self.cfg.horizon)
            tw_len = random.uniform(self.cfg.tw_min, self.cfg.tw_max)
            e = r_t
            l = min(self.cfg.horizon, e + tw_len)
            d = random.uniform(self.cfg.d_min, self.cfg.d_max)
            c1 = (d > self.cfg.MD) or (euclid((x,y), self.depot) > self.cfg.LD)
            reqs.append(Request(i, (x,y), d, r_t, e, l, must_truck=c1))
        reqs.sort(key=lambda r: r.r_time)
        self.requests = reqs

    def _gen_vehicles(self):
        vid = 0
        for _ in range(self.cfg.K_truck):
            self.vehicles.append(Vehicle(vid, "truck", self.cfg.MT, self.cfg.v_truck))
            vid += 1
        for _ in range(self.cfg.D_drone):
            self.vehicles.append(Vehicle(vid, "drone", self.cfg.MD, self.cfg.v_drone, drone_radius=self.cfg.LD))
            vid += 1

    # ---------- Augmentation helper (thu hẹp TW in-place) ----------
    def shrink_tw_inplace(self, sf: float):
        # [MAP] PROBLEM AUGMENTATION: l' = e + sf*(l - e), sf ∈ (0,1]
        for r in self.requests:
            r.l = r.e + sf * (r.l - r.e)

    # ---------- Feature builders ----------
    def _median_of_queue(self, veh: Vehicle) -> Tuple[float,float]:
        if not veh.queue:
            return self.depot
        xs = [self._req_map[q].xy[0] for q in veh.queue if q in self._req_map]
        ys = [self._req_map[q].xy[1] for q in veh.queue if q in self._req_map]
        if not xs:
            return self.depot
        return (statistics.median(xs), statistics.median(ys))

    def _common_feats(self, veh: Vehicle, req: Request, now: float, cur_pos=None) -> Dict[str,float]:
        pos = veh.pos if cur_pos is None else cur_pos
        dist = euclid(pos, req.xy)
        return dict(
            q_ratio=len(veh.queue)/max(1,len(self._req_list)),
            cap_slack_ratio=pdiv(max(veh.cap_max - veh.load, 0.0), max(1e-6, self.total_demand)),
            demand_ratio=req.demand/max(1e-6, self.total_demand),
            dist_cur_req_norm=dist/max(1.0, self.T_close),
            dist_medQ_req_norm=euclid(self._median_of_queue(veh), req.xy)/max(1.0, self.T_close),
            dist_depot_norm=euclid(self.depot, req.xy)/max(1.0, self.T_close),   # NEW
            tw_width_norm=(req.l - req.e)/max(1.0, self.T_close),               # NEW
            is_drone=float(veh.is_drone()),
            t_travel_norm=(dist/veh.speed)/max(1.0, self.T_close),
            t_since_arrival_norm=(now - req.r_time)/max(1.0, self.T_close),
            t_since_open_norm=(now - req.e)/max(1.0, self.T_close),
            t_until_close_norm=(req.l - now)/max(1.0, self.T_close),
            r_time_norm=(req.r_time)/max(1.0, self.T_close),
        )

    def routing_features(self, veh: Vehicle, req: Request, now: float) -> Dict[str,float]:
        return self._common_feats(veh, req, now)

    def sequencing_features(self, veh: Vehicle, req: Request, now: float, cur_pos=None) -> Dict[str,float]:
        return self._common_feats(veh, req, now, cur_pos=cur_pos)

    # ---------- Rule evaluation ----------
    def R(self, ind: "Individual", veh: Vehicle, req: Request, now: float) -> float:
        return ind.route_tree.eval(self.routing_features(veh, req, now))

    def S(self, ind: "Individual", veh: Vehicle, req: Request, now: float, cur_pos=None) -> float:
        return ind.seq_tree.eval(self.sequencing_features(veh, req, now, cur_pos=cur_pos))

    # ---------- Feasibility ----------
    def _veh_req_feasible_typecap(self, veh: Vehicle, req: Request, add_load: float = 0.0) -> bool:
        # TYPE/CAPACITY/RADIUS constraints
        if req.must_truck and veh.kind != "truck":
            return False
        if veh.kind == "drone":
            if req.demand + add_load > veh.cap_max:
                return False
            if euclid(req.xy, self.depot) > veh.drone_radius:
                return False
        else:
            if req.demand + add_load > veh.cap_max:
                return False
        return True

    def _lw_ok_for_all(self, onboard: List[Tuple[int,float]], t_back: float) -> bool:
        # L_w constraint cho TẤT CẢ gói đang onboard
        for rid, t_pick in onboard:
            if (t_back - t_pick) > self.cfg.Lw + 1e-9:
                return False
        return True

    # ---------- Core simulate with multi-pick ----------
    def simulate(self, ind: "Individual", lambda_w=0.5, objective="weighted", verbose=False) -> Dict[str, Any]:
        # deepcopy state
        reqs_sorted: List[Request] = copy.deepcopy(self.requests)
        vehs: List[Vehicle] = copy.deepcopy(self.vehicles)
        self._req_list = reqs_sorted
        self._req_map: Dict[int, Request] = {r.rid: r for r in reqs_sorted}

        depot = self.depot
        horizon = self.cfg.horizon
        now = 0.0
        appear_idx = 0
        served_cnt = 0

        while now <= horizon:
            next_req_t = reqs_sorted[appear_idx].r_time if appear_idx < len(reqs_sorted) else float("inf")
            next_veh_t = min(v.ready_time for v in vehs) if vehs else float("inf")
            next_t = min(next_req_t, next_veh_t)
            if next_t == float("inf"):
                break
            now = min(next_t, horizon)

            # (A) ARRIVALS: gán i → Q_k' theo argmax R(k,i)
            while appear_idx < len(reqs_sorted) and reqs_sorted[appear_idx].r_time <= now + 1e-9:
                r = reqs_sorted[appear_idx]
                best_k, best_score = None, -1e18
                for v in vehs:
                    if not self._veh_req_feasible_typecap(v, r, add_load=0.0):
                        continue
                    sR = self.R(ind, v, r, now)
                    if sR > best_score:
                        best_score, best_k = sR, v.vid
                if best_k is not None:
                    vehs[best_k].queue.append(r.rid)
                appear_idx += 1

            # (B) DISPATCH: mỗi xe sẵn sàng sẽ chạy 1 chuyến có thể nhặt N khách
            for v in vehs:
                if v.ready_time > now + 1e-12 or not v.queue:
                    continue

                cur_pos = v.pos
                cur_time = now
                cur_load = 0.0
                onboard: List[Tuple[int,float]] = []  # (rid, t_pick) của chuyến hiện tại

                while True:
                    # tìm i' = argmax S(k,i) thoả TW & L_w (dự báo quay về NGAY sau khi pick i')
                    choose_rid, choose_score = None, -1e18
                    for rid in list(v.queue):
                        r = self._req_map.get(rid)
                        if r is None or r.served:
                            continue
                        if not self._veh_req_feasible_typecap(v, r, add_load=cur_load):
                            continue

                        # thời gian đến i từ vị trí hiện tại
                        t_to_req = euclid(cur_pos, r.xy)/v.speed
                        t_arr = cur_time + t_to_req
                        if t_arr > r.l + 1e-9:
                            continue
                        t_pick = max(t_arr, r.e)

                        # nếu quay về ngay sau khi pick r:
                        t_to_depot = euclid(r.xy, depot)/v.speed
                        t_back_proj = t_pick + t_to_depot

                        # L_w phải thoả cho toàn bộ onboard + r
                        if not self._lw_ok_for_all(onboard + [(rid, t_pick)], t_back_proj):
                            continue

                        # điểm S(k,i) tại trạng thái hiện tại (dùng cur_pos/cur_time)
                        sS = self.S(ind, v, r, cur_time, cur_pos=cur_pos)
                        if sS > choose_score:
                            choose_score, choose_rid = sS, rid

                    if choose_rid is None:
                        break  # không còn khách nào thêm được → kết thúc ghép chuyến

                    # Thực hiện pick khách choose_rid
                    r = self._req_map[choose_rid]
                    t_to_req = euclid(cur_pos, r.xy)/v.speed
                    t_arr = cur_time + t_to_req
                    t_pick = max(t_arr, r.e)

                    # cập nhật trạng thái chuyến
                    cur_pos = r.xy
                    cur_time = t_pick
                    cur_load += r.demand
                    onboard.append((r.rid, t_pick))
                    r.served = True            # đánh dấu đã được pick
                    v.queue.remove(r.rid)      # bỏ khỏi queue để tránh chọn lại

                    # (vòng while sẽ xét thêm khách tiếp theo trên trạng thái mới)

                if not onboard:
                    # xe này chưa pick được ai lúc now; đợi sự kiện kế
                    continue

                # Quay về depot sau khi không thêm được khách nào nữa
                t_to_depot = euclid(cur_pos, depot)/v.speed
                t_back = cur_time + t_to_depot
                if not self._lw_ok_for_all(onboard, t_back):
                    # an toàn (theo logic incremental thì luôn OK), nhưng check lại để chắc
                    # nếu fail, roll back bằng cách chỉ giữ khách cuối cùng (bảo toàn ràng buộc)
                    last = onboard[-1:]
                    t_back_last = last[0][1] + euclid(self._req_map[last[0][0]].xy, depot)/v.speed
                    onboard = last
                    t_back = t_back_last

                # cập nhật trạng thái xe & yêu cầu
                v.pos = depot
                v.ready_time = t_back
                for rid, t_pick in onboard:
                    rr = self._req_map[rid]
                    rr.picked_time = t_pick
                    rr.depot_arrival_time = t_back
                served_cnt += len(onboard)
                v.load = 0.0  # reset sau khi về depot

        # ---------- Metrics ----------
        T_finish = max((v.ready_time for v in vehs), default=horizon)
        service_ratio = served_cnt / max(1, len(reqs_sorted))
        avg_d = sum(euclid(depot, r.xy) for r in reqs_sorted) / max(1, len(reqs_sorted))
        vfast = max((v.speed for v in vehs), default=1.0)
        T_ref = 1.0 + 2.0*avg_d / max(1e-9, vfast)
        f1 = T_finish / T_ref            # [MAP] f̄1
        f2 = 1.0 - service_ratio         # [MAP] f̄2

        if objective == "lexicographic":
            f = (f1, f2)                 # [MAP] ưu tiên makespan trước, rồi service
        else:
            f = lambda_w * f1 + (1.0 - lambda_w) * f2

        if verbose:
            msgf = f if isinstance(f, tuple) else f"{f:.4f}"
            print(f"Served {served_cnt}/{len(reqs_sorted)} | ratio={service_ratio:.3f} | T_finish={T_finish:.2f} | f={msgf}")
        return dict(f=f, T_finish=T_finish, service_ratio=service_ratio)


## GPHH training with augmentation

In [10]:
# [MAP] ALGORITHM 1: Init → Evaluate (multi-instance avg) → Variation → Elitism
# [MAP] AUGMENTATION: tạo nhiều Simulator với seed khác & TW shrink (sf ∈ [low, high])

from dataclasses import dataclass
from typing import List, Tuple, Any

@dataclass
class GPConfig:
    pop_size: int = 40
    intermed_size: int = 40
    max_depth: int = 5
    rm: float = 0.25
    rc: float = 0.65
    generations: int = 20
    lambda_w: float = 0.5
    tournament_k: int = 3
    seed: int = 999
    objective: str = "weighted"   # "weighted" | "lexicographic"

@dataclass
class AugConfig:
    num_instances: int = 5
    seed_base: int = 12345
    tw_sf_low: float = 0.6
    tw_sf_high: float = 1.0

def make_augmented_sims(base_cfg: SimConfig, aug: AugConfig) -> List[Simulator]:
    sims = []
    for i in range(aug.num_instances):
        cfg_i = copy.deepcopy(base_cfg)
        cfg_i.seed = aug.seed_base + i
        sim_i = Simulator(cfg_i)
        sf = random.uniform(aug.tw_sf_low, aug.tw_sf_high)
        sim_i.shrink_tw_inplace(sf)  # [MAP] TW shrink
        sims.append(sim_i)
    return sims

def tournament_select(pop: List["Individual"], k: int) -> "Individual":
    cand = random.sample(pop, k)
    cand.sort(key=lambda ind: ind.fitness)
    return cand[0]

def evaluate_population_multi(sims: List[Simulator], pop: List["Individual"], cfg: GPConfig):
    for ind in pop:
        # đánh giá trung bình trên nhiều instance (robust)
        f_list, t_list, sr_list = [], [], []
        for sim in sims:
            res = sim.simulate(ind, lambda_w=cfg.lambda_w, objective=cfg.objective, verbose=False)
            f_list.append(res["f"])
            t_list.append(res["T_finish"])
            sr_list.append(res["service_ratio"])
        # gộp theo objective
        if cfg.objective == "lexicographic":
            # trung bình từng thành phần rồi dùng tuple
            f1_avg = sum(x[0] for x in f_list)/len(f_list)
            f2_avg = sum(x[1] for x in f_list)/len(f_list)
            ind.fitness = (f1_avg, f2_avg)
        else:
            ind.fitness = sum(f_list)/len(f_list)
        ind._avg_T = sum(t_list)/len(t_list)
        ind._avg_SR = sum(sr_list)/len(sr_list)

def elitism(pop: List["Individual"], new_pop: List["Individual"], pop_size: int) -> List["Individual"]:
    pool = pop + new_pop
    pool.sort(key=lambda ind: ind.fitness)
    return [p.copy() for p in pool[:pop_size]]

def random_individual(max_depth: int) -> "Individual":
    return Individual(
        route_tree=ramped_half_and_half(max_depth),
        seq_tree=ramped_half_and_half(max_depth),
        fitness=float("inf")
    )

def train_with_aug(base_cfg: SimConfig, gpcfg: GPConfig, augcfg: AugConfig):
    random.seed(gpcfg.seed)
    sims = make_augmented_sims(base_cfg, augcfg)

    # init
    P = [random_individual(gpcfg.max_depth) for _ in range(gpcfg.pop_size)]
    evaluate_population_multi(sims, P, gpcfg)
    best = min(P, key=lambda x: x.fitness).copy()

    for gen in range(1, gpcfg.generations+1):
        O: List[Individual] = []
        while len(O) < gpcfg.intermed_size:
            u = random.random()
            if u < gpcfg.rc:
                pa = tournament_select(P, gpcfg.tournament_k)
                pb = tournament_select(P, gpcfg.tournament_k)
                c1r, c2r = subtree_crossover(pa.route_tree, pb.route_tree, gpcfg.max_depth)
                c1s, c2s = subtree_crossover(pa.seq_tree, pb.seq_tree, gpcfg.max_depth)
                O.append(Individual(c1r, c1s))
                if len(O) < gpcfg.intermed_size:
                    O.append(Individual(c2r, c2s))
            elif u < gpcfg.rc + gpcfg.rm:
                p = tournament_select(P, gpcfg.tournament_k)
                mr = subtree_mutation(p.route_tree, gpcfg.max_depth)
                ms = subtree_mutation(p.seq_tree, gpcfg.max_depth)
                O.append(Individual(mr, ms))
            else:
                p = tournament_select(P, gpcfg.tournament_k)
                O.append(p.copy())

        evaluate_population_multi(sims, O, gpcfg)
        P = elitism(P, O, gpcfg.pop_size)
        cur_best = min(P, key=lambda x: x.fitness)
        if cur_best.fitness < best.fitness:
            best = cur_best.copy()
        msgf = best.fitness if isinstance(best.fitness, tuple) else f"{best.fitness:.4f}"
        print(f"[Gen {gen:02d}] best f={msgf} | avg T={getattr(best,'_avg_T',0):.2f} | avg SR={getattr(best,'_avg_SR',0):.3f}")

    # báo cáo trên 2 thứ: (1) một sim nguyên gốc (không shrink), (2) trung bình aug sims
    sim_origin = Simulator(base_cfg)  # nguyên gốc
    res_origin = sim_origin.simulate(best, lambda_w=gpcfg.lambda_w, objective=gpcfg.objective, verbose=True)

    # trung bình trên aug sims
    f_list, t_list, sr_list = [], [], []
    for sim in sims:
        res = sim.simulate(best, lambda_w=gpcfg.lambda_w, objective=gpcfg.objective, verbose=False)
        f_list.append(res["f"])
        t_list.append(res["T_finish"])
        sr_list.append(res["service_ratio"])
    if gpcfg.objective == "lexicographic":
        f1_avg = sum(x[0] for x in f_list)/len(f_list)
        f2_avg = sum(x[1] for x in f_list)/len(f_list)
        f_avg = (f1_avg, f2_avg)
    else:
        f_avg = sum(f_list)/len(f_list)

    summary = dict(
        origin=res_origin,
        aug_avg=dict(f=f_avg, T_finish=sum(t_list)/len(t_list), service_ratio=sum(sr_list)/len(sr_list))
    )
    return best, summary


In [6]:
# Cell 6 (Extended) — Quick configs + augmentation & objective

simcfg = SimConfig(
    N=60, plane_size=80.0, horizon=200.0,
    K_truck=2, D_drone=3,
    MT=20.0, MD=3.0, LD=50.0,
    v_truck=1.0, v_drone=1.6,
    Lw=60.0, seed=2025
)

gpcfg = GPConfig(
    pop_size=40, intermed_size=40,
    max_depth=5, rm=0.25, rc=0.65,
    generations=20, lambda_w=0.5,
    tournament_k=3, seed=2025,
    objective="weighted"   # đổi sang "lexicographic" nếu muốn
)

augcfg = AugConfig(
    num_instances=6,
    seed_base=777,
    tw_sf_low=0.65,
    tw_sf_high=1.0
)

print("Configs ready (simcfg, gpcfg, augcfg).")


Configs ready (simcfg, gpcfg, augcfg).


In [7]:
# Cell 7 (Extended) — Train GP with augmentation + report

import time
t0 = time.time()
best, summary = train_with_aug(simcfg, gpcfg, augcfg)
dt = time.time() - t0

print(f"\nDone in {dt:.1f}s")
print("\n== Origin (no shrink) ==")
print(summary["origin"])
print("\n== Augmented average over sims ==")
print(summary["aug_avg"])

KeyboardInterrupt: 

## Inspect best trees

In [None]:
# Cell 8 (Extended) — Inspect best GP trees (R & S)

print("Best ROUTING tree (R):")
print(tree_to_prefix(best.route_tree))
print("\nBest SEQUENCING tree (S):")
print(tree_to_prefix(best.seq_tree))

# chạy lại 1 lần trên sim gốc để xem log
sim_origin = Simulator(simcfg)
_ = sim_origin.simulate(best, lambda_w=gpcfg.lambda_w, objective=gpcfg.objective, verbose=True)


In [None]:
import matplotlib.pyplot as plt

# Demo dummy (xóa đoạn giả lập này nếu bạn đã có hist thực):
best_scalar = [1.2, 1.12, 1.05, 1.03, 1.01, 0.99]  # TODO: thay bằng lịch sử thực

plt.figure()
plt.plot(best_scalar, marker='o')
plt.xlabel("Generation")
plt.ylabel("Best fitness (scalar)")
plt.title("Evolution of Best Fitness")
plt.grid(True)
plt.show()


## tiny test

In [None]:
import math, copy, random
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict, Any

USE_TRAINING = False      # False: dùng policy thủ công (không train). True: train siêu nhẹ.
SAVE_FIG = False          # True: lưu PNG cạnh notebook.
CASES_TO_RUN = ["mini1", "mini2", "mini3"]  # chọn case để vẽ

def make_mini_cases(simcfg) -> Dict[str, List[Request]]:
    """Ba bộ ca nhỏ, trộn C1/C2 cho truck/drone cùng chạy; TW rộng rãi; r_time sớm."""
    depot = (0.0, 0.0)
    def far(p): return math.hypot(p[0]-depot[0], p[1]-depot[1])

    def adj(rid, xy, demand, r_time, open_offset, width, force_truck):
        e = r_time + open_offset
        l = min(simcfg.horizon, e + width)
        must = force_truck or (demand > simcfg.MD) or (far(xy) > simcfg.LD)
        return Request(rid, xy, demand, r_time, e, l, must)

    # Case 1: 5 điểm, 1 truck + 1 drone giải ngon
    mini1 = [
        adj(0, (18, 22), 2.0,  0.0, 0.0, 60.0, False),  # drone OK
        adj(1, (55,  8), 4.2,  0.0, 5.0, 80.0,  True),  # truck
        adj(2, (-24,15), 1.0,  0.0, 0.0, 70.0, False),  # drone OK
        adj(3, (62, 65), 2.0, 10.0, 0.0, 90.0,  True),  # truck (xa>LD)
        adj(4, (-38,-18),2.0,  0.0, 0.0, 70.0, False),  # drone OK
    ]

    # Case 2: 6 điểm, một số tới muộn hơn
    mini2 = [
        adj(0, (20,  30), 2.0,  0.0, 0.0, 60.0, False),
        adj(1, (60,  10), 4.5,  5.0, 5.0, 90.0, True ),
        adj(2, (-30, 20), 1.0,  0.0, 5.0,  70.0, False),
        adj(3, (70,  70), 2.0, 20.0, 0.0, 110.0, True ),
        adj(4, (-45,-20), 2.0,  0.0, 0.0, 60.0, False),
        adj(5, ( 10,-55), 1.5, 10.0, 0.0, 100.0, True ),
    ]

    # Case 3: 7 điểm, drone làm nhiều điểm gần, truck lấy các điểm xa/đòi tải
    mini3 = [
        adj(0,  (12,  14), 1.2,  0.0, 0.0, 60.0, False),
        adj(1,  (22,  16), 1.0,  0.0, 0.0, 60.0, False),
        adj(2,  (-18, 10), 1.8,  0.0, 0.0, 60.0, False),
        adj(3,  (58,  12), 4.0,  0.0, 5.0,  90.0, True ),
        adj(4,  (64,  62), 2.0, 10.0, 0.0, 100.0,True ),
        adj(5,  (-35,-16), 2.0,  0.0, 0.0, 70.0, False),
        adj(6,  (8,  -50), 1.5,  0.0, 0.0, 90.0, True ),
    ]

    return {"mini1": mini1, "mini2": mini2, "mini3": mini3}

# ========= 2) Tracer (mô phỏng có ghi vệt để vẽ) =========
class MiniTraceSimulator(Simulator):
    def __init__(self, cfg: SimConfig, requests: List[Request]):
        super().__init__(cfg)
        self.requests = copy.deepcopy(requests)
        self.requests.sort(key=lambda r: r.r_time)
        self.total_demand = sum(r.demand for r in self.requests)
        self.segments: Dict[int, List[Tuple[float,float,float,float,int,str,str]]] = {}

    def _seg(self, vid: int, x1: float, y1: float, x2: float, y2: float, rid: int, phase: str, kind: str):
        self.segments.setdefault(vid, []).append((x1,y1,x2,y2,rid,phase,kind))

    def simulate_trace(self, ind: "Individual", lambda_w=0.5, objective="weighted") -> Dict[str, Any]:
        reqs_sorted: List[Request] = copy.deepcopy(self.requests)
        vehs: List[Vehicle] = copy.deepcopy(self.vehicles)
        self._req_list = reqs_sorted
        self._req_map: Dict[int, Request] = {r.rid: r for r in reqs_sorted}
        self.segments.clear()

        depot = self.depot
        horizon = self.cfg.horizon
        now = 0.0
        appear_idx = 0
        served_cnt = 0

        def euclid(a, b):
            dx, dy = a[0]-b[0], a[1]-b[1]
            return math.hypot(dx, dy)

        while now <= horizon:
            next_req_t = reqs_sorted[appear_idx].r_time if appear_idx < len(reqs_sorted) else float("inf")
            next_veh_t = min(v.ready_time for v in vehs) if vehs else float("inf")
            next_t = min(next_req_t, next_veh_t)
            if next_t == float("inf"): break
            now = min(next_t, horizon)

            # Arrivals → argmax R
            while appear_idx < len(reqs_sorted) and reqs_sorted[appear_idx].r_time <= now + 1e-9:
                r = reqs_sorted[appear_idx]
                best_k, best_score = None, -1e18
                for v in vehs:
                    if not self._veh_req_feasible_typecap(v, r): continue
                    sR = self.R(ind, v, r, now)
                    if sR > best_score:
                        best_score, best_k = sR, v.vid
                if best_k is not None: vehs[best_k].queue.append(r.rid)
                appear_idx += 1

            # Dispatch multi-pick (thêm khách liên tiếp nếu còn hợp lệ)
            for v in vehs:
                if v.ready_time > now + 1e-12 or not v.queue: continue

                cur_pos, cur_time, cur_load = v.pos, now, 0.0
                onboard: List[Tuple[int,float]] = []

                while True:
                    choose_rid, choose_score = None, -1e18
                    for rid in list(v.queue):
                        r = self._req_map.get(rid)
                        if r is None or r.served: continue
                        if not self._veh_req_feasible_typecap(v, r, add_load=cur_load): continue

                        t_to_req = euclid(cur_pos, r.xy)/v.speed
                        t_arr = cur_time + t_to_req
                        if t_arr > r.l + 1e-9: continue
                        t_pick = max(t_arr, r.e)

                        t_to_depot = euclid(r.xy, depot)/v.speed
                        t_back_proj = t_pick + t_to_depot
                        # L_w cho toàn bộ onboard + r
                        ok = True
                        for rrid, tp in onboard + [(rid, t_pick)]:
                            if (t_back_proj - tp) > self.cfg.Lw + 1e-9: ok=False; break
                        if not ok: continue

                        sS = self.S(ind, v, r, cur_time, cur_pos=cur_pos)
                        if sS > choose_score:
                            choose_score, choose_rid = sS, rid

                    if choose_rid is None: break

                    r = self._req_map[choose_rid]
                    # vẽ tới khách
                    self._seg(v.vid, cur_pos[0],cur_pos[1], r.xy[0],r.xy[1], r.rid, "to_customer", v.kind)
                    # cập nhật trạng thái
                    t_to_req = euclid(cur_pos, r.xy)/v.speed
                    t_arr = cur_time + t_to_req
                    t_pick = max(t_arr, r.e)
                    cur_pos, cur_time = r.xy, t_pick
                    cur_load += r.demand
                    onboard.append((r.rid, t_pick))
                    r.served = True
                    v.queue.remove(r.rid)

                if not onboard: continue
                # về depot
                t_to_depot = euclid(cur_pos, depot)/v.speed
                t_back = cur_time + t_to_depot
                self._seg(v.vid, cur_pos[0],cur_pos[1], depot[0],depot[1], onboard[-1][0], "to_depot", v.kind)
                v.pos, v.ready_time = depot, t_back
                for rid, t_pick in onboard:
                    rr = self._req_map[rid]
                    rr.picked_time, rr.depot_arrival_time = t_pick, t_back
                served_cnt += len(onboard)
                v.load = 0.0

        # metrics & fitness (weighted)
        T_finish = max((v.ready_time for v in vehs), default=horizon)
        service_ratio = served_cnt / max(1, len(reqs_sorted))
        avg_d = sum(math.hypot(r.xy[0]-depot[0], r.xy[1]-depot[1]) for r in reqs_sorted) / max(1, len(reqs_sorted))
        vfast = max((v.speed for v in vehs), default=1.0)
        T_ref = 1.0 + 2.0*avg_d / max(1e-9, vfast)
        f1 = T_finish / T_ref
        f2 = 1.0 - service_ratio
        f  = lambda_w * f1 + (1.0 - lambda_w) * f2
        return dict(f=f, T_finish=T_finish, service_ratio=service_ratio,
                    segments=self.segments, requests=reqs_sorted, vehicles=vehs)

# ========= 3) Vẽ 1 figure/ case =========
def mini_plot_trace(result: Dict[str,Any], title: str):
    fig = plt.figure()
    ax = fig.gca()
    # depot
    ax.scatter([0.0],[0.0], marker='^', s=100, label="Depot")
    ax.annotate("Depot", (0.0,0.0), xytext=(5,5), textcoords="offset points")
    # requests
    reqs = result["requests"]
    xs, ys = [r.xy[0] for r in reqs], [r.xy[1] for r in reqs]
    ax.scatter(xs, ys, marker='o', s=40, label="Requests")
    for r in reqs:
        ax.annotate(f"{r.rid}", r.xy, xytext=(3,3), textcoords="offset points")
    # segments
    segs = result["segments"]
    for vid, arr in segs.items():
        for (x1,y1,x2,y2,rid,phase,kind) in arr:
            ls = "-" if kind=="truck" else "--"
            ax.plot([x1,x2],[y1,y2], linestyle=ls, label=f"{kind} path")
    # gộp legend
    h, l = ax.get_legend_handles_labels()
    uniq = {}
    for hi, li in zip(h,l):
        if li not in uniq: uniq[li] = hi
    ax.legend(uniq.values(), uniq.keys())
    ax.set_xlabel("X"); ax.set_ylabel("Y"); ax.set_title(title); ax.grid(True)
    if SAVE_FIG:
        fn = f"{title.replace(' ','_').lower()}.png"
        plt.savefig(fn, dpi=150, bbox_inches="tight")
        print("Saved:", fn)
    plt.show()

# ========= 4) Policy: (A) thủ công rất nhanh, hoặc (B) train siêu nhẹ =========
def make_handcrafted_policy():
    """Tạo nhanh 2 cây R,S thiên về:
       - Ưu tiên drone cho điểm gần & còn TW, truck lo điểm xa/nặng.
       - Ưu tiên điểm sắp đóng TW và gần vị trí hiện tại.
       (Dùng toán tử/terminal sẵn có trong Node/GP_FUNS.)"""
    def F(sym, *chs):
        # lấy fn & arity từ GP_FUNS đã có
        fn = next((f for (name, ar, f) in GP_FUNS if name==sym), None)
        ar = next((ar for (name, ar, f) in GP_FUNS if name==sym), None)
        n = Node(sym, ar, fn, None, [])
        n.children = list(chs)
        return n
    def T(key): return Node(key, 0, None, key, [])

    # ROUTING: max( cap_slack_ratio - dist_depot_norm, (1 - is_drone) )
    # (đơn giản: drone ưa gần depot; truck được ưu tiên nếu cần)
    route = F("max",
              F("-", T("cap_slack_ratio"), T("dist_depot_norm")),
              F("-", T("abs"), T("is_drone"))  # trick nhỏ để có "1 - is_drone" ≈ ưu tiên truck
             )
    # SEQUENCING: min( t_until_close_norm + dist_cur_req_norm,
    #                  dist_medQ_req_norm + t_travel_norm )
    seq = F("min",
            F("+", T("t_until_close_norm"), T("dist_cur_req_norm")),
            F("+", T("dist_medQ_req_norm"), T("t_travel_norm"))
          )
    return Individual(route, seq)

def tiny_train_on_cases(simcfg, gpcfg, cases: Dict[str,List[Request]]):
    """Train siêu nhẹ trên đúng các case này (không augmentation)"""
    # helper nội bộ (khỏi lệ thuộc Cell 5)
    def eval_on_sims(sims, pop):
        for ind in pop:
            f_list=[]
            for s in sims:
                res = s.simulate(ind, lambda_w=gpcfg.lambda_w, objective=gpcfg.objective, verbose=False)
                f_list.append(res["f"])
            ind.fitness = sum(f_list)/len(f_list)

    # tạo các simulator từ danh sách Request
    sims=[]
    for name, reqs in cases.items():
        s = Simulator(copy.deepcopy(simcfg))
        s.requests = copy.deepcopy(reqs)
        s.requests.sort(key=lambda r: r.r_time)
        s.total_demand = sum(r.demand for r in s.requests)
        sims.append(s)

    # cấu hình train nhẹ
    tiny = copy.deepcopy(gpcfg)
    tiny.pop_size = 12
    tiny.intermed_size = 12
    tiny.generations = 6
    tiny.max_depth = 4
    tiny.tournament_k = 2
    random.seed(tiny.seed)

    P = [random_individual(tiny.max_depth) for _ in range(tiny.pop_size)]
    eval_on_sims(sims, P)
    best = min(P, key=lambda x: x.fitness).copy()

    for gen in range(1, tiny.generations+1):
        O=[]
        while len(O) < tiny.intermed_size:
            u = random.random()
            if u < 0.6:  # crossover
                pa = tournament_select(P, tiny.tournament_k)
                pb = tournament_select(P, tiny.tournament_k)
                c1r, c2r = subtree_crossover(pa.route_tree, pb.route_tree, tiny.max_depth)
                c1s, c2s = subtree_crossover(pa.seq_tree,   pb.seq_tree,   tiny.max_depth)
                O.append(Individual(c1r, c1s))
                if len(O) < tiny.intermed_size: O.append(Individual(c2r, c2s))
            elif u < 0.9:  # mutation
                p  = tournament_select(P, tiny.tournament_k)
                O.append(Individual(subtree_mutation(p.route_tree, tiny.max_depth),
                                    subtree_mutation(p.seq_tree,   tiny.max_depth)))
            else:          # reproduction
                O.append(tournament_select(P, tiny.tournament_k).copy())
        eval_on_sims(sims, O)
        # elitism
        pool = sorted(P+O, key=lambda x: x.fitness)
        P = [pool[i].copy() for i in range(tiny.pop_size)]
        if P[0].fitness < best.fitness: best = P[0].copy()
        print(f"[Tiny Gen {gen}] best f={best.fitness:.4f}")
    return best

# ========= 5) Chạy & vẽ cho các case đã chọn =========
cases = make_mini_cases(simcfg)
policy = None
if USE_TRAINING:
    print("Tiny training on selected mini-cases ...")
    policy = tiny_train_on_cases(simcfg, gpcfg, {k:cases[k] for k in CASES_TO_RUN})
else:
    policy = make_handcrafted_policy()
    print("Using handcrafted policy (no training).")

for name in CASES_TO_RUN:
    reqs = cases[name]
    tracer = MiniTraceSimulator(simcfg, reqs)
    res = tracer.simulate_trace(policy, lambda_w=gpcfg.lambda_w, objective="weighted")
    print(f"{name}: served={res['service_ratio']*len(reqs):.0f}/{len(reqs)}, "
          f"SR={res['service_ratio']:.3f}, T_finish={res['T_finish']:.1f}, f={res['f']:.3f}")
    mini_plot_trace(res, title=f"{name} — truck(solid) & drone(dashed)")

Using handcrafted policy (no training).


## result

In [8]:
# Cell 11 — Test case builders & Trace simulator (uses GP R/S from previous cells)
# Dán sau Cell 10. Không cần sửa các cell trước.

from typing import List, Tuple, Dict, Any, Optional
import copy, math, random

# ==== 11.1. Test case builders ====

def make_fixed_requests(simcfg: SimConfig) -> List[Request]:
    """Tạo bộ yêu cầu cố định (mixed C1/C2) để có ảnh minh hoạ ổn."""
    depot = (0.0, 0.0)
    def far(p):  # khoảng cách tới depot
        return math.hypot(p[0]-depot[0], p[1]-depot[1])
    reqs = [
        # rid, xy, demand, r_time, e, l, must_truck
        Request(0, (20,  30), 2.0,  0.0, 10.0,  70.0, False),  # C2 → drone ok
        Request(1, (60,  10), 4.5,  5.0, 15.0, 120.0, True ),  # C1 (d>MD) → truck
        Request(2, (-30, 20), 1.0,  0.0,  5.0,  80.0, False),  # C2 → drone ok
        Request(3, (70,  70), 2.0, 20.0, 25.0, 160.0, True ),  # C1 (xa>LD) → truck
        Request(4, (-45,-20), 2.0,  0.0,  0.0,  90.0, False),  # C2 → drone ok
        Request(5, (10, -55), 1.5, 10.0, 15.0, 120.0, True ),  # C1 (xa≈56.8>LD) → truck
    ]
    # Chuẩn hoá must_truck theo simcfg (phòng khi đổi MD/LD):
    out = []
    for r in reqs:
        mt = r.must_truck or (r.demand > simcfg.MD) or (far(r.xy) > simcfg.LD)
        out.append(Request(r.rid, r.xy, r.demand, r.r_time, r.e, min(simcfg.horizon, r.l), mt))
    return out

def make_random_requests(simcfg: SimConfig, seed: int = 2025, N: int = 40) -> List[Request]:
    """Sinh ngẫu nhiên N yêu cầu trên mặt phẳng; C1 nếu demand>MD hoặc xa hơn LD."""
    random.seed(seed)
    reqs: List[Request] = []
    for i in range(N):
        x = random.uniform(-simcfg.plane_size, simcfg.plane_size)
        y = random.uniform(-simcfg.plane_size, simcfg.plane_size)
        r_time = random.uniform(0.0, 0.7*simcfg.horizon)
        width  = random.uniform(simcfg.tw_min, simcfg.tw_max)
        e = r_time
        l = min(simcfg.horizon, e + width)
        demand = random.uniform(simcfg.d_min, simcfg.d_max)
        mt = (demand > simcfg.MD) or (math.hypot(x, y) > simcfg.LD)
        reqs.append(Request(i, (x,y), demand, r_time, e, l, mt))
    return reqs

# ==== 11.2. Trace simulator (subclass) ====

class TraceSimulator(Simulator):
    """
    Simulator chuyên để vẽ:
    - Dùng đúng chính sách R/S & ràng buộc như Simulator.simulate (multi-pick).
    - Ghi lại các đoạn thẳng di chuyển để vẽ: segments[vid] = [(x1,y1,x2,y2,rid,phase,kind), ...]
    - Không ảnh hưởng tới training vì chạy độc lập.
    """
    def __init__(self, cfg: SimConfig, requests: List[Request]):
        super().__init__(cfg)
        # Ghi đè bộ requests theo đầu vào
        self.requests = copy.deepcopy(requests)
        self.requests.sort(key=lambda r: r.r_time)
        self.total_demand = sum(r.demand for r in self.requests)
        # buffer vẽ
        self.segments: Dict[int, List[Tuple[float,float,float,float,int,str,str]]] = {}

    def _seg(self, vid: int, x1: float, y1: float, x2: float, y2: float, rid: int, phase: str, kind: str):
        self.segments.setdefault(vid, []).append((x1,y1,x2,y2,rid,phase,kind))

    def simulate_trace(self, ind: "Individual", lambda_w=0.5, objective="weighted", verbose=False) -> Dict[str, Any]:
        # copy state (giống simulate gốc)
        reqs_sorted: List[Request] = copy.deepcopy(self.requests)
        vehs: List[Vehicle] = copy.deepcopy(self.vehicles)
        self._req_list = reqs_sorted
        self._req_map: Dict[int, Request] = {r.rid: r for r in reqs_sorted}
        self.segments.clear()

        depot = self.depot
        horizon = self.cfg.horizon
        now = 0.0
        appear_idx = 0
        served_cnt = 0

        def euclid(a, b):
            dx, dy = a[0]-b[0], a[1]-b[1]
            return math.hypot(dx, dy)

        while now <= horizon:
            next_req_t = reqs_sorted[appear_idx].r_time if appear_idx < len(reqs_sorted) else float("inf")
            next_veh_t = min(v.ready_time for v in vehs) if vehs else float("inf")
            next_t = min(next_req_t, next_veh_t)
            if next_t == float("inf"):
                break
            now = min(next_t, horizon)

            # (A) arrivals: gán theo argmax R
            while appear_idx < len(reqs_sorted) and reqs_sorted[appear_idx].r_time <= now + 1e-9:
                r = reqs_sorted[appear_idx]
                best_k, best_score = None, -1e18
                for v in vehs:
                    if not self._veh_req_feasible_typecap(v, r):
                        continue
                    sR = self.R(ind, v, r, now)
                    if sR > best_score:
                        best_score, best_k = sR, v.vid
                if best_k is not None:
                    vehs[best_k].queue.append(r.rid)
                appear_idx += 1

            # (B) dispatch: mỗi xe có thể nhặt nhiều điểm rồi về depot
            for v in vehs:
                if v.ready_time > now + 1e-12 or not v.queue:
                    continue

                cur_pos = v.pos
                cur_time = now
                cur_load = 0.0
                onboard: List[Tuple[int,float]] = []

                while True:
                    choose_rid, choose_score = None, -1e18
                    for rid in list(v.queue):
                        r = self._req_map.get(rid)
                        if r is None or r.served:
                            continue
                        if not self._veh_req_feasible_typecap(v, r, add_load=cur_load):
                            continue

                        # thời gian đến từ cur_pos
                        t_to_req = euclid(cur_pos, r.xy)/v.speed
                        t_arr = cur_time + t_to_req
                        if t_arr > r.l + 1e-9:
                            continue
                        t_pick = max(t_arr, r.e)

                        # về depot ngay sau khi pick r (để check Lw dự báo)
                        t_to_depot = euclid(r.xy, depot)/v.speed
                        t_back_proj = t_pick + t_to_depot

                        # kiểm tra Lw cho toàn bộ onboard + r
                        ok = True
                        for rrid, tp in onboard + [(rid, t_pick)]:
                            if (t_back_proj - tp) > self.cfg.Lw + 1e-9:
                                ok = False; break
                        if not ok:
                            continue

                        sS = self.S(ind, v, r, cur_time, cur_pos=cur_pos)
                        if sS > choose_score:
                            choose_score, choose_rid = sS, rid

                    if choose_rid is None:
                        break

                    # Vẽ đoạn: cur_pos -> r.xy
                    r = self._req_map[choose_rid]
                    x1,y1 = cur_pos
                    x2,y2 = r.xy
                    self._seg(v.vid, x1,y1,x2,y2, r.rid, "to_customer", v.kind)

                    # cập nhật trạng thái chuyến
                    t_to_req = euclid(cur_pos, r.xy)/v.speed
                    t_arr = cur_time + t_to_req
                    t_pick = max(t_arr, r.e)

                    cur_pos = r.xy
                    cur_time = t_pick
                    cur_load += r.demand
                    onboard.append((r.rid, t_pick))
                    r.served = True
                    v.queue.remove(r.rid)

                if not onboard:
                    continue

                # Vẽ đoạn: last -> depot
                t_to_depot = euclid(cur_pos, depot)/v.speed
                t_back = cur_time + t_to_depot
                self._seg(v.vid, cur_pos[0],cur_pos[1], depot[0],depot[1], onboard[-1][0], "to_depot", v.kind)

                v.pos = depot
                v.ready_time = t_back
                for rid, t_pick in onboard:
                    rr = self._req_map[rid]
                    rr.picked_time = t_pick
                    rr.depot_arrival_time = t_back
                served_cnt += len(onboard)
                v.load = 0.0

        # Metrics (giống simulate gốc)
        T_finish = max((v.ready_time for v in vehs), default=horizon)
        service_ratio = served_cnt / max(1, len(reqs_sorted))
        avg_d = sum(math.hypot(r.xy[0]-depot[0], r.xy[1]-depot[1]) for r in reqs_sorted) / max(1, len(reqs_sorted))
        vfast = max((v.speed for v in vehs), default=1.0)
        T_ref = 1.0 + 2.0*avg_d / max(1e-9, vfast)
        f1 = T_finish / T_ref
        f2 = 1.0 - service_ratio
        f = (f1, f2) if objective == "lexicographic" else (lambda_w * f1 + (1.0 - lambda_w) * f2)

        if verbose:
            msgf = f if isinstance(f, tuple) else f"{f:.4f}"
            print(f"[TRACE] served={served_cnt}/{len(reqs_sorted)} | SR={service_ratio:.3f} | T={T_finish:.1f} | f={msgf}")

        return dict(
            f=f, T_finish=T_finish, service_ratio=service_ratio,
            segments=self.segments,  # dict[vid] -> list of segments
            requests=reqs_sorted,    # để vẽ nhãn
            vehicles=vehs            # để biết loại (truck/drone)
        )


In [9]:
# Cell 12 — Plotting & quick demos (fixed + random)
# Yêu cầu: matplotlib đã import ở cell trước.

import matplotlib.pyplot as plt

def plot_trace(result: Dict[str,Any], title: str = "Truck & Drone routes"):
    reqs = result["requests"]
    segs = result["segments"]
    # figure đơn (theo yêu cầu)
    fig = plt.figure()
    ax = fig.gca()

    # depot
    ax.scatter([0.0],[0.0], marker='^', s=100, label="Depot")
    ax.annotate("Depot", (0.0,0.0), xytext=(5,5), textcoords="offset points")

    # requests
    xs, ys = zip(*[r.xy for r in reqs]) if reqs else ([],[])
    ax.scatter(xs, ys, marker='o', s=40, label="Requests")
    for r in reqs:
        ax.annotate(f"{r.rid}", r.xy, xytext=(3,3), textcoords="offset points")

    # segments
    for vid, arr in segs.items():
        # tìm loại xe từ bất kỳ segment (phase có 'kind' đã lưu)
        for (x1,y1,x2,y2,rid,phase,kind) in arr:
            ls = "-" if kind=="truck" else "--"
            ax.plot([x1,x2],[y1,y2], linestyle=ls, label=f"{kind} path")

    # gộp legend (tránh trùng)
    handles, labels = ax.get_legend_handles_labels()
    uniq = dict()
    for h, lb in zip(handles, labels):
        if lb not in uniq: uniq[lb] = h
    ax.legend(uniq.values(), uniq.keys())

    ax.set_xlabel("X")
    ax.set_ylabel("Y")
    ax.set_title(title)
    ax.grid(True)
    plt.show()

# ==== Demos ====

try:
    fixed_reqs = make_fixed_requests(simcfg)  
    tracer = TraceSimulator(simcfg, fixed_reqs)
    policy = best if "best" in globals() else random_individual(max_depth=5)
    res_fix = tracer.simulate_trace(policy, lambda_w=gpcfg.lambda_w, objective=gpcfg.objective, verbose=True)
    plot_trace(res_fix, title="Fixed case — routes (truck: solid, drone: dashed)")
except Exception as e:
    print("Fixed demo error:", e)

try:
    rnd_reqs = make_random_requests(simcfg, seed=2026, N=40)
    tracer2 = TraceSimulator(simcfg, rnd_reqs)
    policy2 = best if "best" in globals() else random_individual(max_depth=5)
    res_rnd = tracer2.simulate_trace(policy2, lambda_w=gpcfg.lambda_w, objective=gpcfg.objective, verbose=True)
    plot_trace(res_rnd, title="Random case — routes")
except Exception as e:
    print("Random demo error:", e)


KeyboardInterrupt: 