# .py 파일 테스트

In [3]:
from crypto_preprocessor import CryptoPreprocessor

# 테스트 실행
if __name__ == "__main__":
    preprocessor = CryptoPreprocessor()
    
    # BTC 전처리 테스트
    result = preprocessor.preprocess_for_prediction('BTC')
    
    if result is not None:
        print("\n📋 전처리 결과 샘플:")
        print(result.head())
        print(f"\n피처 개수: {len(result.columns)}")
        print(f"피처 목록: {list(result.columns)}")


🔄 BTC 전처리 시작
✅ BTC: 120일 데이터 수집 완료
🔧 기술 지표 계산 중...
🎯 주요 피처 선택 중...
📊 정규화 적용 중...
✂️  최종 피처 필터링 중...
✅ 전처리 완료: 28개 피처


📋 전처리 결과 샘플:
     Stoch_14_K_above_D  Stoch_6_K_above_D  CCI_4_overbought  Formula3_Signal  \
119                   1                  1                 0                1   

     Stoch_1_K_above_D  Stoch_4_K_above_D  is_quarter_start  \
119                  1                  1                 0   

     high_volatility_regime  is_month_end  Stoch_3_K_above_D  ...  \
119                       0             0                  1  ...   

     Stoch_5_overbought  SMI_signal  SMI_overbought_40  SMI_oversold_40  \
119                   0   -1.720824                  0                1   

     SMI_overbought_50  SMI_oversold_50  RSI_7_bullish_divergence  \
119                  0                1                         0   

     RSI_7_bearish_divergence  RSI_14_bullish_divergence  \
119                         0                          0   

     RSI_14_bearish_diverge

# 전처리 로직(.py와 같음)

In [14]:
"""
신규 데이터 전처리기 - A00_00 정확히 재현 (51개 컬럼)
"""
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta
from sklearn.preprocessing import StandardScaler
import os
import warnings
warnings.filterwarnings('ignore')


