In [None]:
# python 및 라이브러리 버전

# Python version: 3.12.5 (tags/v3.12.5:ff3bc82, Aug  6 2024, 20:45:27) [MSC v.1940 64 bit (AMD64)]
# pandas version: 2.2.2
# numpy version: 2.0.2
# tqdm version: 4.67.0

# ================================= Import =========================================
from __future__ import annotations
import math, random, os, hashlib
from dataclasses import dataclass
from typing import Dict, List, Tuple, Any
import pandas as pd
import numpy as np
import tqdm

# ================================= Version Check =========================================
import sys

print(f"Python version: {sys.version}\n")

print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")
print(f"tqdm version: {tqdm.__version__}")

print("\n표준 라이브러리 (hashlib, math, random, os, dataclasses, typing)는 Python 표준 라이브러리에 포함되어 있습니다.")

Python version: 3.12.5 (tags/v3.12.5:ff3bc82, Aug  6 2024, 20:45:27) [MSC v.1940 64 bit (AMD64)]

pandas version: 2.2.2
numpy version: 2.0.2
tqdm version: 4.67.0

표준 라이브러리 (hashlib, math, random, os, dataclasses, typing)는 Python 표준 라이브러리에 포함되어 있습니다.


In [6]:
# ================================= Seed 관리 =========================================
RUN_SEED = 311501450  # 기본 시드

AUTO_INCREMENT = False  # 셀/스크립트 재실행 시 시드 자동 +1
try:
    RUN_SEED
except NameError:
    RUN_SEED = np.random.randint(0, 2**31 - 1)
else:
    if AUTO_INCREMENT:
        RUN_SEED += 1

print(f"[SEED 준비] RUN_SEED = {RUN_SEED}")


[SEED 준비] RUN_SEED = 311501450


In [None]:
# ================================= Construction =========================================
"""
초기해 구성 과정
- 작업은 데드라인으로 배치 선정 후 x+y 내림차순 정렬하고 AGV는 속도·용량·최대거리로 정렬한 후, 초기 k1개는 일대일 배정.
- 남은 작업을 k2개씩 뽑아 (속도2·좌하단 → 속도1·우상단 → 전체) 순으로 완료예상시각이 가장 빠른 AGV에, 용량·거리 제약을 만족하면 배정하고 막히면 투어를 DEPOT으로 닫음.
- k1·k2를 1~15 그리드 서치로 최적 조합을 찾아 최종 결과를 산출.
"""

# --------------------------------------------------
# 기본 상수 및 거리/시간 계산
# --------------------------------------------------

# DEPOT 좌표 설정
DEPOT_XY = (0, 0)
DEPOT = "DEPOT"

# 맨해튼 거리 계산
def manhattan(a, b):
    return abs(a[0]-b[0]) + abs(a[1]-b[1])

# 이동 시간 계산
def move_time(u, v, speed):
    return manhattan(u, v) / speed

# --------------------------------------------------
# AGV/Task 데이터 불러오기 및 전처리
# --------------------------------------------------

# AGV/Task 데이터 불러오기
agv_path = "./data/agv.csv"
task_path = "./data/task.csv"
agv_df  = pd.read_csv(agv_path)
task_df = pd.read_csv(task_path)

agv_df.columns  = [c.strip().lower() for c in agv_df.columns]
task_df.columns = [c.strip().lower() for c in task_df.columns]

print("AGV 데이터:", agv_df.shape)
print("Task 데이터:", task_df.shape)
display(agv_df.head())
display(task_df.head())

# AGV/Task 딕셔너리 구성
agvs = {
    r["agv_id"]: dict(
        speed=float(r["speed_cells_per_sec"]),  # 초당 이동 칸 수
        capacity=float(r["capacity"]),          # 적재 용량
        max_distance=float(r["max_distance"])   # 최대 주행 가능 거리
    )
    for _, r in agv_df.iterrows()
}

tasks = {
    r["task_id"]: dict(
        pos=(int(r["x"]), int(r["y"])),         # 작업 위치
        service_time=float(r["service_time"]),  # 작업 서비스 시간
        deadline=float(r["deadline"]),          # 작업 데드라인
        demand=float(r["demand"])               # 작업 수요량
    )
    for _, r in task_df.iterrows()
}

# --------------------------------------------------
# 초기해 구성
# --------------------------------------------------

