In [1]:
import pandas as pd
import numpy as np
import requests
import xmltodict
import json
import traceback
from pandas.tseries.offsets import MonthEnd
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import platform
from sqlalchemy import create_engine
import pymysql
from tqdm import tqdm

In [2]:
# 한글 폰트 설정
if platform.system() == 'Windows':
    plt.rc('font', family='Malgun Gothic')  # 윈도우 기본 한글 폰트
elif platform.system() == 'Darwin':  # macOS
    plt.rc('font', family='AppleGothic')
else:  # Linux (예: Google Colab)
    plt.rc('font', family='NanumGothic')

# 마이너스 깨짐 방지
plt.rcParams['axes.unicode_minus'] = False

In [3]:
def get_month_date(start, end, freq):
    mydates = pd.period_range(start, end, freq=freq)
    return mydates

def get_period_list(start, end, last_month):

    cut_num = last_month-12
    end = end+1

    period_list = []
    for y in range(start, end):
        for m in range(1,13):
            if len(str(m)) < 2:
                m = str(0)+ str(m)
            else:
                m = str(m)
            #print(str(y)+str(m))
            ym =str(y)+str(m)
            period_list.append(ym)
    period_list = period_list[:cut_num]
    return period_list

## 품목별 수출입실적 목록 검색
## 1년 12개월만 한번에 받을 수 있음
# start =시작일, end = 마지막날, 1년기간 12, 반년 6, 분기 3, hscode = 6자리 혹은 10자리

def get_country_export_by_item(start_list, end_list, hs_code):
    df_list = []
    for i, start in enumerate(start_list):
        end = end_list[i]
        service_key = '2o6NG3ixxDgGQ9S4dWUgsMac9WlxfX46%2BJvFRsAlsXQ6xVi6CZewvNJvbHd4S7exkWwt3YWoKSdwvUNb46kSTQ%3D%3D'
        url = f'https://apis.data.go.kr/1220000/Itemtrade/getItemtradeList?serviceKey={service_key}&strtYymm={start}&endYymm={end}&hsSgn={hs_code}'

        try:
            req = requests.get(url)
            json_dict = json.loads(json.dumps(xmltodict.parse(req.text), indent=4))
            items = json_dict['response']['body']['items']
            if items is None:
                print(f"⚠️ No data for HS {hs_code} from {start} to {end}")
                continue
            target_df = pd.DataFrame(items['item'])
            df_list.append(target_df)
        except Exception as e:
            print(f"❌ API 요청 실패: {hs_code} ({start} ~ {end})")
            print(traceback.format_exc())
            continue

    if df_list:
        return pd.concat(df_list, ignore_index=True)
    else:
        return pd.DataFrame()


def batch_export_by_hscode(cd_list, start_list, end_list, batch_size=20, region_name='전국'):
    all_export_q = []
    all_export_m = []
    error_list = []

    for i in range(0, len(cd_list), batch_size):
        hs_code_list = cd_list[i:i+batch_size]
        data_by_hscode = {}

        for hs_code in hs_code_list:
            try:
                target_df = get_country_export_by_item(start_list, end_list, hs_code)
                if target_df.empty:
                    print(f"⚠️ {hs_code}의 유효한 데이터가 없습니다.")
                    continue

                target_df = target_df[target_df['year'] != '총계'].copy()
                target_df['root_hs_code'] = hs_code
                data_by_hscode[hs_code] = target_df

            except Exception as e:
                print(f"❌ {hs_code} 처리 중 오류 발생:")
                print(traceback.format_exc())
                error_list.append(hs_code)
                continue

        if not data_by_hscode:
            print("⚠️ 병합할 데이터가 없습니다. 건너뜁니다.")
            continue

        try:
            merged_df = pd.concat(data_by_hscode).reset_index(drop=True)

            # 날짜 처리
            merged_df['new_date'] = pd.to_datetime(merged_df['year'].str.replace('.', '-')) + MonthEnd(0)
            merged_df.set_index('new_date', inplace=True)
            merged_df['new_year'] = merged_df.index.year
            merged_df['new_quarter'] = merged_df.index.quarter
            merged_df['new_month'] = merged_df.index.month

            # 숫자형 컬럼 변환
            numeric_cols = ['balPayments', 'expDlr', 'expWgt', 'impDlr', 'impWgt']
            for col in numeric_cols:
                if col in merged_df.columns:
                    merged_df[col] = pd.to_numeric(merged_df[col], errors='coerce')
                else:
                    merged_df[col] = 0.0

            # 정리된 컬럼
            clean_df = merged_df[['hsCode', 'new_year', 'new_quarter', 'new_month', 'statKor', 'balPayments', 'expDlr', 'expWgt', 'impDlr', 'impWgt']].copy()
            clean_df['root_hs_code'] = merged_df['root_hs_code'].values
            clean_df['region'] = region_name

            # 월별 집계
            export_df_by_m = clean_df.groupby(['root_hs_code', 'new_year', 'new_quarter', 'new_month']).agg({
                'balPayments': 'sum', 'expDlr': 'sum', 'impDlr': 'sum'
            }).reset_index()

            # 분기별 집계
            export_df_by_q = clean_df.groupby(['root_hs_code', 'new_year', 'new_quarter']).agg({
                'balPayments': 'sum', 'expDlr': 'sum', 'impDlr': 'sum'
            }).reset_index()

            export_df_by_q['region'] = region_name
            export_df_by_m['region'] = region_name

            all_export_q.append(export_df_by_q)
            all_export_m.append(export_df_by_m)

        except Exception as e:
            print("❌ 병합/정리 중 오류 발생")
            print(traceback.format_exc())
            error_list += hs_code_list

    final_q = pd.concat(all_export_q) if all_export_q else pd.DataFrame()
    final_m = pd.concat(all_export_m) if all_export_m else pd.DataFrame()

    return final_q, final_m, error_list

