### 데이터 Load 함수

load_gex_df(start_date, end_date)를 이용해서 받아오기

DB 유저 정보 하드코딩은 read-only 계정 -> 보안 및 DB 무결성 고려

In [1]:
# pip install pandas sqlalchemy pymysql

In [3]:
'''
db에서 필요한 데이터 받아오기
'''

import pandas as pd
from sqlalchemy import create_engine
import os
import pymysql
import copy
import datetime


host = "slayerzeroa.iptime.org"
user = "fdm_user"
password = ']I]7D)7VqQpO!m4X'
db_name = 'fdm'
db_port = '3306'

def load_index_minute_data(target_date: str):
    engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{db_port}/{db_name}?charset=utf8mb4")
    query = f"SELECT * FROM index_minutes_data WHERE `stck_bsop_date` = '{target_date}'"
    index_minute_df = pd.read_sql(query, engine)

    return index_minute_df


def load_gex_data(target_date: str):
    engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{db_port}/{db_name}?charset=utf8mb4")
    query = f"SELECT * FROM krx_gamma_exposure WHERE `DATE` = '{target_date}'"
    gex_df = pd.read_sql(query, engine)

    return gex_df


def load_gex_returns(target_date: str):
    """
    - load_index_minute_data(target_date)로부터 분봉 데이터를 얻는다.
    - KOSPI 및 특정 시각 이후(1521)만 필터링한다.
    - 9분 전 대비 수익률(returns) 계산 후 NaN 제거.
    - 마지막 행의 'returns' 값을 float(last_returns)로 추출한다.
    - load_gex_data(target_date)에서 GEX 데이터를 불러온 뒤 last_returns를 합쳐 반환.
    - 에러 발생 시 None을 반환.
    """
    try:
        # 1) 분봉 데이터 읽어오기 + 복사
        #    ※ 여기서 한 번 .copy() 해두면 이후 체인할당 경고가 줄어듭니다.
        index_minute_df = load_index_minute_data(target_date).copy()

        # 2) KOSPI 데이터만 필터링
        index_minute_df = index_minute_df.loc[index_minute_df['market'] == 'KOSPI'].copy()

        # 3) 시각 필터링
        index_minute_df = index_minute_df.loc[index_minute_df['stck_cntg_hour'] >= '1521'].copy()

        # 4) returns 계산 (9분 전 대비 수익률)
        #    pct_change(9)을 계산 후 바로 할당 → 필요시 .copy()는 생략 가능
        index_minute_df['returns'] = index_minute_df['stck_prpr'].pct_change(9)

        # 5) NaN 제거 (returns 컬럼 기준)
        #    dropna(subset=['returns'])를 사용하면 returns가 NaN인 행만 제거
        index_minute_df.dropna(subset=['returns'], inplace=True)

        # 6) 마지막 행(또는 특정 행)의 'returns'를 float로 변환
        #    예시로 마지막 행의 returns 값을 사용
        if len(index_minute_df) == 0:
            # 데이터가 없으면 None
            return None
        last_returns = float(index_minute_df['returns'].iloc[-1])

        # 7) GEX 데이터 불러오기 + last_returns 합치기
        gex_df = load_gex_data(target_date).copy()
        gex_df['last_returns'] = last_returns

        return gex_df

    except Exception as e:
        # 디버깅을 위해 실제 오류 메시지를 출력하고 싶다면 다음과 같이 작성
        # print(f"load_gex_returns Error: {e}")
        return None


def load_gex_df(start_date: str, end_date: str):
    """
    start_date부터 end_date까지(YYYYMMDD) 순회하며
    각 날짜의 GEX 데이터 + returns를 합쳐 만든 DF를 반환.

    예:
        gex_df = load_gex_df('20230101', '20230110')
    """
    # 날짜 문자열 -> datetime 객체로 변환
    start_dt = datetime.datetime.strptime(start_date, '%Y%m%d')
    end_dt   = datetime.datetime.strptime(end_date, '%Y%m%d')

    # 결과를 담을 리스트
    result_list = []

    # 현재 날짜 포인터
    current_dt = start_dt

    while current_dt <= end_dt:
        # YYYYMMDD 문자열로 변환
        target_date_str = current_dt.strftime('%Y%m%d')

        # load_gex_returns() 호출
        gex_data = load_gex_returns(target_date_str)
        
        if gex_data is not None:
            # gex_data 자체가 DataFrame일 것이므로 list에 append
            result_list.append(gex_data)

        # 날짜 +1일
        current_dt += datetime.timedelta(days=1)

    # 리스트가 비어있지 않다면 concat
    if len(result_list) > 0:
        # 여러 날짜의 DF를 세로로 붙이기
        final_df = pd.concat(result_list, ignore_index=True)
        return final_df
    else:
        # 해당 구간에 데이터를 하나도 못 불러온 경우
        return pd.DataFrame()    

In [None]:
start_date = '20221226'
end_date = '20241224'

##### 현재 있는 데이터 기준

df = load_gex_df(start_date, end_date)