In [None]:
from Functionbase import *

In [None]:
def ticks_to_kbars(ticks, interval = '30Min'):

    kbars = pd.DataFrame()

    kbars['open'] = ticks['close'].resample(interval, closed='right', label = 'left').first()
    kbars['close'] = ticks['close'].resample(interval, closed='right', label = 'left').last()
    kbars['high'] = ticks['close'].resample(interval, closed='right', label = 'left').max()
    kbars['low'] = ticks['close'].resample(interval, closed='right', label = 'left').min()
    kbars['volume'] = ticks['volume'].resample(interval, closed='right', label = 'left').sum()

    kbars.dropna(inplace=True)

    return kbars


In [None]:
import os 
from dotenv import load_dotenv

load_dotenv()
api = sj.Shioaji(simulation=True)
api.login(
    os.getenv('API_KEY'),
    os.getenv('SECRET_KEY'),
    contracts_cb= lambda security_type: print(f"{repr(security_type)} fetch done"))

connection = sqlite3.connect('data.db')

In [None]:
code = '2330'
date = pd.to_datetime('2024/8/1')

ticks = get_ticks(code, date, connection, api)[0]
ticks

In [None]:
kbars = ticks_to_kbars(ticks)
kbars

In [None]:
kbars = ticks_to_kbars(ticks, interval='30Min')
kbars

畫K棒

In [None]:
#default 漲是綠的
import mplfinance as mpf

mpf.plot(kbars, 
         title = '{}, {}'.format(code, date.date()), 
         type = 'candle',
         style = 'yahoo',
         volume = True,
         figsize = (14,8)
)


In [None]:
#改成台灣習慣

marketcolors = mpf.make_marketcolors(up = '#E9544E',down='#56B475', inherit = True)
style = mpf.make_mpf_style(base_mpf_style = 'yahoo', marketcolors = marketcolors)


mpf.plot(kbars, 
         title = '{}, {}'.format(code, date.date()), 
         type = 'candle',
         style = style,
         volume = True,
         figsize = (14,8)
)

3-3 單日個股當沖回測程式

In [None]:
def day_trading_backtest(code, date, connection, api):
    import pandas as pd

    ticks = get_ticks(code, date, connection, api)[0]
    kbars = ticks_to_kbars(ticks)

    entry_time = None
    entry_price = 0

    exit_time = None
    exit_price = 0

    position = 0

    for ts in range(len(kbars)):

        current_time = kbars.iloc[ts].name + pd.Timedelta(minutes=1)
        current_price = kbars['close'][ts]

        if (
            current_time == date.replace(hour=9, minute=30, second=0) and
            position == 0                                    
        ):
            
            position = 1
            entry_time = current_time.time()
            entry_price = current_price

            print( '[{}] buy {} at {}'.format(current_time, code, current_price))

        elif (
            current_time == date.replace(hour=13, minute=0, second=0) and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))
            
            break

    if entry_time and exit_time:
        transaction = pd.DataFrame([[date,
                                     code,
                                     entry_time,
                                     entry_price,
                                     position * 1000,
                                     entry_price * position * 1000,
                                     exit_time,
                                     exit_price,
                                     position * 1000,
                                     exit_price * position * 1000]                                 
                                    ],
                                    columns = [
                                        '成交日期',
                                        '股票代號',
                                        '買進時間',
                                        '買進價格',
                                        '買進股數',
                                        '買進金額',
                                        '賣出時間',
                                        '賣出價格',
                                        '賣出股數',
                                        '賣出金額'])
        return transaction
    
    else:
        return kbars, pd.DataFrame()

In [None]:
day_trading_backtest('2330', pd.to_datetime('2024/8/1'), connection, api)

3-4 加入單一指標 (RSI)

In [None]:
import talib
from talib import abstract

