In [1]:
import math
from typing import List
import numpy as np
import pandas as pd
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
from scipy.spatial.distance import cdist
import folium

In [2]:
# 데이터 파일 경로 및 파라미터 설정
viol_file = "../Data/CCTV기반단속개별건수.csv"
cctv_file = "../Data/12_04_08_E_CCTV정보.xlsx"
coverage_ratio = 0.7
min_nodes = 50
bandwidths = [200.0, 300.0, 400.0]
penalty_factor = 5.0
time_limit = 120
num_cars = 1
start_node = 0

In [3]:
# 하버사인 거리 함수
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    R = 6371000
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = (
        math.sin(dphi / 2.0) ** 2
        + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2.0) ** 2
    )
    return 2 * R * math.atan2(math.sqrt(a), math.sqrt(1 - a))

# 평면 좌표 변환 함수
def to_xy(lat: np.ndarray, lon: np.ndarray, lat0: float) -> np.ndarray:
    R = 6371000
    x = np.deg2rad(lon) * R * np.cos(np.deg2rad(lat0))
    y = np.deg2rad(lat) * R
    return np.column_stack((x, y))

# KDE 모델 학습 함수
def fit_kde(coords: np.ndarray, bandwidths: List[float]) -> KernelDensity:
    sample_size = min(len(coords), 10000)
    rng = np.random.default_rng(0)
    idx = rng.choice(len(coords), size=sample_size, replace=False)
    coords_sample = coords[idx]
    params = {"bandwidth": bandwidths}
    grid = GridSearchCV(KernelDensity(kernel="gaussian"), params, cv=3, n_jobs=-1)
    grid.fit(coords_sample)
    return grid.best_estimator_

In [4]:
# 데이터 읽기 및 전처리
viol = pd.read_csv(viol_file, encoding="cp949").dropna(subset=["위도", "경도"])
cctv = pd.read_excel(cctv_file, engine="openpyxl").dropna(
    subset=["WGS84위도", "WGS84경도"]
)

# 좌표 변환 및 KDE
lat0 = float(cctv["WGS84위도"].mean())
viol_coords_xy = to_xy(viol["위도"].to_numpy(), viol["경도"].to_numpy(), lat0)
cctv_coords_xy = to_xy(
    cctv["WGS84위도"].to_numpy(), cctv["WGS84경도"].to_numpy(), lat0
)
kde = fit_kde(viol_coords_xy, bandwidths)

  warn("Workbook contains no default style, apply openpyxl's default")


In [5]:
# 밀도 가중치 계산 및 핵심 CCTV 선택
log_dens = kde.score_samples(cctv_coords_xy)
cctv["density_weight"] = np.exp(log_dens)
dens_min = float(cctv["density_weight"].min())
dens_max = float(cctv["density_weight"].max())
if dens_max - dens_min > 0:
    cctv["dens_norm"] = (cctv["density_weight"] - dens_min) / (dens_max - dens_min)
else:
    cctv["dens_norm"] = 0.0

cctv_sorted = cctv.sort_values("density_weight", ascending=False).reset_index(drop=True)
dens_cumsum = cctv_sorted["density_weight"].cumsum()
dens_total = float(cctv_sorted["density_weight"].sum())
dens_ratio = dens_cumsum / dens_total
selected_idx = np.where(dens_ratio <= coverage_ratio)[0]
if len(selected_idx) < min_nodes:
    selected_idx = np.arange(min(min_nodes, len(cctv_sorted)))
selected = cctv_sorted.loc[selected_idx].reset_index(drop=True)

# 거리 행렬 및 패널티 계산
lat_lon = selected[["WGS84위도", "WGS84경도"]].to_numpy()
lat_rad = np.deg2rad(lat_lon[:, 0])
lon_rad = np.deg2rad(lat_lon[:, 1])
lat1 = lat_rad[:, None]
lat2 = lat_rad[None, :]
lon1 = lon_rad[:, None]
lon2 = lon_rad[None, :]
dphi = lat2 - lat1
dlambda = lon2 - lon1
a = np.sin(dphi / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlambda / 2) ** 2
dist_matrix = (2 * 6371000 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))).astype(int)

avg_dist = float(dist_matrix.mean())
base_penalty = avg_dist * penalty_factor
selected["penalty"] = base_penalty * (1.0 - selected["dens_norm"])

In [None]:
# OR-Tools 최적화 및 결과 출력
from ortools.constraint_solver import pywrapcp, routing_enums_pb2

num_nodes = len(selected)
manager = pywrapcp.RoutingIndexManager(num_nodes, num_cars, start_node)
routing = pywrapcp.RoutingModel(manager)

def distance_callback(from_index, to_index):
    from_node = manager.IndexToNode(from_index)
    to_node = manager.IndexToNode(to_index)
    return int(dist_matrix[from_node, to_node])

transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

for idx in range(1, num_nodes):
    penalty_value = int(max(1, selected.loc[idx, "penalty"]))
    routing.AddDisjunction([manager.NodeToIndex(idx)], penalty_value)

params = pywrapcp.DefaultRoutingSearchParameters()
params.first_solution_strategy = routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
params.local_search_metaheuristic = routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
params.time_limit.seconds = time_limit

solution = routing.SolveWithParameters(params)

if solution:
    route = []
    total_dist = 0
    index = routing.Start(0)
    while not routing.IsEnd(index):
        node = manager.IndexToNode(index)
        route.append(node)
        prev_index = index
        index = solution.Value(routing.NextVar(index))
        total_dist += routing.GetArcCostForVehicle(prev_index, index, 0)
    route.append(manager.IndexToNode(index))
    density_sum = selected.loc[route, "density_weight"].sum()
    print(f"방문 지점 수: {len(route)}")
    print(f"총 이동거리: {total_dist/1000:.2f} km")
    print(f"방문 지점의 밀도 가중치 합계: {density_sum:.3e}")
else:
    print("솔루션을 찾지 못했습니다.")

In [None]:
# Folium을 이용한 경로 시각화 및 HTML 저장
center_lat = selected["WGS84위도"].mean()
center_lon = selected["WGS84경도"].mean()
m = folium.Map(location=[center_lat, center_lon], zoom_start=13)
route_coords = selected.loc[route, ["WGS84위도", "WGS84경도"]].values.tolist()
folium.PolyLine(route_coords, weight=5, opacity=0.7).add_to(m)
for idx in route:
    lat, lon = selected.loc[idx, ["WGS84위도", "WGS84경도"]]
    folium.Marker([lat, lon], popup=str(idx)).add_to(m)

# HTML 파일로 저장
output_html = "route_map_"+str(coverage_ratio)+".html"
m.save(output_html)
print(f"경로 지도가 '{output_html}' 파일로 저장되었습니다.")

경로 지도가 'route_map_0.7.html' 파일로 저장되었습니다.
