## 서울시 도로 링크 ID 수집

In [None]:
import xml.etree.ElementTree as ET
import pandas as pd
import requests
import json

api_key = '' # vworld api key 입력
domain = 'localhost' # vworld apu key 생성시 입력 domain

rows = []
for page_num in range(1,47,1):
    vword_link_api_url = f'https://api.vworld.kr/req/data?service=data&version=2.0&request=GetFeature&format=json&errorformat=json&size=1000&page={page_num}&data=LT_L_MOCTLINK&geomfilter=BOX(126.734086,37.413294,127.269311,37.715133)&columns=link_id,road_name,max_spd,rd_rank_h,rd_type_h,rest_veh_h,rest_w,rest_h,remark,ag_geom&geometry=true&attribute=true&crs=EPSG:4326&domain={domain}&key={api_key}'
    

    link_res = requests.get(vword_link_api_url) #, headers=vword_api_params)
    print(f'page number {page_num} status code: {link_res.status_code}')
    
    vword_api_data = link_res.json()
    features = vword_api_data['response']['result']['featureCollection']['features']

    # features 리스트에서 데이터 추출
    for feature in features:
        properties = feature['properties']
        geometry = feature['geometry']
        # 필요한 필드 추출
        rows.append({
            'link_id': properties['link_id'],
            'road_name': properties['road_name'],
            'geometry': geometry['coordinates'],
            'max_spd': properties['max_spd'],
            'rd_rank_h': properties['rd_rank_h'],
            'rd_type_h': properties['rd_type_h'],
            'rest_veh_h': properties['rest_veh_h'],
            'rest_w': properties['rest_w'],
            'rest_h': properties['rest_h']
        })
    print(f'finished {page_num} page.')
link_df = pd.DataFrame(rows)
link_df.to_csv('./vword_linkID_data_241001.csv', index=False, encoding='utf-8-sig')
print(f'saved complite.')

## 주차장별 1km 반경 도로 정보 수집

In [1]:
import pandas as pd
import requests
import math

In [2]:
parking_data = pd.read_csv('./parking_data_20241001_1120.csv') # 주차장 목록

In [3]:
# 사각형의 min, max 값을 계산하는 함수
def calculate_min_max(lat, lon, radius):
    lat_radian = math.radians(lat)
    
    north_lat = lat + (radius / earth_radius) * (180 / math.pi)
    south_lat = lat - (radius / earth_radius) * (180 / math.pi)
    east_lon = lon + (radius / (earth_radius * math.cos(lat_radian))) * (180 / math.pi)
    west_lon = lon - (radius / (earth_radius * math.cos(lat_radian))) * (180 / math.pi)
    
    return west_lon, south_lat, east_lon, north_lat

# 지구 반지름 (대략적인 값, 미터 단위)
earth_radius = 6378137.0

# 반경 설정 (미터 단위)
radius = 1000  # 1km 반경

In [4]:
api_key = '' # vworld api key 입력
domain = '' # vworld apu key 생성시 입력 domain