In [None]:
def get_technical_indicator(kbars):

    kbars['rsi'] = talib.RSI(kbars.close, timeperiod=14)
    
    kbars['k'], kbars['d'] = talib.STOCH(kbars['high'], kbars['low'], kbars['close'], 
                                         fastk_period=9, slowk_period=3, slowd_period=3)
    kbars['macd'], kbars['macdsignal'], kbars['macdhist'] = talib.MACD(kbars.close, fastperiod= 12, slowperiod=26, signalperiod=9)

    kbars['dmiadx'] = talib.ADX(kbars['high'], kbars['low'], kbars['close'], timeperiod=14)
    kbars['dmiplus_di'] = talib.PLUS_DI(kbars['high'], kbars['low'], kbars['close'], timeperiod=14)
    kbars['dmiminus_di'] = talib.MINUS_DI(kbars['high'], kbars['low'], kbars['close'], timeperiod=14)


    return kbars

計算RSI指標

In [None]:
code = '2330'

tw_calendar = get_calendar('XTAI')
date = pd.to_datetime('2024/8/6')
#若求單日則前14mins會無法計算rsi
prev_trading_date = tw_calendar.previous_close(date).date()

ticks = pd.concat([get_ticks(code, prev_trading_date, connection, api)[0],get_ticks(code, date, connection, api)[0]])
kbars = ticks_to_kbars(ticks)
kbars = get_technical_indicator(kbars)
kbars = kbars[date:] #篩選出從當天開始的kbars

kbars

In [None]:
def day_trading_backtest_RSI(code, date, connection, api):

    tw_calendar = get_calendar('XTAI')
    prev_trading_date = tw_calendar.previous_close(date).date()

    ticks = pd.concat([get_ticks(code, prev_trading_date, connection, api)[0],get_ticks(code, date, connection, api)[0]])
    kbars = ticks_to_kbars(ticks)
    kbars = get_technical_indicator(kbars)
    kbars = kbars[date:]

    entry_time = None
    entry_price = 0

    exit_time = None
    exit_price = 0

    position = 0

    for ts in range(len(kbars)):

        current_time = kbars.iloc[ts].name + pd.Timedelta(minutes=1)
        current_price = kbars['close'][ts]

        if (
            current_time < date.replace(hour=13, minute=0, second=0) and
            
            kbars.iloc[ts]['rsi'] < 20 and
            position == 0                                    
        ):
            
            position = 1
            entry_time = current_time.time()
            entry_price = current_price

            print( '[{}] buy {} at {}'.format(current_time, code, current_price))

        elif (
            
            kbars.iloc[ts]['rsi'] > 80 and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))

            break
            
        elif (
            current_time == date.replace(hour=13, minute=0, second=0) and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))
            
            break

    if entry_time and exit_time:
        transaction = pd.DataFrame([[date,
                                     code,
                                     entry_time,
                                     entry_price,
                                     position * 1000,
                                     entry_price * position * 1000,
                                     exit_time,
                                     exit_price,
                                     position * 1000,
                                     exit_price * position * 1000]                                 
                                    ],
                                    columns = [
                                        '成交日期',
                                        '股票代號',
                                        '買進時間',
                                        '買進價格',
                                        '買進股數',
                                        '買進金額',
                                        '賣出時間',
                                        '賣出價格',
                                        '賣出股數',
                                        '賣出金額'])
        return transaction
    
    else:
        return pd.DataFrame()

In [None]:
day_trading_backtest_RSI('2317', pd.to_datetime('2024/8/5'), connection, api)

3-5 複合技術指標 (RSI & MACD)

計算 RSI & MACD

In [None]:
def get_technical_indicator(kbars):

    kbars['rsi'] = talib.RSI(kbars.close, timeperiod=14)

    macd, macdsignal, macdchhist = talib.MACD(kbars.close, fastperiod= 12, slowperiod=26, signalperiod=9)

    kbars['macd'] = macd
    kbars['macdsignal'] = macdsignal
    kbars['macdhist'] = macdchhist

    return kbars

