<a href="https://colab.research.google.com/github/paulyu8868/test/blob/main/%EB%96%A8%EC%82%AC%EC%98%A4%ED%8C%94%EB%94%A5%EB%9F%AC%EB%8B%9Dv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install yfinance pandas numpy tensorflow scikit-learn matplotlib seaborn

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler
import yfinance as yf
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns

# GPU 메모리 설정
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
   try:
       for gpu in gpus:
           tf.config.experimental.set_memory_growth(gpu, True)
   except RuntimeError as e:
       print(e)


def get_data(ticker, start, end):
    """데이터 로드 및 전처리"""
    try:
        # 데이터 다운로드
        df = yf.download(ticker, start=start, end=end)

        # 필요한 컬럼만 선택
        df = df[['Open', 'High', 'Low', 'Close']]

        # 볼린저 밴드 계산
        df['MA20'] = df['Close'].rolling(window=20).mean()
        df['STD20'] = df['Close'].rolling(window=20).std()
        df['Upper'] = df['MA20'] + (2 * df['STD20'])
        df['Lower'] = df['MA20'] - (2 * df['STD20'])

        # BB 위치 계산 - 1차원 배열로 변환
        band_range = (df['Upper'] - df['Lower']).values.flatten()
        close = df['Close'].values.flatten()
        lower = df['Lower'].values.flatten()

        # BB 위치 계산
        bb_position = np.zeros(len(df))
        mask = band_range != 0
        bb_position[mask] = ((close[mask] - lower[mask]) / band_range[mask] * 100)
        bb_position[~mask] = 50
        df['BB위치'] = bb_position

        # BB 밴드폭 계산
        ma20 = df['MA20'].values.flatten()
        bb_width = np.zeros(len(df))
        mask = ma20 != 0
        bb_width[mask] = ((band_range[mask] / ma20[mask]) * 100)
        df['BB밴드폭'] = bb_width

        # 가격 변화율
        df['Return'] = df['Close'].pct_change() * 100
        df['prev_close'] = df['Close'].shift(1)

        # 불필요한 컬럼 제거
        df = df.drop('STD20', axis=1)

        # 날짜 인덱스 변환
        df.index = pd.to_datetime(df.index).date

        # 결측값 제거
        df = df.dropna()

        return df

    except Exception as e:
        print(f"Error in get_data: {str(e)}")
        raise e


def generate_original_trades(df):
    """원래 매매 전략으로 거래 기록 생성"""
    trades = []
    position = None
    bb_targets = {
        (-float('inf'), 0): 4.0,
        (0, 20): 3.86,
        (20, 40): 3.16,
        (40, 60): 3.25,
        (60, 80): 2.35,
        (80, 100): 2.14,
        (100, float('inf')): 1.59
    }

    for i in range(1, len(df)):
        current_price = float(df['Close'].iloc[i])  # float로 변환
        prev_price = float(df['Close'].iloc[i-1])  # float로 변환

        if position is None:  # 매수 조건 확인
            if current_price <= prev_price:  # 전날 종가 대비 하락 또는 동일
                position = {
                    'entry_price': current_price,
                    'entry_date': df.index[i],
                    'entry_idx': i
                }
        else:  # 매도 조건 확인
            days_held = i - position['entry_idx']
            bb_position = float(df['BB위치'].iloc[i])  # float로 변환

            # BB 구간별 목표 수익률 확인
            for (lower, upper), target in bb_targets.items():
                if lower < bb_position <= upper:
                    target_price = position['entry_price'] * (1 + target/100)
                    if current_price >= target_price or days_held >= 30:
                        trades.append({
                            'entry_date': position['entry_date'],
                            'exit_date': df.index[i],
                            'entry_price': position['entry_price'],
                            'exit_price': current_price,
                            'days_held': days_held,
                            'profit_pct': (current_price/position['entry_price'] - 1) * 100
                        })
                        position = None
                        break

    return pd.DataFrame(trades)


