In [None]:
# --- 1. 라이브러리 설치 및 임포트 ---

# .env 파일을 사용하기 위한 라이브러리를 설치합니다.
#!pip install python-dotenv

import pandas as pd
import requests
import time
from tqdm import tqdm
import os
from dotenv import load_dotenv # .env 파일을 읽기 위한 라이브러리
from datetime import datetime
import re

# tqdm 라이브러리와 pandas의 연동을 설정합니다.
tqdm.pandas()

In [None]:
# --- 2. 오류 메시지 출력 함수 & 로그(출력문) 저장 함수 ---

# train.csv 인지 test.csv 인지 구분하기 위한 변수
filename = 'train'

# 오류 메시지를 빨간색으로 출력하는 함수 정의
def print_error(text):
    # ANSI 이스케이프 코드: '\033[91m'는 빨간색 출력, '\033[0m'는 색상 초기화
    print(f"\033[91m{text}\033[0m")


# 로그 파일이 저장될 디렉토리 경로 설정
LOG_DIR = "../../data/logs/geocoding_logs/"

# 로그 디렉토리가 존재하지 않으면 생성
os.makedirs(LOG_DIR, exist_ok=True)

# 로그 파일명: 실행 시간 기준으로 생성 (예: geocoding_run_20250710_112530.log)
log_filename = f"{filename}_coordinate_geocoding_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

# 전체 로그 파일 경로 구성
LOG_PATH = os.path.join(LOG_DIR, log_filename)


# 로그를 파일로 저장하고, 필요 시 콘솔에도 출력하는 함수
def write_log(message, log_path=LOG_PATH, print_also=True):
    # 로그 파일을 append 모드로 열고 메시지를 시간과 함께 기록
    with open(log_path, "a", encoding="utf-8") as f:
        f.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | {message}\n")
    
    # 옵션에 따라 콘솔에도 출력 (기본값: True)
    if print_also:
        print(message)

In [None]:
# --- 3. .env 파일에서 API 키 로드 ---

# .env 파일에 정의된 환경 변수를 현재 세션으로 로드합니다.
load_dotenv('.env')

# API 키 관리를 위한 전역 변수
key_idx = 0
all_keys_exhausted = False

# KAKAO_REST_API_KEY1부터 KAKAO_REST_API_KEY10까지의 키를 저장할 리스트
KAKAO_KEYS = []

# 여러 개의 API 키를 리스트로 불러오기
for i in range(1, 11):  # 최대 10개 지원 (필요시 숫자 변경)
    # os.getenv() 함수를 사용하여 "KAKAO_REST_API_KEY" 라는 이름의 환경 변수 값을 가져옵니다.
    key = os.getenv(f"KAKAO_REST_API_KEY{i}")
    if key: KAKAO_KEYS.append(key)

if not KAKAO_KEYS:
    print_error("[오류] '.env 파일에 KAKAO_REST_API_KEY1~N 형식으로 키가 필요합니다.")
    write_log("[오류] '.env 파일에 KAKAO_REST_API_KEY1~N 형식으로 키가 필요합니다.",print_also=False)
else:
    write_log(f"총 {len(KAKAO_KEYS)}개의 API 키 로드 완료.")

In [None]:
# --- 4. 경로 설정 및 데이터 로드후 좌표 결측치 개수 확인 ---

# 원본 학습 데이터 경로 지정
TRAIN_DATA_PATH = f'../../data/raw/{filename}.csv'

# 처리된 데이터가 저장될 디렉토리 경로
OUTPUT_DIR = '../../data/processed/geocoding/'

NO_SEARCH_FILENAME = f'no_search_geocoding_{filename}.csv'

# 중간 저장 체크포인트 파일명
CHECKPOINT_FILENAME = f'{filename}_geocoded_checkpoint.csv'

# 최종 결과 파일명
FINAL_FILENAME = f'{filename}_geocoded.csv'

