In [20]:
import math
from datetime import datetime, timedelta
from pprint import pprint
from random import randrange

import japanize_matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import yfinance as yf
from backtesting import Backtest, Strategy
from backtesting.lib import crossover
from stockstats import StockDataFrame

sns.set(font="IPAexGothic", rc={"figure.figsize": (11, 8)})
pd.options.display.float_format = "{:6.2f}".format

In [48]:
def random_date(start, end):
    """
    This function will return a random datetime between two datetime
    objects.
    """
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = randrange(int_delta)
    return start + timedelta(seconds=random_second)


def get_previous_datetime_by(standard_time, year):
    weeks = year * 52  # 正確には52週ではないが妥協
    return standard_time - timedelta(weeks=weeks)


def convert_df_to_stock_df(df: pd.DataFrame) -> StockDataFrame:
    sdf = df.copy()
    sdf.rename(
        columns={
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Adj Close": "amount",
            "Volume": "volume",
        },
        inplace=True,
    )
    sdf.index.names = ["date"]
    return StockDataFrame(sdf)


def MACD(arr: pd.DataFrame) -> tuple[pd.Series, pd.Series]:
    sdf = convert_df_to_stock_df(arr)
    StockDataFrame.MACD_EMA_SHORT = 12
    StockDataFrame.MACD_EMA_LONG = 26
    StockDataFrame.MACD_EMA_SIGNAL = 9
    return (sdf["macd"], sdf["macds"])


def ATR(arr: pd.DataFrame) -> pd.Series:
    sdf = convert_df_to_stock_df(arr)
    return sdf["atr"]


def SMA(arr: pd.DataFrame, payload=200) -> pd.Series:
    sdf = convert_df_to_stock_df(arr)
    return sdf["open_" + str(payload) + "_sma"]


def DC(arr: pd.DataFrame, payload=20):
    count = 0

    max = []
    min = []
    middle = []

    df = convert_df_to_stock_df(arr).copy()
    for index, data in df.iterrows():

        if payload > count:
            # 計算できていない間は反応しないように適当に広げておく
            max.append(data["high"] + 10)
            min.append(data["low"] - 10)
            middle.append((data["high"] + data["low"]) / 2)
            count += 1
            continue

        # 当日は含めないので-1しとく
        range = df[count - payload : count - 1]
        middle.append((range["high"].max() + range["low"].min()) / 2)
        max.append(range["high"].max())
        min.append(range["low"].min())
        count += 1

    df["max"] = max
    df["min"] = min
    df["middle"] = middle

    # 以下はトレンド（何回最高を更新したか）の回数

    trend_arr = []
    trend = 0
    for index, data in df.iterrows():
        if data["max"] < data["high"]:
            if trend < 0:
                trend = 0
            else:
                trend += 1

        if data["min"] > data["low"]:
            if trend > 0:
                trend = 0
            else:
                trend -= 1
        trend_arr.append(trend)
    df["trend"] = trend_arr

    return (df["min"], df["max"])  # df["middle"], df["trend"]

In [12]:
def get_random_yf_data(tickers: str, start_str: str, end_str: str, payload_year: int):
    end = random_date(
        datetime.strptime(start_str, "%Y/%m/%d %H:%M:%S"),
        datetime.strptime(end_str, "%Y/%m/%d %H:%M:%S"),
    )

    start = get_previous_datetime_by(end, payload_year)

    # Valid start and end: YYYY-MM-DD
    # Valid periods: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max
    # Valid intervals: [1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo]
    return yf.download(
        tickers=tickers,
        start=start,
        end=end,
        interval="1d",
        group_by="ticker",
    ).dropna()


def get_yf_data(tickers: str, start_str: str, end_str: str):
    return yf.download(
        tickers=tickers,
        start=start_str,
        end=end_str,
        interval="1d",
        group_by="ticker",
    ).dropna()

In [5]:
yf_datas = []
for name in range(1, 10, 1):
    yf_datas.append(
        get_random_yf_data("NDX", "2015/10/01 00:00:00", "2022/11/24 00:00:00", 2)
    )

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


