In [12]:
import tidal as td
import pandas as pd
import numpy as np
import json
import glob
import os
from nutrlink import NutrLink
from typing import Any, Dict, List, Union
from tidal.lake_analyzer import LakeAnalyzer
from pathlib import Path


nl: NutrLink = NutrLink(url="https://dev-api.ddt-dst.cc/nutrients/station")

In [13]:
def query_nutrients_ohlcv(tickers: List[str], start_date: Union[str, None] = None, end_date: Union[str, None] = None, adjusted: bool = True) -> pd.DataFrame:
    adjusted_str = "adj" if adjusted else "d"
    filters = [("coid", "in", tickers)] + (
        [("mdate", ">=", pd.Timestamp(start_date, tz="UTC").to_pydatetime())] if start_date is not None else []) + (
        [("mdate", "<=", pd.Timestamp(end_date, tz="UTC").to_pydatetime())] if end_date is not None else [])

    data = nl.get(
        "tej_stock_twn_aprcd1" if adjusted else "tej_stock_twn_aprcd", 
        columns=["coid", "mdate", f"open_{adjusted_str}", f"high_{adjusted_str}", f"low_{adjusted_str}", f"close_{adjusted_str}", "volume"],
        filters=filters
    )
    data['volume'] = data['volume'] * 1000
    data = data.rename(columns={
        "coid": "instrument", "mdate": "datetime", 
        f"open_{adjusted_str}": "open", f"high_{adjusted_str}": "high", f"low_{adjusted_str}": "low", f"close_{adjusted_str}": "close"
    })
    data['datetime'] = (data['datetime'] - pd.Timedelta(hours=8)).dt.tz_convert("Asia/Taipei")

    data = data.set_index(["instrument", "datetime"]).sort_index()

    return data

In [14]:
# 設置回測參數
start_date = '2015-05-01'
extended_start_date = '2014-05-01'
end_date = '2025-04-30'

# 設定資料夾路徑
folder_path = Path.cwd()/ "Textile_signal_lights" / "my_use_pca_2"
folder_path_2 = Path.cwd()/ "top_k" / "my_use"

# 取得所有 .csv 檔案的完整路徑
csv_files = glob.glob(os.path.join(folder_path, '*.csv'))

# 用檔名（不含副檔名）當作 key，把各檔案的內容讀成 DataFrame 存入 dict
industry_daily_score = {
    os.path.splitext(os.path.basename(file))[0]: pd.read_csv(file, index_col=0)
    for file in csv_files
}

for industry, df in industry_daily_score.items():
    df.index = pd.to_datetime(df.index)

csv_files_2 = glob.glob(os.path.join(folder_path_2, '*.csv'))

industry_map = {
    os.path.splitext(os.path.basename(file))[0]: pd.read_csv(file)
    for file in csv_files_2
}

In [15]:
# 1. 讀取股票 OHLCV 數據
print("讀取 OHLCV 數據...")
all_coids = set()
for df in industry_map.values():
    all_coids.update(df['coid'].astype(str).tolist())  # 確保都是字串型別

# 建立 stock_list
stock_list = list(all_coids)
quote_data = query_nutrients_ohlcv(stock_list, extended_start_date, end_date)


coid_to_industry = {}
for industry, df in industry_map.items():
    for coid in df['coid']:
        coid_to_industry[str(coid)] = industry


quote_data = quote_data.reset_index()
quote_data['date'] = pd.to_datetime(quote_data['datetime']).dt.date
quote_data['industry'] = quote_data['instrument'].map(coid_to_industry)

# 加入 industry_score 欄位
def get_industry_score(row):
    industry = row['industry']
    date = pd.to_datetime(row['date']).normalize()  # datetime.date
    if pd.isna(industry):
        return None
    if industry not in industry_daily_score:
        return None
    df = industry_daily_score[industry]
    # 這裡欄位名稱要改成'燈號分數'
    return df.loc[date, '燈號分數'] if date in df.index else None

quote_data['industry_score'] = quote_data.apply(get_industry_score, axis=1)

quote_data['ma60'] = quote_data.groupby('instrument')['close'].transform(lambda x: x.rolling(window=60).mean())
quote_data['low_1y'] = quote_data.groupby('instrument')['low'].transform(lambda x: x.rolling(window=252, min_periods=1).min())
start_date = pd.to_datetime(start_date).date()
quote_data = quote_data[quote_data['date'] >= start_date]

# === 3. 設定 index 給 DSTrader 使用 ===
quote_data = quote_data.set_index(['instrument', 'datetime'])

