In [None]:
import warnings, os
import datetime as dt
import itertools as it
import numpy as np
import pandas as pd

warnings.filterwarnings('ignore')
pd.set_option('display.width',       None)
pd.set_option('display.max_columns', None)

In [None]:
import MetaTrader5 as mt5
from vectorbt.portfolio import Portfolio as vbt_Portfolio

# Connection

In [None]:
RICO_REAL   = { 'login':16331371,   'password':'#000000Zz', 'server':'Rico-PRD'                 }
CLEAR_REAL  = { 'login':1018156111, 'password':'#000000Zz', 'server':'ClearInvestimentos-CLEAR' }

PATH_7 = r'C:\Program Files\MetaTrader 5 _7\terminal64.exe'

assert mt5.initialize(PATH_7, **CLEAR_REAL)

# Variables

In [None]:
SYMBOLS = ['WIN$N','WDO$N']
TFRAME  = 'M20'
FREQ    = '20min'

# Data

In [None]:
def MT5_LOAD_DATA(SYMBOLS, TFRAMES, start_pos=0, count=99_999):
    Pipe = []
    for (symbol, TF) in it.product(SYMBOLS, TFRAMES):
        timeframe = { 'M1':mt5.TIMEFRAME_M1, 'M5':mt5.TIMEFRAME_M5, 'M10':mt5.TIMEFRAME_M10, 'M15':mt5.TIMEFRAME_M15, 'M20':mt5.TIMEFRAME_M20, 'M30':mt5.TIMEFRAME_M30, 'H1':mt5.TIMEFRAME_H1, 'H2':mt5.TIMEFRAME_H2, 'H3':mt5.TIMEFRAME_H3, 'H4':mt5.TIMEFRAME_H4, 'H6':mt5.TIMEFRAME_H6, 'H8':mt5.TIMEFRAME_H8, 'H12':mt5.TIMEFRAME_H12, 'D1':mt5.TIMEFRAME_D1, 'W1':mt5.TIMEFRAME_W1 }[TF]
        Raw       = pd.DataFrame(mt5.copy_rates_from_pos(symbol, timeframe, start_pos, count))
        Raw.insert(0, 'a',          Raw.index + 1)
        Raw.insert(1, 'z',          Raw.index - Raw.index.max())
        Raw.insert(2, 'symbol',     symbol)
        Raw.insert(3, 'tf',         TF)
        Raw.insert(4, 'datetime',   pd.to_datetime(Raw['time'], unit='s'))
        Pipe.append(Raw)
    pass
    return pd.concat(Pipe, ignore_index=1)

Raw = MT5_LOAD_DATA(SYMBOLS, TFRAMES=[TFRAME])
Raw.head()

In [None]:
# Raw.to_csv('Storage/Raw.csv', index=False)
# Raw = pd.read_csv('Storage/Raw.csv', parse_dates=['datetime'])
# Raw.head()

In [None]:
raise Exception('STOP')

In [None]:
def FORMAT_SOURCE(Raw):
    Src = pd.DataFrame()
    Src['A']        = Raw['a']
    Src['Z']        = Raw['z']
    Src['Symbol']   = Raw['symbol']
    Src['TF']       = Raw['tf']
    
    Src['Datetime'] = Raw['datetime']
    Src['Date']     = Raw['datetime'].dt.date
    Src['Time']     = Raw['datetime'].dt.time
    
    Src['Ticks']    = Raw['tick_volume']
    Src['Volume']   = Raw['real_volume']
    Src['Price']    = Raw['close']
    
    Src['Open']     = Raw['open']
    Src['High']     = Raw['high']
    Src['Low']      = Raw['low']
    Src['Close']    = Raw['close']
    return Src 

Src = FORMAT_SOURCE(Raw)
Src.head()

In [None]:
def CALCULATIONS(Src):
    Calc = pd.DataFrame(Src)

    Calc['Last Time']   = Src.groupby(['Symbol','Date'], sort=0)['Time'].transform('last')
    Calc['Exit Time']   = np.minimum(Calc['Last Time'], dt.time(18,00))

    Calc['Day Open']    = Src.groupby(['Symbol','Date'], sort=0)['Open'].transform('first')
    Calc['Intra Var']   = (Calc['Close'] / Calc['Day Open']) *100-100
    return Calc

Calc = CALCULATIONS(Src)
Calc.head()

In [None]:
def SIGNALS(Calc):
    Sign = pd.DataFrame(Calc[['TF','Symbol']])

    for x in [-5.00, -3.00, -1.00,  -0.50, -0.30, -0.10,  0.00,  +0.10, +0.30, +0.50,  +1.00, +3.00, +5.00]: 
        Sign[f'17h00 + Intra Var > {x}'] = ((Calc['Time']==dt.time(16,40)) & (Calc['Intra Var'] > x))   .astype(int)
        Sign[f'17h00 + Intra Var < {x}'] = ((Calc['Time']==dt.time(16,40)) & (Calc['Intra Var'] < x))   .astype(int)

    return Sign

Sign = SIGNALS(Calc)
Sign.head()

# Backtest

In [None]:
def COMBINE_PARAMS(Calc, Sign):

    TFRAMES = [*Calc['TF'].unique()]
    SYMBOLS = [*Calc['Symbol'].unique()]
    COLS    = [*Sign][2:]
    DIRECTS = ['Long','Short']

    Params = pd.DataFrame(it.product(TFRAMES, SYMBOLS, COLS, DIRECTS), columns=['TF','Symbol','Entry','Direct'])
    Params['Name'] = Params.apply(lambda x: f'{x.Symbol} - {x.Entry.replace('<','lt').replace('>','gt')} - {x.Direct}', axis=1)
    return Params