In [69]:
class My_Strategy(Strategy):
    # 1week = 5day (取引所の営業日)
    entry_payload_day = 20
    sma_payload = 200

    stop_loss = 5
    atr_not_trade = 6

    day = 0

    def init(self):
        self.atr = self.I(ATR, self.data.df)

        (self.dc_min, self.dc_max) = self.I(DC, self.data.df, self.entry_payload_day)

        (self.dc_half_min, self.dc_half_max) = self.I(
            DC, self.data.df, int(self.entry_payload_day / 2)
        )

        self.sma = self.I(SMA, self.data.df, self.sma_payload)

    def atr_unit(self):
        # 変数は1ATRが与える影響度: 0.01 ⇨ 1ATRで総資産の1%の変化
        unit = math.floor((self.equity * 10) / (self.atr[-1] * self.data.Close[-1]))
        # print(unit)
        return unit

    def next(self):
        self.day += 1

        # 計算できていない場合トレードしない
        if len(self.data.index) < self.entry_payload_day:
            return

        # 手仕舞い (半分のDCの高値または安値を下回った場合)
        if (self.dc_half_min[-1] > self.data.Low[-1] and self.position.is_long) or (
            self.data.High[-1] > self.dc_half_max[-1] and self.position.is_short
        ):
            self.position.close()
            return

        # 手仕舞い（SMA）
        if (self.sma[-1] > self.data.Close[-1] and self.position.is_long) or (
            self.sma[-1] < self.data.Close[-1] and self.position.is_short
        ):
            self.position.close()
            return

        if self.atr[-1] > self.atr_not_trade:
            return

        # 買い注文
        if (
            self.sma[-1] < self.data.Close[-1]
            and self.data.Close[-1] > self.dc_max[-1]
            and not self.position.is_long
        ):
            self.buy(sl=self.data.Close[-1] * (1 - (self.stop_loss / 100)))  # 損切り
            return

        # 売り注文
        if (
            self.sma[-1] > self.data.Close[-1]
            and self.dc_min[-1] > self.data.Close[-1]
            and not self.position.is_short
        ):
            self.sell(sl=self.data.Close[-1] * (1 + (self.stop_loss / 100)))  # 損切り
            return

In [8]:
result = []
for yf_data in yf_datas:
    bt = Backtest(
        yf_data,
        My_Strategy,
        cash=1000000000,
        commission=0.003,
        exclusive_orders=True,
    )
    result.append(bt.run())

print(pd.DataFrame(result)["SQN"])
print(pd.DataFrame(result)["SQN"].mean())

0     0.28
1     0.19
2    -1.30
3    -1.69
4    -1.59
5     0.58
6    -1.46
7     0.14
8     1.31
Name: SQN, dtype: float64
-0.39281038098027454


In [9]:
print(pd.DataFrame(result).iloc[8])

Start                                                   2018-09-06 00:00:00
End                                                     2020-09-02 00:00:00
Duration                                                  727 days 00:00:00
Exposure Time [%]                                                     75.90
Equity Final [$]                                              1430893639.93
Equity Peak [$]                                               1430893639.93
Return [%]                                                            43.09
Buy & Hold Return [%]                                                 66.65
Return (Ann.) [%]                                                     19.71
Volatility (Ann.) [%]                                                 28.13
Sharpe Ratio                                                           0.70
Sortino Ratio                                                          1.27
Calmar Ratio                                                           1.29
Max. Drawdow

In [70]:
bt = Backtest(
    get_yf_data("SPY", "2015-11-06", "2022-12-02"),
    My_Strategy,
    cash=10000000,
    commission=0.003,
    exclusive_orders=True,
)

# 最適化
optimize = bt.optimize(
    maximize="SQN",
    stop_loss=range(2, 8, 1),
    entry_payload_day=[20, 55],
    sma_payload=range(50, 200, 50),
    atr_not_trade=range(4, 8, 1),
)
bt.plot()
print(optimize)
print(optimize._strategy)

# 出力
# output = bt.run()
# print(output)
# bt.plot()

[*********************100%***********************]  1 of 1 completed


Start                     2015-11-06 00:00:00
End                       2022-11-18 00:00:00
Duration                   2569 days 00:00:00
Exposure Time [%]                       51.24
Equity Final [$]                  11700259.23
Equity Peak [$]                   12358113.16
Return [%]                              17.00
Buy & Hold Return [%]                   88.55
Return (Ann.) [%]                        2.26
Volatility (Ann.) [%]                    8.72
Sharpe Ratio                             0.26
Sortino Ratio                            0.36
Calmar Ratio                             0.16
Max. Drawdown [%]                      -14.44
Avg. Drawdown [%]                       -2.33
Max. Drawdown Duration      857 days 00:00:00
Avg. Drawdown Duration       88 days 00:00:00
# Trades                                   19
Win Rate [%]                            42.11
Best Trade [%]                          18.69
Worst Trade [%]                         -2.86
Avg. Trade [%]                    