In [1]:
import numpy as np
import pandas as pd

import json
import heapq
import requests

from time import time
from tqdm import tqdm
from urllib.parse import urlparse

# kakao API key
api_key = '43dabecbc47029b5ca73d4c599de3185'

# Data

## subway_graph
* 인접행렬(adjacency matrix) 방식으로 구현한 지하철 역 간 소요 시간 그래프

In [2]:
# 역간 소요시간 및 환승시간 반영된 인접행렬 로드
subway_matrix = pd.read_csv('./adjacency_matrix.csv')

# 인접행렬 형태로 구현된 그래프
subway_graph = subway_matrix.drop('subway_name', axis=1)
subway_graph.head()

Unnamed: 0,소요산,동두천,보산,동두천중앙,지행,덕정,덕계,양주,녹양,가능,...,하남풍산,하남시청,하남검단산,둔촌동,올림픽공원,방이,5오금,개롱,거여,마천
0,0,4,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,4,0,2,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,0,2,0,2,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,0,0,2,0,1,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,0,0,0,1,0,4,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## subway_info

In [3]:
def address_to_coordinates(address):
    '''카카오 API를 활용하여 주소에 해당하는 위도 경도 추출'''
    url = f"https://dapi.kakao.com/v2/local/search/address.json?query={address}"
    headers = {"Authorization": "KakaoAK " + api_key}
    result = json.loads(str(requests.get(url, headers=headers).text))
    try:
        coordinates = result['documents'][0]['address']
        return float(coordinates['x']), float(coordinates['y'])
    except:
        return None, None


# 호선, 역이름, 역주소 로드 (https://www.data.go.kr/data/15013205/standard.do)
subway_info = pd.read_csv('./subway_info.csv')

# 역주소 위도 경도 추출
subway_info['coordinates'] = subway_info['address'].apply(address_to_coordinates)
subway_info['coordinate_x'] = subway_info['coordinates'].apply(lambda x : x[0])
subway_info['coordinate_y'] = subway_info['coordinates'].apply(lambda x : x[1])
subway_info = subway_info.drop('coordinates', axis=1)    

In [4]:
# API 누락 역주소
manual_address_to_coordinates = {
    '덕계' : (37.818761, 127.056676),
    '도봉산' : (37.689603, 127.046347),
    '석계' : (37.615206,	127.065594),
    '성환' : (36.915781,	127.127007),
    '탕정' : (36.788272,	127.080446),
    '화정' : (37.637837,	126.832503),
    '원당' : (37.653103,	126.842891),
    '삼송' : (37.653096,	126.895559),
    '대공원' : (37.435724,	127.006474),
}

# API 누락 역주소 위도 경도 추가
for key, value in manual_address_to_coordinates.items():
    subway_info.loc[subway_info['subway_name'] == key, 'coordinate_x'] = value[0]
    subway_info.loc[subway_info['subway_name'] == key, 'coordinate_y'] = value[1]

subway_info.head()

Unnamed: 0,subway_line,subway_name,address,coordinate_x,coordinate_y
0,1,소요산,경기도 동두천시 평화로 2925(상봉암동 126),127.061213,37.947954
1,1,동두천,경기도 동두천시 평화로 2687(동두천동 245-210),127.05494,37.927837
2,1,보산,경기도 동두천시 평화로 2539(보산동),127.057237,37.914319
3,1,동두천중앙,경기도 동두천시 동두천로 228(생연동 682),127.056239,37.901806
4,1,지행,경기도 동두천시 평화로 2285(지행동),127.055664,37.891877


## heu_matrix
* A* 알고리즘 구현을 위한 휴리스틱 정보 (Euclidean, Manhatten)

In [9]:
# 역이름
subway_li = subway_graph.columns.tolist()
subway_name_li = subway_info['subway_name'].tolist()

# 인접행렬 초기화
heu_matrix_euclidean = pd.DataFrame(index=subway_name_li, columns=subway_name_li)
heu_matrix_manhatten = pd.DataFrame(index=subway_name_li, columns=subway_name_li)

# 유클리드 거리 계산 함수
def euclidean_distance(x1, y1, x2, y2):
    return np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)

# 맨하탄 거리 계산 함수
def manhattan_distance(x1, y1, x2, y2):
    return np.abs(x1 - x2) + np.abs(y1 - y2)