def run_solver(k1: int, k2: int):
    remaining = set(tasks.keys())  # 아직 배정되지 않은 작업 집합

    # 시드 기반 랭크 함수(재현성 확보)
    def _salt_rank(tid, seed):
        h = hashlib.sha1(f"{tid}:{seed}".encode()).hexdigest() # 동률인 작업들 간 순서 결정
        return int(h[:16], 16)
 
    # AGV 상태 클래스 
    class AGVState:

        # 상태 초기화 
        def __init__(self, agv_id, speed, capacity, max_distance):
            self.agv_id = agv_id
            self.speed = float(speed)
            self.capacity = float(capacity)
            self.max_distance = float(max_distance)

            self.route = [DEPOT]        # 현재 토큰 리스트
            self.last_xy = DEPOT_XY     # 현재 위치
            self.tour_dist = 0.0        # 현재 투어 주행 거리
            self.tour_load = 0.0        # 현재 투어 적재량
            self.tour_has_task = False  # 현재 투어의 작업 배정 여부
            self.clock = 0.0            # 현재까지 경과 시간
            self.log = []               # 현재까지 작업 수행 로그

        # 작업 추가 가능 여부(용량, 거리)
        def can_add_now(self, task):
            (tx, ty), dem = task["pos"], task["demand"]  # 작업 위치, 수요량
            d_to   = manhattan(self.last_xy, (tx, ty))   # 현재 위치 -> 작업 위치 거리
            d_back = manhattan((tx, ty), DEPOT_XY)       # 작업 위치 -> DEPOT 거리
            if self.tour_dist + d_to + d_back > self.max_distance + 1e-9:  # 최대 주행 가능 거리 초과
                return False
            if self.tour_load + dem > self.capacity + 1e-9:  # 적재 용량 초과
                return False
            return True

        # 작업 완료 예상 시각 계산
        def predict_finish_time(self, task):
            d = manhattan(self.last_xy, task["pos"])  # 현재 위치 -> 작업 위치 거리
            arrive = self.clock + d / self.speed      # 도착 시각
            finish = arrive + task["service_time"]    # 서비스 완료 시각
            return finish

        # 작업 실제로 추가
        def add_task(self, tid, task):
            (tx, ty) = task["pos"]                 # 작업 위치
            start_time = self.clock                # 작업 시작 시각
            d = manhattan(self.last_xy, (tx, ty))  # 현재 위치 -> 작업 위치 거리
            travel_t = d / self.speed              # 이동 시간
            arrive_time = start_time + travel_t    # 도착 시각
            service_t = task["service_time"]       # 서비스 시간
            finish_time = arrive_time + service_t  # 서비스 완료 시각

            # 상태 업데이트
            self.clock = finish_time          # 경과 시간 갱신
            self.last_xy = (tx, ty)           # 위치 갱신
            self.tour_dist += d               # 투어 주행 거리 갱신
            self.tour_load += task["demand"]  # 투어 적재량 갱신
            self.route.append(tid)            # 토큰 리스트 갱신
            self.tour_has_task = True         # 투어의 작업 배정 여부 갱신
            self.log.append({                 # 작업 수행 로그 갱신
                "task_id": tid,               
                "start_time": start_time,      
                "travel_time": travel_t,      
                "service_time": service_t,     
                "arrive_time": arrive_time,   
                "finish_time": finish_time,    
                "deadline": task["deadline"],  
                "pos": (tx, ty),               
            })

        # 투어 종료 처리 (DEPOT 복귀)
        def close_tour(self):
            if self.route and self.route[-1] != DEPOT:
                d_back = manhattan(self.last_xy, DEPOT_XY)  # 현재 위치 -> DEPOT 거리
                self.clock += d_back / self.speed           # 경과 시간 갱신
                self.last_xy = DEPOT_XY                     # 위치 갱신
                self.route.append(DEPOT)                    # 토큰 리스트 갱신
            self.tour_dist = 0.0       # 주행 거리 초기화
            self.tour_load = 0.0       # 적재량 초기화
            self.tour_has_task = False # 투어의 작업 배정 여부 초기화

    # 초기 상태 구성(매 실행마다 새로)
    agv_states = [AGVState(aid, spec["speed"], spec["capacity"], spec["max_distance"]) for aid, spec in agvs.items()]  

    # AGV 정렬: 속도 내림차순, 용량 내림차순, 최대 주행 가능 거리 내림차순, AGV ID 오름차순
    agv_states.sort(key=lambda a: (-a.speed, -a.capacity, -a.max_distance, str(a.agv_id)))     

    # 속도별 AGV 인덱스
    idx_speed2 = [i for i, a in enumerate(agv_states) if abs(a.speed - 2.0) < 1e-9]
    idx_speed1 = [i for i, a in enumerate(agv_states) if abs(a.speed - 1.0) < 1e-9]

    remaining = set(tasks.keys())  # 아직 배정되지 않은 작업 집합

    # ---- 유틸 ----
    # 작업 배치 구성 및 정렬
    def pick_batch_by_deadline_then_xy(remaining_ids, k):
        
        # 배치 구성: 데드라인 오름차순 + 동률 시 시드로 깨기로 k개 선택
        arr = sorted(list(remaining_ids), key=lambda tid: (tasks[tid]["deadline"], _salt_rank(tid, RUN_SEED)))
        batch = arr[:k]     
           
        # 배치 내부 정렬: x+y값 내림차순 + 동률 시 시드로 깨기                                                                                       
        batch.sort(key=lambda tid: (tasks[tid]["pos"][0] + tasks[tid]["pos"][1], -_salt_rank(tid, RUN_SEED)), reverse=True)          
        return batch

    # 작업 완료 예상 시각이 가장 빠른 AGV 선택
    def choose_agv_by_earliest_finish(task, agv_indices):
        best = None  
        for i in agv_indices:
            agv = agv_states[i]
            if not agv.can_add_now(task):         # 용량/거리 제약을 만족하지 않으면 패스
                continue
            ft = agv.predict_finish_time(task)    # 작업 완료 예상 시각
            cand = (ft, i)                        # AGV 후보 (작업 완료 예상 시각, AGV 인덱스)
            if best is None or cand < best:       # 빠른 AGV 갱신
                best = cand
        return None if best is None else best[1]  # 가장 빠른 AGV 인덱스 반환

    # ---- 1단계: 초기 일대일 배정(k1개 작업 배치 단위)----
    def initial_one_to_one_assignment():
        progress = 0 # 배정된 작업 수 초기화
        batch = pick_batch_by_deadline_then_xy(remaining, k1)  # 작업이 k1개인 정렬된 배치 구성
        
        # 정렬된 AGV들에 정렬된 배치 내 작업들을 일대일로 배정
        for i, agv in enumerate(agv_states):  
            if i >= len(batch):        # 배치 내 작업이 모두 배정되었으면 종료
                break
            tid = batch[i]
            if tid not in remaining:   # 이미 배정된 작업이면 패스
                continue
            t = tasks[tid]
            if agv.can_add_now(t):     # 용량/거리 제약을 만족하면
                agv.add_task(tid, t)   # 작업 추가
                remaining.remove(tid)  # 남은 작업 집합에서 제거
                progress += 1          # 배정된 작업 수 갱신
        return progress  # 배정된 작업 수 반환

    # ---- 2단계: 규칙 기반 배정 (k2개 작업 배치 단위) ----
    def assign_by_rules_batch():
        progress = 0  # 배정된 작업 수 초기화
        batch = pick_batch_by_deadline_then_xy(remaining, k2)  # 작업이 k2개인 정렬된 배치 구성
        
        # 정렬된 배치 내 작업들을 규칙에 따라 배정
        for tid in batch:
            if tid not in remaining:  # 이미 배정된 작업이면 패스
                continue
            t = tasks[tid]
            tx, ty = t["pos"]

            # 2-1: 속도가 2이고 현재 위치가 작업 위치의 좌하단에 있는 AGV들 중, 작업 완료 예상 시각이 가장 빠른 AGV 선택
            cand_idx = [i for i in idx_speed2
                        if tx >= agv_states[i].last_xy[0] and ty >= agv_states[i].last_xy[1]]  
            agv_chosen = choose_agv_by_earliest_finish(t, cand_idx)

            # 2-2: 속도가 1이고, 현재 위치가 작업 위치의 우상단에 있는 AGV들 중, 작업 완료 예상 시각이 가장 빠른 AGV 선택
            if agv_chosen is None:  # 2-1에서 선택된 AGV가 없으면 2-2 수행
                cand_idx = [i for i in idx_speed1
                            if tx <= agv_states[i].last_xy[0] and ty <= agv_states[i].last_xy[1]] 
                agv_chosen = choose_agv_by_earliest_finish(t, cand_idx)

            # 2-3: 모든 AGV들 중, 작업 완료 예상 시각이 가장 빠른 AGV 선택
            if agv_chosen is None:  # 2-2에서 선택된 AGV가 없으면 2-3 수행
                cand_idx = list(range(len(agv_states)))
                agv_chosen = choose_agv_by_earliest_finish(t, cand_idx)

            if agv_chosen is not None: # AGV가 선택되었으면
                agv_states[agv_chosen].add_task(tid, t)  # 작업 추가
                remaining.remove(tid)                    # 남은 작업 집합에서 제거
                progress += 1                            # 배정된 작업 수 갱신
        return progress > 0 # 배정된 작업이 있으면 True 반환

    # ---- 메인 루프: 1단계 후, 2단계 반복 ----
    def main_loop():
        made_progress = True         # 배정 진전 여부
        stalled_after_depot = False  # DEPOT 복귀 후 정체 여부

        # 1단계
        p1 = initial_one_to_one_assignment()  # 1단계 수행
        if p1 > 0:  # 배정된 작업이 있으면
            made_progress = True  # 배정 진전 여부 갱신

        # 2단계 반복
        while remaining:  # 남은 작업들에 대해
            ok = assign_by_rules_batch()  # 2단계 수행
            if ok:  # 배정된 작업이 있으면
                made_progress = True         # 배정 진전 여부 갱신
                stalled_after_depot = False  # DEPOT 복귀 후 정체 여부 초기화
            else:  # 배정된 작업이 없으면
                for agv in agv_states:       # 모든 AGV에 대해
                    if agv.tour_has_task:    # 현재 투어에 작업이 배정된 AGV가 있으면
                        agv.close_tour()     # 투어 종료 처리 (DEPOT 복귀)
                if stalled_after_depot:      # 이전에도 정체 상태였으면 루프 종료
                    break 
                stalled_after_depot = True  # DEPOT 복귀 후 정체 상태로 갱신

    # 메인 루프 실행
    main_loop()

    # ---- 결과 집계 ----
    rows = []  # 각 AGV별 토큰 리스트
    for agv in agv_states:  # 모든 AGV에 대해
        cleaned = [] 
        for tok in agv.route:
            if cleaned and cleaned[-1] == DEPOT and tok == DEPOT:  # 연속된 DEPOT는 하나만 남기기
                continue
            cleaned.append(tok) 
        if cleaned == [DEPOT]:  # DEPOT 하나만 있는 토큰은 빈 경로로
            continue
        if cleaned[-1] != DEPOT:  # 토큰의 마지막이 DEPOT가 아니면 DEPOT 추가
            cleaned.append(DEPOT)
        rows.append({"agv_id": agv.agv_id, "route": ",".join(cleaned)})  # 결과 행 추가

    out_df = pd.DataFrame(rows, columns=["agv_id","route"]).sort_values("agv_id")  # AGV ID로 정렬된 결과 데이터프레임

    # 미배정 작업 데이터프레임
    left_df = None
    if remaining:
        left_df = pd.DataFrame(
            [{"task_id": tid,                      # 작업 ID
              "deadline": tasks[tid]["deadline"],  # 데드라인
              "pos_x": tasks[tid]["pos"][0],       # 작업 X 좌표
              "pos_y": tasks[tid]["pos"][1],       # 작업 Y 좌표
              "demand": tasks[tid]["demand"]}      # 작업 수요량
             for tid in sorted(list(remaining), key=lambda x: tasks[x]["deadline"])] # 미배정 작업 데드라인 오름차순 정렬
        )

    # 지각 작업 데이터프레임
    tardy_records = []
    for agv in agv_states:
        for rec in agv.log: 
            if rec["finish_time"] > rec["deadline"]: 
                tardy_records.append({ 
                    "task_id": rec["task_id"],                         # 작업 ID
                    "agv_id": agv.agv_id,                              # 작업을 수행한 AGV ID
                    "deadline": rec["deadline"],                       # 데드라인
                    "finish_time": rec["finish_time"],                 # 실제 완료 시각
                    "lateness": rec["finish_time"] - rec["deadline"],  # 지각 시간
                    "pos_x": rec["pos"][0],                            # 작업 X 좌표
                    "pos_y": rec["pos"][1],                            # 작업 Y 좌표
                })
    tardy_df = None
    total_lateness = 0.0
    if tardy_records:
        tardy_df = pd.DataFrame(tardy_records).sort_values(["lateness","finish_time"], ascending=[False, True])  # 지각 작업 지각 시간 내림차순, 완료 시각 오름차순 정렬
        total_lateness = float(tardy_df["lateness"].sum())  # 총 지각 시간 합

    # AGV 완료시각 합(마지막 DEPOT 복귀 포함)
    sum_finish_with_depot = 0.0
    for agv in agv_states: 
        ft = agv.clock  # 마지막 작업 완료 시각
        if agv.route and agv.route[-1] != DEPOT:  # 마지막 토큰이 DEPOT가 아니면 DEPOT 복귀 시간 추가
            d_back = manhattan(agv.last_xy, DEPOT_XY)  # 마지막 위치 -> DEPOT 거리
            ft += d_back / agv.speed                   # 마지막 작업 완료 시각 + DEPOT 복귀 시간
        sum_finish_with_depot += ft  # 합산

    objective_value = total_lateness * 2 + sum_finish_with_depot  # 목적함수 값 계산

    # 결과 반환
    return {
        "k1": k1, "k2": k2,
        "objective": objective_value,
        "total_lateness": total_lateness,
        "sum_finish": sum_finish_with_depot,
        "out_df": out_df,
        "left_df": left_df,
        "tardy_df": tardy_df,
        "agv_states": agv_states,
    }

