In [1]:
# 필요한 라이브러리 임포트
import pandas as pd
import numpy as np
from scipy.spatial.distance import squareform, pdist

In [3]:
# 데이터 파일 경로
data_path = './data.csv'  # 참가자가 제공받은 경로로 설정
data = pd.read_csv(data_path)

print("데이터 로드 완료!")
print(f"총 데이터 포인트 수: {len(data)}")
print(data.head())  # 데이터 확인

데이터 로드 완료!
총 데이터 포인트 수: 76
  point_id   x   y  demand
0    DEPOT   0   0       0
1  TOWN_01  34  94       2
2  TOWN_02  40  89       4
3  TOWN_03  39  95       1
4  TOWN_04  47  94       4


In [9]:
# 산타의 썰매 용량
santa_capacity = 25

# 출발점 설정 및 경로 초기화
route = ["DEPOT"]
total_distance = 0
current_capacity = santa_capacity

# 모든 포인트 간 거리 계산 (거리 행렬 생성)
points = data[['x', 'y']].values
distance_matrix = squareform(pdist(points, metric='euclidean'))
distance_df = pd.DataFrame(distance_matrix, index=data['point_id'], columns=data['point_id'])

# DEPOT과 방문해야 할 포인트 분리
remaining_points = data[data['point_id'] != 'DEPOT'].copy()
current_position = 'DEPOT'  # 출발점

print("데이터 전처리 및 초기 설정 완료!")

데이터 전처리 및 초기 설정 완료!


In [16]:
while not remaining_points.empty:
    # 현재 용량으로 방문 가능한 포인트 필터링
    feasible_points = remaining_points[remaining_points['demand'] <= current_capacity].copy()

    # 방문 가능한 포인트가 없는 경우: DEPOT으로 복귀
    if feasible_points.empty:
        # DEPOT 복귀
        total_distance += distance_df.loc[current_position, 'DEPOT']
        route.append("DEPOT")
        current_position = "DEPOT"
        current_capacity = santa_capacity
        continue

    # 가장 가까운 포인트 선택 (거리 계산 후 추가)
    feasible_points['distance'] = feasible_points['point_id'].apply(
        lambda x: distance_df.loc[current_position, x]
    )
    nearest_point = feasible_points.loc[feasible_points['distance'].idxmin()]

    # 경로 업데이트
    route.append(nearest_point['point_id'])
    total_distance += distance_df.loc[current_position, nearest_point['point_id']]
    current_position = nearest_point['point_id']
    current_capacity -= nearest_point['demand']

    # 방문한 포인트 제거
    remaining_points = remaining_points[remaining_points['point_id'] != nearest_point['point_id']].copy()

# 마지막으로 DEPOT으로 복귀
total_distance += distance_df.loc[current_position, 'DEPOT']
route.append("DEPOT")

print("탐욕 알고리즘 실행 완료!")
print(f"총 이동 거리: {total_distance}")

탐욕 알고리즘 실행 완료!
총 이동 거리: 2687.2839186154524


In [18]:
# 경로를 CSV 파일로 저장
output_file = './santa_route.csv'
route_df = pd.DataFrame(route, columns=['point_id'])
route_df.to_csv(output_file, index=False)

print(f"최종 경로 제출 Submission 파일 저장 완료: {output_file}")

최종 경로 제출 Submission 파일 저장 완료: ./santa_route.csv


In [24]:
import math
import pandas as pd

# 최대 적재 용량
CAPACITY = 25

def euclidean_distance(x1, y1, x2, y2):
    """유클리디안 거리 계산"""
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2)

def read_data(csv_path: str):
    """
    data.csv 파일을 불러와서
    각 지점 정보 (point_id, x, y, demand)를 반환한다.
    
    예시 CSV 구조:
    point_id,x,y,demand
    DEPOT,0,0,0
    TOWN_01,10,10,3
    ...
    TOWN_75, ...
    """
    df = pd.read_csv(csv_path)
    
    # point_id를 key로, (x, y, demand) 튜플을 저장
    locations = {}
    for _, row in df.iterrows():
        pid = row['point_id']
        x, y, d = row['x'], row['y'], row['demand']
        locations[pid] = (x, y, d)
    
    return locations

def compute_distance_matrix(locations):
    """
    모든 지점 쌍 간 거리 행렬을 딕셔너리로 구성
    dist[(i, j)] = i 지점에서 j 지점까지의 거리
    """
    dist = {}
    keys = list(locations.keys())
    for i in keys:
        for j in keys:
            if i == j:
                dist[(i, j)] = 0.0
            else:
                x1, y1, _ = locations[i]
                x2, y2, _ = locations[j]
                dist[(i, j)] = euclidean_distance(x1, y1, x2, y2)
    return dist

def clarke_wright_savings(locations, dist):
    """
    Clarke & Wright Savings 알고리즘 (단일 차량을 가정하지만,
    실제로는 여러 라우트가 결과로 나올 수 있음)
    
    1. 각 마을을 'DEPOT -> 마을 -> DEPOT' 형태의 독립 라우트로 초기화
    2. (i, j) 쌍에 대해 Saving = d(DEPOT, i) + d(DEPOT, j) - d(i, j) 계산
    3. Saving이 큰 순으로 정렬
    4. 순서대로 (i, j)를 병합할 수 있는지 확인:
       - i가 속한 라우트의 맨 끝(또는 시작)과 j가 속한 라우트의 맨 끝(또는 시작)이면서,
       - 용량 제약(CAPACITY) 만족 시 병합
    5. 병합이 끝나면 라우트 목록 반환
    """
    # depot 찾기 (문제에서 point_id="DEPOT" 가 유일하다 가정)
    depot = None
    for k, v in locations.items():
        if k.upper() == "DEPOT":
            depot = k
            break

    # 모든 마을 리스트(DEPOT 제외)
    town_ids = [pid for pid in locations.keys() if pid != depot]

    # 초기 라우트: 각 마을을 단독 라우트로 (DEPOT -> 마을 -> DEPOT)
    routes = {}
    route_demands = {}
    for t in town_ids:
        routes[t] = [depot, t, depot]  # 라우트
        # 각 라우트(단일 마을)의 수요
        _, _, d = locations[t]
        route_demands[t] = d

    # Saving 값 계산
    savings_list = []
    for i in town_ids:
        for j in town_ids:
            if i != j:
                s = dist[(depot, i)] + dist[(depot, j)] - dist[(i, j)]
                savings_list.append((i, j, s))
    # Saving이 큰 순으로 정렬
    savings_list.sort(key=lambda x: x[2], reverse=True)

    # Clarke & Wright 병합 과정
    for (i, j, saving) in savings_list:
        # i, j 각각 어떤 라우트에 속해 있는지 검색
        route_i_key, route_j_key = None, None
        for r_key, path in routes.items():
            if i in path:
                route_i_key = r_key
            if j in path:
                route_j_key = r_key

        # 이미 같은 라우트면 스킵
        if route_i_key == route_j_key:
            continue

        route_i = routes[route_i_key]
        route_j = routes[route_j_key]

        # i가 route_i의 끝(DEPOT 앞), j가 route_j의 시작(DEPOT 뒤)인 경우만 단순화 처리
        if route_i[-2] == i and route_j[1] == j:
            # 두 라우트의 수요 합산이 용량 이하면 병합
            if route_demands[route_i_key] + route_demands[route_j_key] <= CAPACITY:
                new_route = route_i[:-1] + route_j[1:]  # DEPOT 중복 제거
                new_demand = route_demands[route_i_key] + route_demands[route_j_key]

                # 기존 라우트 제거
                del routes[route_i_key]
                del routes[route_j_key]

                # 신규 라우트 저장 (키는 임의로 route_i_key 재활용)
                routes[route_i_key] = new_route
                route_demands[route_i_key] = new_demand

    # 최종 라우트 리스트
    final_routes = list(routes.values())
    return final_routes

def flatten_routes_for_submission(routes):
    """
    여러 라우트(예: [[DEPOT, TOWN_01, DEPOT], [DEPOT, TOWN_02, DEPOT], ...])를
    sample_submission 형식대로 1차원 리스트로 펴서 반환.
    
    sample_submission 예시:
      point_id
      DEPOT
      TOWN_01
      TOWN_03
      DEPOT
      TOWN_02
      DEPOT
    """
    path_sequence = []
    for route in routes:
        # 라우트별로 순차적으로 추가 (DEPOT 중복 가능)
        for node in route:
            path_sequence.append(node)
    return path_sequence

def main():
    # 실제 data.csv 읽기 (경로는 예시)
    locations = read_data('./data.csv')

    # 거리 행렬 계산
    dist = compute_distance_matrix(locations)

    # Clarke & Wright 적용
    routes = clarke_wright_savings(locations, dist)

    # 결과 라우트를 sample_submission 형식으로 변환
    submission_path = flatten_routes_for_submission(routes)

    # 콘솔 출력 예시
    print("=== Clarke & Wright 결과 라우트 (Multi-Route) ===")
    for idx, route in enumerate(routes):
        print(f"Route {idx+1} : {' -> '.join(route)}")

    print("\n=== sample_submission 형식 예시 ===")
    print("point_id")
    for node in submission_path:
        print(node)

    # CSV로 저장 예시
    output_file = './santa_route.csv'
    # submission_path를 DataFrame으로 만들어 저장
    route_df = pd.DataFrame({'point_id': submission_path})
    route_df.to_csv(output_file, index=False)
    print(f"\n[INFO] Route saved to {output_file}")

if __name__ == "__main__":
    main()


=== Clarke & Wright 결과 라우트 (Multi-Route) ===
Route 1 : DEPOT -> TOWN_01 -> TOWN_44 -> TOWN_05 -> TOWN_03 -> TOWN_74 -> TOWN_04 -> TOWN_02 -> DEPOT
Route 2 : DEPOT -> TOWN_06 -> TOWN_08 -> TOWN_72 -> TOWN_66 -> TOWN_71 -> TOWN_53 -> TOWN_42 -> DEPOT
Route 3 : DEPOT -> TOWN_15 -> TOWN_21 -> TOWN_24 -> TOWN_16 -> TOWN_22 -> DEPOT
Route 4 : DEPOT -> TOWN_33 -> TOWN_67 -> TOWN_55 -> TOWN_69 -> TOWN_57 -> TOWN_19 -> TOWN_61 -> DEPOT
Route 5 : DEPOT -> TOWN_28 -> TOWN_25 -> TOWN_75 -> TOWN_52 -> TOWN_59 -> TOWN_70 -> TOWN_51 -> TOWN_36 -> DEPOT
Route 6 : DEPOT -> TOWN_14 -> TOWN_20 -> TOWN_23 -> TOWN_43 -> TOWN_64 -> TOWN_54 -> DEPOT
Route 7 : DEPOT -> TOWN_34 -> TOWN_18 -> TOWN_60 -> TOWN_17 -> TOWN_09 -> TOWN_73 -> TOWN_07 -> TOWN_47 -> TOWN_38 -> DEPOT
Route 8 : DEPOT -> TOWN_58 -> TOWN_40 -> TOWN_49 -> TOWN_26 -> TOWN_27 -> TOWN_63 -> TOWN_65 -> DEPOT
Route 9 : DEPOT -> TOWN_62 -> TOWN_35 -> TOWN_46 -> TOWN_48 -> TOWN_39 -> TOWN_45 -> DEPOT
Route 10 : DEPOT -> TOWN_32 -> TOWN_41 -> TOWN_5