class EnhancedTradingModel:
    def __init__(self, lookback_window=10):
        self.lookback_window = lookback_window
        self.feature_scaler = MinMaxScaler()
        self.model = None

    def build_model(self, input_shape):
        """모델 구축"""
        # 입력 레이어
        input_layer = Input(shape=input_shape)

        # LSTM 레이어
        lstm = LSTM(64, return_sequences=True,
                   kernel_regularizer=tf.keras.regularizers.l2(0.01))(input_layer)
        lstm = Dropout(0.3)(lstm)
        lstm = LSTM(32, kernel_regularizer=tf.keras.regularizers.l2(0.01))(lstm)
        lstm = Dropout(0.3)(lstm)

        # 공통 특성 추출
        common = Dense(32, activation='relu',
                      kernel_regularizer=tf.keras.regularizers.l2(0.01))(lstm)
        common = Dropout(0.2)(common)

        # 각 출력 브랜치
        price_dense = Dense(16, activation='relu')(common)
        price_output = Dense(1, name='price_output')(price_dense)

        timing_dense = Dense(16, activation='relu')(common)
        timing_output = Dense(1, activation='sigmoid', name='timing_output')(timing_dense)

        risk_dense = Dense(16, activation='relu')(common)
        risk_output = Dense(1, activation='sigmoid', name='risk_output')(risk_dense)

        # 모델 생성
        model = Model(
            inputs=input_layer,
            outputs=[price_output, timing_output, risk_output]
        )

        # 컴파일
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
            loss={
                'price_output': 'mse',
                'timing_output': 'binary_crossentropy',
                'risk_output': 'mse'
            },
            loss_weights={
                'price_output': 1.0,
                'timing_output': 0.7,
                'risk_output': 0.5
            },
            metrics={
                'price_output': ['mae'],
                'timing_output': ['accuracy'],
                'risk_output': ['mae']
            }
        )

        self.model = model
        return model

    def prepare_training_data(self, df, trades_df):
        """학습 데이터 준비"""
        features = []
        price_targets = []
        timing_targets = []
        risk_targets = []

        # 특성 스케일링
        feature_columns = ['Close', 'BB위치', 'BB밴드폭', 'Return']
        scaled_features = self.feature_scaler.fit_transform(df[feature_columns])
        scaled_df = pd.DataFrame(scaled_features, index=df.index, columns=feature_columns)

        # 시퀀스 데이터 생성
        for i in range(len(df) - self.lookback_window):
            # 입력 시퀀스
            sequence = scaled_df.iloc[i:i+self.lookback_window]
            features.append(sequence.values)

            # 타겟 인덱스
            target_idx = i + self.lookback_window

            # 가격 타겟
            bb_pos = float(df['BB위치'].iloc[target_idx])
            current_price = float(df['Close'].iloc[target_idx])

            # BB 구간별 목표 가격 설정
            bb_targets = {
                (-float('inf'), 0): 4.0,
                (0, 20): 3.86,
                (20, 40): 3.16,
                (40, 60): 3.25,
                (60, 80): 2.35,
                (80, 100): 2.14,
                (100, float('inf')): 1.59
            }

            target_price = current_price * 1.02  # 기본 목표 수익률
            for (lower, upper), target in bb_targets.items():
                if lower < bb_pos <= upper:
                    target_price = current_price * (1 + target/100)
                    break

            price_targets.append(target_price)

            # 타이밍 타겟
            timing_targets.append(1.0 if df.index[target_idx] in trades_df['exit_date'].values else 0.0)

            # 리스크 타겟
            bb_risk = bb_pos / 100
            vol_risk = float(df['BB밴드폭'].iloc[target_idx]) / 100
            risk_targets.append(min(1.0, bb_risk * 0.6 + vol_risk * 0.4))

        return (np.array(features),
                np.array(price_targets),
                np.array(timing_targets),
                np.array(risk_targets))

    def train(self, X, y_price, y_timing, y_risk, epochs=50, batch_size=32):
        """모델 학습"""
        early_stopping = tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        )

        reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.2,
            patience=5,
            min_lr=0.0001
        )

        history = self.model.fit(
            X,
            {
                'price_output': y_price,
                'timing_output': y_timing,
                'risk_output': y_risk
            },
            epochs=epochs,
            batch_size=batch_size,
            validation_split=0.2,
            callbacks=[early_stopping, reduce_lr],
            verbose=1
        )
        return history

    def predict(self, sequence):
        """단일 시퀀스에 대한 예측"""
        if len(sequence.shape) == 2:
            sequence = np.expand_dims(sequence, axis=0)

        predictions = self.model.predict(sequence, verbose=0)
        return (
            predictions[0][0][0],  # price prediction
            predictions[1][0][0],  # timing prediction
            predictions[2][0][0]   # risk prediction
        )