# --------------------------------------------------
# 파라미터 튜닝 (k1, k2)
# --------------------------------------------------

# 그리드 서치
best = None
for k1 in range(1, 16):  #k1의 범위: 1-15
    for k2 in range(1, 16):  #k2의 범위: 1-15
        res = run_solver(k1, k2)
        key = (res["objective"], res["total_lateness"], res["sum_finish"], k1 + k2) 
        if best is None or key < best["key"]:
            best = {"k1": k1, "k2": k2, "key": key, "res": res}  # 최적 결과 갱신

# ---------------------------------------
# 초기해 구성 실행
# ---------------------------------------

# 최적 파라미터
best_k1, best_k2 = best["k1"], best["k2"]

# 최적 결과
final = run_solver(best_k1, best_k2)

# --------------------------------------------------
# 초기해 구성 결과 출력
# --------------------------------------------------

display(final["out_df"].head())

if final["left_df"] is not None:
    print(f"[미배정] 남은 작업 수: {len(final['left_df'])}")
    display(final["left_df"])

if final["tardy_df"] is not None and len(final["tardy_df"]) > 0:
    print(f"[지각] 데드라인 내 완료 실패 작업 수: {len(final['tardy_df'])}")
    display(final["tardy_df"])
else:
    print("데드라인 내 완료 실패 작업 없음")

print(f"[SEED] {RUN_SEED}")  # <-- 추가
print(f"[AGV 완료시각 합] {final['sum_finish']:.3f}")
print(f"[지각 합] 총 lateness: {final['total_lateness']:.3f}")
print(f"[목적함수 값] lateness 합 * 2 + AGV 완료시각 합: {final['objective']:.3f}")


# 초기해 18241

AGV 데이터: (15, 4)
Task 데이터: (100, 6)


Unnamed: 0,agv_id,speed_cells_per_sec,max_distance,capacity
0,A001,2,878,7
1,A002,1,875,3
2,A003,2,864,7
3,A004,2,787,5
4,A005,2,867,5


Unnamed: 0,task_id,x,y,service_time,demand,deadline
0,T0001,45,10,31,2,436
1,T0002,27,13,83,1,356
2,T0003,3,13,120,1,395
3,T0004,37,24,70,3,573
4,T0005,30,18,55,1,404


Unnamed: 0,agv_id,route
0,A001,"DEPOT,T0098,T0092,T0047,T0057,DEPOT,T0026,T005..."
12,A002,"DEPOT,T0084,DEPOT,T0079,T0037,DEPOT,T0082,DEPOT"
1,A003,"DEPOT,T0060,T0090,T0038,DEPOT,T0015,T0035,T008..."
5,A004,"DEPOT,T0020,T0013,T0016,DEPOT,T0075,T0054,T007..."
4,A005,"DEPOT,T0033,T0008,T0022,DEPOT,T0097,T0094,DEPO..."


[지각] 데드라인 내 완료 실패 작업 수: 37


