In [2]:
import pandas as pd
import numpy as np
import pandas_ta as ta
# import matplotlib.pyplot as plt
import os

In [3]:
data_dir = "/app/bucket/data/raw_data"

In [4]:
def create_lag_feature(df, target, n_lags, freq, extend_rows=False, drop_target=True):
    """
    DataFrame의 특정 열(들)에 대해 지정된 기간만큼 Lag 변수를 생성합니다.

    Args:
        df (pd.DataFrame): 시계열 인덱스(DatetimeIndex)를 가진 원본 데이터프레임.
        target (str): Lag 변수를 생성할 열 이름.
        n_lags (int): 지연시킬 기간 (정수 시점 또는 시간 문자열).
            - int: 시계열 인덱스의 '행' 개수만큼 지연 (예: 1일, 7일).
        freq (str): df의 인덱스(datetime)의 빈도 문자열 (예: 'D', 'M', 'MS').
        extend_rows (bool, optional): True이면 데이터프레임의 마지막 날짜 이후로 n_lags 기간만큼의 새로운 인덱스를 추가합니다. 기본값은 False입니다.
        drop_target (bool, optional): True이면 원본 target 열을 삭제합니다. 기본값은 True입니다.
    
    Returns:
        new_df (pd.DataFrame): Lag 변수가 추가된 새로운 데이터프레임.
    """
    new_df = df.copy()
    new_df = new_df.sort_index()
    index_name = new_df.index.name if new_df.index.name else 'ds'

    if extend_rows:
        # 마지막 날짜 이후로 n_lags 기간만큼의 새로운 인덱스 생성
        start = new_df.index[-1] + pd.tseries.frequencies.to_offset(freq)
        future_dates = pd.date_range(start=start, periods=n_lags, freq=freq)
        new_df = pd.concat([new_df, pd.DataFrame(index=future_dates)], axis=0)
        new_df.index.name = index_name
        
    # Lag 변수명 생성 (예: 'Close_lag_1' 또는 'VIX_lag_2M')
    lag_name = f"{target}_lag_{n_lags}{freq}"
    # shift() 함수를 사용하여 Lag 변수 생성
    new_df[lag_name] = new_df[target].shift(periods=n_lags, freq=None)

    if drop_target:
        # 원본 target 열 삭제
        new_df.drop(columns=[target], inplace=True)

    return new_df

In [5]:
def MS_to_D_ffill(df):
    """
    월초(freq='MS') DatetimeIndex를 가진 DataFrame을 
    일별(Daily) 빈도로 확장하고, NaN 값을 ffill로 채웁니다.

    Args:
        df (pd.DataFrame): 월별 DatetimeIndex를 가진 DataFrame.
    
    Returns:
        new_df (pd.DataFrame): 일별 빈도로 확장되고 ffill 처리된 DataFrame.
    """

    new_df = df.copy()
    new_df = new_df.sort_index()
    
    # 1. 일별 DatetimeIndex 생성
    daily_index = pd.date_range(
        start=new_df.index.min(),
        end=new_df.index.max() + pd.tseries.frequencies.to_offset("MS"),
        freq='D' # 일별 빈도(Daily Frequency) 지정
    )
    
    # 2. Reindex (일별 인덱스로 확장)
    index_name = new_df.index.name if new_df.index.name else 'ds'
    new_df = new_df.reindex(daily_index)
    new_df.index.name = index_name
    
    # 3. ffill (Forward Fill)
    new_df = new_df.fillna(method='ffill')

    return new_df

In [None]:
# SPY
# 데이터 읽기 및 전처리
spy_df = pd.read_csv(os.path.join(data_dir, 'spy_data.csv'))
spy_df = spy_df[['datetime', 'close', 'volume']]
spy_df.rename(columns={'close': 'spy_close', 'volume': 'spy_volume'}, inplace=True)
spy_df['ds'] = pd.to_datetime(spy_df['datetime'], format='%Y-%m-%d')
spy_df.set_index('ds', inplace=True)
spy_df.drop(columns=['datetime'], inplace=True)
spy_df = spy_df.sort_index()
# 기술적 지표 추가
spy_df.ta.sma(close="spy_close", length=60, append=True)
spy_df.ta.sma(close="spy_close", length=120, append=True)
spy_df.ta.bbands(close="spy_close", length=20, std=2, append=True)
spy_df.ta.macd(close="spy_close", fast=12, slow=26, signal=9, append=True)
spy_df.ta.rsi(close="spy_close", length=14, append=True)
# Volume Lag Feature 생성
spy_df = create_lag_feature(df=spy_df, target='spy_volume', n_lags=1, freq="D", extend_rows=False, drop_target=True)
spy_df.tail(7)