def backtest_enhanced_strategy(model, test_data, initial_balance=100000):
   """향상된 전략으로 백테스트 수행"""
   trades = []
   balance = initial_balance
   position = None
   feature_columns = ['Close', 'BB위치', 'BB밴드폭', 'Return']

   # 특성 스케일링
   scaled_features = model.feature_scaler.transform(test_data[feature_columns])
   scaled_df = pd.DataFrame(scaled_features, index=test_data.index, columns=feature_columns)

   for i in range(model.lookback_window, len(test_data)):
       current_price = float(test_data['Close'].iloc[i])
       prev_price = float(test_data['Close'].iloc[i-1])
       current_date = test_data.index[i]

       # 현재 시퀀스 준비
       sequence = scaled_df.iloc[i-model.lookback_window:i].values

       if position is None:  # 매수 조건 확인
           # 모델 예측
           price_pred, timing_pred, risk_pred = model.predict(sequence)

           # 매수 조건: 타이밍 점수가 낮고, 리스크가 낮으며, 전날 대비 하락
           if (timing_pred < 0.3 and
               risk_pred < 0.5 and
               current_price <= prev_price):

               qty = int(balance / current_price)
               if qty > 0:
                   position = {
                       'entry_price': current_price,
                       'quantity': qty,
                       'entry_date': current_date,
                       'entry_idx': i,
                       'target_price': price_pred
                   }
                   balance -= current_price * qty

       else:  # 매도 조건 확인
           days_held = i - position['entry_idx']
           price_pred, timing_pred, risk_pred = model.predict(sequence)

           # 매도 조건 체크
           sell_signal = (
               timing_pred > 0.7 or  # 높은 매도 확률
               current_price >= position['target_price'] or  # 목표가 도달
               days_held >= 30 or  # 최대 보유기간
               risk_pred > 0.8  # 높은 리스크
           )

           if sell_signal:
               balance += current_price * position['quantity']
               trades.append({
                   'entry_date': position['entry_date'],
                   'exit_date': current_date,
                   'entry_price': position['entry_price'],
                   'exit_price': current_price,
                   'quantity': position['quantity'],
                   'days_held': days_held,
                   'profit_pct': (current_price / position['entry_price'] - 1) * 100
               })
               position = None

   # 최종 포지션 정리
   if position is not None:
       current_price = float(test_data['Close'].iloc[-1])
       days_held = len(test_data) - 1 - position['entry_idx']
       balance += current_price * position['quantity']
       trades.append({
           'entry_date': position['entry_date'],
           'exit_date': test_data.index[-1],
           'entry_price': position['entry_price'],
           'exit_price': current_price,
           'quantity': position['quantity'],
           'days_held': days_held,
           'profit_pct': (current_price / position['entry_price'] - 1) * 100
       })

   trades_df = pd.DataFrame(trades)
   final_return = (balance / initial_balance - 1) * 100

   return trades_df, final_return

def main():
   # 데이터 준비
   start_date = '2011-01-01'
   end_date = '2024-10-31'

   print("데이터 로딩 중...")
   df = get_data('TQQQ', start_date, end_date)

   # 학습/테스트 데이터 분할
   train_size = int(len(df) * 0.8)
   train_data = df.iloc[:train_size]
   test_data = df.iloc[train_size:]

   print(f"데이터 기간: {df.index[0]} ~ {df.index[-1]}")
   print(f"총 데이터 수: {len(df)}")
   print(f"\n학습 데이터: {train_data.index[0]} ~ {train_data.index[-1]} ({len(train_data)}일)")
   print(f"테스트 데이터: {test_data.index[0]} ~ {test_data.index[-1]} ({len(test_data)}일)")

   # 원래 전략으로 학습용 거래 기록 생성
   print("\n원래 전략으로 학습 데이터 생성 중...")
   trades_df = generate_original_trades(train_data)
   print(f"생성된 거래 기록 수: {len(trades_df)}")

   # 모델 생성 및 학습
   print("\n모델 생성 중...")
   model = EnhancedTradingModel(lookback_window=10)
   model.build_model(input_shape=(10, 4))

   print("\n학습 데이터 준비 중...")
   X, y_price, y_timing, y_risk = model.prepare_training_data(train_data, trades_df)
   print(f"학습 데이터 형태: {X.shape}")

   print("\n모델 학습 중...")
   history = model.train(X, y_price, y_timing, y_risk, epochs=50)

   # 향상된 전략으로 테스트
   print("\n테스트 데이터로 백테스트 수행 중...")
   test_trades_df, test_return = backtest_enhanced_strategy(model, test_data)

   # 결과 출력
   print("\n=== 백테스트 결과 ===")
   print(f"총 거래 횟수: {len(test_trades_df)}")
   if len(test_trades_df) > 0:
       print(f"평균 수익률: {test_trades_df['profit_pct'].mean():.2f}%")
       print(f"승률: {(test_trades_df['profit_pct'] > 0).mean() * 100:.2f}%")
       print(f"평균 보유기간: {test_trades_df['days_held'].mean():.1f}일")
   print(f"최종 수익률: {test_return:.2f}%")

   if len(test_trades_df) > 0:
       # 월별 성과 분석
       test_trades_df['yearmonth'] = pd.to_datetime(test_trades_df['exit_date']).dt.strftime('%Y-%m')
       monthly_returns = test_trades_df.groupby('yearmonth')['profit_pct'].agg(['mean', 'count'])
       print("\n=== 월별 성과 ===")
       print(monthly_returns)

