In [1]:
# Papermill 파라미터 셀
start_lat = 37.575785 # 기본값, papermill로 덮어씀
start_lon = 127.048772 # 기본값, papermill로 덮어씀
walk_km = 5  # 총 왕복 산책 거리
season = '가을'

In [2]:
# Parameters
start_lat = 37.571058144694945
start_lon = 127.0695885049667
walk_km = 2.92
season = "\uc5ec\ub984"


In [3]:
import json
import pandas as pd
import networkx as nx
import folium
from scipy.spatial import KDTree
from collections import Counter
import os

In [4]:
with open('data/04_final_data/final_edges.geojson', encoding='utf-8') as f:
    edges_geo = json.load(f)
nodes_df = pd.read_csv('data/04_final_data/final_nodes.csv')
tree_char_df = pd.read_csv('data/02_intermediate/tree-characteristics.csv')

def get_seasonal_trees(season):
    return set(tree_char_df[tree_char_df[season] == 1]['수목명'])

node_pos = {row['osmid']: (row['lat'], row['lon']) for _, row in nodes_df.iterrows()}

In [5]:
#동대문구 전체 그래프 생성
G = nx.Graph()
for feature in edges_geo['features']:
    props = feature['properties']
    u, v = props['u'], props['v']
    length = props.get('length', 1)
    type_keys = ['park', 'mountain', 'river', 'tree-line', 'road']
    edge_type = next((k for k in type_keys if props.get(k, 0) == 1), 'road') #도로 유형 속성이 1인거 찾기
    tree_names = props.get('tree', '').split() #수목명 리스트로 저장
    road_name = props.get('name', '')
    G.add_edge(u, v, length=length, type=edge_type, tree=tree_names, road_name=road_name)

#엣지 출력
#edge = list(G.edges(data=True))[0]
#print(edge)

#산책 반경에 맞는 서브그래프 생성
def filter_graph_by_bbox_and_type(G, node_pos, start_lat, start_lon, walk_km, bbox_km=2.0):
    lat_margin = bbox_km / 111.0
    lon_margin = bbox_km / 88.0
    min_lat, max_lat = start_lat - lat_margin, start_lat + lat_margin
    min_lon, max_lon = start_lon - lon_margin, start_lon + lon_margin
    nodes_in_bbox = [osmid for osmid, (lat, lon) in node_pos.items() if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon]
    subG = G.subgraph(nodes_in_bbox).copy()
    return subG

In [6]:
#매력도 계산
def get_preference_score(data, season, poi_type='default'):
    seasonal_trees = get_seasonal_trees(season)
    # poi_type에 따라 type_weight_map 다르게 적용
    if poi_type == 'park':
        type_weight_map = { 'park': 0.01, 'river': 0.3, 'mountain': 0.3, 'tree-line': 0.5, 'road': 1.5}
    elif poi_type == 'river':
        type_weight_map = { 'park': 0.3, 'river': 0.01, 'mountain': 0.3, 'tree-line': 0.5, 'road': 1.5}
    elif poi_type == 'mountain':
        type_weight_map = { 'park': 0.3, 'river': 0.3, 'mountain': 0.01, 'tree-line': 0.5, 'road': 1.5}

    type_modifier = type_weight_map.get(data.get('type')) 
    tree_modifier = 0.9 if any(t in seasonal_trees for t in data.get('tree', [])) else 1.1
    return type_modifier * tree_modifier

#실제 그래프 엣지 가중치: 매력도 * 거리
def edge_weight(u, v, season, G, poi_type='default'):
    data = G[u][v]
    preference_score = get_preference_score(data, season, poi_type)
    return preference_score * data['length']