from pandas.tseries.offsets import MonthEnd

def add_yoy_growth(df: pd.DataFrame, steps: int) -> pd.DataFrame:
    """
    root_hs_code별로 expDlr, impDlr의 전년동기대비 증가율(%)을 계산하고,
    'date' 컬럼을 월말 기준으로 추가하는 함수.
    월 단위 데이터는 steps=12, 분기 단위 데이터는 steps=4로 설정합니다.
    """
    df = df.copy()

    if steps == 12:
        # 월 기준: 월말 날짜 생성
        df['date'] = pd.to_datetime(df['new_year'].astype(str) + '-' + df['new_month'].astype(str) + '-01') + MonthEnd(0)

    elif steps == 4:
        # 분기 기준: 분기 마지막 월로 매핑
        end_month = df['new_quarter'].map({1: '03', 2: '06', 3: '09', 4: '12'})
        df['date'] = pd.to_datetime(df['new_year'].astype(str) + '-' + end_month + '-01') + MonthEnd(0)

    else:
        raise ValueError("steps 값은 12(월 단위) 또는 4(분기 단위)여야 합니다.")

    # 정렬 및 전년동기대비 성장률 계산
    df = df.sort_values(['root_hs_code', 'date'])

    df['expDlr_yoy'] = df.groupby('root_hs_code')['expDlr'].transform(lambda x: x.pct_change(periods=steps))
    df['impDlr_yoy'] = df.groupby('root_hs_code')['impDlr'].transform(lambda x: x.pct_change(periods=steps))

    return df

def plot_column_by_hscode(df, hs_code, col_name, start_date=None, end_date=None):
    """
    특정 root_hs_code에 대해 year_month를 X축, 지정된 컬럼(col_name)을 Y축으로 하는 라인차트를 그립니다.

    Parameters:
        df (pd.DataFrame): 'year_month', 'root_hs_code', col_name 컬럼이 포함된 DataFrame
        hs_code (str): 시각화할 root_hs_code
        col_name (str): Y축에 사용할 컬럼 이름
        start_date (str or pd.Timestamp): 시작 날짜 (예: '2020-01-01')
        end_date (str or pd.Timestamp): 종료 날짜 (예: '2024-12-31')
    """
    # 필터링 및 정렬
    target_df = df[df['root_hs_code'] == hs_code].sort_values('year_month')

    if target_df.empty:
        print(f"⚠️ root_hs_code {hs_code}에 해당하는 데이터가 없습니다.")
        return

    if col_name not in target_df.columns:
        print(f"❌ '{col_name}' 컬럼이 DataFrame에 없습니다.")
        return

    # 날짜 범위 필터링
    if start_date:
        target_df = target_df[target_df['year_month'] >= pd.to_datetime(start_date)]
    if end_date:
        target_df = target_df[target_df['year_month'] <= pd.to_datetime(end_date)]

    if target_df.empty:
        print(f"⚠️ 지정한 날짜 범위에 데이터가 없습니다.")
        return

    # Plot
    plt.figure(figsize=(12, 6))
    plt.plot(target_df['year_month'], target_df[col_name], marker='o', label=col_name)

    # 마지막 값에 텍스트 표시
    last_x = target_df['year_month'].iloc[-1]
    last_y = target_df[col_name].iloc[-1]
    plt.text(last_x, last_y, f"{last_y * 100:,.2f}%", fontsize=25, ha='left', va='bottom', color='red')

    plt.title(f"{col_name} 추이 (root_hs_code: {hs_code})")
    plt.xlabel("Year-Month")
    plt.ylabel(col_name)
    plt.legend()
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

