# Example 15: Classifier for Direction Prediction

- Predicting: Return direction
- Technology: Gaussian Naive Bayes classifier(GNB)
- Asset Class: US equities
- Difficulty: Hard
- Type: Full strategy

---

GNB를 사용하여 기술주의 일일 수익률의 방향을 예측하여 그 결과를 양(positive), 음(negative), 보합(flat)|으로 분류

---

주식 수익률 예측은 보통 다양한 기술 지표, 가격 정보, 뉴스 등 수많은 변수들을 입력 변수(high dimension)들을 사용하는데 GNB는 이런 고차원 정보들을 빠르게 처리하면서도 성능이 좋음
(Simplicity, 계산 Efficiency, Effectiveness, 해석력 높음)

 GNB가 입력 특성들이 정규분포를 따르고 서로 독립적이다라는 가정

 vs
 - GNB: feature engineering, processing이 필요하며 결과에 영향
 - CNN: feature 자동 추출

모델 선정 기준
- 데이터 특성(GNB는 정규분포, 독립 특성을 가진 데이터에 적합)
- 모델 가정 충족 여부(GNB는 모델 가정이 중요)
- computational resource
- 해석력 vs 예측력

---
- Model Features: 4가지 daily returns, 30일 동안의 주식 데이터 기반
- Predicted Label: 22 거래일 후의 수익률 sign, 예측값은 -1(하락), 0(보합), 1(상승) 중 하나</br>
   각 주식을 예측된 방향대로 22거래일 동안 보유(hold)하는 전략을 의미

---
논문 요약

"Forecasting the direction of stock market index movement using three data mining techniques: the case of Tehran Stock Exchange"

1. 해결하고자 하는 문제
- 전통적인 금융 예측 방식은 주가의 높은 변동성과 비선형 구조로 인해 한계가 있음.
- 시장 변화를 예측하는 정확한 모델이 필요함.
- 특히 이란 주식시장(Tehran Stock Exchange, TSE)의 일일 방향성 예측에 중점을 둠.

2. 사용한 방법
- 세 가지 분류 기법을 비교함:
    - Decision Tree (DT)
    - Random Forest (RF)
    - Naive Bayes (NB)

- 입력 변수: 10개 미시 변수 + 3개 거시 변수를 사용하여 모델 입력 구성.
    - 10개 미시경제 변수 (예: 전일 종가, 거래량 변화 등)
    - 3개 거시경제 변수 (예: 환율, 금값, 유가)

- 종속 변수 (예측 대상):
    - 다음날의 시장 지수 방향: 상승(+1), 하락(−1), 보합(0)


3. 실험 결과
전체 모델 중 의사결정나무(Decision Tree)가 가장 높은 정확도인 80.08%를 기록.

- 다른 모델의 정확도:
    - Random Forest: 78.81%
    - Naive Bayes: 73.84%

- 성능 순위: Decision Tree > Random Forest > Naive Bayes

- 모델의 성능은 사용된 변수의 종류와 수, 데이터 전처리 방식에 따라 달라질 수 있음.

---

Trading Universe
- the 10 largest stocks from the technology sector as defined by Morningstar

---

Portfolio Construction
- Model Training Time: 매주 첫 거래일(보통 월요일) 오전 9시에 모델 학습
- Portfolio Rebalancing Time: 주식시장이 열린 후 2분 뒤에 포트폴리오를 리밸런싱(재조정)
- Portfolio Weights: 모델이 수익이 날 것이라고(+1) 예측한 종목들만 골라서, 이 종목들에 동일 비중(equal weight)으로 투자

---

Tearsheet
- 5개 종목 정도를 대상으로 할 때 가장 효율적으로 투자(Sharpe Ratio가 좋았다...)
- 어떤 조합이든 손실은 나지 않았고 수익을 내긴 했다
- hyper-parameter
    - days_per_sample: 최소 요인 2개, 최대 요인 8개(노이즈 이슈), step size 1
    - universe_size: 최소 5(종목 선택을 못하고 현금 보유 가능성), 최대 25(독립 변수 증가로 모델 복잡, 학습 어려움, 노이즈 증가), step size 5
    - days_per_sample: 4일치 데이터
    - sample: 학습 샘플 100개
    - universe_size: 한 번에 한 종목만 투자 대상으로 고려