In [26]:
import math
import random
import pandas as pd

# 최대 적재 용량
CAPACITY = 25

def euclidean_distance(x1, y1, x2, y2):
    """유클리디안 거리 계산"""
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2)

def read_data(csv_path: str):
    """
    data.csv 파일을 불러와서
    각 지점 정보 (point_id, x, y, demand)를 반환한다.
    """
    df = pd.read_csv(csv_path)
    
    # point_id를 key로, (x, y, demand) 튜플을 저장
    locations = {}
    for _, row in df.iterrows():
        pid = row['point_id']
        x, y, d = row['x'], row['y'], row['demand']
        locations[pid] = (x, y, d)
    return locations

def compute_distance_matrix(locations):
    """모든 지점 쌍 간 거리 행렬을 딕셔너리로 구성 dist[(i, j)] = 거리"""
    dist = {}
    keys = list(locations.keys())
    for i in keys:
        for j in keys:
            if i == j:
                dist[(i, j)] = 0.0
            else:
                x1, y1, _ = locations[i]
                x2, y2, _ = locations[j]
                dist[(i, j)] = euclidean_distance(x1, y1, x2, y2)
    return dist

def clarke_wright_savings(locations, dist):
    """
    Clarke & Wright 알고리즘으로부터 여러 라우트(해결책)를 구함.
    (단순 버전)
    """
    # depot 찾기
    depot = None
    for k in locations.keys():
        if k.upper() == "DEPOT":
            depot = k
            break

    # 마을 목록
    town_ids = [pid for pid in locations.keys() if pid != depot]

    # 초기 라우트: 각 마을을 [DEPOT, TOWN_i, DEPOT]
    routes = {}
    route_demands = {}
    for t in town_ids:
        routes[t] = [depot, t, depot]
        _, _, d = locations[t]
        route_demands[t] = d

    # Saving 계산
    savings_list = []
    for i in town_ids:
        for j in town_ids:
            if i != j:
                s = dist[(depot, i)] + dist[(depot, j)] - dist[(i, j)]
                savings_list.append((i, j, s))
    savings_list.sort(key=lambda x: x[2], reverse=True)

    # 병합
    for (i, j, saving) in savings_list:
        route_i_key, route_j_key = None, None
        for r_key, path in routes.items():
            if i in path:
                route_i_key = r_key
            if j in path:
                route_j_key = r_key
        if route_i_key == route_j_key:
            continue

        route_i = routes[route_i_key]
        route_j = routes[route_j_key]

        # 단순화: i가 route_i의 끝(DEPOT 앞), j가 route_j의 시작(DEPOT 뒤)
        if route_i[-2] == i and route_j[1] == j:
            # 용량 제약
            if route_demands[route_i_key] + route_demands[route_j_key] <= CAPACITY:
                new_route = route_i[:-1] + route_j[1:]
                new_demand = route_demands[route_i_key] + route_demands[route_j_key]

                del routes[route_i_key]
                del routes[route_j_key]

                routes[route_i_key] = new_route
                route_demands[route_i_key] = new_demand

    final_routes = list(routes.values())
    return final_routes

# -----------------------------------------------------------------------------
#                 === 유전 알고리즘(Genetic Algorithm, GA) 부분 ===
# -----------------------------------------------------------------------------
def split_tour_into_routes(chromosome, locations, dist):
    """
    주어진 '순열(Chromosome)'을 용량 제약(CAPACITY)에 맞게
    여러 라우트로 분할한 뒤 총 이동거리를 계산/반환.
    - chromosome : [TOWN_01, TOWN_05, TOWN_02, ...]
    - return : (routes, total_distance)
      예: ( [ [DEPOT, TOWN_01, TOWN_02, DEPOT],
               [DEPOT, TOWN_05, DEPOT],
               ... ],
             123.45 )
    """
    depot = None
    for k in locations.keys():
        if k.upper() == "DEPOT":
            depot = k
            break

    routes = []
    current_route = [depot]
    current_load = 0

    for town in chromosome:
        demand = locations[town][2]
        # 만약 현재 라우트에 town을 추가해도 용량 초과가 아니라면 추가
        if current_load + demand <= CAPACITY:
            current_route.append(town)
            current_load += demand
        else:
            # 기존 라우트를 DEPOT으로 닫고
            current_route.append(depot)
            routes.append(current_route)
            # 새 라우트 시작
            current_route = [depot, town]
            current_load = demand

    # 마지막 라우트 닫기
    current_route.append(depot)
    routes.append(current_route)

    # 이제 각 라우트별로 거리 계산
    total_dist = 0.0
    for rt in routes:
        for i in range(len(rt)-1):
            a, b = rt[i], rt[i+1]
            total_dist += dist[(a, b)]

    return routes, total_dist


def create_initial_population(town_ids, pop_size, cw_routes=None):
    """
    초기 해(population) 생성
    - town_ids: 마을 목록 (ex: ['TOWN_01','TOWN_02',...])
    - pop_size: 인구(개체) 수
    - cw_routes: Clarke & Wright 결과 (옵션) 
      => C&W로 나온 라우트를 하나의 순열로 변환해 염색체에 포함 가능
    """
    population = []

    # (1) Clarke & Wright 결과 해를 1개 포함 (옵션)
    if cw_routes:
        # cw_routes는 예: [[DEPOT, TOWN_01, TOWN_02, DEPOT], [DEPOT, TOWN_03, DEPOT], ...]
        # DEPOT 제외하고 순서대로 이어붙이면 순열이 됨
        cw_chromosome = []
        for r in cw_routes:
            for node in r:
                if node.upper() != "DEPOT":
                    cw_chromosome.append(node)
        population.append(cw_chromosome)

    # (2) 나머지는 랜덤 순열
    need_more = pop_size - len(population)
    for _ in range(need_more):
        chrom = town_ids[:]
        random.shuffle(chrom)
        population.append(chrom)

    return population

def order_crossover(parent1, parent2):
    """
    Order Crossover (OX) 예시
    - parent1, parent2: 순열(리스트)
    - return: child1, child2
    """
    size = len(parent1)
    # 두 지점 잘라서 구간 설정
    start, end = sorted(random.sample(range(size), 2))
    
    child1 = [None]*size
    child2 = [None]*size

    # 자식에 부모1의 start~end 구간 복사
    child1[start:end+1] = parent1[start:end+1]
    child2[start:end+1] = parent2[start:end+1]

    # 나머지 칸을 parent2 순서대로 채우기
    pointer1 = end+1
    pointer2 = end+1

    # child1 채우기
    for i in range(size):
        idx = (end+1 + i) % size
        gene = parent2[idx]
        if gene not in child1:
            child1[pointer1 % size] = gene
            pointer1 += 1

    # child2 채우기
    for i in range(size):
        idx = (end+1 + i) % size
        gene = parent1[idx]
        if gene not in child2:
            child2[pointer2 % size] = gene
            pointer2 += 1

    return child1, child2

def mutation(chromosome, mutation_rate=0.1):
    """
    간단 스왑(Swap) 돌연변이
    """
    if random.random() < mutation_rate:
        a, b = random.sample(range(len(chromosome)), 2)
        chromosome[a], chromosome[b] = chromosome[b], chromosome[a]

def tournament_selection(pop, fitnesses, k=2):
    """
    토너먼트 선택 (k명 중 베스트 1명)
    - pop: 전체 개체 리스트
    - fitnesses: 개체별 비용(거리), 작을수록 좋음
    """
    selected = random.sample(range(len(pop)), k)
    best_idx = None
    best_cost = float('inf')
    for idx in selected:
        cost = fitnesses[idx]
        if cost < best_cost:
            best_cost = cost
            best_idx = idx
    return pop[best_idx]

def genetic_algorithm_cvrp(locations, dist, 
                           pop_size=30, 
                           max_gen=100, 
                           mutation_rate=0.1, 
                           elite_size=1):
    """
    유전 알고리즘을 이용해 CVRP 해를 탐색
    - locations, dist: 문제 정보
    - pop_size: 초기 개체 수
    - max_gen: 세대 수
    - mutation_rate: 돌연변이 확률
    - elite_size: 엘리트 보존 개수
    """
    # depot 및 town 목록
    depot = None
    for k in locations.keys():
        if k.upper() == "DEPOT":
            depot = k
            break
    town_ids = [pid for pid in locations.keys() if pid != depot]

    # (1) Clarke & Wright으로 1개의 라우트 후보를 얻어 둠
    cw_routes = clarke_wright_savings(locations, dist)
    
    # (2) 초기 해(population) 생성
    population = create_initial_population(town_ids, pop_size, cw_routes)

    best_solution = None
    best_cost = float('inf')

    for gen in range(max_gen):
        # (3) 각 개체에 대해 비용(거리) 계산
        fitnesses = []
        for chrom in population:
            _, cost = split_tour_into_routes(chrom, locations, dist)
            fitnesses.append(cost)

        # 현재 세대에서의 최고 해
        min_cost = min(fitnesses)
        min_idx = fitnesses.index(min_cost)
        if min_cost < best_cost:
            best_cost = min_cost
            best_solution = population[min_idx][:]  # 복제

        # (4) 새 세대 생성
        new_population = []

        # (4-1) 엘리트 보존
        #      - 우수한 순으로 정렬 후 elite_size만큼 복사
        sorted_pop = sorted(zip(population, fitnesses), key=lambda x: x[1])
        elites = [p[0] for p in sorted_pop[:elite_size]]
        new_population.extend(elites)

        # (4-2) 나머지 개체를 교차+변이해서 채우기
        while len(new_population) < pop_size:
            # 토너먼트 선택 2번 -> 부모1, 부모2
            parent1 = tournament_selection(population, fitnesses, k=2)
            parent2 = tournament_selection(population, fitnesses, k=2)

            child1, child2 = order_crossover(parent1, parent2)

            # 돌연변이
            mutation(child1, mutation_rate)
            mutation(child2, mutation_rate)

            new_population.append(child1)
            if len(new_population) < pop_size:
                new_population.append(child2)

        # 새 세대로 교체
        population = new_population

    # 최종 best_solution 반환
    # (best_solution은 순열 형태 -> 경로로 변환해서 반환)
    final_routes, final_dist = split_tour_into_routes(best_solution, locations, dist)
    return final_routes, final_dist

# -----------------------------------------------------------------------------
#                               === 메인 실행 ===
# -----------------------------------------------------------------------------
def flatten_routes_for_submission(routes):
    """
    여러 라우트(예: [[DEPOT, TOWN_01, DEPOT], [DEPOT, TOWN_02, DEPOT], ...])를
    sample_submission 형식대로 1차원 리스트로 펴서 반환.
    """
    path_sequence = []
    for route in routes:
        for node in route:
            path_sequence.append(node)
    return path_sequence

def main():
    # (1) data.csv 읽기
    locations = read_data('./data.csv')
    dist = compute_distance_matrix(locations)

    # (2) 유전 알고리즘 실행
    best_routes, best_distance = genetic_algorithm_cvrp(
        locations, 
        dist,
        pop_size=30,       # 개체 수
        max_gen=100,       # 세대 반복 수
        mutation_rate=0.1, # 돌연변이 확률
        elite_size=1       # 엘리트 보존 개수
    )

    print("=== GA 결과 ===")
    print(f"총 이동 거리: {best_distance:.2f}")
    for idx, route in enumerate(best_routes):
        print(f"Route {idx+1}: {' -> '.join(route)}")

    # (3) sample_submission 형태로 CSV 저장
    submission_path = flatten_routes_for_submission(best_routes)
    output_file = './santa_route_ga.csv'
    route_df = pd.DataFrame({'point_id': submission_path})
    route_df.to_csv(output_file, index=False)
    print(f"\n[INFO] GA-based route saved to {output_file}")

