In [1]:
import pandas as pd
import os
import asyncio
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy.ext.asyncio import AsyncSession
from pathlib import Path
import sys

current_file_path = Path(os.getcwd()) # Jupyter Notebook에서는 os.getcwd()가 현재 노트북 파일이 위치한 디렉토리를 반환합니다.
project_root = current_file_path.parent.parent # app/services/ -> app -> project_root
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from app.models.customs import CountryMapping, ExportImportStatByCountry
from app.models.shared_models import COUNTRY_INFO
from app.repositories.customs_repository import ExportImportStatByCountryRepository
from app.db.base import get_main_db
from app.core.logger import get_logger

logger = get_logger()

In [2]:
async def _validate_file(
        file_path: str
)-> bool:
    return Path(file_path).exists()

In [3]:
def _find_header_idx(df: pd.DataFrame) -> Optional[int]:
    """
    DataFrame에서 헤더 행을 찾기
    기간 및 국가 컬럼이 있는 행을 찾음
    """
    for idx in range(min(20, len(df))):
        row = df.iloc[idx]
        if len(row) >= 2:
            if (str(row.iloc[0]).strip() == "기간" and 
                str(row.iloc[1]).strip() == "국가"):
                return idx
    return None

In [4]:
async def _read_excel_file(
        file_path: str
)-> pd.DataFrame:
    try:

        # 첫 번째 시트 읽기
        df = pd.read_excel(file_path, sheet_name=0, header=None)

        # 데이터 시작 위치 찾기 (헤더가 있는 행)
        header_idx = _find_header_idx(df)

        if header_idx is None:
            raise ValueError("헤더 행을 찾을 수 없습니다.")

        df = pd.read_excel(file_path, sheet_name=0, header=header_idx)

        #총계 행 제거 (첫 번째 데이터 행이 보통 총계)
        df = df[df.iloc[:, 0] != "총계"].copy()

        logger.info(f"엑셀 파일 읽기 완료: {len(df)}행")
        return df

    except Exception as e:
        logger.error(f"Error reading excel file: {e}")
        raise e

In [5]:
async def _preprocess_data(
        df: pd.DataFrame
)-> pd.DataFrame:
    try:
        #컬럼명 정리
        df.columns = df.columns.str.strip()

        # 필수 컬럼 확인
        required_cols = ['기간', '국가', '수출 금액', '수입 금액', '무역수지'] # 필수컬럼 변수명 저장
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            raise ValueError(f"필수 컬럼이 누락되었습니다: {missing_cols}")
        

        # 기간 데이터 정리(년도만 추출)
        df['기간'] = df['기간'].astype(str).str.extract(r'(\d{4})')
        
        logger.info(f"데이터 전처리 완료: {len(df)}행")

        return df

    except Exception as e:
        logger.error(f"Error preprocessing data: {e}")
        raise e

In [6]:
async def _transform_country_name(
        df: pd.DataFrame,
        repository: ExportImportStatByCountryRepository
)-> pd.DataFrame:
    try:
        # 관세청 국가명 -> ISO 코드 매핑
        country_names = await repository.get_country_name_mapping()
        # ISO 코드 -> 무보 국가명 매핑
        country_iso_names = await repository.get_country_iso_mapping()


        # 관세청 국가명 -> ISO 코드 변환 함수 정의
        
        def map_korean_country_to_iso(korean_country_name: str)-> str:
            korean_country_name = korean_country_name.strip()
            return country_names.get(korean_country_name, "")
        
        # ISO 코드 -> 무보 국가명으로 변환
        def map_iso_to_mubo_country(iso_code: str)-> str:
            iso_code = iso_code.strip()
            return country_iso_names.get(iso_code, "")
        
        df['ISO코드'] = df['국가'].apply(map_korean_country_to_iso)
        df['국가'] = df['ISO코드'].apply(map_iso_to_mubo_country)

        # 매핑되지 않은 국가(ISO 코드가 None이거나 국가명이 None인 경우) 제거
        df = df[df['ISO코드'].notnull() & df['국가'].notnull()].reset_index(drop=True)
        
        logger.info(f"국가명 변환 완료: {len(df)}행")
        return df

    except Exception as e:
        logger.error(f"Error transforming country name: {e}")
        raise e




In [7]:
async def _transform_iso_mapping(
        df: pd.DataFrame,
        repository: ExportImportStatByCountryRepository
)-> pd.DataFrame:
    try:
        country_iso_names = await repository.get_country_iso_mapping()
        logger.info(f"{country_iso_names}")

        # 국가명 -> ISO 코드 변환 함수 정의
        def map_country_name(iso_code: str)-> str:
            iso_code = iso_code.strip()
            return country_iso_names.get(iso_code, "")
        
        df['국가'] = df['표준국가약식코드'].apply(map_country_name)
        
        logger.info(f"ISO -> 국가명 변환 완료: {len(df)}행")
        return df

    except Exception as e:
        logger.error(f"Error transforming country name: {e}")
        raise e

In [14]:
async def _create_final_output(
        df: pd.DataFrame
)-> pd.DataFrame:
    try:
        # 최종 형태로 데이터 변환
        final_df = df.copy()
        
        # 상수로..
        final_df = final_df.rename(columns={
            '기간': 'impexp_year',
            'ISO코드': 'impexp_nation_code',
            '국가': 'impexp_nation_nm',
            '수출 금액': 'impexp_exp_money',
            '수입 금액': 'impexp_imp_money',
            '무역수지': 'impexp_trade_rate_money'
        })

        # 필수 컬럼 확인 (모델 정의 참고)
        
        # 정렬
        final_df = final_df.sort_values(by=['impexp_year','impexp_nation_nm'])

        return final_df
    except Exception as e:
        logger.error(f"Error creating final output: {e}")
        raise e