---

시작 시: initialize() → 모델 불러오기

학습 시: _train() → 모델 업데이트

종료 시: on_end_of_algorithm() → 모델 저장

In [None]:
import numpy as np
import pandas as pd
import pickle
from sklearn.naive_bayes import GaussianNB
from sklearn.preprocessing import StandardScaler
from AlgorithmImports import *  # QuantConnect 전용 라이브러리

class GNBTradingBot(QCAlgorithm):
    def initialize(self):
        # 하이퍼파라미터 설정
        self._days_per_sample = 5                  # 시퀀스 샘플당 일 수 (open-close 수익률 개수)
        self._holding_period = 5                   # 미래 리턴을 평가할 기간
        self._samples = 20                         # 학습에 사용할 과거 샘플 수
        self._lookback = 50                        # 초기 warm-up 시 사용할 lookback 기간
        self._models_by_symbol = {}                # 심볼별 학습된 모델 저장
        self._tradable_securities = []             # 학습이 가능한 종목 목록
        self._key = "gnb_models.pkl"               # 모델 저장용 ObjectStore 키
        self.live_mode = self.LiveMode             # 라이브 트레이딩 여부

        self.set_start_date(2022, 1, 1)
        self.set_cash(100000)

        # 스케줄: 매주 월요일 9:00 학습, 9:32 트레이딩
        schedule_symbol = Symbol.create("SPY", SecurityType.EQUITY, Market.USA)
        date_rule = self.date_rules.week_start(schedule_symbol)
        self.schedule.on(date_rule, self.time_rules.at(9, 0), self._train)
        self.schedule.on(date_rule, self.time_rules.after_market_open(schedule_symbol, 2), self._trade)

        # 이전에 저장된 모델 불러오기 (Live 모드 전용)
        if self.live_mode and self.object_store.contains_key(self._key):
            self._models_by_symbol = pickle.loads(self.object_store.read_bytes(self._key))

    def on_securities_changed(self, changes):
        # 종목 추가 시: 모델 초기화 및 warm-up
        for security in changes.added_securities:
            security.model = None
            self._set_up_consolidator(security)
            self._warm_up(security)

        # 종목 제거 시: consolidator 제거
        for security in changes.removed_securities:
            self.subscription_manager.remove_consolidator(security.symbol, security.consolidator)

    def _set_up_consolidator(self, security):
        # 일간 바 수신을 위한 consolidator 등록
        security.consolidator = self.consolidate(security.symbol, Resolution.DAILY, self._consolidation_handler)

    def _consolidation_handler(self, bar):
        # 실시간 바 수신 시 호출되는 핸들러
        security = self.securities[bar.symbol]
        time = bar.end_time
        open_ = bar.open
        close = bar.close

        # open-close 수익률 계산
        open_close_return = (close - open_) / open_
        if not self._update_features(security, time, open_close_return):
            return

        # holding_period 전의 open 가격 불러오기
        opens = security.previous_opens
        prev_days = opens[opens.index <= time - timedelta(self._holding_period)]
        if len(prev_days) == 0:
            return

        open_day = prev_days.index[-1]
        prev_open = opens[open_day]
        open_open_return = (open_ - prev_open) / prev_open

        # 레이블 업데이트
        security.labels_by_day[open_day] = np.sign(open_open_return)

        # 최근 N개만 유지
        security.labels_by_day = security.labels_by_day[-self._samples:]
        security.previous_opens.loc[time] = open_
        security.previous_opens = security.previous_opens[-self._holding_period:]

    def _update_features(self, security, day, open_close_return):
        # 특징 벡터를 생성하고 업데이트
        if not hasattr(security, "roc_window"):
            security.roc_window = np.array([])

        # 가장 최신 수익률부터 앞쪽에 append
        security.roc_window = np.append(open_close_return, security.roc_window)[:self._days_per_sample]

        # 충분한 길이일 때만 기록
        if len(security.roc_window) < self._days_per_sample:
            return False

        security.features_by_day.loc[day] = security.roc_window
        security.features_by_day = security.features_by_day[-(self._samples + self._holding_period + 2):]
        return True

    def _warm_up(self, security):
        # 초기화
        security.roc_window = np.array([])
        security.previous_opens = pd.Series(dtype=float)
        security.labels_by_day = pd.Series(dtype=float)
        security.features_by_day = pd.DataFrame(
            {f"f{security.symbol.id}_t-{i}": [] for i in range(self._days_per_sample)}
        )

        # 히스토리 로딩
        history = self.history(security.symbol, self._lookback, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW)
        if history.empty or 'close' not in history.columns:
            self.log(f"Not enough history for {security.symbol} yet")
            return

        # open-close 수익률 및 future 수익률 계산
        history["open_close_return"] = (history["close"] - history["open"]) / history["open"]
        history = history.dropna()
        start = history.shift(self._holding_period).open
        end = history.shift(-self._holding_period).open
        history['future_return'] = (end - start) / start

        # 과거 데이터 기반 초기화
        for day, row in history.iterrows():
            security.previous_opens[day] = row.open
            if not self._update_features(security, day, row.open_close_return):
                continue
            if not pd.isnull(row.future_return):
                security.labels_by_day[day] = np.sign(row.future_return)

        # 최근만 유지
        security.labels_by_day = security.labels_by_day[-self._samples:]
        security.previous_opens = security.previous_opens[-self._holding_period:]

    def _is_ready(self, security):
        # 학습 가능한 상태인지 체크
        return security.features_by_day.shape[0] == self._samples + self._holding_period + 2

    def _train(self):
        # Gaussian Naive Bayes 모델 학습
        features = pd.DataFrame()
        labels_by_symbol = {}
        self._tradable_securities = []

        for symbol in self.universe.selected:
            security = self.securities[symbol]
            if self._is_ready(security):
                self._tradable_securities.append(security)
                features = pd.concat([features, security.features_by_day], axis=1)
                labels_by_symbol[symbol] = security.labels_by_day

        # 공통 인덱스 교집합 계산
        features.dropna(inplace=True)
        idx = set(features.index)
        for symbol, labels in labels_by_symbol.items():
            idx &= set(labels.index)
        idx = sorted(list(idx))

        # 모델 학습 및 저장
        for security in self._tradable_securities:
            symbol = security.symbol
            X = features.loc[idx]
            y = labels_by_symbol[symbol].loc[idx]
            model = GaussianNB().fit(X, y)
            security.model = model
            if self.live_mode:
                self._models_by_symbol[str(symbol.id)] = pickle.dumps(model)

    def _trade(self):
        # 예측을 통한 종목 선택 및 포트폴리오 설정
        if len(self._tradable_securities) == 0:
            return

        # 최신 특징 벡터 준비
        features = [[]]
        for security in self._tradable_securities:
            features[0].extend(security.features_by_day.iloc[-1].values)

        # 예측이 1인 종목만 매수
        long_symbols = []
        for security in self._tradable_securities:
            key = str(security.symbol.id)
            if self.live_mode and not hasattr(security, "model") and key in self._models_by_symbol:
                security.model = pickle.loads(self._models_by_symbol[key])

            if security.model and security.model.predict(features) == 1:
                long_symbols.append(security.symbol)

        # 비어있으면 리턴, 아니면 균등 분배
        if len(long_symbols) == 0:
            return

        weight = 1 / len(long_symbols)
        self.set_holdings([PortfolioTarget(symbol, weight) for symbol in long_symbols], True)

    def on_end_of_algorithm(self):
        # 라이브일 경우 모델 저장
        if self.live_mode:
            self.object_store.save_bytes(self._key, pickle.dumps(self._models_by_symbol))

    def on_splits(self, splits):
        # 액면분할 발생 시 재초기화
        for symbol, split in splits.items():
            if split.type != SplitType.SPLIT_OCCURRED:
                continue
            security = self.securities[symbol]
            self.subscription_manager.remove_consolidator(symbol, security.consolidator)
            self._set_up_consolidator(security)
            self._warm_up(security)