def reshape_to_long(df: pd.DataFrame) -> pd.DataFrame:
    id_vars = ['date', 'root_hs_code']
    value_vars = ['balPayments', 'expDlr', 'impDlr', 'expDlr_yoy', 'impDlr_yoy']

    long_df = df.melt(id_vars=id_vars, value_vars=value_vars,
                      var_name='indicator', value_name='value')
    return long_df

def upload_long_format_to_db(df_long: pd.DataFrame, db_info: dict, table_name: str = 'korea_monthly_trade_data', chunk_size: int = 1000):
    """
    long-format 형태의 무역 데이터를 MySQL/MariaDB에 업로드하는 함수 (tqdm 포함)

    Parameters:
        df_long (pd.DataFrame): 'date', 'root_hs_code', 'indicator', 'value' 포함
        db_info (dict): DB 접속 정보
        table_name (str): 업로드할 테이블명
        chunk_size (int): tqdm 표시용 업로드 청크 크기 (기본값: 1000)
    """

    # ✅ 날짜 포맷 정리
    df_long['date'] = pd.to_datetime(df_long['date'])

    # ✅ 결측치 및 inf 처리
    df_long = df_long.replace([np.inf, -np.inf], np.nan)
    df_long = df_long.where(pd.notnull(df_long), None)

    # ✅ SQLAlchemy 연결
    engine = create_engine(
        f"mysql+pymysql://{db_info['user']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
    )
    conn = engine.raw_connection()
    cursor = conn.cursor()

    # ✅ 테이블 생성 쿼리
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        `date` DATE,
        `root_hs_code` VARCHAR(20),
        `indicator` VARCHAR(50),
        `value` FLOAT,
        PRIMARY KEY (`date`, `root_hs_code`, `indicator`)
    );
    """
    cursor.execute(create_table_sql)
    conn.commit()

    # ✅ 기존 데이터 조회
    existing_query = f"SELECT `date`, `root_hs_code`, `indicator` FROM {table_name}"
    existing_df = pd.read_sql(existing_query, engine)
    
    # ✅ 날짜 형식을 datetime으로 변환
    existing_df['date'] = pd.to_datetime(existing_df['date'])
    
    # ✅ 중복 제거
    merged = pd.merge(df_long, existing_df, on=['date', 'root_hs_code', 'indicator'], how='left', indicator=True)
    df_to_upload = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])


    # ✅ 업로드 실행 with tqdm
    if not df_to_upload.empty:
        print(f"🚀 업로드 대상 {len(df_to_upload)}건 → chunk_size={chunk_size}")
        for i in tqdm(range(0, len(df_to_upload), chunk_size), desc="Uploading"):
            chunk = df_to_upload.iloc[i:i + chunk_size]
            chunk.to_sql(name=table_name, con=engine, if_exists='append', index=False)
        print(f"✅ 총 {len(df_to_upload)}건 업로드 완료")
    else:
        print("⚠️ 업로드할 새로운 데이터가 없습니다.")

    # ✅ 연결 종료
    cursor.close()
    conn.close()
    

In [4]:
# data = pd.read_excel(r'C:\Users\MetaM\PycharmProjects\pythonProject3\HS_Code_500\HS_Code_500.xlsx')
# cd_array = data['HS_Code'].unique()
# cd_list = cd_array.tolist()
# cd_list

In [6]:
## get hs code 

# DB 접속 정보 설정
db_info = {
    'user': 'stox7412',         # 예: 'root'
    'password': 'Apt106503!~', # 예: '1234'
    'host': '192.168.0.230',         # 예: 'localhost' 또는 IP
    'port': '3307',              # 기본 포트는 보통 3306
    'database': 'investar'        # 예: 'trade_data'
}

# SQLAlchemy 엔진 생성
engine = create_engine(
    f"mysql+pymysql://{db_info['user']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
)

# 테이블 이름
table_name = 'target_hs_code'

# 고유한 hs_code 값 추출 쿼리 실행
query = f"SELECT DISTINCT hs_code FROM {table_name}"
unique_hs_codes_df = pd.read_sql(query, con=engine)

cd_list = unique_hs_codes_df['hs_code'].unique().tolist()

In [7]:
# cd_list = ['854231', '854232', '854239']
start_list = [ '200701', '200801', '200901', 
              '201001', '201101', '201201', '201301', '201401', 
              '201501', '201601', '201701', '201801', '201901',
              '202001', '202101', '202201', '202301', '202401', '202501']
end_list = [ '200712', '200812', '200912', 
              '201012', '201112', '201212', '201312', '201412', 
              '201512', '201612', '201712', '201812', '201912',
              '202012', '202112', '202212', '202312', '202412', '202512']

export_q, export_m, error_list = batch_export_by_hscode(cd_list, start_list, end_list)

⚠️ No data for HS 121221 from 200701 to 200712
⚠️ No data for HS 121221 from 200801 to 200812
⚠️ No data for HS 121221 from 200901 to 200912
⚠️ No data for HS 121221 from 201001 to 201012
⚠️ No data for HS 121221 from 201101 to 201112
⚠️ No data for HS 220299 from 200701 to 200712
⚠️ No data for HS 220299 from 200801 to 200812
⚠️ No data for HS 220299 from 200901 to 200912
⚠️ No data for HS 220299 from 201001 to 201012
⚠️ No data for HS 220299 from 201101 to 201112
⚠️ No data for HS 220299 from 201201 to 201212
⚠️ No data for HS 220299 from 201301 to 201312
⚠️ No data for HS 220299 from 201401 to 201412
⚠️ No data for HS 220299 from 201501 to 201512
⚠️ No data for HS 271012 from 200701 to 200712
⚠️ No data for HS 271012 from 200801 to 200812
⚠️ No data for HS 271012 from 200901 to 200912
⚠️ No data for HS 271012 from 201001 to 201012
⚠️ No data for HS 271012 from 201101 to 201112
⚠️ No data for HS 293190 from 200701 to 200712
⚠️ No data for HS 293190 from 200801 to 200812
⚠️ No data fo

In [8]:
# export_df_by_m 은 월별 수출입 데이터라고 가정
export_m_with_yoy = add_yoy_growth(export_m,steps=12)
export_q_with_yoy = add_yoy_growth(export_q,steps=4)

export_m_with_yoy_resize = export_m_with_yoy[['date', 'root_hs_code', 'balPayments', 'expDlr', 'impDlr', 'expDlr_yoy', 'impDlr_yoy']].copy()
export_q_with_yoy_resize = export_m_with_yoy[['date', 'root_hs_code', 'balPayments', 'expDlr', 'impDlr', 'expDlr_yoy', 'impDlr_yoy']].copy()

export_m_with_yoy_resize['root_hs_code'] = export_m_with_yoy_resize['root_hs_code'].astype(str)
export_q_with_yoy_resize['root_hs_code'] = export_q_with_yoy_resize['root_hs_code'].astype(str)

trade_data_monthly = reshape_to_long(export_m_with_yoy_resize)
trade_data_quarterly= reshape_to_long(export_q_with_yoy_resize)

In [10]:
# DB 정보
db_info = {
    # 'host': 'hystox74.synology.me',
    'host': '192.168.0.230',
    'port': 3307,
    'user': 'stox7412',
    'password': 'Apt106503!~',
    'database': 'investar'
}

# long-format 변환
df_wide = export_m_with_yoy_resize.copy()
df_long = reshape_to_long(df_wide)

# 업로드 실행
upload_long_format_to_db(df_long, db_info, table_name = 'korea_monthly_trade_data')

🚀 업로드 대상 82415건 → chunk_size=1000


Uploading: 100%|██████████| 83/83 [00:25<00:00,  3.31it/s]

✅ 총 82415건 업로드 완료





In [11]:
df_long

Unnamed: 0,date,root_hs_code,indicator,value
0,2007-01-31,121120,balPayments,1.978305e+06
1,2007-02-28,121120,balPayments,1.146875e+06
2,2007-03-31,121120,balPayments,4.306842e+06
3,2007-04-30,121120,balPayments,2.836603e+06
4,2007-05-31,121120,balPayments,2.624726e+06
...,...,...,...,...
576915,2024-12-31,970191,impDlr_yoy,6.985456e-02
576916,2025-01-31,970191,impDlr_yoy,-4.920406e-01
576917,2025-02-28,970191,impDlr_yoy,-4.825521e-01
576918,2025-03-31,970191,impDlr_yoy,4.559442e-01


In [36]:
upload_long_format_to_db(trade_data_quarterly, db_info, table_name = 'korea_quarterly_trade_data')

🚀 업로드 대상 494625건 → chunk_size=1000


Uploading: 100%|██████████| 495/495 [00:47<00:00, 10.41it/s]

✅ 총 494625건 업로드 완료