print("讀取基準數據...")
benchmark_data = query_nutrients_ohlcv(['0050'], start_date, end_date)
# 提取基準數據的 'close' 價格，並設置正確索引
if not benchmark_data.empty:
    benchmark_data = benchmark_data.loc['0050'][['close']]
else:
    print("基準數據 (0050) 為空，無法用於比較。")
    benchmark_data = None # Set to None if benchmark is empty

讀取 OHLCV 數據...
讀取基準數據...


In [16]:
quote_data.head(10)

Unnamed: 0_level_0,Unnamed: 1_level_0,open,high,low,close,volume,date,industry,industry_score,ma60,low_1y
instrument,datetime,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2330,2015-05-04 00:00:00+08:00,110.0502,110.0502,108.1975,109.3092,30868640.0,2015-05-04,semiconductor,4.0,108.97567,84.0758
2330,2015-05-05 00:00:00+08:00,109.3092,109.6797,108.1975,108.9386,27789400.0,2015-05-05,semiconductor,4.0,108.97567,84.0758
2330,2015-05-06 00:00:00+08:00,107.827,109.6797,107.827,109.3092,18824208.0,2015-05-06,semiconductor,4.0,108.988022,84.0758
2330,2015-05-07 00:00:00+08:00,108.1975,109.3092,108.1975,108.5681,21908150.0,2015-05-07,semiconductor,4.0,109.025077,84.0758
2330,2015-05-08 00:00:00+08:00,108.1975,109.3092,108.1975,108.5681,20035646.0,2015-05-08,semiconductor,4.0,109.09301,84.0758
2330,2015-05-11 00:00:00+08:00,110.4208,110.4208,108.9386,110.0502,20402529.0,2015-05-11,semiconductor,4.0,109.167117,84.0758
2330,2015-05-12 00:00:00+08:00,108.9386,110.0502,108.9386,109.3092,24956498.0,2015-05-12,semiconductor,4.0,109.204172,84.0758
2330,2015-05-13 00:00:00+08:00,109.3092,110.4208,108.9386,109.6797,19437537.0,2015-05-13,semiconductor,4.0,109.228875,84.4382
2330,2015-05-14 00:00:00+08:00,109.6797,110.0502,108.1975,108.1975,39888654.0,2015-05-14,semiconductor,4.0,109.23505,85.163
2330,2015-05-15 00:00:00+08:00,108.9386,108.9386,107.4565,108.5681,24831890.0,2015-05-15,semiconductor,4.0,109.27828,85.163