# 각 파일의 전체 경로 구성
NO_SEARCH_PATH = os.path.join(OUTPUT_DIR, NO_SEARCH_FILENAME)
CHECKPOINT_PATH = os.path.join(OUTPUT_DIR, CHECKPOINT_FILENAME)
FINAL_PATH = os.path.join(OUTPUT_DIR, FINAL_FILENAME)

# --- 카카오 API URL 상수 정의 ---
KAKAO_GEOCODE_URL = "https://dapi.kakao.com/v2/local/search/address.json"

# 데이터 로드 시작 메시지 출력
write_log(f"'{TRAIN_DATA_PATH}'에서 원본 데이터를 로드합니다...")

try:
    # 원본 CSV 파일을 pandas로 읽어옴
    train_df = pd.read_csv(TRAIN_DATA_PATH, low_memory=False)
    write_log("데이터 로드 완료.")
    
    # --- 체크포인트 파일이 존재할 경우, 좌표 데이터 업데이트 ---
    if os.path.exists(CHECKPOINT_PATH):
        write_log(f"'{CHECKPOINT_PATH}' 에서 체크포인트 파일을 발견했습니다. 데이터를 불러와 좌표를 업데이트합니다.")
        try:
            # 체크포인트 파일을 불러옵니다.
            checkpoint_df = pd.read_csv(CHECKPOINT_PATH, low_memory=False)
            
            # 원본 데이터프레임의 인덱스와 체크포인트의 인덱스가 동일하다고 가정하고,
            # '좌표X'와 '좌표Y' 컬럼을 체크포인트 데이터로 덮어씁니다.
            # 이렇게 하면 이전에 지오코딩된 결과가 반영된 상태로 작업을 재개할 수 있습니다.
            if '좌표X' in checkpoint_df.columns and '좌표Y' in checkpoint_df.columns:
                train_df['좌표X'] = checkpoint_df['좌표X']
                train_df['좌표Y'] = checkpoint_df['좌표Y']
                write_log("체크포인트 파일의 좌표 데이터로 업데이트 완료.")
            else:
                print_error("[오류] 체크포인트 파일에 '좌표X' 또는 '좌표Y' 컬럼이 없습니다.")
                write_log("[오류] 체크포인트 파일에 '좌표X' 또는 '좌표Y' 컬럼이 없어 업데이트에 실패했습니다.", print_also=False)

        except Exception as e:
            print_error(f"[오류] 체크포인트 파일 로딩 또는 처리 중 오류 발생: {e}")
            write_log(f"[오류] 체크포인트 파일 처리 중 오류: {e}", print_also=False)
    
except FileNotFoundError:
    # 파일이 존재하지 않을 경우 예외 처리
    print_error(f"오류: 'except FileNotFoundError:' / '{TRAIN_DATA_PATH}' 파일을 찾을 수 없습니다.")
    write_log(f"오류: '{TRAIN_DATA_PATH}' 파일을 찾을 수 없습니다.", print_also=False)
    train_df = None
    
# 좌표X, 좌표Y 결측치 개수 출력 (isnull을 먼저 사용하고 isnull로 조회되지 않는 결측치 모두 조회)
if train_df is not None:
    missing_x_count = train_df['좌표X'].isnull().sum()
    missing_y_count = train_df['좌표Y'].isnull().sum()
    write_log(f"좌표X 결측치 개수: {missing_x_count}")
    write_log(f"좌표Y 결측치 개수: {missing_y_count}")

    # 좌표X, 좌표Y 모두 결측치인 행 개수 출력
    missing_both_count = train_df[(train_df['좌표X'].isnull()) & (train_df['좌표Y'].isnull())].shape[0]
    write_log(f"좌표X와 좌표Y 모두 결측치인 행 개수: {missing_both_count}")


In [None]:
# --- 5. 주소 생성 함수 정의 (우선순위 변경) ---