# 각 이름간 유클리드 거리 & 맨해튼 거리 각각 계산하여 인접행렬에 저장
for subway1 in tqdm(subway_name_li):
    for subway2 in subway_name_li:
        if subway1 == subway2:
            heu_matrix_euclidean.loc[subway1, subway2] = 0.0
            heu_matrix_manhatten.loc[subway1, subway2] = 0.0
        else:
            x1 = subway_info.loc[subway_info['subway_name'] == subway1, 'coordinate_x'].values[0]
            y1 = subway_info.loc[subway_info['subway_name'] == subway1, 'coordinate_y'].values[0]
            x2 = subway_info.loc[subway_info['subway_name'] == subway2, 'coordinate_x'].values[0]
            y2 = subway_info.loc[subway_info['subway_name'] == subway2, 'coordinate_y'].values[0]
            heu_matrix_euclidean.loc[subway1, subway2] = euclidean_distance(x1, y1, x2, y2)
            heu_matrix_manhatten.loc[subway1, subway2] = manhattan_distance(x1, y1, x2, y2)

100%|████████████████████████████████████████████████████████████████████████████████| 300/300 [01:22<00:00,  3.63it/s]


# Algorithms

## 1. Dijkstra algorithm

In [10]:
def dijkstra(adj_matrix, start_vertex, end_vertex):
    # 초기화
    n = len(adj_matrix)
    visited = [False] * n
    previous_vertex = [None] * n
    
    distance = [float('inf')] * n

    # 시작 정점의 시간를 0으로 설정
    distance[start_vertex] = 0
    
    # 우선순위 큐에 (시간, 정점) 쌍을 넣음
    pq = [(0, start_vertex)]
    
    while pq:
        # 우선순위 큐에서 시간이 가장 짧은 정점을 꺼냄
        current_distance, current_vertex = heapq.heappop(pq)
        
        # 이미 방문한 정점이면 스킵
        if visited[current_vertex]:
            continue
        
        # 현재 정점을 방문 처리
        visited[current_vertex] = True
        
        # 목적지에 도달한 경우, 탐색을 종료하고 최단 경로와 비용을 반환
        if current_vertex == end_vertex:
            path = []
            while previous_vertex[current_vertex] is not None:
                path.append(current_vertex)
                current_vertex = previous_vertex[current_vertex]
            path.append(current_vertex)
            path.reverse()
            return path, distance[end_vertex]
        
        # 현재 정점과 연결된 인접 정점들을 탐색
        for neighbor_vertex in range(n):
            weight = adj_matrix[current_vertex][neighbor_vertex]
            if weight > 0:
                # 새로운 시간을 계산
                new_distance = distance[current_vertex] + weight
                
                # 새로운 시간이 기존의 시간보다 짧으면 업데이트
                if new_distance < distance[neighbor_vertex]:
                    distance[neighbor_vertex] = new_distance
                    previous_vertex[neighbor_vertex] = current_vertex
                    heapq.heappush(pq, (new_distance, neighbor_vertex))
    
    # 목적지에 도달할 수 없는 경우
    return None, None

In [12]:
# 다익스트라 알고리즘 적용
adj_matrix = subway_graph.values
start_vertex = int(subway_li.index(input("출발지 입력: ")))
end_vertex = int(subway_li.index(input("목적지 입력: ")))

start_time = time()
shortest_path, cost = dijkstra(adj_matrix, start_vertex, end_vertex)
end_time = time()
elapsed = end_time - start_time

print(f"\n최단 경로: {[subway_li[i] for i in shortest_path]}\n")

print(f"소요 시간: {cost} 분")

print(f"다익스트라 성능: {elapsed} 초")

출발지 입력: 소요산
목적지 입력: 마천

최단 경로: ['소요산', '동두천', '보산', '동두천중앙', '지행', '덕정', '덕계', '양주', '녹양', '가능', '의정부', '회룡', '망월사', '도봉산', '도봉', '방학', '1창동', '4창동', '쌍문', '수유', '미아', '미아사거리', '길음', '성신여대입구', '한성대입구', '혜화', '4동대문', '4동대문역사문화공원', '2동대문역사문화공원', '신당', '상왕십리', '2왕십리', '5왕십리', '마장', '답십리', '장한평', '군자', '아차산', '광나루', '천호', '강동', '둔촌동', '올림픽공원', '방이', '5오금', '개롱', '거여', '마천']

소요 시간: 86 분
다익스트라 성능: 0.012998580932617188 초


## 2. A* algorithm