Unnamed: 0,task_id,agv_id,deadline,finish_time,lateness,pos_x,pos_y
3,T0093,A001,582.0,829.5,247.5,31,52
2,T0012,A001,532.0,769.5,237.5,16,49
20,T0029,A012,509.0,733.0,224.0,39,21
11,T0004,A014,573.0,748.5,175.5,37,24
12,T0069,A014,662.0,833.5,171.5,43,38
25,T0088,A010,466.0,628.0,162.0,8,51
8,T0076,A003,616.0,771.5,155.5,54,59
36,T0025,A008,565.0,715.0,150.0,52,28
7,T0031,A003,557.0,701.5,144.5,51,22
16,T0064,A004,559.0,699.0,140.0,42,28


[SEED] 311501450
[AGV 완료시각 합] 11353.000
[지각 합] 총 lateness: 3444.000
[목적함수 값] lateness 합 * 2 + AGV 완료시각 합: 18241.000


In [8]:
# ================================= Improvement =========================================
"""
해 개선 과정 (VNS-SA)
- Shaking은 k강도로 AGV 간/내 작업을 무작위 재배치해 국소 최적을 벗어나도록 함.
- Local Search는 투어 내/간 연산자(Or-Opt(1~3), 2-opt, 교차교환, 부분 3-opt)로 제약을 지키며 목적함수를 즉시 개선하는 해만 채택.
- 각 반복에서 k강도로 흔들고 Local Search로 개선을 한 뒤 SA 기준(온도 냉각)으로 수용·거부하며, 더 좋은 해를 최적해로 갱신.
"""

from tqdm import tqdm

try:
    from IPython.display import display as _nb_display
    _IN_NOTEBOOK = True
except Exception:
    _IN_NOTEBOOK = False

# NOTE: Assumes these globals exist in your environment:
#   DEPOT (str), DEPOT_XY (tuple[int,int]), manhattan(a,b) -> float
#   agvs (dict), tasks (dict)

# --------------------------------------------------
# DataFrame <-> routes 변환
# --------------------------------------------------
 
# out_df -> routes: AGV별 토큰 리스트 딕셔너리 반환
def out_df_to_routes(out_df: pd.DataFrame) -> Dict[Any, List[str]]:
    routes: Dict[Any, List[str]] = {}
    for _, row in out_df.iterrows():
        agv_id = row["agv_id"]
        tokens = [t.strip() for t in str(row["route"]).split(",") if t.strip()] 
        routes[agv_id] = tokens 
    for aid in agvs.keys():
        routes.setdefault(aid, [DEPOT, DEPOT])  # 토큰이 없으면 [DEPOT, DEPOT]으로 초기화
    return routes

# routes -> out_df: AGV ID로 정렬된 데이터프레임 반환
def routes_to_out_df(routes: Dict[Any, List[str]]) -> pd.DataFrame:
    rows = []
    for agv_id, toks in sorted(routes.items(), key=lambda x: x[0]): 
        cleaned = [] 
        for t in toks:
            if cleaned and cleaned[-1] == DEPOT and t == DEPOT:  # 연속된 DEPOT는 하나만 남기기
                continue
            cleaned.append(t)
        if cleaned and cleaned[-1] != DEPOT:  # 토큰의 마지막이 DEPOT가 아니면 DEPOT 추가
            cleaned.append(DEPOT) 
        if cleaned and cleaned != [DEPOT] and cleaned != [DEPOT, DEPOT]:  # DEPOT 하나 또는 두개만 있는 토큰은 무시
            rows.append({"agv_id": agv_id, "route": ",".join(cleaned)})
    return pd.DataFrame(rows, columns=["agv_id","route"]).sort_values("agv_id") 

# --------------------------------------------------
# 투어 반환 및 평가
# --------------------------------------------------

# AGV의 토큰 리스트에서 모든 투어 구간 (start_idx, end_idx) 리스트 반환
def _all_tours_of_agv(tokens: List[str]) -> List[Tuple[int, int]]:
    tours, start = [], None
    for i, tok in enumerate(tokens):
        if tok == DEPOT:
            if start is None:
                start = i
            else:
                if i - start > 1:
                    tours.append((start, i))
                start = i
    return tours

# 투어 내 작업 리스트로부터 (주행 거리, 수요량) 계산
def tour_distance_and_demand(tour: List[str]) -> Tuple[float, float]:
    xy = [DEPOT_XY] + [tasks[t]["pos"] for t in tour] + [DEPOT_XY]
    dist = sum(manhattan(xy[i], xy[i+1]) for i in range(len(xy)-1))
    demand_sum = sum(tasks[t]["demand"] for t in tour)
    return dist, demand_sum

# --------------------------------------------------
# 캐시 관리
# --------------------------------------------------

_CACHE_EPOCH = 0

# 캐시 무효화
def cache_invalidate():
    global _CE, _FZ, _CACHE_EPOCH
    _CACHE_EPOCH += 1
    _CE = _DeltaEvaluatorCache(epoch=_CACHE_EPOCH)
    _FZ = _FeasibleCache(epoch=_CACHE_EPOCH)

# 지각 및 완료시각 평가 캐시 클래스
class _DeltaEvaluatorCache:

    # 캐시 초기화
    def __init__(self, max_entries_factor=300, epoch=0):
        self.cache, self.touch, self.t = {}, {}, 0
        self.max_entries, self.epoch = max_entries_factor * max(1, len(agvs)), epoch
    
    # 캐시 크기 조절 (LRU 기반)
    def _trim(self):
        if len(self.cache) <= self.max_entries: return
        keep_keys = {k for k, _ in sorted(self.touch.items(), key=lambda x: x[1], reverse=True)[:int(self.max_entries * 0.6)]}
        self.cache = {k:v for k,v in self.cache.items() if k in keep_keys}
        self.touch = {k:v for k,v in self.touch.items() if k in keep_keys}
    
    # AGV별 토큰 리스트에 대한 (lateness, finish_time) 계산
    def eval_agv(self, aid, toks):
        self.t += 1 
        key = (self.epoch, aid, tuple(toks))
        if key in self.cache:
            self.touch[key] = self.t
            return self.cache[key]

        speed = agvs[aid]["speed"]
        clock, lateness, last_xy = 0.0, 0.0, DEPOT_XY
        for tok in toks:
            if tok == DEPOT:
                if last_xy != DEPOT_XY:
                    clock += manhattan(last_xy, DEPOT_XY) / speed
                    last_xy = DEPOT_XY
            else:
                task = tasks[tok]
                xy, deadline, service = task["pos"], task["deadline"], task["service_time"]
                travel = manhattan(last_xy, xy) / speed
                finish = clock + travel + service
                lateness += max(0, finish - deadline)
                clock, last_xy = finish, xy

        val = (lateness, clock)
        self.cache[key], self.touch[key] = val, self.t
        self._trim()
        return val

