# ToDo

2/22時点でののmodel.py([taro-kuroda-5228/auto_trading_submodule](https://github.com/taro-kuroda-5228/auto_trading_submodule))のModelクラスのリファクタリングを行う。

観点は下記の通り。

- Modelクラスの責務を分割
- DRY
- モデル選択を行えるクラスの作成

# auto_trading_submoduleの概要

auto_trading_submoduleでは、Yahoo Finance API2を使用して、将来（明日、1週間後、1ヶ月後）の株価のup/downを予測する。

株価を予測するまでの概要は下記の通りである。

1. 予測対象となる銘柄と、上場国を指定 -> Nameクラス
2. Yahoo Financeより、予測したい銘柄の株価情報（始値、終値、安値、高値、出来高）を取得 -> Symbol Dataクラス
3. 2で取得したデータをテーブルデータに整形 -> Raw Dataクラス
4. 予測したい銘柄の特徴量のテーブルデータを作成するために、1~3を実行
5. 予測したい銘柄、特徴量のテーブルデータを結合しデータマートを作成 -> Datamartクラス
6. データマートから学習・検証データセットを作成
7. 各データセットで学習済みモデル、予測値を算出 -> Modelクラス
8. 予測値を使用して、評価指標を算出 -> Scoreクラス
9. 最良の評価指標を算出した学習済みモデルを選択 -> Model Selectionクラス
10. 9の学習済みモデル使用して、将来の株価のup/downを予測する。-> TODO: future coding

# 準備

In [1]:
from typing import Dict, Tuple

import lightgbm as lgb
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score,
    auc,
    f1_score,
    log_loss,
    precision_recall_curve,
    precision_score,
    recall_score,
    roc_curve,
)
from sklearn.model_selection import TimeSeriesSplit

from datamart import Datamart
from feature import Feature
from name import Name
from raw_data import RawData
from symbol_data import SymbolData

# Main

## データマート作成

In [2]:
def create_datamart(name: str) -> pd.DataFrame:
    """終値 x 5日分のラグ x 1日後の予測のためのデータマート作成"""
    symbol_data = SymbolData(name).symbol_data
    raw_data = RawData(symbol_data).raw_data
    return Datamart(raw_data, "close", 5, 1, name).datamart


datamart_msft = create_datamart("msft")
datamart_dia = create_datamart("dia")
datamart_spy = create_datamart("spy")

datamart = pd.concat(
    [
        datamart_msft,
        datamart_dia.drop("target", axis=1),
        datamart_spy.drop("target", axis=1),
    ],
    axis=1,
)

# NOTE: 使用していない不要なクラスは削除しましょう。

## データセット作成

In [3]:
# NOTE: model.pyにおけるfold_splitなどを分割

In [4]:
def X_y_split(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
    """データマートを特徴量、目的変数に分割"""
    df = df.iloc[::-1]
    df.reset_index(inplace=True, drop=True)
    X = df.iloc[:, 1:]
    y = df["target"]
    return X, y


def train_test_time_series_split(
    X: pd.DataFrame, y: pd.Series
) -> Tuple[list, list, list, list]:
    """特徴量、目的変数を時系列 x train x testで分割
    
       Note:
           X_y_split関数でデータマートを特徴量と目的変数に分割した後、
           特徴量と目的変数を学習データセットと検証データセットに分割する。
           さらにこの関数では、学習データセットと検証データセットを5組生成する。
           
           この関数の具体的な機能は、例えばデータマートの期間が2021/01 ~ 2021/08であり、日次データであるとすれば、
               - データセット1（学習データセット1, 検証データセット1） -> 2021/01~2021/03, 2021/04
               - データセット1（学習データセット2, 検証データセット2） -> 2021/01~2021/04, 2021/05
               - データセット1（学習データセット3, 検証データセット3） -> 2021/01~2021/05, 2021/06
               - データセット1（学習データセット4, 検証データセット4） -> 2021/01~2021/06, 2021/07
               - データセット1（学習データセット5, 検証データセット5） -> 2021/01~2021/07, 2021/08
           という5組のデータセットを生成するという機能である。各学習・検証データセットはそれぞれ、X,yを持つ。
           l_X_train, l_X_test, l_y_train, l_y_test
    """
    folds = TimeSeriesSplit(n_splits=5)
    l_X_train = []
    l_X_test = []
    l_y_train = []
    l_y_test = []
    for train_index, test_index in folds.split(X):
        X_train, X_test = (
            X.iloc[train_index,],
            X.iloc[test_index,],
        )
        y_train, y_test = y[train_index], y[test_index]
        l_X_train.append(X_train)
        l_X_test.append(X_test)
        l_y_train.append(y_train)
        l_y_test.append(y_test)
    return l_X_train, l_X_test, l_y_train, l_y_test


X, y = X_y_split(datamart)
l_X_train, l_X_test, l_y_train, l_y_test = train_test_time_series_split(X, y)

## モデル選択

In [5]:
class Model:
    """学習・予測"""

    def __init__(
        self,
        model_str: str,
        l_X_train: list,
        l_X_test: list,
        l_y_train: list,
        l_y_test: list,
    ):
        self.model_str = model_str
        self.l_X_train = l_X_train
        self.l_X_test = l_X_test
        self.l_y_train = l_y_train
        self.l_y_test = l_y_test  # NOTE: Modelクラス内では使用しないが、Scoreクラス内で使用する。

    @property
    def clf(self):
        if self.model_str == "lgb":
            return lgb.LGBMClassifier()
        elif self.model_str == "lr":
            return LogisticRegression(
                max_iter=1500
            )  # NOTE: 2/22時点でConvergence Warningが送出されないことを確認済み
        else:
            raise Exception(f"想定していないモデル({model_str})が指定されています。")

    def main(self) -> Dict[str, list]:
        """各データセットの学習済みモデル・予測値・予測確率(Positive & Negative *1)・予測確率（Positive *2）を算出。
        
           Note:
               *1, 後続のlog_loss算出に使用
               *2, 後続のauc算出に使用
        """
        self.l_clf = []
        self.l_pred = []
        self.l_prob = []
        self.l_prob_posi = []
        for X_train, X_test, y_train in zip(
            self.l_X_train, self.l_X_test, self.l_y_train
        ):
            clf = self.clf
            clf.fit(X_train, y_train)
            y_pred = clf.predict(X_test)
            y_prob = clf.predict_proba(X_test)
            y_prob_posi = y_prob[:, 1]
            self.l_clf.append(clf)
            self.l_pred.append(y_pred)
            self.l_prob.append(y_prob)
            self.l_prob_posi.append(y_prob_posi)

In [6]:
class Score:
    """予測後の評価指標"""

    def __init__(
        self, model: Model,
    ):
        self.model = model

    @property
    def l_idx_accuracy(self):
        l = []
        for true, pred in zip(self.model.l_y_test, self.model.l_pred):
            l.append(accuracy_score(y_true=true, y_pred=pred))
        return l

    @property
    def l_idx_precision(self):
        l = []
        for true, pred in zip(self.model.l_y_test, self.model.l_pred):
            l.append(precision_score(y_true=true, y_pred=pred))
        return l

    @property
    def l_idx_recall(self):
        l = []
        for true, pred in zip(self.model.l_y_test, self.model.l_pred):
            l.append(recall_score(y_true=true, y_pred=pred))
        return l

    @property
    def l_idx_f1(self):
        l = []
        for true, pred in zip(self.model.l_y_test, self.model.l_pred):
            l.append(f1_score(y_true=true, y_pred=pred))
        return l

    @property
    def l_idx_log_loss(self):
        l = []
        for true, prob in zip(self.model.l_y_test, self.model.l_prob):
            l.append(-1 * log_loss(y_true=true, y_pred=prob))
        return l

    @property
    def l_idx_roc_auc(self):
        l = []
        for true, prob_posi in zip(self.model.l_y_test, self.model.l_prob_posi):
            fpr, tpr, _ = roc_curve(y_true=true, y_score=prob_posi)
            l.append(auc(fpr, tpr))
        return l

    @property
    def l_idx_pr_auc(self):
        l = []
        for true, prob_posi in zip(self.model.l_y_test, self.model.l_prob_posi):
            pr_precision, pr_recall, _ = precision_recall_curve(
                y_true=true, probas_pred=prob_posi
            )
            l.append(auc(pr_recall, pr_precision))
        return l

In [7]:
class ModelSelection:
    """最適なモデル選択"""

    def __init__(
        self,
        l_X_train: list,
        l_X_test: list,
        l_y_train: list,
        l_y_test: list,
        *model_str,
    ):
        self.model_str = list(model_str)
        self.l_X_train = l_X_train
        self.l_X_test = l_X_test
        self.l_y_train = l_y_train
        self.l_y_test = l_y_test  # NOTE: Modelクラス内では使用しないが、Scoreクラス内で使用する。

        self.l_score = []  # NOTE: あるモデルのScoreインスタンス
        self.l_average_score = []  # NOTE: あるモデルが持つ複数の評価指標の平均値
        self.l_ensemble_score = []
        self.l_l_clf = []  # NOTE: 全ての学習済みモデル

        self.n_split = 5

    def create_df_idx(self, score: Score):
        """あるモデルの評価指標のテーブル"""
        v = [getattr(score, attr) for attr in dir(score) if attr.startswith("l_idx")]
        idx = [
            attr.replace("l_idx_", "")
            for attr in dir(score)
            if attr.startswith("l_idx")
        ]
        col = [
            order for order in range(1, self.n_split + 1)
        ]  # NOTE: n_splitはTimeSeriesSplitによる分割回数
        return pd.DataFrame(v, index=idx, columns=col)

    def create_average_score(self, df_idx):
        """ある指標の中でどのモデルが良かったかの判断指標"""
        return df_idx.mean(axis=1)

    def create_ensemble_score(self, df_idx):
        """あるモデルの中で何番目の分割が良かったかの判断指標"""
        return df_idx.mean(axis=0)

    def _calc(self):
        # NOTE: アンダーバー1つはクラス内部用。外部からアクセスしない。
        # .      2つは外部からアクセスできない。
        """最良のモデル選択に使用する評価指標の数値を算出"""
        for model_str in self.model_str:
            model = Model(model_str, l_X_train, l_X_test, l_y_train, l_y_test)
            model.main()
            score = Score(model)
            df_idx = self.create_df_idx(score)
            average_score = self.create_average_score(df_idx)
            ensemble_score = self.create_ensemble_score(df_idx)
            self.l_average_score.append(average_score.sum())
            self.l_ensemble_score.append(ensemble_score.tolist())
            self.l_score.append(score)
            self.l_l_clf.append(model.l_clf)

    def best_model(self, verbose: bool = False):
        self._calc()
        max_ = max(self.l_average_score)
        self.idx_max_model = self.l_average_score.index(max_)

        max_ = max(self.l_ensemble_score[self.idx_max_model])
        self.idx_max_split = self.l_ensemble_score[self.idx_max_model].index(max_)

        if verbose:
            print(
                f"""best_model: {self.model_str[self.idx_max_model]}\nbest_split: {self.idx_max_split}"""
            )

        return self.l_l_clf[self.idx_max_model][self.idx_max_split]

In [8]:
model_selection = ModelSelection(l_X_train, l_X_test, l_y_train, l_y_test, "lgb", "lr")

In [9]:
model_selection.best_model(verbose=True)

best_model: lr
best_split: 3


LogisticRegression(max_iter=1500)