if __name__ == "__main__":
    main()


=== GA 결과 ===
총 이동 거리: 2243.95
Route 1: DEPOT -> TOWN_01 -> TOWN_44 -> TOWN_05 -> TOWN_03 -> TOWN_74 -> TOWN_04 -> TOWN_02 -> DEPOT
Route 2: DEPOT -> TOWN_06 -> TOWN_08 -> TOWN_72 -> TOWN_66 -> TOWN_71 -> TOWN_53 -> TOWN_42 -> DEPOT
Route 3: DEPOT -> TOWN_15 -> TOWN_21 -> TOWN_24 -> TOWN_16 -> TOWN_22 -> TOWN_33 -> DEPOT
Route 4: DEPOT -> TOWN_67 -> TOWN_55 -> TOWN_69 -> TOWN_57 -> TOWN_19 -> TOWN_61 -> DEPOT
Route 5: DEPOT -> TOWN_28 -> TOWN_25 -> TOWN_75 -> TOWN_52 -> TOWN_59 -> TOWN_70 -> TOWN_51 -> TOWN_36 -> DEPOT
Route 6: DEPOT -> TOWN_14 -> TOWN_20 -> TOWN_23 -> TOWN_43 -> TOWN_64 -> TOWN_54 -> DEPOT
Route 7: DEPOT -> TOWN_34 -> TOWN_18 -> TOWN_60 -> TOWN_17 -> TOWN_09 -> TOWN_73 -> TOWN_07 -> TOWN_47 -> TOWN_38 -> DEPOT
Route 8: DEPOT -> TOWN_58 -> TOWN_40 -> TOWN_49 -> TOWN_26 -> TOWN_27 -> TOWN_63 -> TOWN_65 -> DEPOT
Route 9: DEPOT -> TOWN_62 -> TOWN_35 -> TOWN_46 -> TOWN_48 -> TOWN_39 -> TOWN_45 -> DEPOT
Route 10: DEPOT -> TOWN_32 -> TOWN_41 -> TOWN_50 -> TOWN_56 -> TOWN_37 

In [28]:
import math
import random
import pandas as pd

# 최대 적재 용량
CAPACITY = 25

def euclidean_distance(x1, y1, x2, y2):
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2)

def read_data(csv_path: str):
    df = pd.read_csv(csv_path)
    locations = {}
    for _, row in df.iterrows():
        pid = row['point_id']
        x, y, d = row['x'], row['y'], row['demand']
        locations[pid] = (x, y, d)
    return locations

def compute_distance_matrix(locations):
    dist = {}
    keys = list(locations.keys())
    for i in keys:
        for j in keys:
            if i == j:
                dist[(i, j)] = 0.0
            else:
                x1, y1, _ = locations[i]
                x2, y2, _ = locations[j]
                dist[(i, j)] = euclidean_distance(x1, y1, x2, y2)
    return dist

# -----------------------------------------------------------------------------
#                      Clarke & Wright (초기해 생성용)
# -----------------------------------------------------------------------------
def clarke_wright_savings(locations, dist):
    depot = [k for k in locations.keys() if k.upper() == "DEPOT"][0]
    town_ids = [pid for pid in locations.keys() if pid != depot]

    routes = {}
    demands = {}
    for t in town_ids:
        routes[t] = [depot, t, depot]
        demands[t] = locations[t][2]  # demand

    # Saving 계산
    savings_list = []
    for i in town_ids:
        for j in town_ids:
            if i != j:
                s = dist[(depot, i)] + dist[(depot, j)] - dist[(i, j)]
                savings_list.append((i, j, s))
    savings_list.sort(key=lambda x: x[2], reverse=True)

    for (i, j, saving) in savings_list:
        if i in routes and j in routes:
            if i != j: 
                route_i = routes[i]
                route_j = routes[j]
                # 단순화: i가 route_i[-2], j가 route_j[1]
                if route_i[-2] == i and route_j[1] == j:
                    if demands[i] + demands[j] <= CAPACITY:
                        new_route = route_i[:-1] + route_j[1:]
                        new_demand = demands[i] + demands[j]
                        routes[i] = new_route
                        demands[i] = new_demand
                        del routes[j]
                        del demands[j]

    final_routes = list(routes.values())
    return final_routes


# -----------------------------------------------------------------------------
#                      GA 관련 유틸 (스플릿, 토너먼트 등)
# -----------------------------------------------------------------------------
def split_tour_into_routes(chromosome, locations, dist):
    """
    염색체(순열)을 용량 제약에 맞춰 여러 라우트로 나눈 뒤 총 이동거리 계산
    """
    depot = [k for k in locations.keys() if k.upper() == "DEPOT"][0]
    routes = []
    current_route = [depot]
    current_load = 0

    for town in chromosome:
        demand = locations[town][2]
        if current_load + demand <= CAPACITY:
            current_route.append(town)
            current_load += demand
        else:
            # 기존 라우트를 DEPOT으로 닫고
            current_route.append(depot)
            routes.append(current_route)
            # 새 라우트 시작
            current_route = [depot, town]
            current_load = demand

    # 마지막 라우트 닫기
    current_route.append(depot)
    routes.append(current_route)

    # 이동 거리 계산
    total_dist = 0.0
    for rt in routes:
        for i in range(len(rt)-1):
            a, b = rt[i], rt[i+1]
            total_dist += dist[(a, b)]
    return routes, total_dist

def tournament_selection(pop, fitnesses, k=2):
    """토너먼트 선택"""
    selected = random.sample(range(len(pop)), k)
    best_idx = None
    best_cost = float('inf')
    for idx in selected:
        if fitnesses[idx] < best_cost:
            best_cost = fitnesses[idx]
            best_idx = idx
    return pop[best_idx]


# -----------------------------------------------------------------------------
#                      다양한 Crossover 기법
# -----------------------------------------------------------------------------
def order_crossover(parent1, parent2):
    """
    Order Crossover (OX)
    1) 구간 [start, end]를 무작위로 선택
    2) 부모1의 start~end 구간을 자식1에 복사
    3) 남은 칸에 부모2의 순서를 유지하며 노드를 채워넣음
    (자식2는 반대 방향으로 진행)
    """
    size = len(parent1)
    start, end = sorted(random.sample(range(size), 2))

    child1 = [None]*size
    child2 = [None]*size

    # 부모1 구간 -> 자식1, 부모2 구간 -> 자식2
    child1[start:end+1] = parent1[start:end+1]
    child2[start:end+1] = parent2[start:end+1]

    # 자식1 나머지 칸 채우기
    ptr = (end+1) % size
    for i in range(size):
        idx = (end+1 + i) % size
        gene = parent2[idx]
        if gene not in child1:
            child1[ptr] = gene
            ptr = (ptr + 1) % size

    # 자식2 나머지 칸 채우기
    ptr = (end+1) % size
    for i in range(size):
        idx = (end+1 + i) % size
        gene = parent1[idx]
        if gene not in child2:
            child2[ptr] = gene
            ptr = (ptr + 1) % size

    return child1, child2

def pmx_crossover(parent1, parent2):
    """
    PMX (Partially Mapped Crossover)
    1) 구간 [start, end] 선택
    2) 그 구간을 parent1 -> child1, parent2 -> child2 에 복사
    3) 교차 구간 내 '맵핑' 정보를 바탕으로, 나머지 위치에서 노드를 치환
    """
    size = len(parent1)
    start, end = sorted(random.sample(range(size), 2))

    child1 = [None]*size
    child2 = [None]*size

    # 교차 구간 복사
    for i in range(start, end+1):
        child1[i] = parent1[i]
        child2[i] = parent2[i]

    # 맵핑 영역(교차 구간) 처리
    def pmx_mapping(child, other_parent):
        for i in range(start, end+1):
            gene = other_parent[i]
            if gene not in child:
                pos = i
                # 충돌이 없을 때까지 치환
                while child[pos] is not None:
                    # child[pos]가 실제로 다른 노드와 충돌날 경우
                    val = other_parent[pos]
                    pos = other_parent.index(val)
                child[pos] = gene

    pmx_mapping(child1, parent2)
    pmx_mapping(child2, parent1)

    # 나머지 None인 칸 채우기
    for i in range(size):
        if child1[i] is None:
            child1[i] = parent2[i]
        if child2[i] is None:
            child2[i] = parent1[i]

    return child1, child2


# -----------------------------------------------------------------------------
#                     GA 메인 루프 (crossover_method 파라미터)
# -----------------------------------------------------------------------------
def create_initial_population(town_ids, pop_size, cw_routes=None):
    """초기 해 생성 (Clarke & Wright 결과 + 랜덤 순열)"""
    population = []
    if cw_routes: 
        # Clarke & Wright 라우트를 단일 순열로 변환
        cw_chrom = []
        for r in cw_routes:
            for node in r:
                if node.upper() != "DEPOT":
                    cw_chrom.append(node)
        population.append(cw_chrom)

    # 나머지는 랜덤
    while len(population) < pop_size:
        chrom = town_ids[:]
        random.shuffle(chrom)
        population.append(chrom)
    return population

def mutation(chromosome, mutation_rate=0.1):
    """단순 스왑 돌연변이"""
    if random.random() < mutation_rate:
        a, b = random.sample(range(len(chromosome)), 2)
        chromosome[a], chromosome[b] = chromosome[b], chromosome[a]

def genetic_algorithm_cvrp(locations, dist, 
                           pop_size=30, 
                           max_gen=100, 
                           mutation_rate=0.1, 
                           elite_size=1,
                           crossover_method='order'):
    """
    crossover_method : 'order' 또는 'pmx' 등 선택
    """
    random.seed(0)  # 재현성 유지를 위해 (테스트 시)

    depot = [k for k in locations.keys() if k.upper() == "DEPOT"][0]
    town_ids = [pid for pid in locations.keys() if pid != depot]

    # 초기해 생성 (Clarke & Wright 1개 + 랜덤)
    cw_routes = clarke_wright_savings(locations, dist)
    population = create_initial_population(town_ids, pop_size, cw_routes)

    best_solution = None
    best_cost = float('inf')

    for gen in range(max_gen):
        # 1) 적합도 평가 (총 이동 거리)
        fitnesses = []
        for chrom in population:
            _, cost = split_tour_into_routes(chrom, locations, dist)
            fitnesses.append(cost)

        # 현재 세대에서 최고 해 갱신
        min_cost = min(fitnesses)
        min_idx = fitnesses.index(min_cost)
        if min_cost < best_cost:
            best_cost = min_cost
            best_solution = population[min_idx][:]

        # 2) 새로운 세대 구성
        new_population = []

        # (a) 엘리트 보존
        sorted_pop = sorted(zip(population, fitnesses), key=lambda x: x[1])
        elites = [p[0] for p in sorted_pop[:elite_size]]
        new_population.extend(elites)

        # (b) 나머지 개체 생성
        while len(new_population) < pop_size:
            # 부모 선택
            parent1 = tournament_selection(population, fitnesses, k=2)
            parent2 = tournament_selection(population, fitnesses, k=2)

            # crossover
            if crossover_method == 'order':
                child1, child2 = order_crossover(parent1, parent2)
            elif crossover_method == 'pmx':
                child1, child2 = pmx_crossover(parent1, parent2)
            else:
                # 기본은 order
                child1, child2 = order_crossover(parent1, parent2)

            # 돌연변이
            mutation(child1, mutation_rate)
            mutation(child2, mutation_rate)

            new_population.append(child1)
            if len(new_population) < pop_size:
                new_population.append(child2)

        # 세대 교체
        population = new_population

    # GA 종료 후 best_solution (순열) → 실제 라우트로 변환
    final_routes, final_dist = split_tour_into_routes(best_solution, locations, dist)
    return final_routes, final_dist