class NewDataPreprocessor:
    
    def __init__(self, output_dir='./processed_data'):
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)
        
    # ===== 데이터 수집 =====
    def fetch_data(self, symbols=['BTC-USD', 'ETH-USD'], days=120):
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        
        print(f"\n📥 데이터 수집: {start_date.date()} ~ {end_date.date()}")
        
        all_data = []
        for symbol in symbols:
            try:
                ticker = yf.Ticker(symbol)
                df = ticker.history(start=start_date, end=end_date)
                
                if not df.empty:
                    df = df.reset_index()
                    df['Symbol'] = symbol.replace('-USD', '')
                    all_data.append(df)
                    print(f"  ✅ {symbol.replace('-USD', '')}: {len(df)}일")
            except Exception as e:
                print(f"  ❌ {symbol}: {e}")
        
        if not all_data:
            return pd.DataFrame()
        
        return pd.concat(all_data, ignore_index=True)
    
    # ===== A00_00 피처 생성 (전체 지표 계산) =====
    def calculate_all_features(self, df):
        """모든 지표를 계산한 후 Top20+SMI+RSI다이버전스+Future만 선택"""
        
        print("\n🔧 A00_00 피처 생성 중...")
        
        # 1. 기본 지표
        df['Close_return'] = df.groupby('Symbol')['Close'].pct_change()
        df['Close_log_return'] = df.groupby('Symbol')['Close'].transform(lambda x: np.log(x/x.shift(1)))
        df['Volume_normalized'] = df.groupby('Symbol')['Volume'].transform(
            lambda x: (x - x.rolling(30).mean()) / x.rolling(30).std()
        )
        df['Volume_log_ratio'] = df.groupby('Symbol')['Volume'].transform(lambda x: np.log(x/x.shift(1)))
        
        # 2. Stochastic (Top20에 필요)
        stoch_configs = [(1, 3), (3, 3), (4, 3), (5, 3), (6, 3), (14, 3)]
        for k_period, d_period in stoch_configs:
            low_k = df.groupby('Symbol')['Low'].transform(lambda x: x.rolling(k_period).min())
            high_k = df.groupby('Symbol')['High'].transform(lambda x: x.rolling(k_period).max())
            range_k = high_k - low_k
            
            df[f'Stoch_K_{k_period}'] = np.where(range_k > 0, 100 * ((df['Close'] - low_k) / range_k), 50)
            df[f'Stoch_D_{k_period}'] = df.groupby('Symbol')[f'Stoch_K_{k_period}'].transform(lambda x: x.rolling(d_period).mean())
            df[f'Stoch_{k_period}_K_above_D'] = (df[f'Stoch_K_{k_period}'] > df[f'Stoch_D_{k_period}']).astype(int)
            df[f'Stoch_{k_period}_overbought'] = (df[f'Stoch_K_{k_period}'] > 80).astype(int)
            df[f'Stoch_{k_period}_oversold'] = (df[f'Stoch_K_{k_period}'] < 20).astype(int)
        
        df['Stoch_K_3'] = df['Stoch_K_3']  # Top20에 필요
        
        # 3. CCI (Top20에 필요)
        cci_periods = [3, 4]
        for period in cci_periods:
            tp = (df['High'] + df['Low'] + df['Close']) / 3
            cci_ma = tp.groupby(df['Symbol']).transform(lambda x: x.rolling(period).mean())
            cci_mad = tp.groupby(df['Symbol']).transform(
                lambda x: x.rolling(period).apply(lambda y: np.mean(np.abs(y - np.mean(y))))
            )
            df[f'CCI_{period}'] = np.where(cci_mad > 0, (tp - cci_ma) / (0.015 * cci_mad), 0)
            df[f'CCI_{period}_overbought'] = (df[f'CCI_{period}'] > 100).astype(int)
            df[f'CCI_{period}_oversold'] = (df[f'CCI_{period}'] < -100).astype(int)
        
        # 4. 변동성 (Top20에 필요)
        df['Volatility_7d'] = df.groupby('Symbol')['Close_return'].transform(lambda x: x.rolling(7).std()) * np.sqrt(365)
        df['high_volatility_regime'] = (
            df['Volatility_7d'] > df.groupby('Symbol')['Volatility_7d'].transform(lambda x: x.rolling(30).quantile(0.8))
        ).astype(int)
        
        df['realized_volatility_7'] = df.groupby('Symbol')['Close_log_return'].transform(lambda x: x.rolling(7).std()) * np.sqrt(365)
        df['realized_volatility_30'] = df.groupby('Symbol')['Close_log_return'].transform(lambda x: x.rolling(30).std()) * np.sqrt(365)
        
        # 5. 모멘텀 시그널 (Top20에 필요)
        df['Return_7d'] = df.groupby('Symbol')['Close'].pct_change(7)
        df['Momentum_Signal'] = 1
        strong_momentum_up = (df['Return_7d'] > 0.1) & (df['Return_7d'].shift(1) <= 0.1)
        strong_momentum_down = (df['Return_7d'] < -0.1) & (df['Return_7d'].shift(1) >= -0.1)
        df.loc[strong_momentum_up, 'Momentum_Signal'] = 2
        df.loc[strong_momentum_down, 'Momentum_Signal'] = 0
        
        df['Price_change_3d'] = df.groupby('Symbol')['Close'].pct_change(3)
        volume_ma = df.groupby('Symbol')['Volume'].transform(lambda x: x.rolling(7).mean())
        df['Formula3_Signal'] = 1
        formula3_up = (df['Price_change_3d'] > 0.05) & (df['Volume'] > volume_ma)
        formula3_down = (df['Price_change_3d'] < -0.05) & (df['Volume'] > volume_ma)
        df.loc[formula3_up, 'Formula3_Signal'] = 2
        df.loc[formula3_down, 'Formula3_Signal'] = 0
        
        # 6. 시간 피처 (Top20에 필요)
        df['Date'] = pd.to_datetime(df['Date'])
        df['year'] = df['Date'].dt.year
        df['month'] = df['Date'].dt.month
        df['quarter'] = df['Date'].dt.quarter
        df['day_of_week'] = df['Date'].dt.dayofweek
        df['is_month_start'] = df['Date'].dt.is_month_start.astype(int)
        df['is_month_end'] = df['Date'].dt.is_month_end.astype(int)
        df['is_quarter_start'] = df['Date'].dt.is_quarter_start.astype(int)
        df['is_quarter_end'] = df['Date'].dt.is_quarter_end.astype(int)
        df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
        df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
        df['dow_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
        df['dow_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
        
        # 7. SMI
        k_length, d_length, ema_length = 10, 3, 10
        ll = df.groupby('Symbol')['Low'].transform(lambda x: x.rolling(k_length).min())
        hh = df.groupby('Symbol')['High'].transform(lambda x: x.rolling(k_length).max())
        diff = hh - ll
        rdiff = df['Close'] - (hh + ll) / 2
        
        avgrel_step1 = rdiff.groupby(df['Symbol']).transform(lambda x: x.ewm(span=d_length, adjust=False).mean())
        avgrel = avgrel_step1.groupby(df['Symbol']).transform(lambda x: x.ewm(span=d_length, adjust=False).mean())
        avgdiff_step1 = diff.groupby(df['Symbol']).transform(lambda x: x.ewm(span=d_length, adjust=False).mean())
        avgdiff = avgdiff_step1.groupby(df['Symbol']).transform(lambda x: x.ewm(span=d_length, adjust=False).mean())
        
        df['SMI'] = np.where(avgdiff != 0, (avgrel / (avgdiff / 2)) * 100, 0)
        df['SMI_signal'] = df.groupby('Symbol')['SMI'].transform(lambda x: x.ewm(span=d_length, adjust=False).mean())
        df['SMI_ema'] = df.groupby('Symbol')['SMI'].transform(lambda x: x.ewm(span=ema_length, adjust=False).mean())
        df['SMI_overbought_40'] = (df['SMI_signal'] > 40).astype(int)
        df['SMI_oversold_40'] = (df['SMI_signal'] < -40).astype(int)
        df['SMI_overbought_50'] = (df['SMI_signal'] > 50).astype(int)
        df['SMI_oversold_50'] = (df['SMI_signal'] < -50).astype(int)
        df['SMI_normalized'] = np.clip(df['SMI'] / 100, -1, 1)
        
        # 8. RSI 다이버전스
        rsi_periods = [3, 7, 14, 30]
        for period in rsi_periods:
            delta = df.groupby('Symbol')['Close'].diff()
            gain = delta.where(delta > 0, 0).groupby(df['Symbol']).transform(lambda x: x.rolling(period).mean())
            loss = (-delta.where(delta < 0, 0)).groupby(df['Symbol']).transform(lambda x: x.rolling(period).mean())
            rs = gain / loss
            df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
        
        for period in [7, 14]:
            price_lower = (df['Close'] < df['Close'].shift(5)) & (df['Close'].shift(5) < df['Close'].shift(10))
            rsi_higher = (df[f'RSI_{period}'] > df[f'RSI_{period}'].shift(5)) & (df[f'RSI_{period}'].shift(5) > df[f'RSI_{period}'].shift(10))
            df[f'RSI_{period}_bullish_divergence'] = (price_lower & rsi_higher).astype(int)
            
            price_higher = (df['Close'] > df['Close'].shift(5)) & (df['Close'].shift(5) > df['Close'].shift(10))
            rsi_lower = (df[f'RSI_{period}'] < df[f'RSI_{period}'].shift(5)) & (df[f'RSI_{period}'].shift(5) < df[f'RSI_{period}'].shift(10))
            df[f'RSI_{period}_bearish_divergence'] = (price_higher & rsi_lower).astype(int)
        
        # 9. Future 수익률 (더미)
        future_periods = [1, 3, 7, 14, 30, 90]
        for period in future_periods:
            df[f'Future_Return_{period}d'] = 0  # 더미
            df[f'Future_Label_{period}d'] = 1  # 더미
        
        # 10. NaN 처리
        future_cols = [col for col in df.columns if 'Future' in col]
        for col in future_cols:
            df[col] = df[col].fillna(1 if 'Label' in col else 0)
        df = df.fillna(method='ffill').fillna(0)
        
        # 11. Top20 + SMI + RSI다이버전스 + Future만 선택 (A00_00 방식)
        top20_features = [
            'Stoch_14_K_above_D', 'Stoch_6_K_above_D', 'CCI_4_overbought', 'Formula3_Signal',
            'Stoch_1_K_above_D', 'Stoch_4_K_above_D', 'is_quarter_start', 'high_volatility_regime',
            'is_month_end', 'year', 'Stoch_3_K_above_D', 'Stoch_K_3',
            'is_month_start', 'Momentum_Signal', 'day_of_week', 'Volatility_7d',
            'month_sin', 'CCI_3', 'dow_sin', 'Stoch_5_overbought'
        ]
        
        essential_cols = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Symbol']
        smi_cols = [col for col in df.columns if 'SMI' in col]
        rsi_divergence_cols = [col for col in df.columns if 'divergence' in col]
        available_top20 = [col for col in top20_features if col in df.columns]
        
        selected_cols = essential_cols + available_top20 + smi_cols + rsi_divergence_cols + future_cols
        df = df[selected_cols].copy()
        
        print(f"  ✅ A00_00 완료: {df.shape} (51개 컬럼)")
        
        return df
    
    # ===== B00_00 정규화 =====
    def apply_normalization(self, df):
        """종목별 정규화"""
        print("\n📊 B00_00 정규화 중...")
        
        # 정규화 제외
        SIGNAL_FEATURES = [
            'is_month_start', 'is_month_end', 'is_quarter_start', 'is_quarter_end',
            'Formula3_Signal', 'Momentum_Signal', 'high_volatility_regime',
            'Stoch_1_K_above_D', 'Stoch_3_K_above_D', 'Stoch_4_K_above_D', 'Stoch_5_K_above_D',
            'Stoch_6_K_above_D', 'Stoch_14_K_above_D', 'Stoch_1_overbought', 'Stoch_3_overbought',
            'Stoch_4_overbought', 'Stoch_5_overbought', 'Stoch_6_overbought', 'Stoch_14_overbought',
            'Stoch_1_oversold', 'Stoch_3_oversold', 'Stoch_4_oversold', 'Stoch_5_oversold',
            'Stoch_6_oversold', 'Stoch_14_oversold',
            'CCI_3_overbought', 'CCI_3_oversold', 'CCI_4_overbought', 'CCI_4_oversold',
            'SMI_overbought_40', 'SMI_oversold_40', 'SMI_overbought_50', 'SMI_oversold_50',
            'RSI_7_bullish_divergence', 'RSI_7_bearish_divergence', 
            'RSI_14_bullish_divergence', 'RSI_14_bearish_divergence',
            'month_sin', 'month_cos', 'dow_sin', 'dow_cos'
        ]
        
        exclude_cols = set(['Date', 'Symbol', 'Open', 'High', 'Low', 'Close', 'Volume', 
                           'year', 'month', 'quarter', 'day_of_week'] + SIGNAL_FEATURES)
        exclude_cols.update([col for col in df.columns if 'Future' in col])
        
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        scale_cols = [col for col in numeric_cols if col not in exclude_cols]
        
        if scale_cols:
            for symbol in df['Symbol'].unique():
                mask = df['Symbol'] == symbol
                scaler = StandardScaler()
                df.loc[mask, scale_cols] = scaler.fit_transform(df.loc[mask, scale_cols].fillna(0))
                print(f"  ✅ {symbol}: {len(scale_cols)}개 정규화")
        
        return df
    
    # ===== 최종 피처 필터링 =====
    def filter_final_features(self, df):
        """학습에서 제외할 컬럼 제거"""
        print("\n🎯 최종 피처 필터링 중...")
        
        crypto_exclude = [
            'Open', 'High', 'Low', 'Close', 'Volume',
            'Date', 'Symbol',
            'Future_Return_1d', 'Future_Return_3d', 'Future_Return_7d', 
            'Future_Return_14d', 'Future_Return_30d', 'Future_Return_90d',
            'Future_Label_1d', 'Future_Label_3d', 'Future_Label_7d',
            'Future_Label_14d', 'Future_Label_30d', 'Future_Label_90d',
            'SMI_ema', 'SMI', 'SMI_normalized',
            'realized_volatility_7', 'realized_volatility_30',
            'quarter', 'dow_cos', 'year'
        ]
        
        final_features = [col for col in df.columns if col not in crypto_exclude]
        df_final = df[final_features].copy()
        df_final = df_final.fillna(0).replace([np.inf, -np.inf], 0)
        
        print(f"  ✅ 최종 피처: {len(final_features)}개")
        
        return df_final
    
    # ===== 전체 파이프라인 =====
    def preprocess(self, symbols=['BTC-USD', 'ETH-USD'], target_date='2024-09-28'):
        """전체 전처리 실행"""
        
        print("\n" + "="*60)
        print("🚀 신규 데이터 전처리 파이프라인")
        print("="*60)
        
        # 1. 데이터 수집
        df = self.fetch_data(symbols)
        if df.empty:
            return {}
        
        # 2. A00_00: 피처 생성 (51개)
        df = self.calculate_all_features(df)
        
        # 3. B00_00: 정규화
        df = self.apply_normalization(df)
        
        # 4. 최종 필터링
        df_final = self.filter_final_features(df)
        
        # 5. 날짜별 추출
        df['Date'] = pd.to_datetime(df['Date'])
        target_mask = df['Date'].dt.date == pd.to_datetime(target_date).date()
        
        results = {}
        for symbol in df['Symbol'].unique():
            symbol_mask = df['Symbol'] == symbol
            combined_mask = target_mask & symbol_mask
            
            if combined_mask.sum() > 0:
                idx = df[combined_mask].index[0]
                results[symbol] = df_final.loc[[idx]]
                print(f"  ✅ {symbol}: {target_date} 추출")
            else:
                latest_idx = df[symbol_mask].index[-1]
                results[symbol] = df_final.loc[[latest_idx]]
                latest_date = df.loc[latest_idx, 'Date'].date()
                print(f"  ⚠️  {symbol}: {latest_date} 사용")
        
        # 6. 저장
        for symbol, data in results.items():
            filename = f'{symbol}_preprocessed_{target_date.replace("-", "")}.csv'
            filepath = os.path.join(self.output_dir, filename)
            data.to_csv(filepath, index=False)
            print(f"  💾 {filepath}")
        
        print("\n" + "="*60)
        print("✅ 전처리 완료!")
        print(f"A00_00 → 51개 컬럼 → 정규화 → 최종 필터링")
        print("="*60)
        
        return results




In [15]:
# 실행
if __name__ == "__main__":
    preprocessor = NewDataPreprocessor(output_dir='./processed_data')
    
    results = preprocessor.preprocess(
        symbols=['BTC-USD', 'ETH-USD'],
        target_date='2024-09-28'
    )
    
    for symbol, data in results.items():
        print(f"\n{symbol}: {data.shape}, 컬럼: {list(data.columns[:10])}...")


🚀 신규 데이터 전처리 파이프라인

📥 데이터 수집: 2025-05-31 ~ 2025-09-28
  ✅ BTC: 120일
  ✅ ETH: 120일

🔧 A00_00 피처 생성 중...
  ✅ A00_00 완료: (240, 51) (51개 컬럼)

📊 B00_00 정규화 중...
  ✅ BTC: 7개 정규화
  ✅ ETH: 7개 정규화

🎯 최종 피처 필터링 중...
  ✅ 최종 피처: 28개
  ⚠️  BTC: 2025-09-27 사용
  ⚠️  ETH: 2025-09-27 사용
  💾 ./processed_data/BTC_preprocessed_20240928.csv
  💾 ./processed_data/ETH_preprocessed_20240928.csv

✅ 전처리 완료!
A00_00 → 51개 컬럼 → 정규화 → 최종 필터링

BTC: (1, 28), 컬럼: ['Stoch_14_K_above_D', 'Stoch_6_K_above_D', 'CCI_4_overbought', 'Formula3_Signal', 'Stoch_1_K_above_D', 'Stoch_4_K_above_D', 'is_quarter_start', 'high_volatility_regime', 'is_month_end', 'Stoch_3_K_above_D']...

ETH: (1, 28), 컬럼: ['Stoch_14_K_above_D', 'Stoch_6_K_above_D', 'CCI_4_overbought', 'Formula3_Signal', 'Stoch_1_K_above_D', 'Stoch_4_K_above_D', 'is_quarter_start', 'high_volatility_regime', 'is_month_end', 'Stoch_3_K_above_D']...