In [17]:
class YourStrategy(td.BaseStrategy):
    def __init__(self, max_inst, industry_map, industry_daily_score, signal=1):
        super().__init__()
        self.max_inst = max_inst
        self.industry_map = industry_map               # dict: key=產業名, value=該產業股票DataFrame(coid欄)
        self.industry_daily_score = industry_daily_score   # dict: key=產業名, value=每日燈號分數DataFrame (index=date)
        self.last_sell_price = {}
        self.signal = signal  # 🔴 儲存模式設定
        self.prev_quote = None  # 🔴 新增：用來存昨天行情資料

    def on_trade(self):
        quote_today = self.quote()  # 取得當日股票行情資料，index是股票代碼

        today = pd.to_datetime(self.datetime.date())   # 取得策略當前日期，注意轉成 Timestamp

        # 🔴 如果有昨天的資料就用，沒有就給空表（第一天會是空的）
        if self.prev_quote is not None:
            quote_yesterday = self.prev_quote
        else:
            quote_yesterday = pd.DataFrame(index=quote_today.index)
            quote_yesterday['industry_score'] = 999
            
        # 記錄今天的行情，留給下一天使用
        self.prev_quote = quote_today.copy()

        # 計算今天各產業燈號分數 (從industry_daily_score取得當日分數)
        industry_scores_today = {}
        for industry, df_score in self.industry_daily_score.items():
            if today in df_score.index:
                industry_scores_today[industry] = df_score.loc[today, '燈號分數']
            else:
                # 如果今天沒有資料，給一個很大值，避免被選中
                industry_scores_today[industry] = 9999
        
        # 找燈號分數最低的產業(若有多個同分，取第一個)
        selected_industry = min(industry_scores_today, key=industry_scores_today.get)
        # print(f"[{self.datetime}] 選擇產業: {selected_industry}，分數: {industry_scores_today[selected_industry]}")

        conditions = {
            1: {"buy": lambda today_df, yest_df: today_df['industry_score'] <= 2,
                "sell": lambda today_df, yest_df: today_df['industry_score'] >= 4},
            
            2: {"buy": lambda today_df, yest_df: today_df['industry_score'] == 1,
                "sell": lambda today_df, yest_df: today_df['industry_score'] == 5},

            3: {"buy": lambda today_df, yest_df: today_df['industry_score'] == 1,
                "sell": lambda today_df, yest_df: today_df['industry_score'] == 4},

            4: {"buy": lambda today_df, yest_df: today_df['industry_score'] == 2,
                "sell": lambda today_df, yest_df: today_df['industry_score'] == 5},

            5: {"buy": lambda today_df, yest_df: today_df['industry_score'] == 2,
                "sell": lambda today_df, yest_df: today_df['industry_score'] == 4},
            
            6: {"buy": lambda today_df, yest_df: (yest_df['industry_score'] == 3) &
                                                 (today_df['industry_score'].isin([1, 2])),
                "sell": lambda today_df, yest_df: today_df['industry_score'] == 5},
            
            7: {"buy": lambda today_df, yest_df: (yest_df['industry_score'] == 1) &
                                                 (today_df['industry_score'].isin([2, 3, 4])),
                "sell": lambda today_df, yest_df: today_df['industry_score'] == 5},

            8: {"buy": lambda today_df, yest_df: (yest_df['industry_score'] == 1) &
                                                 (today_df['industry_score'] == 2),
                "sell": lambda today_df, yest_df: (yest_df['industry_score'] == 5) &
                                                  (today_df['industry_score'] == 4)}
            }

        if self.signal not in conditions:
            raise ValueError("無效的 signal 參數")
            
        cond = conditions[self.signal]
        buy_candidates = quote_today[cond["buy"](quote_today, quote_yesterday)]
        sell_candidates = quote_today[cond["sell"](quote_today, quote_yesterday)]
        held_stocks = set(self.positions.keys())

        # 限制買入股票必須在選中的產業內
        candidate_stocks = set(self.industry_map[selected_industry]['coid'].astype(str))
        buy_candidates = buy_candidates.loc[buy_candidates.index.isin(candidate_stocks)]

        # ===== 賣出區塊 =====
        for inst in held_stocks:
            if inst in sell_candidates.index:
                current_price = quote_today.loc[inst, 'close']
                ma60 = quote_today.loc[inst, 'ma60']
                
                total_quantity = 0
                total_cost = 0
                for pos in self.positions[inst]:
                    total_quantity += pos.quantity
                    total_cost += pos.price * pos.quantity

                if total_quantity > 0:
                    avg_cost = total_cost / total_quantity
                    return_pct = (current_price - avg_cost) / avg_cost

                    if return_pct >= 0.5 and current_price < ma60:
                        self.place_order(inst, -total_quantity, current_price, td.OrderType.MARKET)
                        self.last_sell_price[inst] = current_price
        
        # ===== 買入區塊 =====
        available_cash = self.cash
        current_positions = len(held_stocks - set(sell_candidates.index))
        remaining_slots = self.max_inst - current_positions

        buy_candidates = buy_candidates.sort_values(by='industry_score')
        buy_candidates = buy_candidates.loc[~buy_candidates.index.duplicated(keep='first')]
        buy_list = buy_candidates.loc[~buy_candidates.index.isin(held_stocks)] \
                                  .head(remaining_slots)

        if not buy_list.empty and remaining_slots > 0:
            cash_per_stock = available_cash / len(buy_list)
            for inst in buy_list.index:
                price = buy_list.loc[inst, 'close']
                low_1y = buy_list.loc[inst, 'low_1y']
                if price > 0 and low_1y > 0 and price <= 1.3 * low_1y:
                    quantity = int(cash_per_stock // price)
                    if quantity > 0:
                        self.place_order(inst, quantity, price, td.OrderType.MARKET)

In [18]:
# Initialize DSTrader object
tidal = td.Tidal(init_cash=10000000, slip_ticks=1, stock_config=td.StockConfig.TW, load_configs=True)

# Add Quote data
tidal.add_quote(quote_data)

# Set strategy object
tidal.set_strategy(YourStrategy(max_inst=20,industry_map=industry_map, industry_daily_score=industry_daily_score,signal=2))

# Set metric objects
tidal.add_metric(td.metric.AccountInfo())
tidal.add_metric(td.metric.PositionInfo())
tidal.add_metric(td.metric.Portfolio(benchmark_data))

# tidal.metrics['Portfolio'].report


[2025/09/10 15:48:16] root INFO Allocated remote server address: tcp://10.136.22.5:6666
[2025/09/10 15:48:16] root INFO SocketClient Initializing connection to tcp://10.136.22.5:6666
[2025/09/10 15:48:17] root INFO SocketClient tcp://10.136.22.5:6666: Starting event monitor
[2025/09/10 15:48:17] root INFO SocketClient tcp://10.136.22.5:6666: Attempting handshake
[2025/09/10 15:48:20] root INFO SocketClient tcp://10.136.22.5:6666: Successfully connected to server
[2025/09/10 15:48:21] root INFO SocketClient tcp://10.136.22.5:6666: ZMQ connection established, waiting for server response
[2025/09/10 15:48:21] root INFO SocketClient tcp://10.136.22.5:6666: Handshake completed successfully
[2025/09/10 15:48:21] root INFO SocketClient Successfully initialized connection to tcp://10.136.22.5:6666
[2025/09/10 15:48:21] root INFO Tidal client version: 1.1.69
[2025/09/10 15:48:21] root INFO Tidal server version: 1.1.69


[2025/09/10 17:54:59] root INFO SocketClient tcp://10.136.22.5:6666: Disconnected from server
[2025/09/10 17:54:59] root INFO SocketClient tcp://10.136.22.5:6666: Event monitor loop ended


In [19]:
tidal.backtest()

Tidal Backtesting: 100%|██████████| 2439/2439 [04:46<00:00,  8.51it/s, cash=6.92e+6, pnl=1.96e+7, position_cost=4.57e+7, value=7.21e+7] 


In [20]:
tidal.trade_report

Unnamed: 0_level_0,win_num,lose_num,trade_num,pos_num,win_rate,profit,loss,trade_cost,pnl
instrument,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
5434,1.0,0.0,1.0,0.0,1.0,10787380.0,0.0,96836.58,10787380.0
2330,2.0,0.0,2.0,1.0,1.0,9651279.0,0.0,10056710.0,9651279.0
2379,1.0,0.0,1.0,0.0,1.0,7959845.0,0.0,35017.15,7959845.0
2454,3.0,1.0,4.0,1.0,0.75,8163251.0,-468283.93673,3938704.0,7694967.0
6183,7.0,0.0,7.0,6.0,1.0,6980843.0,0.0,7865436.0,6980843.0
3029,2.0,0.0,2.0,0.0,1.0,4574782.0,0.0,46184.37,4574782.0
5410,6.0,3.0,9.0,3.0,0.666667,3942259.0,-164555.28,7736817.0,3777703.0
3034,1.0,0.0,1.0,0.0,1.0,2763565.0,0.0,17131.34,2763565.0
3010,1.0,0.0,1.0,0.0,1.0,1841418.0,0.0,18491.59,1841418.0
3702,1.0,0.0,1.0,0.0,1.0,1801066.0,0.0,18349.25,1801066.0


In [21]:
# tidal.metrics['Portfolio'].report.to_csv('/home/jovyan/business-cycle/buy_and_hold_condition/my_use_2_return_condition_5.csv', index=False)

In [22]:
tidal.tdboard()

[2025/09/10 15:53:09] root INFO SocketClient tcp://10.136.22.2:6666: Initiating normal client shutdown
[2025/09/10 15:53:09] root INFO SocketClient tcp://10.136.22.2:6666: Closing monitor socket
[2025/09/10 15:53:09] root INFO SocketClient tcp://10.136.22.2:6666: Closing main socket
[2025/09/10 15:53:09] root INFO SocketClient tcp://10.136.22.2:6666: Normal client shutdown completed


 * Serving Flask app 'tidal.tdboard'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:46859
 * Running on http://10.136.19.4:46859
[2025/09/10 15:53:09] werkzeug INFO [33mPress CTRL+C to quit[0m
[2025/09/10 15:53:11] werkzeug INFO 10.0.10.82 - - [10/Sep/2025 15:53:11] "GET / HTTP/1.1" 200 -
[2025/09/10 15:53:11] werkzeug INFO 10.0.10.82 - - [10/Sep/2025 15:53:11] "GET /static/js/main.d754b0a3.js HTTP/1.1" 200 -
[2025/09/10 15:53:12] werkzeug INFO 10.0.10.82 - - [10/Sep/2025 15:53:12] "GET /static/css/main.bf4d504b.css HTTP/1.1" 200 -
[2025/09/10 15:53:12] werkzeug INFO 10.0.10.82 - - [10/Sep/2025 15:53:12] "GET /images/Tidal_Logo_white.png HTTP/1.1" 200 -
[2025/09/10 15:53:13] werkzeug INFO 10.0.10.82 - - [10/Sep/2025 15:53:13] "GET /api/quote/inst_list HTTP/1.1" 200 -
[2025/09/10 15:53:13] werkzeug INFO 10.0.10.82 - - [10/Sep/2025 15:53:13] "GET /api/metric/metric_list HTTP/1.1" 200 -
[2025/09/10 15:53:13] werkzeug INFO 10.0.10.82 - - [10/Sep/2025 15:53:13] "GET /api/trade/trade_report HTTP/1.1" 200