In [9]:
async def process_data(
        file_path: str,
        file_name: str,
        dbprsr: AsyncSession,
        replace_all: bool = True,
)-> pd.DataFrame:
    try :
        file_path = Path(file_path,file_name)
        logger.info(f"관세청 수출입규모 파일 처리 시작:{file_path}")

        # repository 초기화
        expimp_repository = ExportImportStatByCountryRepository(dbprsr)

        # 1. 파일 유효성 검사
        if not await _validate_file(file_path):
            logger.error(f"파일이 존재하지 않습니다: {file_path}")
            return ValueError(f"파일이 존재하지 않습니다: {file_path}")

        # 2. 엑셀 파일 읽기
        raw_df = await _read_excel_file(file_path)

        # 3. 데이터 전처리
        processed_df = await _preprocess_data(raw_df)

        # 4. 국가명 -> ISO 코드 변환
        transformed_df = await _transform_country_name(processed_df,expimp_repository)

        # 5. 최종 형태로 변환 및 파일 생성
        final_df = await _create_final_output(transformed_df)

        # 6. 데이터 저장
        # if replace_all:
        #     await expimp_repository.replace_all_data(final_df)
        # else:
        #     await expimp_repository.insert_dataframe(final_df)

        return final_df
    except Exception as e:
        logger.error(f"Error processing {file_name}: {e}")
        raise e

In [10]:
file_path = '/appdata/storage/research/original'
file_name = '6-1. 수출입 실적(국가별)_20240613.xlsx'

In [11]:
df = await _read_excel_file(Path(file_path, file_name))
df

[32m2025-07-03 10:15:57.036[0m | [1mINFO    [0m | [36m__main__[0m:[36m_read_excel_file[0m:[36m20[0m - [1m엑셀 파일 읽기 완료: 1235행[0m


Unnamed: 0,기간,국가,수출 건수,수출 금액,수입 건수,수입 금액,무역수지
1,2019,중국,2773940,136202533,3777444,107228736,28973797
2,2019,미국,1067330,73343898,12602090,61878564,11465334
3,2019,베트남,661342,48177749,421373,21071557,27106192
4,2019,홍콩,296510,31912876,255752,1779542,30133334
5,2019,일본,1242119,28420213,1800272,47580853,-19160640
...,...,...,...,...,...,...,...
1231,2023,크리스마스 아일랜드,0,0,9,5,-5
1232,2023,피트카이른,0,0,2,0,0
1233,2023,허드 앤 맥도날드 군도,0,0,1,183,-183
1234,2023,보빗군도,0,0,1,8,-8


In [12]:
df = await _preprocess_data(df)
df

[32m2025-07-03 10:15:57.056[0m | [1mINFO    [0m | [36m__main__[0m:[36m_preprocess_data[0m:[36m18[0m - [1m데이터 전처리 완료: 1235행[0m


Unnamed: 0,기간,국가,수출 건수,수출 금액,수입 건수,수입 금액,무역수지
1,2019,중국,2773940,136202533,3777444,107228736,28973797
2,2019,미국,1067330,73343898,12602090,61878564,11465334
3,2019,베트남,661342,48177749,421373,21071557,27106192
4,2019,홍콩,296510,31912876,255752,1779542,30133334
5,2019,일본,1242119,28420213,1800272,47580853,-19160640
...,...,...,...,...,...,...,...
1231,2023,크리스마스 아일랜드,0,0,9,5,-5
1232,2023,피트카이른,0,0,2,0,0
1233,2023,허드 앤 맥도날드 군도,0,0,1,183,-183
1234,2023,보빗군도,0,0,1,8,-8


In [13]:


async def main():
    async for dbprsr in get_main_db():
        expimp_repository = ExportImportStatByCountryRepository(dbprsr)
        transformed_df = await _transform_country_name(df, expimp_repository)
        break
        
    return transformed_df
    
transformed_df = await main()
transformed_df[transformed_df['국가'] == '피트케안군도']

[32m2025-07-03 10:15:57.956[0m | [1mINFO    [0m | [36m__main__[0m:[36m_transform_country_name[0m:[36m29[0m - [1m국가명 변환 완료: 1235행[0m


Unnamed: 0,기간,국가,수출 건수,수출 금액,수입 건수,수입 금액,무역수지,ISO코드
240,2019,피트케안군도,0,0,5,5,-5,PN
493,2020,피트케안군도,0,0,1,1,-1,PN
730,2021,피트케안군도,1,0,1,4,-4,PN
986,2022,피트케안군도,0,0,1,3,-3,PN
1231,2023,피트케안군도,0,0,2,0,0,PN


In [16]:
final_df = await _create_final_output(transformed_df)
final_df

Unnamed: 0,impexp_year,impexp_nation_nm,수출 건수,impexp_exp_money,수입 건수,impexp_imp_money,impexp_trade_rate_money,impexp_nation_code
9,2019,,154656,8843499,136840,9279940,-436441,
19,2019,,66552,5297808,38989,1189057,4108751,
32,2019,,30165,2338438,787,19708,2318730,
106,2019,,1462,92323,2684,57224,35099,
118,2019,,500,54316,6489,2326881,-2272565,ETC
...,...,...,...,...,...,...,...,...
1001,2023,필리핀,398818,9008624,80313,4645089,4363535,PH
1232,2023,허드섬,0,0,1,183,-183,HM
1006,2023,헝가리,31497,6787435,23111,828869,5958566,HU
995,2023,호주,204383,17791354,588448,32822955,-15031601,AU