def create_jibeon_address(row):
    """
    '시군구'와 '번지'를 조합하여 번지 주소를 생성합니다.
    '번지'가 유효한 문자열일 경우에만 주소를 반환하고, 아니면 None을 반환합니다.
    """
    if isinstance(row['번지'], str) and row['번지'].strip():
        return f"{row['시군구']} {row['번지']}"
    return None

def create_doro_address(row):
    """
    '시군구'와 '도로명'을 조합하여 도로명 주소를 생성합니다.
    '도로명'이 유효한 문자열일 경우에만 주소를 반환하고, 아니면 None을 반환합니다.
    """
    if isinstance(row['도로명'], str) and row['도로명'].strip():
        return f"{row['시군구']} {row['도로명']}"
    return None

def remove_dong(address):
    """
    주소 문자열에서 '동/가/로'로 끝나는 세 번째 부분을 제거합니다.
    이 행정구역 명칭이 주소 검색을 방해하는 경우가 있기 때문입니다.
    예: '서울특별시 강남구 개포동 언주로 21' → '서울특별시 강남구 언주로 21'
    """
    if not isinstance(address, str):
        return address
        
    try:
        parts = address.split()
        # 도로명 주소의 'OO길'을 제거하지 않도록 '동', '가', '로'만 대상으로 한정합니다.
        if len(parts) > 2 and (parts[2].endswith(('동', '가', '로'))):
            return ' '.join(parts[:2] + parts[3:])
    except Exception as e:
        write_log(f"[오류] 주소 변형 중 오류 발생: {e}")
        
    # 변형에 실패하거나 조건에 맞지 않으면 원본 주소 반환
    return address

In [None]:
# --- 6. 지오코딩 및 주소 생성 함수 정의 (검색 로직 변경) ---