# 제약 만족 여부 캐시 클래스
class _FeasibleCache:

    # 캐시 초기화
    def __init__(self, max_entries_factor=300, epoch=0):
        self.cache, self.touch, self.t = {}, {}, 0
        self.max_entries, self.epoch = max_entries_factor * max(1, len(agvs)), epoch

    # 캐시 크기 조절 (LRU 기반)
    def _trim(self):
        if len(self.cache) <= self.max_entries: return
        keep_keys = {k for k, _ in sorted(self.touch.items(), key=lambda x: x[1], reverse=True)[:int(self.max_entries * 0.6)]}
        self.cache = {k:v for k,v in self.cache.items() if k in keep_keys}
        self.touch = {k:v for k,v in self.touch.items() if k in keep_keys}

    # AGV별 토큰 리스트 내 모든 투어가 용량/거리 제약을 만족하는지 반환
    def check_tokens(self, aid, toks):
        self.t += 1
        key = (self.epoch, aid, tuple(toks))
        if key in self.cache:
            self.touch[key] = self.t
            return self.cache[key]

        cap, dmax = agvs[aid]["capacity"], agvs[aid]["max_distance"]
        ok = True
        for s, e in _all_tours_of_agv(toks):
            dist, dem = tour_distance_and_demand(toks[s+1:e])
            if dist > dmax + 1e-9 or dem > cap + 1e-9:
                ok = False; break

        self.cache[key], self.touch[key] = ok, self.t
        self._trim()
        return ok

# 캐시 인스턴스
_CE = _DeltaEvaluatorCache()
_FZ = _FeasibleCache()

# --------------------------------------------------
# 토큰 리스트 제약 만족 여부 및 목적함수 값 평가
# --------------------------------------------------

# 평가 결과 데이터 클래스
@dataclass
class EvalResult:
    feasible: bool
    objective: float
    total_lateness: float
    sum_finish: float

# AGV별 토큰 리스트의 제약 만족 여부 및 목적함수 값 평가
def check_feasibility_and_eval(routes: Dict[Any, List[str]]) -> EvalResult:
    for aid, toks in routes.items():
        if not _FZ.check_tokens(aid, toks):
            return EvalResult(False, math.inf, math.inf, math.inf)
        
    # 제약 만족 시 지각 및 완료시각 평가
    total_lateness, sum_finish = 0.0, 0.0
    for aid, toks in routes.items():
        clean_toks = []
        if not toks or toks[0] != DEPOT: clean_toks.append(DEPOT)
        for t in toks:
            if clean_toks and clean_toks[-1] == DEPOT and t == DEPOT: continue
            clean_toks.append(t)
        if not clean_toks or clean_toks[-1] != DEPOT: clean_toks.append(DEPOT)
        if len(clean_toks) > 1:
            lat, fin = _CE.eval_agv(aid, clean_toks)
            total_lateness += lat
            sum_finish += fin

    # 목적함수 값 계산
    objective = total_lateness * 2.0 + sum_finish
    return EvalResult(True, objective, total_lateness, sum_finish)

# AGV 하나의 토큰 리스트 제약 만족 여부
def _is_feasible_one(aid, toks): return _FZ.check_tokens(aid, toks) 

# AGV 하나의 토큰 리스트 교체 후 목적함수 값 계산
def _eval_one(routes, aid, new_toks):
    old_toks = routes[aid]
    routes[aid] = new_toks
    res = check_feasibility_and_eval(routes)
    routes[aid] = old_toks
    return res.objective

# --------------------------------------------------
# Local Search 연산자
# --------------------------------------------------

# 투어 내 연속된 작업 블록을 다른 위치로 이동
def ls_intra_oropt(routes, chain_len_max=3):  # chain_len_max: 이동 작업 블록의 최대 길이
    base_obj = check_feasibility_and_eval(routes).objective  # 현재 목적함수 값 (기준값)
    # 모든 AGV에 대해
    for aid, toks in routes.items():  
        # 모든 투어에 대해
        for s, e in _all_tours_of_agv(toks):  
            tour = toks[s+1:e] 
            n = len(tour) 
            # 모든 가능한 이동 작업 블록의 길이에 대해
            for L in range(1, min(chain_len_max, n) + 1):  
                # 모든 가능한 이동 작업 블록의 시작 위치에 대해
                for i in range(n - L + 1):  
                    chain = tour[i:i+L]
                    rest = tour[:i] + tour[i+L:] 
                    for pos in range(len(rest) + 1):
                        if pos == i: continue  # 원래 시작 위치는 패스
                        new_tour = rest[:pos] + chain + rest[pos:]   # 이동 작업 블록 삽입
                        new_toks = toks[:s+1] + new_tour + toks[e:]  # 새로운 토큰 리스트 구성
                        if _is_feasible_one(aid, new_toks):  # 제약 만족 시
                            obj = _eval_one(routes, aid, new_toks)  # 새로운 목적함수 값 계산
                            if obj < base_obj - 1e-9:  # 개선되면
                                new_routes = {k: v[:] for k, v in routes.items()}  # 경로 복사
                                new_routes[aid] = new_toks                         # 경로에 변경된 토큰 리스트 반영
                                return new_routes  # 개선된 경로 반환
    return routes  # 개선된 경로 없으면 원래 경로 반환

# 투어 내 연속된 작업 블록을 역순으로 뒤집기
def ls_intra_2opt(routes):
    base_obj = check_feasibility_and_eval(routes).objective  # 현재 목적함수 값 (기준값)
    # 모든 AGV에 대해
    for aid, toks in routes.items():  
        # 모든 투어에 대해
        for s, e in _all_tours_of_agv(toks):  
            tour = toks[s+1:e]
            n = len(tour)
            if n < 2: continue  # 투어 내 작업이 2개 미만이면 패스
            # 모든 가능한 뒤집기 구간의 시작 위치에 대해
            for i in range(n - 1): 
                # 모든 가능한 뒤집기 구간의 끝 위치에 대해
                for j in range(i + 1, n): 
                    new_tour = tour[:i] + tour[i:j+1][::-1] + tour[j+1:]  # 뒤집기 적용
                    new_toks = toks[:s+1] + new_tour + toks[e:]           # 새로운 토큰 리스트 구성
                    if _is_feasible_one(aid, new_toks):  # 제약 만족 시
                        obj = _eval_one(routes, aid, new_toks)  # 새로운 목적함수 값 계산
                        if obj < base_obj - 1e-9:  # 개선되면
                            new_routes = {k: v[:] for k, v in routes.items()}  # 경로 복사
                            new_routes[aid] = new_toks                         # 경로에 변경된 토큰 리스트 반영
                            return new_routes  # 개선된 경로 반환
    return routes  # 개선된 경로 없으면 원래 경로 반환

# 두 AGV 간 연속된 작업 블록 교환
def ls_inter_cross_exchange(routes, max_len=2):
    base_obj = check_feasibility_and_eval(routes).objective # 현재 목적함수 값 (기준값)
    aids = list(routes.keys())
    # 모든 AGV 조합에 대해
    for i in range(len(aids)): 
        for j in range(i + 1, len(aids)): 
            a1, a2 = aids[i], aids[j] 
            toks1, toks2 = routes[a1], routes[a2] 
            # 모든 투어 조합에 대해
            for s1, e1 in _all_tours_of_agv(toks1):
                for s2, e2 in _all_tours_of_agv(toks2): 
                    tour1, tour2 = toks1[s1+1:e1], toks2[s2+1:e2]
                    n1, n2 = len(tour1), len(tour2) 
                    # 모든 가능한 작업 블록 길이 조합에 대해
                    for L1 in range(1, min(max_len, n1) + 1):
                        for L2 in range(1, min(max_len, n2) + 1): 
                            # 모든 가능한 시작 위치 조합에 대해
                            for i1 in range(n1 - L1 + 1): 
                                for i2 in range(n2 - L2 + 1): 
                                    b1, b2 = tour1[i1:i1+L1], tour2[i2:i2+L2]                                          # 교환할 작업 블록
                                    nt1, nt2 = tour1[:i1] + b2 + tour1[i1+L1:], tour2[:i2] + b1 + tour2[i2+L2:]        # 교환 후 작업 리스트
                                    ntoks1, ntoks2 = toks1[:s1+1] + nt1 + toks1[e1:], toks2[:s2+1] + nt2 + toks2[e2:]  # 새로운 토큰 리스트 구성
                                    if _is_feasible_one(a1, ntoks1) and _is_feasible_one(a2, ntoks2):  # 제약 만족 시
                                        r_cand = {k: v[:] for k, v in routes.items()}  # 경로 복사
                                        r_cand[a1], r_cand[a2] = ntoks1, ntoks2        # 경로에 변경된 토큰 리스트 반영
                                        obj = check_feasibility_and_eval(r_cand).objective  # 새로운 목적함수 값 계산
                                        if obj < base_obj - 1e-9:  # 개선되면
                                            return r_cand  # 개선된 경로 반환
    return routes  # 개선된 경로 없으면 원래 경로 반환