In [13]:
def a_star(adj_matrix, start_vertex, end_vertex, heu_matrix):
    # 초기화
    n = len(adj_matrix)
    previous_vertex = [None] * n
    
    open_list = []
    close_list = []
    
    # 출발 노드로부터 현재 노드까지의 실제 비용 (거리)
    g = [float('inf')] * n
    g[start_vertex] = 0
    
    # 목적지까지의 예상 비용 (휴리스틱)
    h = heu_matrix.iloc[start_vertex, end_vertex]
    
    # 목적지까지의 총 예상 비용 (휴리스틱 + 거리)
    f = g + h
    
    # 우선순위 큐에 (총 예상 비용, 노드) 쌍을 넣음
    heapq.heappush(open_list, (f, start_vertex))
    
    while open_list:
        # 우선순위 큐에서 총 예상 시간이 가장 짧은 노드를 꺼냄
        _, current_vertex = heapq.heappop(open_list)
        
         # 목적지에 도달한 경우, 탐색을 종료하고 최단 경로와 비용을 반환
        if current_vertex == end_vertex:
            path = [current_vertex]
            while previous_vertex[current_vertex] is not None:
                current_vertex = previous_vertex[current_vertex]
                path.append(current_vertex)
            return path[::-1], g[end_vertex]
        
        # 현재 노드를 Close List에 추가
        close_list.append(current_vertex)
        
        # 인접 노드 탐색
        for neighbor_vertex in range(n):          
            weight = adj_matrix[current_vertex][neighbor_vertex]
            if (weight > 0) and (neighbor_vertex not in close_list):             
                
                # 인접 노드에서 상태값 계산
                tentative_g = g[current_vertex] + weight
                h = heu_matrix.iloc[neighbor_vertex, end_vertex]
                f = tentative_g + h
                
                # 인접 노드를 Open List에 추가
                if (neighbor_vertex not in open_list) or (tentative_g < g[neighbor_vertex]):
                    g[neighbor_vertex] = tentative_g
                    previous_vertex[neighbor_vertex] = current_vertex
                    heapq.heappush(open_list, (f, neighbor_vertex))    
                    
    # 목적지에 도달할 수 없는 경우
    return None, None

In [14]:
# A* 알고리즘 적용 - with 유클리드 휴리스틱
adj_matrix = subway_graph.values
heu_matrix = heu_matrix_euclidean

start_vertex = int(subway_li.index(input("출발지 입력: ")))
end_vertex = int(subway_li.index(input("목적지 입력: ")))

start_time = time()
shortest_patht, cost = a_star(adj_matrix, start_vertex, end_vertex, heu_matrix)
end_time = time()
elapsed = end_time - start_time

print(f"\n최단 경로: {[subway_li[i] for i in shortest_path]}\n")

print(f"소요 시간: {cost} 분")

print(f"A* with Euclidean 성능: {elapsed} 초")

출발지 입력: 소요산
목적지 입력: 마천


IndexError: index 300 is out of bounds for axis 0 with size 300

In [None]:
# A* 알고리즘 적용 - with 맨해튼 휴리스틱
adj_matrix = subway_graph.values
heu_matrix = heu_matrix_manhatten
start_vertex = int(subway_li.index(input("출발지 입력: ")))
end_vertex = int(subway_li.index(input("목적지 입력: ")))

start_time = time()
shortest_path, cost = a_star(adj_matrix, start_vertex, end_vertex, heu_matrix)
end_time = time()
elapsed = end_time - start_time

print(f"최단 경로: {[subway_li[i] for i in shortest_path]}")

print(f"소요 시간: {cost} 분")

print(f"A* with Manhatten 성능: {elapsed} 초")

## 3. Kruskal Algorithm

## 4. Prim Algorithm

# Comparative analysis

In [None]:
d

In [None]:
def test_algorithm():
    algorithm_dict = {
        0 : 'Dijkstra',
        1 : 'A*'
    }
    print(algorithm_dict)
    
    algorithm_idx = int(input("적용할 알고리즘 번호 입력: "))
    start_vertex = int(subway_name_li.index(input("출발지: ")))
    end_vertex = int(subway_name_li.index(input("도착지: ")))
    
    if algorithm_idx == 0:
        shortest_path_idx, cost = dijkstra(subway_graph.values, start_vertex, end_vertex)
        
    shortest_path = [subway_name_li[i] for i in shortest_path_idx]
    
    print(shortest_path)

test_algorithm()