In [7]:
def recommend_path(G, node_pos, start_lat, start_lon, season, total_walk_km, poi_type, tolerance=0.2):
    # 시작점 노드 찾기
    node_coords = list(node_pos.values())
    node_osmids = list(node_pos.keys())
    kdtree = KDTree(node_coords)
    _, start_node_idx = kdtree.query([start_lat, start_lon])
    start_osmid = node_osmids[start_node_idx]

    # 서브그래프 생성
    subG = filter_graph_by_bbox_and_type(G, node_pos, start_lat, start_lon, total_walk_km)
    if not subG.has_node(start_osmid):
        print("error: 시작점 주변에 탐색할 경로가 없습니다.")
        return None
        
    # 경로 탐색용 가중치 적용 (매력도+거리)
    for u, v in subG.edges():
        subG[u][v]['weight'] = edge_weight(u, v, season, subG, poi_type)

    # 매력적인 산책로 찾기 (유형별 필터링 추가)
    edge_preferences = [((u, v), get_preference_score(data, season, poi_type))
                    for u, v, data in subG.edges(data=True) if data.get('type') == poi_type]
    best_edges = [edge for edge, score in sorted(edge_preferences, key=lambda x: x[1])[:max(1, int(len(edge_preferences) * 0.1))]]
    pois = {n for u, v in best_edges for n in (u, v)}
    if not pois:
        print("error: 주변에 추천할 만한 경로가 없습니다.")
        return None

    # 시작점에서 POI까지의 최단 경로 길이 계산
    try:
        path_lengths = nx.single_source_dijkstra_path_length(subG, start_osmid, weight='length')
    except nx.NetworkXNoPath:
        print(f"error: 시작점 {start_osmid}이(가) 주변 경로와 연결되어 있지 않습니다.")
        return None

    # 목표 편도 거리에 맞는 POI 후보군 필터링
    candidates = []
    target_one_way_m = total_walk_km * 1000 / 2  # 목표 거리를 절반으로 설정
    for poi in pois:
        if poi in path_lengths: #우리가 설정한 시간만에 갈수있는지
            path_length_to_poi = path_lengths[poi]
            # 편도 거리를 기준으로 비교
            if abs(path_length_to_poi - target_one_way_m) <= target_one_way_m * tolerance:
                candidates.append((poi, path_length_to_poi))

    if not candidates:
        print(f"error: {total_walk_km}km 왕복 거리에 맞는 추천 경로를 찾지 못했습니다.")
        return None

    # 목표 편도 거리에 가장 근접한 산책로를 목적지로 선택
    best_poi, _ = min(candidates, key=lambda x: abs(x[1] - target_one_way_m))
    # 가중치(매력도+거리) 반영해서 최적 경로 계산
    final_path = nx.shortest_path(subG, start_osmid, best_poi, weight='weight')
    
    return final_path

In [8]:
# 추천 유형 리스트 (예: 산, 강, 공원)
poi_types = ['mountain', 'river', 'park']
results = []

for poi_type in poi_types:
    path = recommend_path(G, node_pos, start_lat, start_lon, season, walk_km, poi_type, tolerance=0.3)
    results.append(path)

poi_tree_list = []
for idx, path_nodes in enumerate(results):
    if path_nodes:
        one_way_length = sum(G.edges[path_nodes[i], path_nodes[i+1]]['length'] for i in range(len(path_nodes)-1))
        print(f"--- {poi_types[idx]} POI 경로 추천 ---")
        print(f"편도 거리: {one_way_length/1000:.2f} km")
        print(f"왕복 시 예상 거리: {one_way_length*2/1000:.2f} km")
        round_trip_time_min = one_way_length * 2 / (4000 / 60)
        print(f"예상 소요 시간(왕복): {round_trip_time_min:.0f}분")
        poi_type = poi_types[idx]
        poi_road_name = G.edges[path_nodes[-2], path_nodes[-1]].get('road_name', '')
        if poi_road_name:
            poi_road_name = f"{poi_road_name}-{poi_type}"  # ← 타입 붙이기
        else:
            poi_road_name = poi_type  # 도로명이 없으면 타입만
        seasonal_trees_on_path = {
            tree
            for u, v in zip(path_nodes, path_nodes[1:])
            for tree in G.edges[u, v].get('tree', [])
            if tree in get_seasonal_trees(season)
        }
        if seasonal_trees_on_path:
            print(f"경로에서 볼 수 있는 계절 수목: {', '.join(seasonal_trees_on_path)}")
        # 리스트에 저장
        poi_tree_list.append([poi_road_name, list(seasonal_trees_on_path)])
    else:
        print(f'error: {poi_types[idx]} 경로를 찾지 못했습니다.')