Unnamed: 0_level_0,spy_close,SMA_60,SMA_120,BBL_20_2.0_2.0,BBM_20_2.0_2.0,BBU_20_2.0_2.0,BBB_20_2.0_2.0,BBP_20_2.0_2.0,MACD_12_26_9,MACDh_12_26_9,MACDs_12_26_9,RSI_14,spy_volume_lag_1D
ds,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2025-10-30,679.83002,657.74383,633.755667,653.207882,671.080997,688.954112,5.326664,0.744754,6.232236,1.311786,4.92045,59.629392,85657100.0
2025-10-31,682.06,658.573997,634.58125,653.220802,671.723496,690.22619,5.509021,0.779324,6.218361,1.038328,5.180032,61.291646,76335800.0
2025-11-03,683.34003,659.343331,635.385417,653.092614,672.309998,691.527382,5.716822,0.786981,6.238736,0.846963,5.391773,62.252429,87164100.0
2025-11-04,675.23999,659.998664,636.115833,653.417597,672.615997,691.814398,5.708577,0.568339,5.537446,0.116538,5.420907,53.245887,57315000.0
2025-11-05,677.58002,660.580165,636.841833,653.513232,672.839499,692.165766,5.744689,0.622645,5.111566,-0.247473,5.359039,55.259702,78427000.0
2025-11-06,670.31,661.003831,637.476083,653.451455,672.797,692.142546,5.750782,0.435722,4.139704,-0.975468,5.115172,48.299228,74402400.0
2025-11-07,670.96997,661.437497,638.110416,656.688077,673.694498,690.700919,5.048704,0.419897,3.383744,-1.385142,4.768886,48.928133,85035300.0


In [30]:
# CLI
# 데이터 읽기 및 전처리
cli_df = pd.read_csv(os.path.join(data_dir, 'cli_data.csv'))
cli_df = cli_df[['datetime', 'CLI']]
cli_df['ds'] = pd.to_datetime(cli_df['datetime'], format='%Y-%m-%d')
cli_df.set_index('ds', inplace=True)
cli_df.drop(columns=['datetime'], inplace=True)
cli_df = cli_df.sort_index()
# Lag Feature 생성
cli_df = create_lag_feature(df=cli_df, target='CLI', n_lags=1, freq="MS", extend_rows=True, drop_target=True)
# 월별 데이터를 일별로 확장 및 ffill
cli_df = MS_to_D_ffill(cli_df)
cli_df.tail(7)

  new_df = new_df.fillna(method='ffill')


Unnamed: 0_level_0,CLI_lag_1MS
ds,Unnamed: 1_level_1
2025-11-25,100.2963
2025-11-26,100.2963
2025-11-27,100.2963
2025-11-28,100.2963
2025-11-29,100.2963
2025-11-30,100.2963
2025-12-01,100.2963


In [33]:
# 데이터 병합 및 결측치 처리
total_df = pd.merge(spy_df,
                     cli_df,
                     left_index=True,
                     right_index=True,
                     how="left")
total_df = total_df.fillna(method="ffill")
total_df = total_df.dropna(how="any")
total_df = total_df.round(6)
print(total_df.shape)
total_df

(3868, 14)


  total_df = total_df.fillna(method="ffill")


Unnamed: 0_level_0,spy_close,SMA_60,SMA_120,BBL_20_2.0_2.0,BBM_20_2.0_2.0,BBU_20_2.0_2.0,BBB_20_2.0_2.0,BBP_20_2.0_2.0,MACD_12_26_9,MACDh_12_26_9,MACDs_12_26_9,RSI_14,spy_volume_lag_1D,CLI_lag_1MS
ds,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2010-06-24,107.42000,114.281667,113.377000,105.188016,109.374000,113.559984,7.654440,0.266602,-0.703357,0.184935,-0.888292,40.495411,254639900.0,99.71648
2010-06-25,107.87000,114.129500,113.331500,105.045443,109.229500,113.413557,7.661039,0.337538,-0.800797,0.069997,-0.870793,42.094856,268523600.0,99.71648
2010-06-28,107.53000,113.958333,113.280667,104.886076,109.137500,113.388924,7.790950,0.310946,-0.895135,-0.019473,-0.875662,41.193901,238726500.0,99.71648
2010-06-29,104.21000,113.715833,113.201500,104.225322,108.971500,113.717678,8.710860,-0.001614,-1.223688,-0.278421,-0.945267,33.625754,169218600.0,99.71648
2010-06-30,103.22000,113.452167,113.110083,103.270937,108.616000,113.961063,9.842129,-0.004765,-1.546132,-0.480692,-1.065440,31.752420,373649500.0,99.71648
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-11-03,683.34003,659.343331,635.385417,653.092614,672.309998,691.527382,5.716822,0.786981,6.238736,0.846963,5.391773,62.252429,87164100.0,100.29630
2025-11-04,675.23999,659.998664,636.115833,653.417597,672.615997,691.814398,5.708577,0.568339,5.537446,0.116538,5.420907,53.245887,57315000.0,100.29630
2025-11-05,677.58002,660.580165,636.841833,653.513232,672.839499,692.165766,5.744689,0.622645,5.111566,-0.247473,5.359039,55.259702,78427000.0,100.29630
2025-11-06,670.31000,661.003831,637.476083,653.451455,672.797000,692.142546,5.750782,0.435722,4.139704,-0.975468,5.115172,48.299228,74402400.0,100.29630


In [34]:
# 저장
output_dir = "/app/bucket/data/processed_data"
total_df.to_csv(os.path.join(output_dir, 'prep_D_data.csv'), index=True)