# 투어 내 네 구간 재배열
def ls_intra_3opt_partial(routes, max_span=40):
    base_obj = check_feasibility_and_eval(routes).objective  # 현재 목적함수 값 (기준값)
    # 모든 AGV에 대해
    for aid, toks in routes.items():  
        # 모든 투어에 대해
        for s, e in _all_tours_of_agv(toks):  
            tour = toks[s+1:e]  
            n = len(tour)  
            if n < 5: continue  # 투어 내 작업이 5개 미만이면 패스
            # 모든 가능한 세 분할 위치에 대해
            for a in range(n - 4):  
                for b in range(a + 1, min(n - 3, a + max_span)):
                    for c in range(b + 1, min(n - 2, b + max_span)):
                        sA, sB, sC, sD = tour[:a+1], tour[a+1:b+1], tour[b+1:c+1], tour[c+1:]                                              # 네 구간으로 분할
                        candidates = [sA + sB[::-1] + sC + sD, sA + sC + sB + sD, sA + sC[::-1] + sB + sD, sA + sB[::-1] + sC[::-1] + sD]  # 가능한 재배열 후보들
                        for cand_tour in candidates: 
                            new_toks = toks[:s+1] + cand_tour + toks[e:]  # 새로운 토큰 리스트 구성
                            if _is_feasible_one(aid, new_toks):  # 제약 만족 시
                                obj = _eval_one(routes, aid, new_toks)  # 새로운 목적함수 값 계산
                                if obj < base_obj - 1e-9:  # 개선되면
                                    new_routes = {k: v[:] for k, v in routes.items()}  # 경로 복사
                                    new_routes[aid] = new_toks                         # 경로에 변경된 토큰 리스트 반영
                                    return new_routes  # 개선된 경로 반환
    return routes  # 개선된 경로 없으면 원래 경로 반환

# --------------------------------------------------
# Shaking 연산자
# --------------------------------------------------

# 토큰 리스트에서 모든 투어 리스트 추출
def _extract_tours_for_shake(tokens: List[str]) -> List[List[str]]:
    tours, cur = [], []
    toks = tokens[:]
    if not toks or toks[0] != DEPOT: toks = [DEPOT] + toks  # 시작이 DEPOT가 아니면 DEPOT 추가
    if toks[-1] != DEPOT: toks = toks + [DEPOT]  # 끝이 DEPOT가 아니면 DEPOT 추가
    for tok in toks[1:]:  
        if tok == DEPOT: 
            if cur: tours.append(cur); cur = []  
        else: cur.append(tok)  
    return tours  

# 투어 리스트로부터 토큰 리스트 재조립
def _assemble_from_tours_for_shake(tours: List[List[str]]) -> List[str]:  
    toks = [DEPOT]  
    for seg in tours:  
        toks.extend(seg) 
        toks.append(DEPOT) 
    return toks  

# k번의 무작위 작업 이동/교환 수행
def shake(routes: Dict[Any, List[str]], k: int, rng: random.Random) -> Dict[Any, List[str]]:
    new_routes = {aid: toks[:] for aid, toks in routes.items()}
    agv_ids = list(new_routes.keys())
    for _ in range(k):  # k번 반복
        if not agv_ids: break  # AGV가 없으면 종료
        a, b = rng.choice(agv_ids), rng.choice(agv_ids)  # 두 AGV 무작위 선택

        # 같은 AGV 선택 시: 투어 내 작업 위치 무작위 이동
        if a == b:  
            tours = _extract_tours_for_shake(new_routes[a])          # AGV a의 투어 리스트 추출
            if not tours or not any(tours): continue
            ti = rng.choice([i for i, t in enumerate(tours) if t])   # 투어 무작위 선택
            i = rng.randrange(len(tours[ti]))                        # 선택된 투어 내 작업 무작위 선택
            task = tours[ti].pop(i)  
            tours[ti].insert(rng.randrange(len(tours[ti])+1), task)  # 작업을 투어 내 다른 위치에 무작위 삽입
            new_routes[a] = _assemble_from_tours_for_shake(tours)    # 토큰 리스트 재조립
        
        # 다른 AGV 선택 시: 투어 간 작업 무작위 이동
        else:  
            tours_a = _extract_tours_for_shake(new_routes.get(a, []))        # AGV a의 투어 리스트 추출
            if not tours_a or not any(tours_a): continue 
            ai = rng.choice([i for i, t in enumerate(tours_a) if t])         # 투어 무작위 선택
            task = tours_a[ai].pop(rng.randrange(len(tours_a[ai])))          # 선택된 투어 내 작업 무작위 선택 및 제거
            tours_b = _extract_tours_for_shake(new_routes.get(b, []))        # AGV b의 투어 리스트 추출
            if tours_b and any(tours_b): 
                bi = rng.choice([i for i, t in enumerate(tours_b) if t])     # 투어 무작위 선택
                tours_b[bi].insert(rng.randrange(len(tours_b[bi])+1), task)  # 작업을 투어 내 무작위 삽입
            else: tours_b = [[task]]
            new_routes[a] = _assemble_from_tours_for_shake(tours_a)          # 토큰 리스트 재조립
            new_routes[b] = _assemble_from_tours_for_shake(tours_b)         
    return new_routes  # 변경된 경로 반환

# --------------------------------------------------
# Local Search 구현
# --------------------------------------------------

def local_search_vns(routes, stats: Dict[str,int]): 
    cur_routes = {aid: toks[:] for aid, toks in routes.items()}

    # 탐색 연산자 리스트
    search_operators = [
        lambda r: ls_intra_oropt(r, chain_len_max=1),
        lambda r: ls_intra_oropt(r, chain_len_max=2),
        lambda r: ls_intra_oropt(r, chain_len_max=3),
        ls_intra_2opt,
        lambda r: ls_inter_cross_exchange(r, max_len=2),
        lambda r: ls_intra_3opt_partial(r, max_span=40)
    ]  

    while True:
        start_obj = check_feasibility_and_eval(cur_routes).objective  # 현재 목적함수 값
        improved_in_iteration = False
        # 모든 탐색 연산자에 대해
        for operator in search_operators:  
            cand_routes = operator(cur_routes)  # 연산자 적용
            stats["evals"] += 1 
            if cand_routes is not cur_routes:  # 개선되면
                cur_routes = cand_routes  # 경로 갱신
                stats["accepts"] += 1
                improved_in_iteration = True
                break  # 다시 처음 연산자부터 적용
        if not improved_in_iteration:  # 개선 없으면 종료
            break

    return cur_routes, check_feasibility_and_eval(cur_routes)  # 최종 경로 및 평가 결과 반환

