In [None]:
import pandas as pd
import numpy as np
import os
import sys

# --- 0. 경로 설정 ---
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
DATA_DIR = os.path.join(PROJECT_ROOT, "data", "processed")
FINAL_MASTER_FILE = os.path.join(DATA_DIR, "final_master_table_v2.csv")


## Phase2-A : Data Handler 정의

12개 채널의 Scale 일치를 위해 Standardization을 진행합니다.
<br>모든 12개 채널의 Scale을평균 0, 표준편차 1인 표준 정규 분포로 통일시킵니다.

In [1]:
import pandas as pd
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

# [1] --- DataHandler 클래스 정의 (V2: Scaler + zfill) ---
# (Phase 2-A와 2-C가 합쳐진 최종 버전)
import pandas as pd
import numpy as np
import os
import sys

# --- 0. 경로 설정 ---
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
DATA_DIR = os.path.join(PROJECT_ROOT, "data", "processed")
FINAL_MASTER_FILE = os.path.join(DATA_DIR, "final_master_table_v2.csv")

class DataHandler:
    """
    [V2] 표준화(Standardization)와 zfill(6)이 적용된 DataHandler.
    """
    
    def __init__(self, file_path, train_end_date='2022-12-31'):
        self.file_path = file_path
        self.train_end_date = pd.to_datetime(train_end_date)
        self.data_by_ticker = {}   # 원본 데이터
        self.scalers_by_ticker = {} # Ticker별 Scaler
        self.tickers = []
        
        self._load_and_process_data()
        self._fit_scalers()
        
    def _load_and_process_data(self):
        try:
            # 1. dtype=str로 읽기
            df = pd.read_csv(
                self.file_path, 
                parse_dates=['date'],
                dtype={'ticker': str} 
            )
            # 2. zfill(6)로 '0' 채우기
            df['ticker'] = df['ticker'].str.zfill(6)
            df = df.set_index('date')
            
            self.tickers = df['ticker'].unique()
            
            for ticker in self.tickers:
                ticker_df = df[df['ticker'] == ticker].copy()
                channel_cols = [col for col in ticker_df.columns if col not in ['ticker']]
                self.data_by_ticker[ticker] = ticker_df[channel_cols]
            
            print(f"[DataHandler V2] Success: Loaded {len(self.tickers)} tickers.")
            print(f"[DataHandler V2] Available tickers: {self.tickers}")

        except Exception as e:
            print(f"[DataHandler V2] Error loading data: {e}")

    def _fit_scalers(self):
        """
        [Data Leakage 방지] 훈련 데이터로만 Scaler를 학습(fit)
        """
        print(f"[DataHandler V2] Fitting scalers using data up to {self.train_end_date.date()}...")
        for ticker in self.tickers:
            train_data = self.data_by_ticker[ticker].loc[:self.train_end_date]
            if train_data.empty:
                print(f"  > Warning: No training data for {ticker}.")
                continue
            
            scaler = StandardScaler()
            scaler.fit(train_data) # 'fit'은 훈련 데이터로만!
            self.scalers_by_ticker[ticker] = scaler
        print("[DataHandler V2] Scalers fitted.")

    def get_scaled_data_by_ticker(self, ticker):
        """
        'transform'은 전체 데이터에 적용하여 표준화된 DF 반환
        """
        if ticker not in self.scalers_by_ticker:
            print(f"[DataHandler V2] Error: No scaler for {ticker}")
            return None
        
        original_data = self.data_by_ticker[ticker]
        scaler = self.scalers_by_ticker[ticker]
        
        scaled_data_np = scaler.transform(original_data)
        
        scaled_df = pd.DataFrame(
            scaled_data_np, 
            index=original_data.index, 
            columns=original_data.columns
        )
        return scaled_df

    def get_all_tickers(self):
        return self.tickers


## Phase2-B : 윈도우 생성기 구현
LLM에 넣기 위해 슬라이딩 윈도우 방식의 3D 데이터로 재구성합니다.
<br>(샘플 수, 120일, 12개 채널) 형태의 numpy 배열(텐서)을 만듭니다.

In [2]:
# [2] --- 윈도우 생성기 함수 정의 ---
# (Phase 2-B)

def create_sliding_windows(data, input_seq_len, output_seq_len):
    """
    DataFrame(2D)을 슬라이딩 윈도우(3D) numpy 배열로 변환합니다.
    """
    data_np = data.values
    n_samples = len(data_np)
    X, y = [], []
    
    total_len = input_seq_len + output_seq_len
    
    for i in range(n_samples - total_len + 1):
        input_window = data_np[i : i + input_seq_len]
        output_window = data_np[i + input_seq_len : i + total_len]
        X.append(input_window)
        y.append(output_window)
        
    return np.array(X), np.array(y)

# --- 3. Phase 2 파이프라인 실행 테스트 ---

# (노트북 표준 세팅 코드에서 DATA_DIR이 정의되었다고 가정)
# DATA_DIR = os.path.join(PROJECT_ROOT, "data", "processed")
MASTER_TABLE_PATH = os.path.join(DATA_DIR, "final_master_table_v2.csv")

print("--- [Phase 2-A/C] DataHandler V2 초기화 ---")
data_handler = DataHandler(MASTER_TABLE_PATH, train_end_date='2022-12-31')

# 모델 하이퍼파라미터
INPUT_SEQ_LEN = 120  # 120일 (Input)
OUTPUT_SEQ_LEN = 10  # 10일 (Target)

if '010140' in data_handler.get_all_tickers():
    print("\n--- [Phase 2-B] '010140' 윈도우 생성 테스트 ---")
    
    # 1. 표준화된 2D 데이터 가져오기
    scaled_df = data_handler.get_scaled_data_by_ticker('010140')
    
    # 2. 3D 윈도우 생성
    X_train_samsung, y_train_samsung = create_sliding_windows(
        scaled_df, 
        INPUT_SEQ_LEN, 
        OUTPUT_SEQ_LEN
    )
    
    print("\n[SUCCESS] 3D Tensors (Windows) created!")
    print(f"  > X (Input Tensors) shape: {X_train_samsung.shape}")
    print(f"  > y (Target Tensors) shape: {y_train_samsung.shape}")
else:
    print("\n[Error] '010140' Ticker not found.")

--- [Phase 2-A/C] DataHandler V2 초기화 ---
[DataHandler V2] Success: Loaded 6 tickers.
[DataHandler V2] Available tickers: ['010140' '010620' '329180' '042660' '443060' '009540']
[DataHandler V2] Fitting scalers using data up to 2022-12-31...
[DataHandler V2] Scalers fitted.

--- [Phase 2-B] '010140' 윈도우 생성 테스트 ---

[SUCCESS] 3D Tensors (Windows) created!
  > X (Input Tensors) shape: (1259, 120, 12)
  > y (Target Tensors) shape: (1259, 10, 12)