# -----------------------------------------------------------------------------
#                               실행 예시
# -----------------------------------------------------------------------------
def flatten_routes_for_submission(routes):
    path_sequence = []
    for route in routes:
        for node in route:
            path_sequence.append(node)
    return path_sequence

def main():
    # 1) 데이터 읽기
    # (사용 환경에 맞게 CSV 경로 수정)
    locations = read_data('./data.csv')
    dist = compute_distance_matrix(locations)

    # 2) GA 실행 (crossover_method='order' 또는 'pmx')
    best_routes, best_distance = genetic_algorithm_cvrp(
        locations, dist,
        pop_size=30,
        max_gen=100,
        mutation_rate=0.1,
        elite_size=1,
        crossover_method='pmx'  # 여기서 교차 방식을 선택
    )

    print("=== GA 결과 (교차 방식: PMX) ===")
    print(f"총 이동 거리: {best_distance:.2f}")
    for i, route in enumerate(best_routes):
        print(f"Route {i+1}: {' -> '.join(route)}")

    # 3) CSV로 저장 (sample_submission 형식)
    submission_path = flatten_routes_for_submission(best_routes)
    df = pd.DataFrame({'point_id': submission_path})
    df.to_csv('./santa_route_ga.csv', index=False)
    print("\n[INFO] GA-based route saved to santa_route_ga.csv")

if __name__ == "__main__":
    main()



KeyboardInterrupt



In [None]:
import math
import random
import pandas as pd

# 최대 적재 용량
CAPACITY = 25

def euclidean_distance(x1, y1, x2, y2):
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2)

def read_data(csv_path: str):
    df = pd.read_csv(csv_path)
    locations = {}
    for _, row in df.iterrows():
        pid = row['point_id']
        x, y, d = row['x'], row['y'], row['demand']
        locations[pid] = (x, y, d)
    return locations

def compute_distance_matrix(locations):
    dist = {}
    keys = list(locations.keys())
    for i in keys:
        for j in keys:
            if i == j:
                dist[(i, j)] = 0.0
            else:
                x1, y1, _ = locations[i]
                x2, y2, _ = locations[j]
                dist[(i, j)] = euclidean_distance(x1, y1, x2, y2)
    return dist

# -----------------------------------------------------------------------------
#                      Clarke & Wright (초기해 생성용)
# -----------------------------------------------------------------------------
def clarke_wright_savings(locations, dist):
    depot = [k for k in locations.keys() if k.upper() == "DEPOT"][0]
    town_ids = [pid for pid in locations.keys() if pid != depot]

    routes = {}
    demands = {}
    for t in town_ids:
        routes[t] = [depot, t, depot]
        demands[t] = locations[t][2]  # demand

    # Saving 계산
    savings_list = []
    for i in town_ids:
        for j in town_ids:
            if i != j:
                s = dist[(depot, i)] + dist[(depot, j)] - dist[(i, j)]
                savings_list.append((i, j, s))
    savings_list.sort(key=lambda x: x[2], reverse=True)

    for (i, j, saving) in savings_list:
        if i in routes and j in routes:
            if i != j: 
                route_i = routes[i]
                route_j = routes[j]
                # 단순화: i가 route_i[-2], j가 route_j[1]
                if route_i[-2] == i and route_j[1] == j:
                    if demands[i] + demands[j] <= CAPACITY:
                        new_route = route_i[:-1] + route_j[1:]
                        new_demand = demands[i] + demands[j]
                        routes[i] = new_route
                        demands[i] = new_demand
                        del routes[j]
                        del demands[j]

    final_routes = list(routes.values())
    return final_routes


# -----------------------------------------------------------------------------
#                      GA 관련 유틸 (스플릿, 토너먼트 등)
# -----------------------------------------------------------------------------
def split_tour_into_routes(chromosome, locations, dist):
    """
    염색체(순열)을 용량 제약에 맞춰 여러 라우트로 나눈 뒤 총 이동거리 계산
    """
    depot = [k for k in locations.keys() if k.upper() == "DEPOT"][0]
    routes = []
    current_route = [depot]
    current_load = 0

    for town in chromosome:
        demand = locations[town][2]
        if current_load + demand <= CAPACITY:
            current_route.append(town)
            current_load += demand
        else:
            # 기존 라우트를 DEPOT으로 닫고
            current_route.append(depot)
            routes.append(current_route)
            # 새 라우트 시작
            current_route = [depot, town]
            current_load = demand

    # 마지막 라우트 닫기
    current_route.append(depot)
    routes.append(current_route)

    # 이동 거리 계산
    total_dist = 0.0
    for rt in routes:
        for i in range(len(rt)-1):
            a, b = rt[i], rt[i+1]
            total_dist += dist[(a, b)]
    return routes, total_dist

def tournament_selection(pop, fitnesses, k=2):
    """토너먼트 선택"""
    selected = random.sample(range(len(pop)), k)
    best_idx = None
    best_cost = float('inf')
    for idx in selected:
        if fitnesses[idx] < best_cost:
            best_cost = fitnesses[idx]
            best_idx = idx
    return pop[best_idx]


# -----------------------------------------------------------------------------
#                      다양한 Crossover 기법
# -----------------------------------------------------------------------------
def order_crossover(parent1, parent2):
    """
    Order Crossover (OX)
    1) 구간 [start, end]를 무작위로 선택
    2) 부모1의 start~end 구간을 자식1에 복사
    3) 남은 칸에 부모2의 순서를 유지하며 노드를 채워넣음
    (자식2는 반대 방향으로 진행)
    """
    size = len(parent1)
    start, end = sorted(random.sample(range(size), 2))

    child1 = [None]*size
    child2 = [None]*size

    # 부모1 구간 -> 자식1, 부모2 구간 -> 자식2
    child1[start:end+1] = parent1[start:end+1]
    child2[start:end+1] = parent2[start:end+1]

    # 자식1 나머지 칸 채우기
    ptr = (end+1) % size
    for i in range(size):
        idx = (end+1 + i) % size
        gene = parent2[idx]
        if gene not in child1:
            child1[ptr] = gene
            ptr = (ptr + 1) % size

    # 자식2 나머지 칸 채우기
    ptr = (end+1) % size
    for i in range(size):
        idx = (end+1 + i) % size
        gene = parent1[idx]
        if gene not in child2:
            child2[ptr] = gene
            ptr = (ptr + 1) % size

    return child1, child2

def pmx_crossover(parent1, parent2):
    """
    PMX (Partially Mapped Crossover)
    1) 구간 [start, end] 선택
    2) 그 구간을 parent1 -> child1, parent2 -> child2 에 복사
    3) 교차 구간 내 '맵핑' 정보를 바탕으로, 나머지 위치에서 노드를 치환
    """
    size = len(parent1)
    start, end = sorted(random.sample(range(size), 2))

    child1 = [None]*size
    child2 = [None]*size

    # 교차 구간 복사
    for i in range(start, end+1):
        child1[i] = parent1[i]
        child2[i] = parent2[i]

    # 맵핑 영역(교차 구간) 처리
    def pmx_mapping(child, other_parent):
        for i in range(start, end+1):
            gene = other_parent[i]
            if gene not in child:
                pos = i
                # 충돌이 없을 때까지 치환
                while child[pos] is not None:
                    # child[pos]가 실제로 다른 노드와 충돌날 경우
                    val = other_parent[pos]
                    pos = other_parent.index(val)
                child[pos] = gene

    pmx_mapping(child1, parent2)
    pmx_mapping(child2, parent1)

    # 나머지 None인 칸 채우기
    for i in range(size):
        if child1[i] is None:
            child1[i] = parent2[i]
        if child2[i] is None:
            child2[i] = parent1[i]

    return child1, child2


# -----------------------------------------------------------------------------
#                     GA 메인 루프 (crossover_method 파라미터)
# -----------------------------------------------------------------------------
def create_initial_population(town_ids, pop_size, cw_routes=None):
    """초기 해 생성 (Clarke & Wright 결과 + 랜덤 순열)"""
    population = []
    if cw_routes: 
        # Clarke & Wright 라우트를 단일 순열로 변환
        cw_chrom = []
        for r in cw_routes:
            for node in r:
                if node.upper() != "DEPOT":
                    cw_chrom.append(node)
        population.append(cw_chrom)

    # 나머지는 랜덤
    while len(population) < pop_size:
        chrom = town_ids[:]
        random.shuffle(chrom)
        population.append(chrom)
    return population

def mutation(chromosome, mutation_rate=0.1):
    """단순 스왑 돌연변이"""
    if random.random() < mutation_rate:
        a, b = random.sample(range(len(chromosome)), 2)
        chromosome[a], chromosome[b] = chromosome[b], chromosome[a]

def genetic_algorithm_cvrp(locations, dist, 
                           pop_size=30, 
                           max_gen=100, 
                           mutation_rate=0.1, 
                           elite_size=1,
                           crossover_method='order',
                           do_two_opt=False):
    """
    do_two_opt : True면 교차 후 2-Opt 로컬 서치 적용
    """
    import random

    depot = [k for k in locations.keys() if k.upper() == "DEPOT"][0]
    town_ids = [pid for pid in locations.keys() if pid != depot]

    # 1) 초기해 (Clarke & Wright + 랜덤)
    cw_routes = clarke_wright_savings(locations, dist)
    population = create_initial_population(town_ids, pop_size, cw_routes)

    best_solution = None
    best_cost = float('inf')

    for gen in range(max_gen):
        # (a) 적합도 계산
        fitnesses = []
        for chrom in population:
            _, cost = split_tour_into_routes(chrom, locations, dist)
            fitnesses.append(cost)

        # (b) 최고 해 갱신
        min_cost = min(fitnesses)
        min_idx = fitnesses.index(min_cost)
        if min_cost < best_cost:
            best_cost = min_cost
            best_solution = population[min_idx][:]

        # (c) 새 세대 구성
        new_population = []

        # (c1) 엘리트 보존
        sorted_pop = sorted(zip(population, fitnesses), key=lambda x: x[1])
        elites = [p[0] for p in sorted_pop[:elite_size]]
        new_population.extend(elites)

        # (c2) 자식 생성 (부모 선택→교차→2-Opt→돌연변이)
        while len(new_population) < pop_size:
            parent1 = tournament_selection(population, fitnesses, k=2)
            parent2 = tournament_selection(population, fitnesses, k=2)

            # 교차
            if crossover_method == 'order':
                child1, child2 = order_crossover(parent1, parent2)
            elif crossover_method == 'pmx':
                child1, child2 = pmx_crossover(parent1, parent2)
            else:
                child1, child2 = order_crossover(parent1, parent2)

            # ---- [새로 추가] 2-Opt Local Search 적용 ----
            if do_two_opt:
                child1 = two_opt(child1, dist, locations, max_iter=50)
                child2 = two_opt(child2, dist, locations, max_iter=50)

            # 돌연변이
            mutation(child1, mutation_rate)
            mutation(child2, mutation_rate)

            new_population.append(child1)
            if len(new_population) < pop_size:
                new_population.append(child2)

        # (d) 교체
        population = new_population

    # 최종 해
    final_routes, final_dist = split_tour_into_routes(best_solution, locations, dist)
    return final_routes, final_dist

