In [None]:
# 載入必要套件
from BackTest import ChartTrade
import mplfinance as mpf
import pandas as pd
import os
import yfinance as yf

# 自訂模組
from BackTest import Performance


# 設定在根目錄即可，之後可以改到 /data
datapath = "./"


# 更新資料源為 yahoo finance
def getDataFM(prod, st, en):
    #
    bakfile = f"data//YF_{prod}_{st}_{en}_stock_daily_adj.csv"
    if os.path.exists(bakfile):
        data = pd.read_csv(bakfile)
        data["Date"] = pd.to_datetime(data["Date"])
        data = data.set_index("Date")
    else:
        data = yf.download(f"{prod}.TW", start=st, end=en)
        data.columns = [i.lower() for i in data.columns]
        # 除錯 如果沒有資料
        if data.shape[0] == 0:
            print("沒有資料")
            return pd.DataFrame()
        # 將資料寫入備份檔
        data.to_csv(bakfile)
    return data


# 取得三大法人 證交所資料來源
def getTSEInstitutionalInvestors(prod, st, en):
    # 備份檔名
    bakfile = f"{datapath}//{prod}_{st}_{en}_TSE_InstitutionalInvestorsBuySell.csv"
    # 檢視是否有該檔案存在
    if os.path.exists(bakfile):
        # 取得檔案內容
        tmpdata = pd.read_csv(bakfile)
        tmpdata["日期"] = pd.to_datetime(tmpdata["日期"])
        tmpdata = tmpdata.set_index(tmpdata["日期"])
        tmpdata.drop("日期", axis=1, inplace=True)

    # 沒有的話就取檔案內容
    else:
        # 取得檔案內容
        tmpdata = pd.read_csv("三大法人爬蟲資料.csv", encoding="utf-8")
        tmpdata = tmpdata[
            (tmpdata["證券代號"] == prod) &
            (tmpdata["日期"] >= int(st)) &
            (tmpdata["日期"] <= int(en))
        ]
        tmpdata["日期"] = pd.to_datetime(tmpdata["日期"], format="%Y%m%d")
        tmpdata = tmpdata.set_index(tmpdata["日期"])
        tmpdata.drop("日期", axis=1, inplace=True)
        # 將資料內容轉換為數值
        for i in range(2, tmpdata.shape[1]):
            tmpdata.iloc[:, i] = tmpdata.iloc[:, i].str.replace(",", "")
            tmpdata.iloc[:, i].astype(int)
        # 將單一證券內容寫入備份檔中
        tmpdata.to_csv(bakfile)
    # 回傳資料
    return tmpdata


# 取得K線與三大法人的集合資料 透過 證交所
def getPriceAndInstInvest_TSE(prod, st, en):
    # 取得還原股價
    data1 = getDataFM(prod, st, en)
    # 取得證交所的三大法人
    st = st.replace("-", "")
    en = en.replace("-", "")
    data2 = getTSEInstitutionalInvestors(prod, st, en)
    data3 = pd.concat([data1, data2], axis=1, join="inner")
    return data3


# 取得回測資料
prod = "2618"
data = getPriceAndInstInvest_TSE(prod, "2023-12-01", "2023-12-10")

# 減法之前先轉換
data["投信買進股數"] = pd.to_numeric(data["投信買進股數"], errors="coerce")
data["投信賣出股數"] = pd.to_numeric(data["投信賣出股數"], errors="coerce")
# 計算 外資平均買賣
data["I_day"] = data["投信買進股數"] - data["投信賣出股數"]
data["I_month"] = (data["I_day"]).rolling(60).sum()
data["I_mean"] = (data["I_month"]).rolling(60).mean()


# 初始部位
position = 0
trade = pd.DataFrame()
# 開始回測
for i in range(data.shape[0] - 1):
    # 取得策略會應用到的變數
    c_time = data.index[i]
    c_high = data.loc[c_time, "high"]
    c_close = data.loc[c_time, "close"]
    c_I_month = data.loc[c_time, "I_month"]
    c_I_mean = data.loc[c_time, "I_mean"]
    # 取下一期資料做為進場資料
    n_time = data.index[i + 1]
    n_open = data.loc[n_time, "open"]

    # 進場程序
    if position == 0:
        if c_I_month > c_I_mean:
            position = 1
            order_i = i
            order_time = n_time
            order_price = n_open
            order_unit = 1
    # 出場程序
    elif position == 1:
        # 出場邏輯
        if c_I_month < c_I_mean:
            position = 0
            cover_time = n_time
            cover_price = n_open
            # 交易紀錄
            trade = trade._append(
                pd.Series(
                    [
                        prod,
                        "Buy",
                        order_time,
                        order_price,
                        cover_time,
                        cover_price,
                        order_unit,
                    ]
                ),
                ignore_index=True,
            )


# 繪製副圖
addp = []
# 外資買賣力道
addp.append(mpf.make_addplot(data["I_month"], panel=1, color="red", secondary_y=False))
addp.append(mpf.make_addplot(data["I_mean"], panel=1, color="blue", secondary_y=False))

# 績效分析
Performance(trade, "ETF")
# 繪製K線圖與交易明細
ChartTrade(data, trade, addp=addp, v_enable=False)