Params = COMBINE_PARAMS(Calc, Sign)
Params.head()

In [None]:
def vbt_format_result(res):
    return {
        **              res.returns() .describe().add_prefix(        'returns_describe.'), 
        **np.trim_zeros(res.returns()).describe().add_prefix('nonzero_returns_describe.'), 
        'alpha':                    res.alpha(), 
        'annual_returns':           res.annual_returns().get(0), 
        'annualized_return':        res.annualized_return(), 
        'annualized_volatility':    res.annualized_volatility(), 
        'beta':                     res.beta(), 
        'calmar_ratio':             res.calmar_ratio(), 
        'capture':                  res.capture(), 
        'cond_value_at_risk':       res.cond_value_at_risk(), 
        'down_capture':             res.down_capture(), 
        'downside_risk':            res.downside_risk(), 
        'final_value':              res.final_value(), 
        'max_drawdown':             res.max_drawdown(), 
        'omega_ratio':              res.omega_ratio(), 
        'position_coverage':        res.position_coverage(), 
                                  **res.returns_stats().add_prefix('returns_stats.'), 
        'sharpe_ratio':             res.sharpe_ratio(), 
        'sortino_ratio':            res.sortino_ratio(), 
                                  **res.stats().add_prefix('stats.'), 
        'tail_ratio':               res.tail_ratio(), 
        'total_benchmark_return':   res.total_benchmark_return(), 
        'total_profit':             res.total_profit(), 
        'total_return':             res.total_return(), 
        'up_capture':               res.up_capture(), 
        'value_at_risk':            res.value_at_risk(), 
    }

In [None]:
def format_trades_results(res, direction):
    pts = (res.trades.exit_price - res.trades.entry_price).to_pd() * direction

    trades_total        =  pts.count()
    trades_winners      = (pts >= 0).sum()
    trades_losers       = (pts <  0).sum()

    trades_total_pnl    =  pts.sum()
    trades_avg_pnl      =  pts.mean()

    trades_max_profit   =  pts[pts >= 0].max()
    trades_avg_profit   =  pts[pts >= 0].mean()
    trades_avg_loss     =  pts[pts <  0].mean()
    trades_max_loss     =  pts[pts <  0].min()

    return pd.Series({
        'Trades Total':     trades_total, 
        'Trades Winners':   trades_winners, 
        'Trades Losers':    trades_losers,  

        'Total PnL (pts)':  trades_total_pnl,  
        'Avg PnL (pts)':    trades_avg_pnl,  
 
        'Max Profit (pts)': trades_max_profit,  
        'Avg Profit (pts)': trades_avg_profit,  
        'Avg Loss (pts)':   trades_avg_loss,    
        'Max Loss (pts)':   trades_max_loss,    
    })

In [None]:
def _to_csv(Df, folder, file):
    path = os.path.join(folder, file)
    os.makedirs(folder, exist_ok=True)
    Df.to_csv(path, index=False, header=(not os.path.exists(path)), mode='a')

In [None]:
def BACKTEST(Calc, Sign, Params, FREQ=FREQ): 

    # ================ Vars ================ #
    Calc_group = Calc.groupby(['TF','Symbol'])
    Sign_group = Sign.groupby(['TF','Symbol'])


    # ================ Main ================ #
    for i, row in Params.iterrows():
        TFRAME  = row['TF']
        SYMBOL  = row['Symbol']
        ENTRY   = row['Entry']
        DIRECT  = row['Direct']
        NAME    = row['Name']


        # ================ Input Data ================ #
        Calc_by = Calc_group.get_group((TFRAME,SYMBOL)) .reset_index(drop=1)
        Sign_by = Sign_group.get_group((TFRAME,SYMBOL)) .reset_index(drop=1)

        Input = pd.DataFrame()
        Input['Price']  =  Calc_by['Price']
        Input['Entry']  =  Sign_by[ENTRY]
        Input['Exit']   = (Calc_by['Time'] == Calc_by['Exit Time'])
        Input.index     =  Calc_by['Datetime']


        # ================ Compute Backtest ================ #
        res1 = None
        if (DIRECT=='Long'):   res1 = vbt_Portfolio.from_signals(close=Input['Price'],       entries=Input['Entry'],       exits=Input['Exit'], freq=FREQ)
        if (DIRECT=='Short'):  res1 = vbt_Portfolio.from_signals(close=Input['Price'], short_entries=Input['Entry'], short_exits=Input['Exit'], freq=FREQ)


        # ================ Save Result ================ #
        # res2 = vbt_format_result(res1)
        res3 = format_trades_results(res1, direction={ 'Long':(+1), 'Short':(-1) }[DIRECT])
        _to_csv(Df=pd.DataFrame([{ **row, **res3 }]), folder='Storage/', file='Result.csv')


        # ================ Plot Chart ================ #
        fig = res1.plot()
        fig.update_layout(title={'text':f'<b>{NAME}</b>'})
        os.makedirs(f'Charts', exist_ok=1)
        # fig.write_image(f'Charts/{NAME}.png')
        # fig.write_html(f'Charts/{NAME}.html', include_plotlyjs='cdn')
        fig.show()


        # ================ Log ================ #
        print(i, NAME)
    pass

# BACKTEST(Calc, Sign, Params)

# Drafts

In [None]:
# Df = pd.DataFrame()
# Df['Value'] = [10,20,30,40,50,60]
# Df['Open']  = [ 0, 1, 0, 0, 0, 0]
# Df['Close'] = [ 0, 0, 0, 1, 0, 0]
# Df

# res    = vbt_Portfolio.from_signals(close=Df['Value'], entries=Df['Open'], exits=Df['Close'])
# trades = res.trades