In [None]:
def day_trading_backtest_Multi(code, date, connection, api):

    tw_calendar = get_calendar('XTAI')
    #若求單日則前14mins會無法計算rsi
    prev_trading_date = tw_calendar.previous_close(date).date()

    ticks = pd.concat([get_ticks(code, prev_trading_date, connection, api)[0],get_ticks(code, date, connection, api)[0]])
    kbars = ticks_to_kbars(ticks)
    kbars = get_technical_indicator(kbars)
    kbars = kbars[date:]

    entry_time = None
    entry_price = 0

    exit_time = None
    exit_price = 0

    position = 0

    for ts in range(len(kbars)):

        current_time = kbars.iloc[ts].name + pd.Timedelta(minutes=1)
        current_price = kbars['close'][ts]

        if (
            current_time < date.replace(hour=13, minute=0, second=0) and
            
            kbars.iloc[ts-3 : ts]['rsi'].min() < 30 and
            kbars.iloc[ts-1]['macdhist'] < 0 and 
            kbars.iloc[ts]['macdhist'] > 0 and 
            position == 0                                    
        ):
            
            position = 1
            entry_time = current_time.time()
            entry_price = current_price

            print( '[{}] buy {} at {}'.format(current_time, code, current_price))

        elif (
            
            kbars.iloc[ ts-3 :ts]['rsi'].max() > 70 and
            kbars.iloc[ts-1]['macdhist'] > 0 and 
            kbars.iloc[ts]['macdhist'] < 0 and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))

            break
            
        elif (
            current_time == date.replace(hour=13, minute=0, second=0) and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))
            
            break

    if entry_time and exit_time:
        transaction = pd.DataFrame([[date,
                                     code,
                                     entry_time,
                                     entry_price,
                                     position * 1000,
                                     entry_price * position * 1000,
                                     exit_time,
                                     exit_price,
                                     position * 1000,
                                     exit_price * position * 1000]                                 
                                    ],
                                    columns = [
                                        '成交日期',
                                        '股票代號',
                                        '買進時間',
                                        '買進價格',
                                        '買進股數',
                                        '買進金額',
                                        '賣出時間',
                                        '賣出價格',
                                        '賣出股數',
                                        '賣出金額'])
        return transaction
    
    else:
        return pd.DataFrame()

In [None]:
day_trading_backtest_Multi('2449', pd.to_datetime('2020/9/4'), connection, api)

3-6 突破壓力線(Resistance Line, RL)進場

以前15分鐘的高點作為壓力線；低點作為支撐線

出場條件為: 1. 停利點 3%  2. 停損點 跌破支撐線

In [None]:
def day_trading_backtest_RL(code, date, connection, api):

    tw_calendar = get_calendar('XTAI')
    prev_trading_date = tw_calendar.previous_close(date).date()

    ticks = pd.concat([get_ticks(code, prev_trading_date, connection, api)[0],get_ticks(code, date, connection, api)[0]])
    kbars = ticks_to_kbars(ticks)
    kbars = kbars[date:]

    entry_time = None
    entry_price = 0

    exit_time = None
    exit_price = 0

    position = 0
    volume_today = 0

    for ts in range(len(kbars)):

        current_time = kbars.iloc[ts].name + pd.Timedelta(minutes=1)
        current_price = kbars['close'][ts]
        volume_today += kbars['volume'][ts]

        if (
            current_time >= date.replace(hour=9, minute=15, second=0) and 
            current_time <= date.replace(hour=9, minute=30, second=0) and
            position == 0                                  
        ):
            high_15m = kbars[:date.replace(hour=9, minute=14, second=0)]['high'].max()
            low_15m = kbars[:date.replace(hour=9, minute=14, second=0)]['low'].min()

            if (
                current_price > high_15m and
                volume_today > 2000
            ):
                
                position = 1
                entry_time = current_time.time()
                entry_price = current_price

                target_price = current_price * 1.03
                stop_price = low_15m

                print( '[{}] buy {} at {}'.format(current_time, code, current_price))

        elif (
            current_time >= date.replace(hour=9, minute=15, second=0) and 
            current_time < date.replace(hour=13, minute=0, second=0) and 
            position != 0
        ):
            
            if (
                current_price > target_price or
                current_price < stop_price
            ):
                
                exit_time = current_time.time()
                exit_price = current_price

                print( '[{}] sell {} at {}'.format(current_time, code, current_price))

                break
            
        elif (
            current_time == date.replace(hour=13, minute=0, second=0) and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))
            
            break

    if entry_time and exit_time:
        transaction = pd.DataFrame([[date,
                                     code,
                                     entry_time,
                                     entry_price,
                                     position * 1000,
                                     entry_price * position * 1000,
                                     exit_time,
                                     exit_price,
                                     position * 1000,
                                     exit_price * position * 1000]                                 
                                    ],
                                    columns = [
                                        '成交日期',
                                        '股票代號',
                                        '買進時間',
                                        '買進價格',
                                        '買進股數',
                                        '買進金額',
                                        '賣出時間',
                                        '賣出價格',
                                        '賣出股數',
                                        '賣出金額'])
        return transaction
    
    else:
        return pd.DataFrame()