rows = []
for idx, row in parking_data.iterrows():
    lat = row['위도']
    lon = row['경도']
    west_lon, south_lat, east_lon, north_lat = calculate_min_max(lat, lon, radius)
    min_x = west_lon
    min_y = south_lat
    max_x = east_lon
    max_y = north_lat

    box_value = f'BOX{min_x, min_y, max_x, max_y}'

    vword_link_api_url = f'https://api.vworld.kr/req/data?service=data&version=2.0&request=GetFeature&format=json&errorformat=json&size=1000&page=1&data=LT_L_MOCTLINK&geomfilter={box_value}&columns=link_id,road_name,max_spd,rd_rank_h,rd_type_h,rest_veh_h,rest_w,rest_h,remark,ag_geom&geometry=true&attribute=true&crs=EPSG:4326&domain={domain}&key={api_key}'
    
    try:
        data_res = requests.get(vword_link_api_url)
        vword_api_search_data = data_res.json()
        
        features = vword_api_search_data['response']['result']['featureCollection']['features']
        for feature in features:
            properties = feature['properties']
            geometry = feature['geometry']
            # 필요한 필드 추출
            rows.append({
                '주차장 이름' : row['주차장 이름'],
                '주차장 위도' : row['위도'],
                '주차장 경도' : row['경도'],
                'link_id': properties['link_id'],
                # 'road_name': properties['road_name'],
                'geometry': geometry['coordinates'],
                # 'max_spd': properties['max_spd'],
                # 'rd_rank_h': properties['rd_rank_h'],
                # 'rd_type_h': properties['rd_type_h'],
                # 'rest_veh_h': properties['rest_veh_h']
                # 'rest_w': properties['rest_w'],
                # 'rest_h': properties['rest_h']
            })
    except requests.exceptions.RequestException as e:
        # Handle exceptions related to the HTTP request
        print(f"Error occurred during API request for parking lot {row['주차장 이름']}: {e}")
    except ValueError as e:
        # Handle JSON parsing errors
        print(f"Error parsing JSON for parking lot {row['주차장 이름']}: {e}")
    except KeyError as e:
        # Handle missing expected keys in the API response
        print(f"Missing key {e} in API response for parking lot {row['주차장 이름']}") 

search_parking_road_data = pd.DataFrame(rows)
search_parking_road_data.to_csv('./parking_road_mapping_data_241002.csv', index=False, encoding='utf-8-sig')

## 주차장별 1km 반경 맵핑

In [120]:
import pandas as pd
import math
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from haversine import haversine, Unit
from tqdm import tqdm
tqdm.pandas()


In [121]:
parking_data = pd.read_csv('./parking_data_20241001_1120.csv')  # 주차장 목록
search_parking_road_data_cp = pd.read_csv('./parking_road_mapping_data_241002.csv', encoding='utf-8-sig')  # 주차장 주변 1km 도로 목록

In [123]:
# 위경도 정리 'lat_lon' 만들기
def re_lat_lon(geometry):
    lat_lon_pair = []
    geometry = eval(geometry)
    for geometry_list1 in geometry:
        for geometry_list2 in geometry_list1:
            lat_lon_pair.append((geometry_list2[1],geometry_list2[0]))
            
    return lat_lon_pair

search_parking_road_data_cp['lat_lon'] = search_parking_road_data_cp['geometry'].progress_apply(re_lat_lon)

In [129]:
# 주차장 주변 500m 이내 도로 맵핑
def find_nearest_for_row(row):
    # '주차장 이름'으로 필터링
    filtered_data_cp = search_parking_road_data_cp[search_parking_road_data_cp['주차장 이름'] == row['주차장 이름']]
    
    # 주차장 좌표와 비교할 좌표 (위도, 경도)
    search_point = (row['위도'], row['경도'])
    
    # 좌표 리스트 풀기
    near_points = []
    link_ids = []

    for idx, lat_lon_list in filtered_data_cp.iterrows():
        for lat_lon in lat_lon_list['lat_lon']:
            near_points.append(lat_lon)
            link_ids.append(lat_lon_list['link_id'])  # 해당 좌표에 해당하는 link_id 저장
    
    nearest_points = []

    # 각 좌표에 대해 하버사인 거리 계산
    for lat_lon, link_id in zip(near_points, link_ids):
        # 정확한 거리 계산 (단위: meters)
        distance_m = haversine(search_point, lat_lon, unit=Unit.METERS)
        
        # 500m 이내의 좌표만 필터링
        if distance_m <= 500:
            nearest_points.append({'link_id': link_id, 'distance': int(distance_m)})  # 거리 값을 m 단위로 변환하여 저장

    # 거리 기준으로 오름차순 정렬
    nearest_points = sorted(nearest_points, key=lambda x: x['distance'])

    # 동일한 link_id 중 가장 가까운 거리만 남기기
    unique_nearest_points = {}
    for point in nearest_points:
        link_id = point['link_id']
        if link_id not in unique_nearest_points:
            unique_nearest_points[link_id] = point  # 가장 가까운 거리만 저장

    # unique_nearest_points를 리스트로 변환
    nearest_points = list(unique_nearest_points.values())

    return pd.Series({
        '주차장 이름': row['주차장 이름'],
        '위도': row['위도'],
        '경도': row['경도'],
        'nearest_points': nearest_points
    })