if __name__ == "__main__":
   main()



[*********************100%***********************]  1 of 1 completed

데이터 로딩 중...
데이터 기간: 2011-01-31 ~ 2024-10-30
총 데이터 수: 3461

학습 데이터: 2011-01-31 ~ 2022-01-27 (2768일)
테스트 데이터: 2022-01-28 ~ 2024-10-30 (693일)

원래 전략으로 학습 데이터 생성 중...



  df = df.drop('STD20', axis=1)
  current_price = float(df['Close'].iloc[i])  # float로 변환
  prev_price = float(df['Close'].iloc[i-1])  # float로 변환


생성된 거래 기록 수: 252

모델 생성 중...

학습 데이터 준비 중...


  current_price = float(df['Close'].iloc[target_idx])


학습 데이터 형태: (2758, 10, 4)

모델 학습 중...
Epoch 1/50
[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 24ms/step - loss: 48.1245 - price_output_mae: 5.0600 - risk_output_mae: 0.1764 - timing_output_accuracy: 0.5175 - val_loss: 1994.1558 - val_price_output_mae: 39.1590 - val_risk_output_mae: 0.1804 - val_timing_output_accuracy: 0.8913 - learning_rate: 0.0010
Epoch 2/50
[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 15ms/step - loss: 29.8602 - price_output_mae: 4.3589 - risk_output_mae: 0.1799 - timing_output_accuracy: 0.9059 - val_loss: 1494.3676 - val_price_output_mae: 32.3564 - val_risk_output_mae: 0.1919 - val_timing_output_accuracy: 0.8913 - learning_rate: 0.0010
Epoch 3/50
[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - loss: 4.6944 - price_output_mae: 1.2332 - risk_output_mae: 0.1705 - timing_output_accuracy: 0.9071 - val_loss: 1289.6663 - val_price_output_mae: 29.2893 - val_risk_output_mae: 0.1696 - val_timing_output_accuracy:

  current_price = float(test_data['Close'].iloc[i])
  prev_price = float(test_data['Close'].iloc[i-1])



=== 백테스트 결과 ===
총 거래 횟수: 201
평균 수익률: 0.34%
승률: 51.24%
평균 보유기간: 1.2일
최종 수익률: 63.92%

=== 월별 성과 ===
               mean  count
yearmonth                 
2022-02   -0.471663      4
2022-03    1.663769      9
2022-04   -0.469480      8
2022-05    2.290982      8
2022-06   -1.370264      7
2022-07    3.247331      6
2022-08   -0.396069      9
2022-09   -1.871196      4
2022-10   -1.478344      5
2022-11    3.431971      7
2022-12    0.300999      3
2023-01   -0.913289      3
2023-02   -0.065476      6
2023-03   -0.373736      6
2023-04   -0.437190      7
2023-05    0.066316      6
2023-06    0.318974      6
2023-07   -1.163012      6
2023-08   -0.580525      8
2023-09   -0.669371      7
2023-10   -1.543656      6
2023-11    2.598083      5
2023-12    1.607056      5
2024-01   -1.500019      5
2024-02    0.548922      6
2024-03   -1.088823      5
2024-04    1.275813      8
2024-05   -1.454971      5
2024-06    0.756114      4
2024-07    0.993160      7
2024-08    1.314120      7
2024-09   

  current_price = float(test_data['Close'].iloc[-1])