# API를 호출하여 좌표를 가져오는 내부 함수
def get_coordinates_from_api(address, key_index):
    global KAKAO_KEYS
    api_key = KAKAO_KEYS[key_index]
    headers = {"Authorization": f"KakaoAK {api_key}"}
    params = {"query": address}

    try:
        response = requests.get(KAKAO_GEOCODE_URL, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        if data['documents']:
            return float(data['documents'][0]['x']), float(data['documents'][0]['y'])
        return None
    
    except requests.exceptions.HTTPError as e:
        if e.response.status_code in [403, 429]: # Quota Exceeded or Forbidden
            return "CHANGE_KEY"
        write_log(f"[실패] API 요청 실패 (HTTP 오류): {e}, 주소: '{address}'")
        return "API_ERROR"
        
    except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
        write_log(f"[네트워크 오류] 인터넷 연결이 끊겼거나 응답 시간이 초과되었습니다. 주소: '{address}'")
        time.sleep(5)
        return "RETRY"
        
    except Exception as e:
        write_log(f"[실패] 알 수 없는 오류 발생: {e}, 주소: '{address}'")
        return "API_ERROR"

# 여러 API 키와 주소 유형을 순환하며 지오코딩을 시도하는 메인 함수
def geocode_address(address_tuple):
    global key_idx, all_keys_exhausted
    
    jibeon_address, doro_address = address_tuple
    
    # 주소가 모두 없는 경우
    if not jibeon_address and not doro_address:
        return None

    start_key_idx = key_idx
    retry_attempts = 0

    # 시도할 주소 리스트 (우선순위: 번지 -> 도로명 -> '동'제거 도로명)
    addresses_to_try = []
    if jibeon_address:
        addresses_to_try.append(("번지", jibeon_address))
    if doro_address:
        addresses_to_try.append(("도로명", doro_address))
        short_doro_address = remove_dong(doro_address)
        if short_doro_address != doro_address:
            addresses_to_try.append(("'동'제거 도로명", short_doro_address))


    # 각 주소 유형에 대해 API 호출 시도
    for address_type, address in addresses_to_try:
        while True:
            if all_keys_exhausted:
                return None

            result = get_coordinates_from_api(address, key_idx)

            if result == "CHANGE_KEY":
                key_idx = (key_idx + 1) % len(KAKAO_KEYS)
                if key_idx == start_key_idx:
                    all_keys_exhausted = True
                    write_log("모든 API 키가 소진되었습니다.")
                    break
                time.sleep(1)
                continue

            elif result == "RETRY":
                retry_attempts += 1
                if retry_attempts >= 5:
                    write_log(f"[재시도 실패] 주소: '{address}' - 네트워크 오류가 계속 발생하여 건너뜁니다.")
                    break 
                time.sleep(5)
                continue

            elif result == "API_ERROR":
                break # 다음 주소 유형으로 넘어감

            # 좌표를 성공적으로 찾은 경우
            elif isinstance(result, tuple):
                write_log(f"[성공] '{address}' ({address_type}) 검색으로 좌표를 찾았습니다.", print_also=False)
                return result

            # result가 None일 경우 (API는 성공했으나 검색 결과가 없는 경우)
            else:
                write_log(f"[실패] API 검색 결과 없음: '{address}' ({address_type})", print_also=False)
                break # 다음 주소 유형으로 넘어감
    
    # 모든 시도가 실패한 경우
    return None

In [None]:
# --- 7. 결측치 채우기 실행 ---

if train_df is not None and KAKAO_KEYS:
    df_to_geocode = train_df[train_df['좌표X'].isnull()].copy()
    
    if not df_to_geocode.empty:
        write_log(f"\\n총 {len(df_to_geocode)}개의 좌표 결측치에 대한 작업을 시작합니다...")

        # 7.1. 주소 생성 및 주소 없는 행 분리/저장
        df_to_geocode['jibeon_address'] = df_to_geocode.apply(create_jibeon_address, axis=1)
        df_to_geocode['doro_address'] = df_to_geocode.apply(create_doro_address, axis=1)

        # 번지와 도로명 주소가 모두 없는 행을 식별
        no_address_mask = df_to_geocode['jibeon_address'].isnull() & df_to_geocode['doro_address'].isnull()
        no_address_rows = df_to_geocode[no_address_mask]

        if not no_address_rows.empty:
            os.makedirs(OUTPUT_DIR, exist_ok=True)
            # 주소 없는 행을 별도 파일로 저장 (임시 컬럼 제외)
            no_address_rows.drop(columns=['jibeon_address', 'doro_address']).to_csv(NO_SEARCH_PATH, index=False)
            write_log(f"{len(no_address_rows)}개의 주소 없는 행을 '{NO_SEARCH_PATH}'에 저장했습니다.")
        
        # 주소가 하나라도 있는 행만 남겨서 지오코딩 진행
        df_to_geocode = df_to_geocode[~no_address_mask].copy()
        
        # 7.2. 고유 주소 조합에 대한 API 호출 (캐싱)
        df_to_geocode['address_tuple'] = list(zip(df_to_geocode['jibeon_address'], df_to_geocode['doro_address']))
        unique_address_tuples = df_to_geocode['address_tuple'].unique()
        
        write_log(f"실제 API 호출 대상 고유 주소 조합 개수: {len(unique_address_tuples)}")
        
        address_cache = {}
        processed_count = 0

        for addr_tuple in tqdm(unique_address_tuples, desc="Geocoding Progress"):
            if all_keys_exhausted:
                print_error("\\n!! 모든 키 사용량 소진: 작업 중단 !!")
                write_log("\\n모든 키 사용량 소진: 작업 중단", print_also=False)
                break
            
            coords = geocode_address(addr_tuple)
            if coords:
                address_cache[addr_tuple] = coords
            
            processed_count += 1
            
            # 7.3. 1000개 단위로 임시 저장 (체크포인트)
            if processed_count > 0 and processed_count % 1000 == 0:
                write_log(f"\\n--- {processed_count}개 처리 완료. 체크포인트를 저장합니다 ---")
                
                # 캐시된 주소 조합을 기반으로 좌표 매핑
                temp_coords = df_to_geocode['address_tuple'].map(address_cache)
                valid_coords_map = temp_coords.dropna()

                # 원본 train_df의 해당 인덱스에 좌표 반영
                train_df.loc[valid_coords_map.index, '좌표X'] = [x[0] for x in valid_coords_map]
                train_df.loc[valid_coords_map.index, '좌표Y'] = [x[1] for x in valid_coords_map]

                train_df.to_csv(CHECKPOINT_PATH, index=False)
                write_log(f"체크포인트 저장 완료: '{CHECKPOINT_PATH}'")

        # 7.4. 캐싱된 결과 매핑 및 최종 업데이트
        write_log("\\n최종 결과 업데이트를 시작합니다...")
        
        final_coords_map = df_to_geocode['address_tuple'].map(address_cache)
        valid_final_coords = final_coords_map.dropna()

        train_df.loc[valid_final_coords.index, '좌표X'] = [x[0] for x in valid_final_coords]
        train_df.loc[valid_final_coords.index, '좌표Y'] = [x[1] for x in valid_final_coords]
        write_log("결측치 업데이트 완료!")
        
        # 7.5. 최종 검색 실패 행 식별
        cached_tuples = set(address_cache.keys())
        all_geocoding_tuples = set(unique_address_tuples)
        failed_tuples = all_geocoding_tuples - cached_tuples
        
        if failed_tuples:
            failed_rows = df_to_geocode[df_to_geocode['address_tuple'].isin(failed_tuples)].copy()
            # 임시로 사용한 컬럼 제거
            failed_rows.drop(columns=['jibeon_address', 'doro_address', 'address_tuple'], inplace=True)
            write_log(f"총 {len(failed_rows)}개 주소의 좌표 검색에 최종 실패했습니다.")
        else:
            # 실패한 행이 없을 경우 failed_rows를 빈 데이터프레임으로 초기화
            failed_rows = pd.DataFrame()

    else:
        write_log("\\n좌표에 결측치가 없어 추가 작업을 진행하지 않습니다.")
        # failed_rows 변수가 정의되지 않는 경우를 대비해 초기화
        failed_rows = pd.DataFrame()

In [None]:
# --- 8. 결과 확인 및 저장 ---

# train_df가 정상적으로 존재할 경우에만 저장 진행
if train_df is not None:
    if not failed_rows.empty:
        # 1. 실패 행을 별도 CSV 파일로 저장
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        failed_rows.to_csv(NO_SEARCH_PATH, index=False)
        write_log(f"검색 실패 행 {len(failed_rows)}개를 '{NO_SEARCH_PATH}'에 저장했습니다.")
        
        # 2. 원본 데이터프레임에서 실패 행 삭제
        train_df.drop(index=failed_rows.index, inplace=True)
        write_log(f"최종 데이터에서 검색 실패 행 {len(failed_rows)}개를 삭제했습니다.")
    
    # 좌표X의 남은 결측치 개수를 최종 확인
    final_missing_count = train_df['좌표X'].isnull().sum()
    write_log(f"\n작업 후 남은 '좌표X'의 결측치 개수: {final_missing_count}")

    # 저장 디렉토리가 없을 경우 생성
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # 전체 train_df를 최종 결과 파일로 저장
    train_df.to_csv(FINAL_PATH, index=False)
    write_log(f"모든 작업이 완료되었습니다. 최종 데이터가 '{FINAL_PATH}' 경로에 저장되었습니다.")

else:
    # train_df가 None이면 에러 메시지 출력 후 저장하지 않음
    print_error("\n[작업 중단] 데이터 파일이 없어 최종 저장을 진행할 수 없습니다.")
    write_log("[작업 중단] 데이터 파일이 없어 최종 저장을 진행할 수 없습니다.", print_also=False)