# 파일로 저장 (예: json)
import json
with open('backend/app/services/path_reccomendation/poi_tree_list.json', 'w', encoding='utf-8') as f:
    json.dump(poi_tree_list, f, ensure_ascii=False, indent=2)
print('poi_tree_list.json 파일로 각 경로별 반환점 도로명과 계절 수목 리스트가 저장되었습니다.')

--- mountain POI 경로 추천 ---
편도 거리: 1.46 km
왕복 시 예상 거리: 2.93 km
예상 소요 시간(왕복): 44분
경로에서 볼 수 있는 계절 수목: 잣나무, 느티나무, 은행나무, 양버즘나무
--- river POI 경로 추천 ---
편도 거리: 1.99 km
왕복 시 예상 거리: 3.99 km
예상 소요 시간(왕복): 60분
경로에서 볼 수 있는 계절 수목: 은행나무, 양버즘나무
--- park POI 경로 추천 ---
편도 거리: 1.50 km
왕복 시 예상 거리: 3.01 km
예상 소요 시간(왕복): 45분
경로에서 볼 수 있는 계절 수목: 향나무, 은행나무, 양버즘나무
poi_tree_list.json 파일로 각 경로별 반환점 도로명과 계절 수목 리스트가 저장되었습니다.


In [9]:
colors = {'mountain': 'green', 'river': 'blue', 'park': 'orange'}
multi_map = folium.Map(location=[start_lat, start_lon], zoom_start=15)
folium.Marker(location=[start_lat, start_lon], popup="시작점", icon=folium.Icon(color='red')).add_to(multi_map)
for idx, path_nodes in enumerate(results):
    if path_nodes:
        end_pos = node_pos[path_nodes[-1]]
        folium.Marker(location=end_pos, popup=f"{poi_types[idx]} 반환점", icon=folium.Icon(color='blue')).add_to(multi_map)
        path_coords = [node_pos[node] for node in path_nodes]
        folium.PolyLine(locations=path_coords, color=colors[poi_types[idx]], weight=5, opacity=0.8, popup=f"{poi_types[idx]} 경로").add_to(multi_map)

multi_map.save('backend/app/services/path_reccomendation/recommended_walk_path_all.html')
print("\n'recommended_walk_path_all.html' 파일에 3가지 경로가 모두 시각화되었습니다.")


'recommended_walk_path_all.html' 파일에 3가지 경로가 모두 시각화되었습니다.


In [10]:
# 최종 3개 경로를 LineString geojson으로 저장
geojson_features = []
for idx, path_nodes in enumerate(results):
    if path_nodes:
        coords = [node_pos[node] for node in path_nodes]
        feature = {
            "type": "Feature",
            "properties": {
                "name": poi_types[idx]
            },
            "geometry": {
                "type": "LineString",
                "coordinates": [[lon, lat] for lat, lon in coords]
            }
        }
        geojson_features.append(feature)
geojson_obj = {
    "type": "FeatureCollection",
    "features": geojson_features
}
with open('backend/app/services/path_reccomendation/results_path.geojson', 'w', encoding='utf-8') as f:
    json.dump(geojson_obj, f, ensure_ascii=False, indent=2)
print('results_path.geojson 파일로 3개 경로가 LineString geojson으로 저장되었습니다.')

results_path.geojson 파일로 3개 경로가 LineString geojson으로 저장되었습니다.
