In [1]:
import warnings
warnings.filterwarnings("ignore")

from datetime import datetime, timedelta
from pandas import DataFrame, concat, date_range, ExcelWriter, to_datetime
import os
from numpy import isnan, inf, mean
import time
import json
from Calculator import Calculator

from utils import getSchema, changedType
from BacktestBaseFunc import *


parent = os.path.dirname(os.path.abspath("__file__"))
output_path = os.path.join(parent, "Output", "HundredBreakOut")
if not os.path.isdir(output_path):
    os.makedirs(output_path)




    Please use `mplfinance` instead (no hyphen, no underscore).

    To install: `pip install --upgrade mplfinance` 

   For more information, see: https://pypi.org/project/mplfinance/




In [3]:
bt_list = StockList()

In [7]:
def HundredBreakout(df:DataFrame, **kwargs):
    entry_date = None
    exit_date = None
    entry_price = 0
    exit_price = 0
    max_price = 0
    min_price = inf
    sig = pos = 0
    num_breakout_day = int(kwargs.get('num_breakout_day', 100))
    take_profit = float(kwargs.get('take_profit', .1))
    stop_loss = float(kwargs.get('stop_loss', .1))
    
    result = []
    for i, row in enumerate(df.itertuples()):
        if i < num_breakout_day: continue
        # Check Signal without pos
        if not sig and not pos:
            last_highest_close = df.iloc[i-num_breakout_day:i].Close.max()
#             last_highest_close = df.iloc[-num_breakout_day-i:-i].Close.max()
            if row.Close > last_highest_close:
                sig = -1
        # Entry Market
        elif sig and not pos:
#             if row.Ticker == '1538':print(row)
            if not row.Volume or isnan(row.Open): continue
#             print(row)
            pos, sig = sig, 0
            max_price = min_price = entry_price = row.Open
            entry_date = row.Date
            max_price = max(row.High, max_price)
            min_price = min(row.Low, min_price)
        # Check Signal with pos
        elif not sig and pos:
            max_price = max(row.High, entry_price)
            min_price = min(row.Low, entry_price)
            if (row.High / entry_price - 1 >= take_profit):
                sig = -pos
#                 exit_price = row.High
            if (row.Low / entry_price - 1 <= -stop_loss):
                sig = -pos
#                 exit_price = row.Low
        # Exit Market
        elif sig and pos:
            
            if not row.Volume or isnan(row.Open): continue
#             print(row)
            exit_price = row.Open
            exit_date = row.Date
            res = CreateTradeLog(entry_date, exit_date, entry_price, exit_price, max_price, min_price, pos)
            result.append(res)
            entry_date = None
            exit_date = None
            entry_price = 0
            exit_price = 0
            max_price = 0
            min_price = inf
            sig = pos = 0
        if i == df.shape[0]-1 and pos:
            exit_price = row.Close
            exit_date = row.Date
            res = CreateTradeLog(entry_date, exit_date, entry_price, exit_price, max_price, min_price, pos)
            result.append(res)
            entry_date = None
            exit_date = None
            entry_price = 0
            exit_price = 0
            max_price = 0
            min_price = inf
            sig = pos = 0
#     print()
            
    return result
            
            

In [8]:
results = {}
for ticker_info in bt_list:
    print(f"============= Backtest {ticker_info['Ticker']}=============")
    results[ticker_info['Ticker']] = Backtest(HundredBreakout, ticker_info['Ticker'], bt_period=20)
#     break











ValueError: cannot convert float NaN to integer

# Summary

In [9]:
summary_ = []
bt_day = datetime.today()
bt_day_str = bt_day.strftime("%Y%m%d")
for ticker, result in results.items():
    total_cost = 0
#     total_pnl = 0
    total_net = 0
    trade_num = len(result)
    win_num = 0
    loss_num = 0
    hold_period = []
    for res in result:
        total_net += res['Net']
        total_cost += res['TotalCost']
        win_num += int(res['Net'] > 0)
        loss_num += int(res['Net'] < 0)
        hold_period.append(res['HoldingPeriod'])
    summary_.append({
        '代號':ticker,
        '總成本':total_cost,
        '總損益(淨)':total_net,
        '獲利次數':win_num,
        '損失次數':loss_num,
        '總交易次數':trade_num,
        '勝率%':round(win_num / trade_num*100,2) if trade_num else 0,
        '平均持倉時間(日)':mean(hold_period)
    })
#     if result:
#         DataFrame(result).to_csv(os.path.join(output_path, f'{ticker}_{bt_day_str}.csv'), index=False)
        

In [10]:
sum_df = DataFrame(summary_)#.sort_values(['勝率%','總交易次數'], ascending=False)

In [11]:
sum_df['總損益(淨)'].sum()

-24290492

In [12]:
sum_df.to_csv(os.path.join(output_path, 'Summary.csv'),index=False, encoding='utf-8-sig')

In [13]:
sum_df_prob_sorted = sum_df[sum_df['勝率%'] > 60] # sum_df.sort_values(['勝率%'], ascending=False)
sum_df_trade_num_sorted = sum_df[sum_df.總交易次數 >= 20]#sum_df.sort_values(['總交易次數'], ascending=False)
sum_df_holding_period_sorted = sum_df[sum_df['平均持倉時間(日)'] <= 40]

In [14]:
top_num = 100
prob_Ticker = sum_df_prob_sorted.代號 # .iloc[:top_num]
trade_num_Ticker = sum_df_trade_num_sorted.代號 # .iloc[:top_num]
holding_period_Ticker = sum_df_holding_period_sorted.代號
suitable_Ticker = list(set(prob_Ticker).intersection(trade_num_Ticker).intersection(holding_period_Ticker))

In [15]:
print(sum_df[sum_df.代號.isin(suitable_Ticker)].shape)
sum_df[sum_df.代號.isin(suitable_Ticker)].sort_values("勝率%", ascending=False) # ['總損益(淨)'].sum()

(16, 8)


Unnamed: 0,代號,總成本,總損益(淨),獲利次數,損失次數,總交易次數,勝率%,平均持倉時間(日)
625,3694,3910,26540,20,8,28,71.43,13.5
335,2424,5547,27003,29,14,43,67.44,23.302326
181,1762,6775,44325,15,8,23,65.22,17.086957
260,2312,3360,16900,27,15,42,64.29,38.619048
374,2477,7924,47836,35,20,55,63.64,24.345455
754,6155,8744,21856,29,17,46,63.04,31.652174
560,3164,8338,-2838,22,13,35,62.86,24.314286
897,8940,5549,20151,25,15,40,62.5,28.4
873,8271,5830,25320,18,11,29,62.07,38.344828
83,1456,3979,4611,37,23,60,61.67,13.516667