def two_opt(chromosome, dist, locations, max_iter=50):
    """
    - chromosome : 마을 순열 (ex: ['TOWN_03','TOWN_05','TOWN_01',...])
    - dist       : 모든 지점 쌍의 거리 딕셔너리 dist[(i, j)]
    - locations  : {point_id: (x, y, demand), ...}
    - max_iter   : 2-Opt 시도 횟수 제한 (로컬서치 무한 반복 방지)

    [주의] 
    - 여기선 모든 마을을 '하나의 경로'로 간주하고 2-Opt를 적용(실제 VRP는 다수 라우트).
    - 엄밀한 VRP 접근은 라우트별로 2-Opt, 라우트 간 교환 등을 따로 구현해야 함.
    """

    # depot 찾기 (하지만 여긴 순열에서 depot은 제외)
    depot = [k for k in locations.keys() if k.upper() == "DEPOT"][0]
    
    best = chromosome[:]
    improved = True
    iteration = 0

    def route_cost(seq):
        # depot~town들~depot를 하나의 경로로 보고 비용 계산
        cost = 0.0
        prev = depot
        for town in seq:
            cost += dist[(prev, town)]
            prev = town
        # 마지막에 depot으로 돌아옴
        cost += dist[(prev, depot)]
        return cost

    best_cost = route_cost(best)

    while improved and iteration < max_iter:
        improved = False
        iteration += 1
        for i in range(len(best)-1):
            for j in range(i+1, len(best)):
                # [i, i+1, ..., j-1, j] 구간 뒤집기
                new_seq = best[:i] + best[i:j+1][::-1] + best[j+1:]
                new_cost = route_cost(new_seq)
                if new_cost < best_cost:
                    best = new_seq
                    best_cost = new_cost
                    improved = True
                    break  # 개선했으면 내부 루프 탈출
            if improved:
                break

    return best


# -----------------------------------------------------------------------------
#                               실행 예시
# -----------------------------------------------------------------------------
def flatten_routes_for_submission(routes):
    path_sequence = []
    for route in routes:
        for node in route:
            path_sequence.append(node)
    return path_sequence


def main():
    # CSV 읽기
    locations = read_data('./data.csv')
    dist = compute_distance_matrix(locations)

    # GA 실행 (crossover_method='order', do_two_opt=True)
    best_routes, best_distance = genetic_algorithm_cvrp(
        locations, dist,
        pop_size=30,
        max_gen=100,
        mutation_rate=0.1,
        elite_size=1,
        crossover_method='order',  # or 'pmx'
        do_two_opt=True            # ← 2-Opt 적용 
    )

    print("=== GA + 2-Opt 결과 ===")
    print(f"총 이동 거리: {best_distance:.2f}")
    for idx, route in enumerate(best_routes):
        print(f"Route {idx+1}: {' -> '.join(route)}")

    # CSV 저장
    submission_path = flatten_routes_for_submission(best_routes)
    route_df = pd.DataFrame({'point_id': submission_path})
    route_df.to_csv('./santa_route_ga_2opt.csv', index=False)
    print("[INFO] Saved: santa_route_ga_2opt.csv")


if __name__ == "__main__":
    main()


In [97]:
import math
import random
import pandas as pd

# 최대 적재 용량
CAPACITY = 25

def euclidean_distance(x1, y1, x2, y2):
    """유클리디안 거리 계산"""
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2)

def read_data(csv_path: str):
    """
    data.csv 파일을 불러와서
    각 지점 정보 (point_id, x, y, demand)를 반환한다.
    """
    df = pd.read_csv(csv_path)
    
    # point_id를 key로, (x, y, demand) 튜플을 저장
    locations = {}
    for _, row in df.iterrows():
        pid = row['point_id']
        x, y, d = row['x'], row['y'], row['demand']
        locations[pid] = (x, y, d)
    return locations

def compute_distance_matrix(locations):
    """모든 지점 쌍 간 거리 행렬을 딕셔너리 dist[(i, j)] = 거리 로 구성"""
    dist = {}
    keys = list(locations.keys())
    for i in keys:
        for j in keys:
            if i == j:
                dist[(i, j)] = 0.0
            else:
                x1, y1, _ = locations[i]
                x2, y2, _ = locations[j]
                dist[(i, j)] = euclidean_distance(x1, y1, x2, y2)
    return dist

# -----------------------------------------------------------------------------
#                 === 병렬 Clarke & Wright (PCW) 부분 ===
# -----------------------------------------------------------------------------
def parallel_clarke_wright_savings(locations, dist, alpha=1.0, beta=0.0):
    """
    병렬 Clarke & Wright 알고리즘을 이용해 여러 라우트를 생성.
    alpha, beta 파라미터로 Saving 공식을 조정.
    
    S(i, j) = d(depot, i) + d(depot, j) - alpha*d(i, j) + beta*(demand(i) + demand(j))
    
    - locations: {point_id: (x, y, demand)}
    - dist     : {(i, j): distance}
    - alpha    : 거리 가중치 (기본 1.0)
    - beta     : 수요 가중치 (기본 0.0)
    """
    # depot 찾기
    depot = None
    for k in locations.keys():
        if k.upper() == "DEPOT":
            depot = k
            break

    if depot is None:
        raise ValueError("Depot not found! 'DEPOT' key is missing in locations.")

    # 마을 목록 (DEPOT 제외)
    town_ids = [pid for pid in locations.keys() if pid != depot]

    # (1) 초기 라우트: 각 마을을 [DEPOT, TOWN_i, DEPOT] 으로 시작
    routes = {}
    route_demands = {}
    for t in town_ids:
        routes[t] = [depot, t, depot]  # 한 마을이 독립된 경로
        _, _, d = locations[t]
        route_demands[t] = d

    # (2) Saving 계산
    savings_list = []
    for i in town_ids:
        for j in town_ids:
            if i != j:
                di = dist[(depot, i)]
                dj = dist[(depot, j)]
                dij = dist[(i, j)]
                demand_i = locations[i][2]
                demand_j = locations[j][2]

                # 병렬 C&W용 Saving 공식 (alpha, beta 반영)
                s = (di + dj) - alpha * dij + beta * (demand_i + demand_j)
                savings_list.append((i, j, s))

    # Saving 내림차순 정렬
    savings_list.sort(key=lambda x: x[2], reverse=True)

    # (3) 병렬 라우트 병합
    for (i, j, s) in savings_list:
        # route_i_key, route_j_key 찾기
        route_i_key, route_j_key = None, None
        for r_key, path in routes.items():
            if i in path:
                route_i_key = r_key
            if j in path:
                route_j_key = r_key

        # 이미 같은 라우트이면 무시
        if route_i_key is None or route_j_key is None:
            continue
        if route_i_key == route_j_key:
            continue

        route_i = routes[route_i_key]
        route_j = routes[route_j_key]

        # i가 route_i에서 첫 번째 마을이거나 마지막 마을인지 확인
        # j가 route_j에서 첫 번째 마을이거나 마지막 마을인지 확인
        # (간단 버전: i가 route_i 끝, j가 route_j 시작 또는 그 반대)
        can_merge = False
        new_route = None
        new_demand = None

        # (a) i가 route_i[-2], j가 route_j[1]
        if route_i[-2] == i and route_j[1] == j:
            # 용량 체크
            if route_demands[route_i_key] + route_demands[route_j_key] <= CAPACITY:
                can_merge = True
                new_route = route_i[:-1] + route_j[1:]
                new_demand = route_demands[route_i_key] + route_demands[route_j_key]

        # (b) i가 route_i[1], j가 route_j[-2]
        if not can_merge:
            if route_i[1] == i and route_j[-2] == j:
                if route_demands[route_i_key] + route_demands[route_j_key] <= CAPACITY:
                    can_merge = True
                    new_route = route_j[:-1] + route_i[1:]
                    new_demand = route_demands[route_i_key] + route_demands[route_j_key]

        if can_merge and new_route is not None:
            # 라우트 병합
            del routes[route_i_key]
            del routes[route_j_key]
            # 새로운 key로 저장(기존 route_i_key 사용)
            routes[route_i_key] = new_route
            route_demands[route_i_key] = new_demand

    # 모든 병합 후 남은 라우트들
    final_routes = list(routes.values())
    return final_routes


# -----------------------------------------------------------------------------
#                 === VRP 해 평가용(스플릿, 총 거리 계산 등) ===
# -----------------------------------------------------------------------------
def split_tour_into_routes(chromosome, locations, dist):
    """
    주어진 '순열(Chromosome)'을 용량 제약(CAPACITY)에 맞게
    여러 라우트로 분할한 뒤 총 이동거리 계산/반환.
    
    - chromosome: [TOWN_01, TOWN_05, TOWN_02, ...]
    - return: (routes, total_distance)
      예: ( [ [DEPOT, TOWN_01, TOWN_02, DEPOT],
               [DEPOT, TOWN_05, DEPOT],
               ... ],
             123.45 )
    """
    depot = None
    for k in locations.keys():
        if k.upper() == "DEPOT":
            depot = k
            break

    routes = []
    current_route = [depot]
    current_load = 0

    for town in chromosome:
        demand = locations[town][2]
        # 용량 체크
        if current_load + demand <= CAPACITY:
            current_route.append(town)
            current_load += demand
        else:
            # 기존 루트를 DEPOT으로 닫고
            current_route.append(depot)
            routes.append(current_route)
            # 새 라우트 시작
            current_route = [depot, town]
            current_load = demand

    # 마지막 라우트 마무리
    current_route.append(depot)
    routes.append(current_route)

    # 라우트별 총 이동거리
    total_dist = 0.0
    for rt in routes:
        for i in range(len(rt) - 1):
            a, b = rt[i], rt[i+1]
            total_dist += dist[(a, b)]

    return routes, total_dist


def flatten_routes_for_submission(routes):
    """
    여러 라우트(예: [[DEPOT, TOWN_01, DEPOT], [DEPOT, TOWN_02, DEPOT], ...])를
    sample_submission 형식대로 1차원 리스트로 펴서 반환.
    """
    path_sequence = []
    for route in routes:
        for node in route:
            path_sequence.append(node)
    return path_sequence


# -----------------------------------------------------------------------------
#                 === 유전 알고리즘(Genetic Algorithm, GA) ===
# -----------------------------------------------------------------------------
def create_initial_population(town_ids, pop_size, cw_routes=None):
    """
    초기 해(population) 생성
    - town_ids: 마을 목록 (ex: ['TOWN_01','TOWN_02',...])
    - pop_size: 인구(개체) 수
    - cw_routes: PCW 결과 (옵션) => PCW로 나온 라우트를 하나의 순열로 변환해 염색체에 포함 가능
    """
    population = []

    # (1) PCW 결과 해를 1개 포함 (옵션)
    if cw_routes:
        # cw_routes 예: [[DEPOT, TOWN_01, TOWN_02, DEPOT], [DEPOT, TOWN_03, DEPOT], ...]
        # DEPOT 제외하고 순서대로 이어붙이면 순열이 됨
        cw_chromosome = []
        for r in cw_routes:
            for node in r:
                if node.upper() != "DEPOT":
                    cw_chromosome.append(node)
        population.append(cw_chromosome)

    # (2) 나머지는 랜덤 순열
    need_more = pop_size - len(population)
    for _ in range(need_more):
        chrom = town_ids[:]
        random.shuffle(chrom)
        population.append(chrom)

    return population


