In [1]:
%pip install numpy pandas scikit-learn

Collecting numpy
  Downloading numpy-2.3.5-cp312-cp312-win_amd64.whl.metadata (60 kB)
Collecting pandas
  Downloading pandas-2.3.3-cp312-cp312-win_amd64.whl.metadata (19 kB)
Collecting scikit-learn
  Downloading scikit_learn-1.8.0-cp312-cp312-win_amd64.whl.metadata (11 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting scipy>=1.10.0 (from scikit-learn)
  Downloading scipy-1.16.3-cp312-cp312-win_amd64.whl.metadata (60 kB)
Collecting joblib>=1.3.0 (from scikit-learn)
  Downloading joblib-1.5.3-py3-none-any.whl.metadata (5.5 kB)
Collecting threadpoolctl>=3.2.0 (from scikit-learn)
  Downloading threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Downloading numpy-2.3.5-cp312-cp312-win_amd64.whl (12.8 MB)
   ---------------------------------------- 0.0/12.8 MB ? eta -:--:--
   -- ------------------------------------- 0.8/12.8 MB 5.6 MB/s eta 0:00:03
   ---- ----------------------------------- 1.3/12.8 MB 2.9 MB/s eta 0:00:0

In [7]:
import numpy as np
import pandas as pd
from sklearn.neighbors import BallTree

EARTH_RADIUS_M = 6371000  # meters

# -----------------------------
# 1) 유틸
# -----------------------------
def _to_float_series(s: pd.Series) -> pd.Series:
    """숫자 변환 + 결측/이상치 처리"""
    return pd.to_numeric(s, errors="coerce")

def _to_radians(df: pd.DataFrame, lat_col: str, lon_col: str) -> np.ndarray:
    return np.deg2rad(df[[lat_col, lon_col]].to_numpy())

def confidence_from_distance_m(d: float) -> float:
    # 필요하면 너 프로젝트 기준으로 가중치/구간 바꾸면 됨
    if d <= 10:  return 0.95
    if d <= 20:  return 0.80
    if d <= 40:  return 0.60
    return 0.40

# -----------------------------
# 2) 메인: 매핑 테이블 생성
# -----------------------------
def build_crosswalk_signal_map(
    signal_df: pd.DataFrame,
    crosswalk_df: pd.DataFrame,
    R_m: float = 30.0,
    top_k: int = 2,
    # 네 컬럼명 고정
    signal_id_col: str = "신호등관리번호",
    crosswalk_id_col: str = "횡단보도관리번호",
    lat_col: str = "위도",
    lon_col: str = "경도",
    # (선택) 동일 시군구 안에서만 매칭하면 오매칭 감소
    use_admin_filter: bool = True,
    sido_col: str = "시도명",
    sigungu_col: str = "시군구명",
) -> pd.DataFrame:
    sg = signal_df.copy()
    cw = crosswalk_df.copy()

    # --- 필수 컬럼 체크
    required_sg = {signal_id_col, lat_col, lon_col}
    required_cw = {crosswalk_id_col, lat_col, lon_col}
    if use_admin_filter:
        required_sg |= {sido_col, sigungu_col}
        required_cw |= {sido_col, sigungu_col}

    missing_sg = required_sg - set(sg.columns)
    missing_cw = required_cw - set(cw.columns)
    if missing_sg:
        raise ValueError(f"signal_df에 컬럼이 없음: {missing_sg}")
    if missing_cw:
        raise ValueError(f"crosswalk_df에 컬럼이 없음: {missing_cw}")

    # --- 좌표 정제
    sg[lat_col] = _to_float_series(sg[lat_col])
    sg[lon_col] = _to_float_series(sg[lon_col])
    cw[lat_col] = _to_float_series(cw[lat_col])
    cw[lon_col] = _to_float_series(cw[lon_col])

    # --- 좌표 결측 제거
    sg = sg.dropna(subset=[lat_col, lon_col, signal_id_col]).reset_index(drop=True)
    cw = cw.dropna(subset=[lat_col, lon_col, crosswalk_id_col]).reset_index(drop=True)

    # --- (선택) 보행자 신호등만 필터링하면 정확도 상승
    # 신호등구분 값이 어떤 형태인지 데이터마다 달라서 여기서는 보수적으로 주석 처리
    # 예: sg = sg[sg["신호등구분"].str.contains("보행", na=False)].copy()

    maps_all = []

    # =========================
    # A) 행정구역 단위로 쪼개서 매칭 (오매칭↓, 속도↑)
    # =========================
    if use_admin_filter:
        groups = (
            sg[[sido_col, sigungu_col]]
            .drop_duplicates()
            .itertuples(index=False, name=None)
        )
        for sido, sigungu in groups:
            sg_g = sg[(sg[sido_col] == sido) & (sg[sigungu_col] == sigungu)].reset_index(drop=True)
            cw_g = cw[(cw[sido_col] == sido) & (cw[sigungu_col] == sigungu)].reset_index(drop=True)
            if len(cw_g) == 0 or len(sg_g) == 0:
                continue
            maps_all.append(_match_one_group(
                sg_g, cw_g, R_m, top_k,
                signal_id_col, crosswalk_id_col, lat_col, lon_col,
                extra_cols={"시도명": sido, "시군구명": sigungu}
            ))
        if len(maps_all) == 0:
            return pd.DataFrame(columns=[
                signal_id_col, crosswalk_id_col, "match_method", "distance_m", "confidence"
            ] + ([sido_col, sigungu_col] if use_admin_filter else []))
        mapping = pd.concat(maps_all, ignore_index=True)

    # =========================
    # B) 전체 한 번에 매칭 (행정구역 필터 안 쓸 때)
    # =========================
    else:
        mapping = _match_one_group(
            sg, cw, R_m, top_k,
            signal_id_col, crosswalk_id_col, lat_col, lon_col,
            extra_cols=None
        )

    # --- 중복 제거: 동일 (신호등관리번호, 횡단보도관리번호)는 confidence 높은 것만
    mapping = (
        mapping.sort_values([signal_id_col, crosswalk_id_col, "confidence"], ascending=[True, True, False])
               .drop_duplicates([signal_id_col, crosswalk_id_col], keep="first")
               .reset_index(drop=True)
    )

    return mapping


def _match_one_group(
    sg: pd.DataFrame,
    cw: pd.DataFrame,
    R_m: float,
    top_k: int,
    signal_id_col: str,
    crosswalk_id_col: str,
    lat_col: str,
    lon_col: str,
    extra_cols: dict | None = None
) -> pd.DataFrame:

    # =========================
    # 1) Exact match: lat/lon 완전 동일
    # =========================
    exact_merged = sg.merge(
        cw,
        on=[lat_col, lon_col],
        how="inner",
        suffixes=("_signal", "_crosswalk"),
    )

    exact = exact_merged[[signal_id_col, crosswalk_id_col, lat_col, lon_col]].copy()
    if len(exact) > 0:
        # 좌표를 명확히 분리해서 보관
        exact["signal_lat"] = exact[lat_col]
        exact["signal_lon"] = exact[lon_col]
        exact["crosswalk_lat"] = exact[lat_col]
        exact["crosswalk_lon"] = exact[lon_col]

        exact["match_method"] = "EXACT_COORD"
        exact["distance_m"] = 0.0
        exact["confidence"] = 0.99

        if extra_cols:
            for k, v in extra_cols.items():
                exact[k] = v

        # 원래 위도/경도 컬럼은 필요 없으면 제거 가능
        exact = exact.drop(columns=[lat_col, lon_col])

    matched_signal_ids = set(exact[signal_id_col].unique()) if len(exact) > 0 else set()
    sg_rest = sg[~sg[signal_id_col].isin(matched_signal_ids)].reset_index(drop=True)

    # =========================
    # 2) Distance match: 반경 R 내 top_k
    # =========================
    if len(sg_rest) == 0:
        base_cols = [signal_id_col, crosswalk_id_col, "match_method", "distance_m", "confidence",
                     "signal_lat", "signal_lon", "crosswalk_lat", "crosswalk_lon"]
        if extra_cols:
            base_cols += list(extra_cols.keys())
        return exact[base_cols]

    cw_rad = _to_radians(cw, lat_col, lon_col)
    sg_rad = _to_radians(sg_rest, lat_col, lon_col)

    tree = BallTree(cw_rad, metric="haversine")
    radius_rad = R_m / EARTH_RADIUS_M

    ind_array, dist_array = tree.query_radius(
        sg_rad,
        r=radius_rad,
        return_distance=True,
        sort_results=True,
    )

    cw_ids = cw[crosswalk_id_col].to_numpy()
    sg_ids = sg_rest[signal_id_col].to_numpy()

    # 좌표 배열(인덱싱으로 바로 가져오기)
    cw_lats = cw[lat_col].to_numpy()
    cw_lons = cw[lon_col].to_numpy()
    sg_lats = sg_rest[lat_col].to_numpy()
    sg_lons = sg_rest[lon_col].to_numpy()

    rows = []
    for i, (cw_indices, cw_dist_rad) in enumerate(zip(ind_array, dist_array)):
        if len(cw_indices) == 0:
            continue

        cw_indices = cw_indices[:top_k]
        cw_dist_rad = cw_dist_rad[:top_k]
        d_m = cw_dist_rad * EARTH_RADIUS_M

        for idx, dist_m in zip(cw_indices, d_m):
            dist_m = float(dist_m)
            rows.append({
                signal_id_col: sg_ids[i],
                crosswalk_id_col: cw_ids[idx],

                "signal_lat": float(sg_lats[i]),
                "signal_lon": float(sg_lons[i]),
                "crosswalk_lat": float(cw_lats[idx]),
                "crosswalk_lon": float(cw_lons[idx]),

                "match_method": "DISTANCE",
                "distance_m": dist_m,
                "confidence": float(confidence_from_distance_m(dist_m)),
            })

    dist_map = pd.DataFrame(rows)
    if extra_cols and len(dist_map) > 0:
        for k, v in extra_cols.items():
            dist_map[k] = v

    # =========================
    # 합치기
    # =========================
    if len(exact) > 0 and len(dist_map) > 0:
        out = pd.concat([exact, dist_map], ignore_index=True)
    elif len(exact) > 0:
        out = exact
    else:
        out = dist_map

    return out


# -----------------------------
# 3) 사용 예시
# -----------------------------
# if __name__ == "__main__":
    # 예: CSV 로드
    # signal_df = pd.read_csv("signals.csv", encoding="utf-8")
    # crosswalk_df = pd.read_csv("crosswalks.csv", encoding="utf-8")

    # mapping_df = build_crosswalk_signal_map(
    #     signal_df, crosswalk_df,
    #     R_m=30, top_k=2,
    #     use_admin_filter=True
    # )

    # mapping_df.to_csv("crosswalk_signal_map.csv", index=False, encoding="utf-8-sig")
#    pass


In [8]:
# 사용 예시
crosswalk_df = pd.read_csv('crosswalks.csv', encoding="cp949" )
signal_df = pd.read_csv('signals.csv', encoding="cp949")
mapping_df = build_crosswalk_signal_map(
         signal_df, crosswalk_df,
         R_m=30, top_k=2,
         use_admin_filter=True
     )
mapping_df.to_csv("crosswalk_signal_map.csv", index=False, encoding="utf-8-sig")

  signal_df = pd.read_csv('signals.csv', encoding="cp949")