In [None]:
day_trading_backtest_RL('2330', pd.to_datetime('2024/7/23'), connection, api)

3-7 多日回測程式

In [None]:
def backtest(start_date, end_date, connection, api):

    tw_calendar = get_calendar('XTAI')

    transactions = pd.DataFrame()

    for date in pd.date_range(start_date, end_date):

        if date not in tw_calendar.opens:
            continue
            
        print(date)
        codes = get_stocks(date, connection)
        day_trading_codes = [code for code in codes if get_stock(code, connection, api)[0].iloc[0]['day_trade'] == 'Yes']
        day_trading_codes = list(set(day_trading_codes))

        for code in day_trading_codes:

            transaction = day_trading_backtest_KD(code, pd.to_datetime(date), connection, api)

            if not transaction.empty:
                transactions = pd.concat([transactions,transaction], sort = False)
    
    return transactions



In [None]:
transactions = backtest('2024/8/1','2024/8/12', connection, api)
transactions

3-8 交易成本與損益計算

In [None]:
discount = 0.38

transactions['買進手續費'] = transactions['買進金額'] * 0.001425 * discount
transactions['買進手續費'] = transactions['買進手續費'].apply( lambda fee: fee if fee < 20 else 20)
transactions['買進手續費'] = transactions['買進手續費'].astype(int)

transactions['賣出手續費'] = transactions['賣出金額'] * 0.001425 * discount
transactions['賣出手續費'] = transactions['賣出手續費'].apply( lambda fee: fee if fee > 20 else 20)
transactions['賣出手續費'] = transactions['賣出手續費'].astype(int)

transactions['交易稅'] = transactions['賣出金額'] * 0.0015
transactions['交易稅'] = transactions['交易稅'].astype(int)

transactions['損益'] = (transactions['賣出金額']-transactions['買進金額']) - (transactions['買進手續費'] + transactions['賣出手續費'] + transactions['交易稅'])

transactions = transactions.set_index(['成交日期'])
transactions

合併損益進Function

In [None]:
def backtest(start_date, end_date, connection, api, discount = 0.38):

    tw_calendar = get_calendar('XTAI')

    transactions = pd.DataFrame()

    for date in pd.date_range(start_date, end_date):

        if date not in tw_calendar.opens:
            continue
            
        codes = get_stocks(date, connection)
        day_trading_codes = [code for code in codes if get_stock(code, connection, api)[0].iloc[0]['day_trade'] == 'Yes']
        day_trading_codes = list(set(day_trading_codes))

        for code in day_trading_codes:

            transaction = day_trading_backtest_RSI(code, pd.to_datetime(date), connection, api)

            if not transaction.empty:
                transactions = pd.concat([transactions,transaction], sort = False)
    
    if not transactions.empty:

        transactions['買進手續費'] = transactions['買進金額'] * 0.001425 * discount
        transactions['買進手續費'] = transactions['買進手續費'].apply( lambda fee: fee if fee > 20 else 20)
        transactions['買進手續費'] = transactions['買進手續費'].astype(int)

        transactions['賣出手續費'] = transactions['賣出金額'] * 0.001425 * discount
        transactions['賣出手續費'] = transactions['賣出手續費'].apply( lambda fee: fee if fee > 20 else 20)
        transactions['賣出手續費'] = transactions['賣出手續費'].astype(int)

        transactions['交易稅'] = transactions['賣出金額'] * 0.0015
        transactions['交易稅'] = transactions['交易稅'].astype(int)

        transactions['損益'] = (transactions['賣出金額']-transactions['買進金額']) - (transactions['買進手續費'] + transactions['賣出手續費'] + transactions['交易稅'])

        transactions['成交日期'] = pd.to_datetime(transactions['成交日期']).dt.date
        transactions = transactions.set_index(['成交日期'])

    return transactions