def order_crossover(parent1, parent2):
    """
    Order Crossover (OX) 예시
    - parent1, parent2: 순열(리스트)
    - return: child1, child2
    """
    size = len(parent1)
    start, end = sorted(random.sample(range(size), 2))

    child1 = [None]*size
    child2 = [None]*size

    # 자식에 부모1의 [start~end] 구간 복사
    child1[start:end+1] = parent1[start:end+1]
    child2[start:end+1] = parent2[start:end+1]

    # 나머지 칸을 parent2 순서대로 채우기
    pointer1 = end + 1
    pointer2 = end + 1

    for i in range(size):
        idx = (end + 1 + i) % size
        gene = parent2[idx]
        if gene not in child1:
            child1[pointer1 % size] = gene
            pointer1 += 1

    for i in range(size):
        idx = (end + 1 + i) % size
        gene = parent1[idx]
        if gene not in child2:
            child2[pointer2 % size] = gene
            pointer2 += 1

    return child1, child2


def mutation(chromosome, mutation_rate=0.1):
    """
    간단 스왑(Swap) 돌연변이
    """
    if random.random() < mutation_rate:
        a, b = random.sample(range(len(chromosome)), 2)
        chromosome[a], chromosome[b] = chromosome[b], chromosome[a]


def tournament_selection(pop, fitnesses, k=2):
    """
    토너먼트 선택 (k명 중 베스트 1명)
    - pop: 전체 개체 리스트
    - fitnesses: 개체별 비용(거리), 작을수록 좋음
    """
    selected = random.sample(range(len(pop)), k)
    best_idx = None
    best_cost = float('inf')
    for idx in selected:
        cost = fitnesses[idx]
        if cost < best_cost:
            best_cost = cost
            best_idx = idx
    return pop[best_idx]


def genetic_algorithm_cvrp(locations, dist, 
                           pop_size=30, 
                           max_gen=100, 
                           mutation_rate=0.1, 
                           elite_size=1,
                           alpha=0.8,
                           beta=0.2):
    """
    유전 알고리즘을 이용해 CVRP 해를 탐색
    - locations, dist: 문제 정보
    - pop_size: 초기 개체 수
    - max_gen: 세대(Generation) 수
    - mutation_rate: 돌연변이 확률
    - elite_size: 엘리트 보존 개수
    - alpha, beta: 병렬 C&W에서 사용할 파라미터 (초기염색체 생성 시 이용)
    """
    # (1) depot 및 town 목록
    depot = None
    for k in locations.keys():
        if k.upper() == "DEPOT":
            depot = k
            break
    town_ids = [pid for pid in locations.keys() if pid != depot]

    # (2) 병렬 Clarke & Wright으로 1개의 라우트 후보를 얻어 둠
    pcw_routes = parallel_clarke_wright_savings(locations, dist, alpha=alpha, beta=beta)
    
    # (3) 초기 해(population) 생성
    population = create_initial_population(town_ids, pop_size, pcw_routes)

    best_solution = None
    best_cost = float('inf')

    # (4) 세대 반복
    for gen in range(max_gen):
        # (4-1) 각 개체(fitness) 계산
        fitnesses = []
        for chrom in population:
            _, cost = split_tour_into_routes(chrom, locations, dist)
            fitnesses.append(cost)

        # (4-2) 현재 세대에서의 최고 해 업데이트
        min_cost = min(fitnesses)
        min_idx = fitnesses.index(min_cost)
        if min_cost < best_cost:
            best_cost = min_cost
            best_solution = population[min_idx][:]  # 복제

        # (4-3) 새 세대 생성
        new_population = []

        # 엘리트 보존
        sorted_pop = sorted(zip(population, fitnesses), key=lambda x: x[1])
        elites = [p[0] for p in sorted_pop[:elite_size]]
        new_population.extend(elites)

        # 나머지 개체 교배+돌연변이
        while len(new_population) < pop_size:
            parent1 = tournament_selection(population, fitnesses, k=2)
            parent2 = tournament_selection(population, fitnesses, k=2)

            child1, child2 = order_crossover(parent1, parent2)

            mutation(child1, mutation_rate)
            mutation(child2, mutation_rate)

            new_population.append(child1)
            if len(new_population) < pop_size:
                new_population.append(child2)

        # 새 세대로 교체
        population = new_population

    # (5) 최종 best_solution 반환
    final_routes, final_dist = split_tour_into_routes(best_solution, locations, dist)
    return final_routes, final_dist


# -----------------------------------------------------------------------------
#                               === 메인 실행 ===
# -----------------------------------------------------------------------------
def main():
    # (1) data.csv 읽기
    csv_path = './data.csv'
    locations = read_data(csv_path)
    dist = compute_distance_matrix(locations)

    # (2) 유전 알고리즘 실행
    #     - 여기서 alpha, beta는 병렬 Clarke & Wright에 사용할 파라미터
    best_routes, best_distance = genetic_algorithm_cvrp(
        locations, 
        dist,
        pop_size=30,       # 개체(인구) 수
        max_gen=200,       # 세대 반복 수
        mutation_rate=0.1, # 돌연변이 확률
        elite_size=1,      # 엘리트 보존 개수
        alpha=1.0,         # PCW의 거리 가중치
        beta=0.0           # PCW의 수요 가중치
    )

    # (3) 결과 출력
    print("=== GA + 병렬 C&W 결과 ===")
    print(f"총 이동 거리: {best_distance:.2f}")
    for idx, route in enumerate(best_routes):
        print(f"Route {idx+1}: {' -> '.join(route)}")

    # (4) sample_submission 형태로 CSV 저장
    submission_path = flatten_routes_for_submission(best_routes)
    output_file = './santa_route_ga_imp.csv'
    route_df = pd.DataFrame({'point_id': submission_path})
    route_df.to_csv(output_file, index=False)
    print(f"\n[INFO] GA-based route saved to {output_file}")

if __name__ == "__main__":
    main()

=== GA + 병렬 C&W 결과 ===
총 이동 거리: 2243.95
Route 1: DEPOT -> TOWN_01 -> TOWN_44 -> TOWN_05 -> TOWN_03 -> TOWN_74 -> TOWN_04 -> TOWN_02 -> DEPOT
Route 2: DEPOT -> TOWN_06 -> TOWN_08 -> TOWN_72 -> TOWN_66 -> TOWN_71 -> TOWN_53 -> TOWN_42 -> DEPOT
Route 3: DEPOT -> TOWN_15 -> TOWN_21 -> TOWN_24 -> TOWN_16 -> TOWN_22 -> TOWN_33 -> DEPOT
Route 4: DEPOT -> TOWN_67 -> TOWN_55 -> TOWN_69 -> TOWN_57 -> TOWN_19 -> TOWN_61 -> DEPOT
Route 5: DEPOT -> TOWN_28 -> TOWN_25 -> TOWN_75 -> TOWN_52 -> TOWN_59 -> TOWN_70 -> TOWN_51 -> TOWN_36 -> DEPOT
Route 6: DEPOT -> TOWN_14 -> TOWN_20 -> TOWN_23 -> TOWN_43 -> TOWN_64 -> TOWN_54 -> DEPOT
Route 7: DEPOT -> TOWN_34 -> TOWN_18 -> TOWN_60 -> TOWN_17 -> TOWN_09 -> TOWN_73 -> TOWN_07 -> TOWN_47 -> TOWN_38 -> DEPOT
Route 8: DEPOT -> TOWN_58 -> TOWN_40 -> TOWN_49 -> TOWN_26 -> TOWN_27 -> TOWN_63 -> TOWN_65 -> DEPOT
Route 9: DEPOT -> TOWN_62 -> TOWN_35 -> TOWN_46 -> TOWN_48 -> TOWN_39 -> TOWN_45 -> DEPOT
Route 10: DEPOT -> TOWN_32 -> TOWN_41 -> TOWN_50 -> TOWN_56 ->

# OR-Tools

!pip install --upgrade ortools

## 최고점 재현

In [24]:
import math
import pandas as pd
from ortools.constraint_solver import pywrapcp, routing_enums_pb2

def read_data(csv_path: str):
    """데이터를 읽고 전처리"""
    data_df = pd.read_csv(csv_path)
    data_df = data_df.sort_values(by='point_id').reset_index(drop=True)
    
    # DEPOT과 마을 데이터 분리
    depot_data = data_df[data_df['point_id'] == 'DEPOT'].iloc[0]
    towns_data = data_df[data_df['point_id'] != 'DEPOT'].reset_index(drop=True)
    
    # OR-Tools 데이터 준비
    locations = [(depot_data['x'], depot_data['y'])] + list(zip(towns_data['x'], towns_data['y']))
    demands = [0] + list(towns_data['demand'])
    
    return data_df, depot_data, towns_data, locations, demands

def create_data_model(locations, demands):
    """OR-Tools 데이터 모델 생성"""
    data = {}
    data['distance_matrix'] = [
        [int(math.sqrt((x1[0] - x2[0])**2 + (x1[1] - x2[1])**2) * 1000)
         for x2 in locations]
        for x1 in locations
    ]
    data['demands'] = demands
    data['vehicle_capacities'] = [25] * 75  # 최대 75대, 각 용량 25
    data['num_vehicles'] = 75
    data['depot'] = 0
    
    return data

def solve_vrp(data, time_limit_seconds=3600):
    """VRP 해결"""
    manager = pywrapcp.RoutingIndexManager(
        len(data['distance_matrix']),
        data['num_vehicles'],
        data['depot']
    )
    routing = pywrapcp.RoutingModel(manager)
    
    # 거리 콜백
    def distance_callback(from_index, to_index):
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        return data['distance_matrix'][from_node][to_node]
    
    transit_callback_index = routing.RegisterTransitCallback(distance_callback)
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
    
    # 수요 콜백
    def demand_callback(from_index):
        from_node = manager.IndexToNode(from_index)
        return data['demands'][from_node]
    
    demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
    
    # 용량 제약 추가
    routing.AddDimensionWithVehicleCapacity(
        demand_callback_index,
        0,  # slack
        data['vehicle_capacities'],
        True,  # start cumul to zero
        'Capacity'
    )
    
    # 탐색 파라미터 설정
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = (
        routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
    )
    search_parameters.local_search_metaheuristic = (
        routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
    )
    search_parameters.time_limit.seconds = time_limit_seconds
    search_parameters.log_search = True
    
    # 해 찾기
    solution = routing.SolveWithParameters(search_parameters)
    
    if not solution:
        return None, None
    
    # 결과 추출
    routes = []
    total_distance = 0
    
    for vehicle_id in range(data['num_vehicles']):
        index = routing.Start(vehicle_id)
        route = []
        route_distance = 0
        
        while not routing.IsEnd(index):
            node = manager.IndexToNode(index)
            route.append(node)
            previous_index = index
            index = solution.Value(routing.NextVar(index))
            if not routing.IsEnd(index):
                route_distance += data['distance_matrix'][node][manager.IndexToNode(index)] / 1000
        
        if len(route) > 1:  # 실제로 마을을 방문한 경우만
            route.append(0)  # DEPOT으로 복귀
            routes.append(route)
            total_distance += route_distance
    
    return routes, total_distance