In [130]:
# meta 정의
meta = pd.DataFrame({
    '주차장 이름': pd.Series(dtype='str'),
    '위도': pd.Series(dtype='float'),
    '경도': pd.Series(dtype='float'),
    'nearest_points': pd.Series(dtype='object')
})

# Dask apply 적용
ddf = dd.from_pandas(parking_data, npartitions=6)
ddf = ddf.apply(find_nearest_for_row, axis=1, meta=meta)

# ProgressBar 사용
with ProgressBar():
    result = ddf.compute()

# print(result)

## 주차장별 도로 맵핑 데이터 정리

In [179]:
# 교통 데이터 로드
test_its_traffic_data = pd.read_csv('./data/its_traffic_241001_02/its_traffic_data_2024-10-01_184218.csv', encoding='utf-8-sig')

In [180]:
# 샘플 테스트
nearest_result_search_parking_lot = result[:1]
type(nearest_result_search_parking_lot['nearest_points'])

pandas.core.series.Series

In [181]:
# 주차장 주변 도로 link_id 추출
nearest_points_str = nearest_result_search_parking_lot['nearest_points']
nearest_points = [item_2['link_id'] for item_1 in nearest_points_str for item_2 in eval(item_1)]

In [182]:
# 주차장 주변도로 교통상황 데이터프레임 생성
link_id_df = pd.DataFrame({'linkId': nearest_points})
merged_df = link_id_df.merge(test_its_traffic_data[['linkId', 'speed']], on='linkId', how='left')
merged_df.set_index('linkId', inplace=True)
merged_df.T

linkId,1090040300,1090040800,1090000602,1090000601,1090040200,1090040900,1090000501,1090000502,1090000603,1090000503,...,1090013202,1090042500,1090041400,1090041500,1090042600,1090040001,1090040101,2070000401,2070000301,1090039300
speed,22.0,12.0,23.0,29.0,8.0,22.0,20.0,28.0,28.0,38.5,...,14.0,21.0,26.0,39.0,88.0,13.0,35.0,49.0,27.5,13.0


## 테스트 시각화

In [39]:
test_filtered = search_parking_road_data_cp['주차장 이름'].value_counts() <= 10
filtered_names = test_filtered[test_filtered].index
len(filtered_names)

11

In [38]:
import folium
import webbrowser
import os

# 지도 생성 (서울 근처의 중심 좌표 설정)
map_center = [37.672, 127.06]
mymap = folium.Map(location=map_center, zoom_start=12)

for filtered_name in filtered_names:
    
    parking_lot_name = filtered_name

    test_parking_data = parking_data[parking_data['주차장 이름'] == parking_lot_name]
    test_search_parking_road_data_cp = search_parking_road_data_cp[search_parking_road_data_cp['주차장 이름'] == parking_lot_name]


    folium.Marker(location=[test_parking_data['위도'], test_parking_data['경도']],icon=folium.Icon(color='red')).add_to(mymap)

    for _, row in test_search_parking_road_data_cp.iterrows():
        lat_lon = row['lat_lon']
        for lat, lon in lat_lon:
            folium.Marker(location=[lat, lon],icon=folium.Icon(color='green')).add_to(mymap)

# HTML 파일 저장
map_file = 'test_vword_road_map.html'
mymap.save(map_file)

# 저장한 파일을 기본 브라우저로 열기
webbrowser.open('file://' + os.path.realpath(map_file))

  float(coord)
  if math.isnan(float(coord)):
  return [float(x) for x in coords]


True