In [None]:
transactions = backtest('2024/7/25','2024/8/5', connection, api)
transactions

績效指標計算

In [None]:
transactions.index = pd.to_datetime(transactions.index, format ='%Y-%m-%d')
transactions = transactions.drop(['買進時間','賣出時間'], axis =1)
daily_profit_loss = transactions.resample('D').mean() # D: Daily ; M: Monthly
daily_profit_loss['交易次數'] = transactions.groupby('成交日期').size()
columns_to_select = ['買進金額', '買進手續費', '賣出金額', '賣出手續費', '交易稅', '損益', '交易次數']
daily_profit_loss = daily_profit_loss[columns_to_select]
daily_profit_loss.dropna(inplace=True)

#transactions
daily_profit_loss

In [None]:
profit_loss = daily_profit_loss['損益'].sum()
profit_loss

win_rate = round(len(transactions[transactions['損益'] > 0])/(len(transactions)),3)
win_rate

In [None]:
#最大連續虧損

con_loss = 0
max_loss = 0

for profit_loss in daily_profit_loss['損益']:

    if profit_loss < 0:
        con_loss += profit_loss
    else:
        con_loss = 0
        continue

    if con_loss < max_loss:
        max_loss = con_loss

print('最大連續虧損', max_loss)

In [None]:
daily_profit_loss['損益'].cumsum()
daily_profit_loss['損益'].cumsum().plot(title = 'Backtest', figsize=(15,8)).set(xlabel = 'profit loss')

My IDEA

1. KD-MACD-DMI

In [None]:
def day_trading_backtest_KD(code, date, connection, api):

    tw_calendar = get_calendar('XTAI')
    prev_trading_date = tw_calendar.previous_close(date).date()

    ticks = pd.concat([get_ticks(code, prev_trading_date, connection, api)[0],get_ticks(code, date, connection, api)[0]])
    kbars = ticks_to_kbars(ticks)
    kbars = get_technical_indicator(kbars)
    kbars = kbars[date:]

    entry_time = None
    entry_price = 0

    exit_time = None
    exit_price = 0

    position = 0

    for ts in range(len(kbars)):

        current_time = kbars.iloc[ts].name + pd.Timedelta(minutes=1)
        current_price = kbars['close'][ts]

        if (
            current_time < date.replace(hour=13, minute=0, second=0) and
            
            kbars.iloc[ts]['k'] < 20 and kbars.iloc[ts-1]['k'] > kbars.iloc[ts-1]['d'] and
            position == 0                                    
        ):
            
            position = 1
            entry_time = current_time.time()
            entry_price = current_price

            print( '[{}] buy {} at {}'.format(current_time, code, current_price))

        elif (
            
            kbars.iloc[ts]['k'] > 80 and kbars.iloc[ts]['k'] < kbars.iloc[ts]['d'] and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))

            break
            
        elif (
            current_time == date.replace(hour=13, minute=0, second=0) and 
            position != 0
        ):
            exit_time = current_time.time()
            exit_price = current_price

            print( '[{}] sell {} at {}'.format(current_time, code, current_price))
            
            break

    if entry_time and exit_time:
        transaction = pd.DataFrame([[date,
                                     code,
                                     entry_time,
                                     entry_price,
                                     position * 1000,
                                     entry_price * position * 1000,
                                     exit_time,
                                     exit_price,
                                     position * 1000,
                                     exit_price * position * 1000]                                 
                                    ],
                                    columns = [
                                        '成交日期',
                                        '股票代號',
                                        '買進時間',
                                        '買進價格',
                                        '買進股數',
                                        '買進金額',
                                        '賣出時間',
                                        '賣出價格',
                                        '賣出股數',
                                        '賣出金額'])
        return transaction
    
    else:
        return pd.DataFrame()

In [None]:
day_trading_backtest_KD('2317', pd.to_datetime('2024/8/5'), connection, api)