def convert_routes_to_submission(routes, towns_data):
    """경로를 제출 형식으로 변환"""
    total_route_town_ids = []
    
    for route in routes:
        route_town_ids = []
        for node_idx in route:
            if node_idx == 0:
                route_town_ids.append('DEPOT')
            else:
                town_id = towns_data.iloc[node_idx-1]['point_id']
                route_town_ids.append(town_id)
        total_route_town_ids.extend(route_town_ids[:-1])  # 마지막 DEPOT 제외
    
    total_route_town_ids.append('DEPOT')  # 최종 DEPOT 추가
    return total_route_town_ids

def main():
    try:
        # 데이터 읽기
        data_df, depot_data, towns_data, locations, demands = read_data('data.csv')
        
        # 데이터 모델 생성
        data = create_data_model(locations, demands)
        
        # VRP 해결
        routes, total_distance = solve_vrp(data)
        
        if routes:
            print("\n=== 산타의 배달 경로 ===")
            
            # 경로 출력
            for i, route in enumerate(routes, 1):
                route_town_ids = []
                for node_idx in route:
                    if node_idx == 0:
                        route_town_ids.append('DEPOT')
                    else:
                        town_id = towns_data.iloc[node_idx-1]['point_id']
                        route_town_ids.append(town_id)
                print(f"Route #{i}: {' -> '.join(route_town_ids)}")
            
            print(f"\n총 이동 거리: {total_distance:.2f} km")
            
            # 제출 파일 생성
            total_route_town_ids = convert_routes_to_submission(routes, towns_data)
            result_df = pd.DataFrame({'point_id': total_route_town_ids})
            result_df.to_csv('submission_or_imp.csv', index=False)
            print("\n[INFO] 제출 파일이 생성되었습니다: submission.csv")
            
        else:
            print("해를 찾을 수 없습니다!")
            
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()


=== 산타의 배달 경로 ===
Route #1: DEPOT -> TOWN_32 -> TOWN_53 -> TOWN_66 -> TOWN_72 -> TOWN_08 -> TOWN_10 -> TOWN_38 -> DEPOT
Route #2: DEPOT -> TOWN_71 -> TOWN_23 -> TOWN_43 -> TOWN_44 -> TOWN_64 -> DEPOT
Route #3: DEPOT -> TOWN_19 -> TOWN_21 -> TOWN_55 -> TOWN_67 -> TOWN_60 -> TOWN_57 -> TOWN_69 -> DEPOT
Route #4: DEPOT -> TOWN_30 -> TOWN_22 -> TOWN_20 -> TOWN_24 -> TOWN_31 -> DEPOT
Route #5: DEPOT -> TOWN_70 -> TOWN_59 -> TOWN_51 -> TOWN_52 -> TOWN_75 -> TOWN_25 -> TOWN_49 -> TOWN_28 -> DEPOT
Route #6: DEPOT -> TOWN_34 -> TOWN_37 -> TOWN_46 -> TOWN_48 -> TOWN_54 -> TOWN_62 -> DEPOT
Route #7: DEPOT -> TOWN_04 -> TOWN_18 -> TOWN_74 -> TOWN_05 -> TOWN_01 -> TOWN_02 -> TOWN_03 -> TOWN_35 -> DEPOT
Route #8: DEPOT -> TOWN_61 -> TOWN_42 -> TOWN_13 -> TOWN_07 -> TOWN_09 -> TOWN_36 -> TOWN_40 -> TOWN_73 -> DEPOT
Route #9: DEPOT -> TOWN_65 -> TOWN_27 -> TOWN_63 -> TOWN_26 -> TOWN_45 -> DEPOT
Route #10: DEPOT -> TOWN_50 -> TOWN_56 -> TOWN_39 -> TOWN_41 -> TOWN_68 -> DEPOT
Route #11: DEPOT -> TOWN_2

In [166]:
import math
import pandas as pd

def calculate_route_distance(submission_path: str, data_path: str) -> float:
    """
    제출 파일의 경로에 대한 총 이동 거리 계산
    
    Args:
        submission_path: 제출 파일 경로 ('point_id' 컬럼을 가진 CSV)
        data_path: 원본 데이터 파일 경로 ('point_id', 'x', 'y' 컬럼을 가진 CSV)
        
    Returns:
        float: 총 이동 거리
    """
    # 데이터 로드
    submission_df = pd.read_csv(submission_path)
    data_df = pd.read_csv(data_path)
    
    # point_id를 기준으로 좌표 정보 병합
    route_df = pd.merge(
        submission_df,
        data_df[['point_id', 'x', 'y']],
        on='point_id',
        how='left'
    )
    
    # 총 거리 계산
    total_distance = 0
    for i in range(len(route_df) - 1):
        # 현재 지점과 다음 지점의 좌표
        current = route_df.iloc[i]
        next_point = route_df.iloc[i + 1]
        
        # 유클리드 거리 계산
        distance = math.sqrt(
            (current['x'] - next_point['x'])**2 + 
            (current['y'] - next_point['y'])**2
        )
        total_distance += distance
        
        # 이동 정보 출력
        print(f"{current['point_id']} -> {next_point['point_id']}: {distance:.4f}")
    
    return total_distance

def main():
    # 파일 경로
    submission_path = 'submission_or_imp.csv'  # 제출 파일
    data_path = 'data.csv'  # 원본 데이터
    
    try:
        # 총 거리 계산
        total_distance = calculate_route_distance(submission_path, data_path)
        
        print(f"\n=== 경로 거리 계산 결과 ===")
        print(f"총 이동 거리: {total_distance:.4f}")
        
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

DEPOT -> TOWN_32: 33.3017
TOWN_32 -> TOWN_19: 69.8140
TOWN_19 -> TOWN_57: 5.0000
TOWN_57 -> TOWN_69: 4.1231
TOWN_69 -> TOWN_55: 1.0000
TOWN_55 -> TOWN_67: 14.1421
TOWN_67 -> TOWN_60: 5.6569
TOWN_60 -> DEPOT: 98.7927
DEPOT -> TOWN_40: 47.1275
TOWN_40 -> TOWN_70: 49.0918
TOWN_70 -> TOWN_59: 8.0623
TOWN_59 -> TOWN_52: 4.0000
TOWN_52 -> TOWN_75: 11.0454
TOWN_75 -> TOWN_25: 10.0499
TOWN_25 -> TOWN_28: 7.2801
TOWN_28 -> TOWN_49: 27.7308
TOWN_49 -> DEPOT: 60.6712
DEPOT -> TOWN_35: 64.8460
TOWN_35 -> TOWN_01: 36.3456
TOWN_01 -> TOWN_05: 6.0000
TOWN_05 -> TOWN_74: 7.2111
TOWN_74 -> TOWN_03: 1.4142
TOWN_03 -> TOWN_04: 8.0623
TOWN_04 -> TOWN_02: 8.6023
TOWN_02 -> TOWN_33: 34.7131
TOWN_33 -> TOWN_37: 24.1661
TOWN_37 -> DEPOT: 40.2244
DEPOT -> TOWN_22: 91.0055
TOWN_22 -> TOWN_20: 10.6301
TOWN_20 -> TOWN_23: 5.0990
TOWN_23 -> TOWN_43: 3.0000
TOWN_43 -> TOWN_64: 2.8284
TOWN_64 -> TOWN_44: 9.0000
TOWN_44 -> DEPOT: 101.6514
DEPOT -> TOWN_16: 93.0054
TOWN_16 -> TOWN_24: 7.0000
TOWN_24 -> TOWN_21: 1.0000

## 최고점 시간 +

In [None]:
import math
import pandas as pd
from ortools.constraint_solver import pywrapcp, routing_enums_pb2

def read_data(csv_path: str):
    """데이터를 읽고 전처리"""
    data_df = pd.read_csv(csv_path)
    data_df = data_df.sort_values(by='point_id').reset_index(drop=True)
    
    # DEPOT과 마을 데이터 분리
    depot_data = data_df[data_df['point_id'] == 'DEPOT'].iloc[0]
    towns_data = data_df[data_df['point_id'] != 'DEPOT'].reset_index(drop=True)
    
    # OR-Tools 데이터 준비
    locations = [(depot_data['x'], depot_data['y'])] + list(zip(towns_data['x'], towns_data['y']))
    demands = [0] + list(towns_data['demand'])
    
    return data_df, depot_data, towns_data, locations, demands

def create_data_model(locations, demands):
    """OR-Tools 데이터 모델 생성"""
    data = {}
    data['distance_matrix'] = [
        [int(math.sqrt((x1[0] - x2[0])**2 + (x1[1] - x2[1])**2) * 1000)
         for x2 in locations]
        for x1 in locations
    ]
    data['demands'] = demands
    data['vehicle_capacities'] = [25] * 75  # 최대 75대, 각 용량 25
    data['num_vehicles'] = 75
    data['depot'] = 0
    
    return data

def solve_vrp(data, time_limit_seconds=7200):
    """VRP 해결"""
    manager = pywrapcp.RoutingIndexManager(
        len(data['distance_matrix']),
        data['num_vehicles'],
        data['depot']
    )
    routing = pywrapcp.RoutingModel(manager)
    
    # 거리 콜백
    def distance_callback(from_index, to_index):
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        return data['distance_matrix'][from_node][to_node]
    
    transit_callback_index = routing.RegisterTransitCallback(distance_callback)
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
    
    # 수요 콜백
    def demand_callback(from_index):
        from_node = manager.IndexToNode(from_index)
        return data['demands'][from_node]
    
    demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
    
    # 용량 제약 추가
    routing.AddDimensionWithVehicleCapacity(
        demand_callback_index,
        0,  # slack
        data['vehicle_capacities'],
        True,  # start cumul to zero
        'Capacity'
    )
    
    # 탐색 파라미터 설정
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = (
        routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
    )
    search_parameters.local_search_metaheuristic = (
        routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
    )
    search_parameters.time_limit.seconds = time_limit_seconds
    search_parameters.log_search = True
    
    # 해 찾기
    solution = routing.SolveWithParameters(search_parameters)
    
    if not solution:
        return None, None
    
    # 결과 추출
    routes = []
    total_distance = 0
    
    for vehicle_id in range(data['num_vehicles']):
        index = routing.Start(vehicle_id)
        route = []
        route_distance = 0
        
        while not routing.IsEnd(index):
            node = manager.IndexToNode(index)
            route.append(node)
            previous_index = index
            index = solution.Value(routing.NextVar(index))
            if not routing.IsEnd(index):
                route_distance += data['distance_matrix'][node][manager.IndexToNode(index)] / 1000
        
        if len(route) > 1:  # 실제로 마을을 방문한 경우만
            route.append(0)  # DEPOT으로 복귀
            routes.append(route)
            total_distance += route_distance
    
    return routes, total_distance