# --------------------------------------------------
# VNS-SA (Simulated Annealing) 구현
# --------------------------------------------------

# 흔들기 강도 k를 정하는 클래스
class _KSchedule:

    # k-스케줄러 초기화
    def __init__(self, *, mode: str, min_k: int, max_k: int, iters: int, adaptive_stall_step: int = 25):
        self.mode, self.min_k, self.max_k, self.iters = mode, int(min_k), int(max_k), max(1, int(iters))  # k 범위 및 최대 반복 횟수
        self.adaptive_stall_step = max(1, int(adaptive_stall_step))                                       # k가 증가하는 iteration 간격
        self._cur_k, self._last_accept_iter = self.min_k, 0                                               # 현재 k 값 및 마지막 수용된 반복 횟수

    # 해가 수용되었음을 알림
    def notify_acceptance(self, it_now: int):
        self._last_accept_iter = it_now  # 마지막 수용된 반복 횟수 갱신
        if self.mode == "adaptive": self._cur_k = max(self.min_k, self._cur_k - 1)  # 해가 수용되면 k 한 단계 감소

    # 현재 반복 횟수에 대한 k 값 반환
    def get_k(self, it_now: int) -> int:
        # adaptive 모드: 마지막 수용 이후 경과한 단계 수에 따라 k 조정하여 반환
        if self.mode == "adaptive":
            steps = (it_now - self._last_accept_iter) // self.adaptive_stall_step # 마지막 수용 이후 경과한 단계 수
            k = self.min_k + steps                                                # 목표 k 값
            if k > self._cur_k: self._cur_k += 1    # 목표 k가 증가했으면(개선 없으면) 현재 k도 한 단계 증가
            elif k < self._cur_k: self._cur_k -= 1  # 목표 k가 감소했으면(개선 있으면) 현재 k도 한 단계 감소
            self._cur_k = max(self.min_k, min(self.max_k, self._cur_k))  # k 범위 내로 조정
            return self._cur_k  
        # cyclic 모드: 반복 횟수에 따라 순환적으로 k 반환
        span = self.max_k - self.min_k
        if span <= 0: return self.min_k
        return self.min_k + ((it_now - 1) % (span + 1))

# VNS-SA 기반 해 개선 메인 함수
def run_vns_improvement(agvs: Dict, tasks: Dict, out_df: pd.DataFrame,
                        min_k: int = 1, max_k: int = 6, mode: str = "adaptive",
                        adaptive_stall_step: int = 25,
                        iters: int = 4000, seed: int | None = None,
                        verbose: bool = True,
                        temp0: float = 100.0, cooling: float = 0.999):

    # 시드 설정
    cache_invalidate()  # 캐시 무효화
    seed_used = seed if seed is not None else int.from_bytes(os.urandom(16), 'big')  # 시드 결정
    random.seed(seed_used)                                                           # 전역 RNG 동일 시드 고정
    rng = random.Random(seed_used)                                                   # 지역 RNG
    print(f"[VNS-SA] Seed: {seed_used}") 
    
    # 초기해 설정 및 평가
    base_routes = out_df_to_routes(out_df)               # 초기해 설정
    base_eval = check_feasibility_and_eval(base_routes)  # 초기해 평가
    if not base_eval.feasible:
        print("초기 해가 제약조건을 위반하여 실행할 수 없습니다.")
        return {}
    stats = {"evals": 1, "accepts": 0, "best_updates": 0}

    # 최적해 및 현재해 초기화
    best_routes = {aid: toks[:] for aid, toks in base_routes.items()}     # 최적해
    best_eval = base_eval 
    current_routes = {aid: toks[:] for aid, toks in base_routes.items()}  # 현재해
    current_eval = base_eval
    T = temp0                                                             # 초기 온도

    # k-스케줄러 초기화
    ksch = _KSchedule(mode=mode, min_k=min_k, max_k=max_k, iters=iters, adaptive_stall_step=adaptive_stall_step)

    # 메인 반복문
    pbar = tqdm(range(1, iters + 1), desc=f"VNS-SA (Best: {best_eval.objective:.2f})", disable=not verbose)
    for it in pbar:
        
        # Shaking
        k = ksch.get_k(it)                      # k 값 결정
        shaken = shake(current_routes, k, rng)  # 현재해 흔들기
        
        # Local Search
        loc_routes, loc_eval = local_search_vns(shaken, stats)
        
        # 제약 만족 시 SA
        if loc_eval.feasible:
            delta = loc_eval.objective - current_eval.objective
            if delta < 0 or (T > 0 and rng.random() < math.exp(-delta / T)):  # SA 수용 기준: 더 좋은 해는 항상 수용, 나쁜 해는 확률적으로 수용
                current_routes, current_eval = loc_routes, loc_eval
                stats["accepts"] += 1
                ksch.notify_acceptance(it)  # k-스케줄러에 '수용' 사실을 알림
                
                # 현재 해가 최적해보다 좋으면 최적해 갱신
                if current_eval.objective < best_eval.objective - 1e-9:
                    best_routes, best_eval = {k:v[:] for k,v in current_routes.items()}, current_eval
                    stats["best_updates"] += 1
                    pbar.set_description(f"VNS-SA (Best: {best_eval.objective:.2f})")

        # 온도 냉각
        T *= cooling
        pbar.set_postfix(Temp=f"{T:.2f}")
        if it % 1000 == 0:
            print(
                f"[Iter {it}] "
                f"BestObj={best_eval.objective:.2f}, "
                f"CurObj={current_eval.objective:.2f}, "
                f"Temp={T:.2f}, "
                f"Accepts={stats['accepts']}, BestUpdates={stats['best_updates']}"
            )
    
    # 결과 반환
    return {
        "old_obj": float(base_eval.objective),
        "new_obj": float(best_eval.objective),
        "improved": best_eval.objective + 1e-9 < base_eval.objective,
        "out_df": routes_to_out_df(best_routes),
        "stats": stats,
        "detail": {
            "old": {"total_lateness": float(base_eval.total_lateness), "sum_finish": float(base_eval.sum_finish)},
            "new": {"total_lateness": float(best_eval.total_lateness), "sum_finish": float(best_eval.sum_finish)},
        },
        "seed": int(seed_used),
    }

# --------------------------------------------------
# VNS-SA 실행
# --------------------------------------------------

_improved = run_vns_improvement(
    agvs, tasks, final["out_df"],
    mode="adaptive",
    min_k=1, max_k=6,
    adaptive_stall_step=50,
    iters=10000,
    seed=42,
    temp0=150,     
    cooling=0.9995  
)

# --------------------------------------------------
# VNS-SA 결과 출력
# --------------------------------------------------

print(f"\n[VNS-SA] Objective: {_improved['old_obj']:.3f} -> {_improved['new_obj']:.3f}",
      "(improved)" if _improved["improved"] else "(no change)")