def convert_routes_to_submission(routes, towns_data):
    """경로를 제출 형식으로 변환"""
    total_route_town_ids = []
    
    for route in routes:
        route_town_ids = []
        for node_idx in route:
            if node_idx == 0:
                route_town_ids.append('DEPOT')
            else:
                town_id = towns_data.iloc[node_idx-1]['point_id']
                route_town_ids.append(town_id)
        total_route_town_ids.extend(route_town_ids[:-1])  # 마지막 DEPOT 제외
    
    total_route_town_ids.append('DEPOT')  # 최종 DEPOT 추가
    return total_route_town_ids

def main():
    try:
        # 데이터 읽기
        data_df, depot_data, towns_data, locations, demands = read_data('data.csv')
        
        # 데이터 모델 생성
        data = create_data_model(locations, demands)
        
        # VRP 해결
        routes, total_distance = solve_vrp(data)
        
        if routes:
            print("\n=== 산타의 배달 경로 ===")
            
            # 경로 출력
            for i, route in enumerate(routes, 1):
                route_town_ids = []
                for node_idx in route:
                    if node_idx == 0:
                        route_town_ids.append('DEPOT')
                    else:
                        town_id = towns_data.iloc[node_idx-1]['point_id']
                        route_town_ids.append(town_id)
                print(f"Route #{i}: {' -> '.join(route_town_ids)}")
            
            print(f"\n총 이동 거리: {total_distance:.2f} km")
            
            # 제출 파일 생성
            total_route_town_ids = convert_routes_to_submission(routes, towns_data)
            result_df = pd.DataFrame({'point_id': total_route_town_ids})
            result_df.to_csv('submission_or_imp_.csv', index=False)
            print("\n[INFO] 제출 파일이 생성되었습니다: submission_or_imp_.csv")
            
        else:
            print("해를 찾을 수 없습니다!")
            
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

In [None]:
import math
import pandas as pd

def calculate_route_distance(submission_path: str, data_path: str) -> float:
    """
    제출 파일의 경로에 대한 총 이동 거리 계산
    
    Args:
        submission_path: 제출 파일 경로 ('point_id' 컬럼을 가진 CSV)
        data_path: 원본 데이터 파일 경로 ('point_id', 'x', 'y' 컬럼을 가진 CSV)
        
    Returns:
        float: 총 이동 거리
    """
    # 데이터 로드
    submission_df = pd.read_csv(submission_path)
    data_df = pd.read_csv(data_path)
    
    # point_id를 기준으로 좌표 정보 병합
    route_df = pd.merge(
        submission_df,
        data_df[['point_id', 'x', 'y']],
        on='point_id',
        how='left'
    )
    
    # 총 거리 계산
    total_distance = 0
    for i in range(len(route_df) - 1):
        # 현재 지점과 다음 지점의 좌표
        current = route_df.iloc[i]
        next_point = route_df.iloc[i + 1]
        
        # 유클리드 거리 계산
        distance = math.sqrt(
            (current['x'] - next_point['x'])**2 + 
            (current['y'] - next_point['y'])**2
        )
        total_distance += distance
        
        # 이동 정보 출력
        print(f"{current['point_id']} -> {next_point['point_id']}: {distance:.4f}")
    
    return total_distance

def main():
    # 파일 경로
    submission_path = 'submission_or_imp_.csv'  # 제출 파일
    data_path = 'data.csv'  # 원본 데이터
    
    try:
        # 총 거리 계산
        total_distance = calculate_route_distance(submission_path, data_path)
        
        print(f"\n=== 경로 거리 계산 결과 ===")
        print(f"총 이동 거리: {total_distance:.4f}")
        
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

## 파라미터 조정

In [None]:
import math
import pandas as pd
from ortools.constraint_solver import pywrapcp, routing_enums_pb2

def read_data(csv_path: str):
    """데이터를 읽고 전처리"""
    data_df = pd.read_csv(csv_path)
    data_df = data_df.sort_values(by='point_id').reset_index(drop=True)
    
    # DEPOT과 마을 데이터 분리
    depot_data = data_df[data_df['point_id'] == 'DEPOT'].iloc[0]
    towns_data = data_df[data_df['point_id'] != 'DEPOT'].reset_index(drop=True)
    
    # OR-Tools 데이터 준비
    locations = [(depot_data['x'], depot_data['y'])] + list(zip(towns_data['x'], towns_data['y']))
    demands = [0] + list(towns_data['demand'])
    
    return data_df, depot_data, towns_data, locations, demands

def create_data_model(locations, demands):
    """OR-Tools 데이터 모델 생성"""
    data = {}
    # 정수 스케일을 유지하되, 조금 더 세밀하게(예: 1km -> 10000 스케일) 해볼 수도 있음
    # 혹은 float 대신 아래처럼 int를 쓰되, 소수점 자르는 방식을 round로 변경 가능
    data['distance_matrix'] = [
        [int(round(math.sqrt((x1[0] - x2[0])**2 + (x1[1] - x2[1])**2)*1000))
         for x2 in locations]
        for x1 in locations
    ]
    
    data['demands'] = demands
    data['vehicle_capacities'] = [25] * 75  # 최대 75대, 각 용량 25
    data['num_vehicles'] = 75
    data['depot'] = 0
    
    return data

def solve_vrp(data, time_limit_seconds=3600):
    """VRP 해결(개선판)"""
    manager = pywrapcp.RoutingIndexManager(
        len(data['distance_matrix']),
        data['num_vehicles'],
        data['depot']
    )
    routing = pywrapcp.RoutingModel(manager)
    
    # 거리 콜백
    def distance_callback(from_index, to_index):
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        return data['distance_matrix'][from_node][to_node]
    
    transit_callback_index = routing.RegisterTransitCallback(distance_callback)
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
    
    # 수요 콜백
    def demand_callback(from_index):
        from_node = manager.IndexToNode(from_index)
        return data['demands'][from_node]
    
    demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
    
    # 용량 제약 추가
    routing.AddDimensionWithVehicleCapacity(
        demand_callback_index,
        0,  # slack
        data['vehicle_capacities'],
        True,  # start cumul to zero
        'Capacity'
    )
    
    # 탐색 파라미터 설정
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    
    # === 초기 해법 전략 ===
    # 적절한 초기해법을 찾는 전략: PATH_CHEAPEST_ARC, SAVINGS, SWEEP, CHRISTOFIDES 등 다양
    # 문제에 따라 다양한 설정을 테스트해 볼 수 있음
    search_parameters.first_solution_strategy = (
        routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
    )
    
    # === 메타 휴리스틱(로컬 서치) 설정 ===
    # GUIDED_LOCAL_SEARCH, SIMULATED_ANNEALING, TABU_SEARCH 등 변경 가능
    search_parameters.local_search_metaheuristic = (
        routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
    )
    
    # === 로컬 서치 오퍼레이터 강화 ===
    # 필요에 따라 활성화 가능 (성능과 시간 트레이드오프)
    search_parameters.local_search_operators.use_relocate = pywrapcp.BOOL_TRUE
    search_parameters.local_search_operators.use_exchange = pywrapcp.BOOL_TRUE
    search_parameters.local_search_operators.use_cross = pywrapcp.BOOL_TRUE
    search_parameters.local_search_operators.use_lin_kernighan = pywrapcp.BOOL_TRUE
    search_parameters.local_search_operators.use_tsp_opt = pywrapcp.BOOL_TRUE
    
    # === LNS 계열 기법 ===
    # 경로단위 LNS, TSP LNS, 비활성화 LNS 등 가능
    search_parameters.local_search_operators.use_path_lns = pywrapcp.BOOL_TRUE
    search_parameters.local_search_operators.use_tsp_lns = pywrapcp.BOOL_TRUE
    # 아래도 시도해볼 수 있음
    # search_parameters.local_search_operators.use_full_path_lns = pywrapcp.BOOL_TRUE
    
    # === 시간 제한 ===
    search_parameters.time_limit.seconds = time_limit_seconds
    search_parameters.log_search = True  # 탐색과정 로그 표시
    
    # === Guided Local Search 파라미터 튜닝 ===
    # 페널티 가중치(람다 계수). 크면 페널티가 더 세게 부여되어, 경로가 다양해짐
    search_parameters.guided_local_search_lambda_coefficient = 0.1
    
    # === 다중 스레드 병렬화(선택) ===
    # (병렬화 시 결과 재현성 보장이 안 될 수 있음)
    # search_parameters.number_of_solutions_to_collect = 1
    # search_parameters.solution_limit = 1000000000
    # search_parameters.use_cp = True  # OR-Tools 8.x 이후 버전에서만 작동
    
    # 해 찾기
    solution = routing.SolveWithParameters(search_parameters)
    
    if not solution:
        return None, None
    
    # 결과 추출
    routes = []
    total_distance = 0.0
    
    for vehicle_id in range(data['num_vehicles']):
        index = routing.Start(vehicle_id)
        route = []
        route_distance = 0
        
        while not routing.IsEnd(index):
            node = manager.IndexToNode(index)
            route.append(node)
            prev_index = index
            index = solution.Value(routing.NextVar(index))
            if not routing.IsEnd(index):
                next_node = manager.IndexToNode(index)
                # 다시 실제 km 단위로 환산
                route_distance += data['distance_matrix'][node][next_node] / 1000.0
        
        # 실제로 마을을 방문한 경우만 기록 (DEPOT만 방문하고 끝나는 경우 제외)
        if len(route) > 1:
            # DEPOT 복귀
            route.append(0)
            routes.append(route)
            total_distance += route_distance
    
    return routes, total_distance

def convert_routes_to_submission(routes, towns_data):
    """경로를 제출 형식으로 변환"""
    total_route_town_ids = []
    
    for route in routes:
        route_town_ids = []
        for node_idx in route:
            if node_idx == 0:
                route_town_ids.append('DEPOT')
            else:
                town_id = towns_data.iloc[node_idx-1]['point_id']
                route_town_ids.append(town_id)
        # 마지막 DEPOT 제외하고 다음 경로로 넘어감
        total_route_town_ids.extend(route_town_ids[:-1])
    
    # 최종 DEPOT 추가
    total_route_town_ids.append('DEPOT')
    return total_route_town_ids

def main():
    try:
        # 데이터 읽기
        data_df, depot_data, towns_data, locations, demands = read_data('data.csv')
        
        # 데이터 모델 생성
        data = create_data_model(locations, demands)
        
        # VRP 해결
        # 시간 제한을 늘려서 더 깊이 탐색해볼 수 있음 (예: 10800초 = 3시간 등)
        routes, total_distance = solve_vrp(data, time_limit_seconds=7200)
        
        if routes:
            print("\n=== 산타의 배달 경로 ===")
            for i, route in enumerate(routes, 1):
                route_town_ids = []
                for node_idx in route:
                    if node_idx == 0:
                        route_town_ids.append('DEPOT')
                    else:
                        town_id = towns_data.iloc[node_idx-1]['point_id']
                        route_town_ids.append(town_id)
                print(f"Route #{i}: {' -> '.join(route_town_ids)}")
            
            print(f"\n총 이동 거리(환산): {total_distance:.2f} km")
            
            # 제출 파일 생성
            total_route_town_ids = convert_routes_to_submission(routes, towns_data)
            result_df = pd.DataFrame({'point_id': total_route_town_ids})
            result_df.to_csv('submission_or_imp_2.csv', index=False)
            print("\n[INFO] 제출 파일이 생성되었습니다: submission_or_imp_2.csv")
            
        else:
            print("해를 찾을 수 없습니다!")
            
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()