print(f"[VNS-SA] New Lateness*2: {_improved['detail']['new']['total_lateness']*2:.3f}, Sum Finish: {_improved['detail']['new']['sum_finish']:.3f}")
if _IN_NOTEBOOK:
    _nb_display(_improved["out_df"].head())


[VNS-SA] Seed: 42


VNS-SA (Best: 12970.00):  10%|█         | 1001/10000 [21:33<2:39:06,  1.06s/it, Temp=90.92] 

[Iter 1000] BestObj=12970.00, CurObj=13221.00, Temp=90.97, Accepts=14453, BestUpdates=19


VNS-SA (Best: 12874.00):  20%|██        | 2000/10000 [47:05<7:19:39,  3.30s/it, Temp=55.17] 

[Iter 2000] BestObj=12874.00, CurObj=13055.00, Temp=55.17, Accepts=27579, BestUpdates=21


VNS-SA (Best: 12811.00):  30%|███       | 3000/10000 [1:05:22<1:34:43,  1.23it/s, Temp=33.46]

[Iter 3000] BestObj=12811.00, CurObj=12871.00, Temp=33.46, Accepts=39414, BestUpdates=24


VNS-SA (Best: 12709.00):  40%|████      | 4000/10000 [1:23:47<1:26:57,  1.15it/s, Temp=20.29]

[Iter 4000] BestObj=12709.00, CurObj=12730.00, Temp=20.29, Accepts=52293, BestUpdates=37


VNS-SA (Best: 12581.00):  50%|█████     | 5000/10000 [1:39:36<2:08:03,  1.54s/it, Temp=12.31]

[Iter 5000] BestObj=12581.00, CurObj=12719.00, Temp=12.31, Accepts=63212, BestUpdates=44


VNS-SA (Best: 12581.00):  60%|██████    | 6000/10000 [2:01:19<40:48,  1.63it/s, Temp=7.46]   

[Iter 6000] BestObj=12581.00, CurObj=12669.00, Temp=7.46, Accepts=75732, BestUpdates=44


VNS-SA (Best: 12527.00):  70%|███████   | 7001/10000 [2:18:30<48:09,  1.04it/s, Temp=4.52]  

[Iter 7000] BestObj=12527.00, CurObj=12527.00, Temp=4.53, Accepts=86997, BestUpdates=48


VNS-SA (Best: 12509.00):  80%|████████  | 8000/10000 [2:35:48<28:27,  1.17it/s, Temp=2.74]  

[Iter 8000] BestObj=12509.00, CurObj=12509.00, Temp=2.74, Accepts=98212, BestUpdates=51


VNS-SA (Best: 12509.00):  90%|█████████ | 9000/10000 [2:52:21<22:42,  1.36s/it, Temp=1.66]

[Iter 9000] BestObj=12509.00, CurObj=12509.00, Temp=1.66, Accepts=109678, BestUpdates=51


VNS-SA (Best: 12509.00): 100%|██████████| 10000/10000 [3:08:56<00:00,  1.13s/it, Temp=1.01]

[Iter 10000] BestObj=12509.00, CurObj=12509.00, Temp=1.01, Accepts=120744, BestUpdates=51

[VNS-SA] Objective: 18241.000 -> 12509.000 (improved)
[VNS-SA] New Lateness*2: 2177.000, Sum Finish: 10332.000





Unnamed: 0,agv_id,route
0,A001,"DEPOT,T0008,T0074,T0027,T0001,DEPOT,T0091,T007..."
1,A002,"DEPOT,T0014,T0003,DEPOT,T0080,T0028,DEPOT"
2,A003,"DEPOT,T0017,T0035,T0046,DEPOT,T0098,T0058,T004..."
3,A004,"DEPOT,T0005,T0077,T0059,DEPOT,T0090,T0079,DEPO..."
4,A005,"DEPOT,T0002,T0070,T0033,DEPOT,T0067,T0071,DEPO..."


In [9]:
# --------------------------------------------------
# VNS-SA 누적 실행
# --------------------------------------------------

k = 2
for run_idx in range(1, k + 1):
    print(f"\n===== [Run {run_idx}/{k}] 누적 개선 =====")

    _improved = run_vns_improvement(
        agvs, tasks, _improved["out_df"],  # 이전 해에서 계속 개선
        mode="adaptive",
        min_k=1, max_k=6,
        adaptive_stall_step=50,
        iters=2000,
        seed=42 + run_idx,
        temp0=20, 
        cooling=0.9994
    )

# --------------------------------------------------
# VNS-SA 누적 결과 출력
# --------------------------------------------------

print(f"\n✅ 최종 결과: {_improved['new_obj']:.3f}")



===== [Run 1/2] 누적 개선 =====
[VNS-SA] Seed: 43


VNS-SA (Best: 12509.00):  50%|█████     | 1001/2000 [15:37<08:22,  1.99it/s, Temp=10.97] 

[Iter 1000] BestObj=12509.00, CurObj=12530.00, Temp=10.97, Accepts=10918, BestUpdates=0


VNS-SA (Best: 12509.00): 100%|██████████| 2000/2000 [31:44<00:00,  1.05it/s, Temp=6.02] 


[Iter 2000] BestObj=12509.00, CurObj=12518.00, Temp=6.02, Accepts=22556, BestUpdates=0

===== [Run 2/2] 누적 개선 =====
[VNS-SA] Seed: 44


VNS-SA (Best: 12463.00):  50%|█████     | 1001/2000 [15:31<08:35,  1.94it/s, Temp=10.97] 

[Iter 1000] BestObj=12463.00, CurObj=12463.00, Temp=10.97, Accepts=11879, BestUpdates=2


VNS-SA (Best: 12457.00): 100%|██████████| 2000/2000 [30:53<00:00,  1.08it/s, Temp=6.02] 

[Iter 2000] BestObj=12457.00, CurObj=12457.00, Temp=6.02, Accepts=23896, BestUpdates=3

✅ 최종 결과: 12457.000





In [10]:
# --------------------------------------------------
# 작업 누락 확인
# --------------------------------------------------

def count_tasks_in_routes(routes_dict):
    task_set = set()
    for agv_id, toks in routes_dict.items():
        for task in toks:
            if task != DEPOT:
                task_set.add(task)
    return len(task_set)

# 최종 결과 DataFrame을 routes 딕셔너리 형태로 변환
final_routes_dict = out_df_to_routes(_improved["out_df"])

# 총 작업 수 계산
total_assigned_tasks = count_tasks_in_routes(final_routes_dict)

print("\n--- 최종 태스크 할당 검증 ---")
print(f"최종적으로 할당된 고유한 태스크의 총 개수: {total_assigned_tasks}개")

if total_assigned_tasks == 100:
    print("✅ 모든 태스크(100개)가 누락 없이 할당되었습니다.")
else:
    print(f"⚠️ 경고: 최종 태스크 수가 100개가 아닙니다. (현재 {total_assigned_tasks}개)")


--- 최종 태스크 할당 검증 ---
최종적으로 할당된 고유한 태스크의 총 개수: 100개
✅ 모든 태스크(100개)가 누락 없이 할당되었습니다.


In [11]:
# --------------------------------------------------
# 최종해 csv파일로 내보내기
# --------------------------------------------------
_improved["out_df"].to_csv("./data/final_submission.csv", index=False)