In [1]:
######################## Imports #############################

# Python Imports
import time
from tqdm import tqdm
import warnings

import pandas as pd
import numpy as np
import datetime
from collections import namedtuple

# Numba Imports
from numba import njit
from numba import prange

# Plotting Imports
import plotly.graph_objects as go
from plotly import offline

# Machine Learning Imports
import optuna as opt

# Trading Imports
import talib
import vectorbtpro as vbt

# Optuna Visualizations
from optuna.visualization import plot_contour
from optuna.visualization import plot_edf
from optuna.visualization import plot_intermediate_values
from optuna.visualization import plot_optimization_history
from optuna.visualization import plot_parallel_coordinate
from optuna.visualization import plot_param_importances
from optuna.visualization import plot_slice

In [2]:
'''
##################### Data Gathering #########################

# Define column names
column_names = ['Gmt time', 'Open', 'High', 'Low', 'Close', 'Volume']

# Fetch Locally Saved Data (1 minute)
m1_data = pd.read_csv('data/Dukascopy/EURUSD_Candlestick_1_M_BID_07.01.2023-07.07.2023.csv', 
                      delimiter=',',  # Data is comma-separated
                      names=column_names,  # Add column names
                      skiprows=1)  # Skip the first row (column names)

# Convert 'Gmt time' to datetime and set it as index
m1_data['Gmt time'] = pd.to_datetime(m1_data['Gmt time'], format='%d.%m.%Y %H:%M:%S.%f')
m1_data.set_index('Gmt time', inplace=True)

# Ensure index is a DatetimeIndex
m1_data.index = pd.DatetimeIndex(m1_data.index.values)

# Rename index
m1_data.index.name = 'Open time'

# Set data for vectorbt
m1_data = vbt.Data.from_data({'EURUSD': m1_data})
'''

"\n##################### Data Gathering #########################\n\n# Define column names\ncolumn_names = ['Gmt time', 'Open', 'High', 'Low', 'Close', 'Volume']\n\n# Fetch Locally Saved Data (1 minute)\nm1_data = pd.read_csv('data/Dukascopy/EURUSD_Candlestick_1_M_BID_07.01.2023-07.07.2023.csv', \n                      delimiter=',',  # Data is comma-separated\n                      names=column_names,  # Add column names\n                      skiprows=1)  # Skip the first row (column names)\n\n# Convert 'Gmt time' to datetime and set it as index\nm1_data['Gmt time'] = pd.to_datetime(m1_data['Gmt time'], format='%d.%m.%Y %H:%M:%S.%f')\nm1_data.set_index('Gmt time', inplace=True)\n\n# Ensure index is a DatetimeIndex\nm1_data.index = pd.DatetimeIndex(m1_data.index.values)\n\n# Rename index\nm1_data.index.name = 'Open time'\n\n# Set data for vectorbt\nm1_data = vbt.Data.from_data({'EURUSD': m1_data})\n"

In [3]:
##################### Data Gathering #########################

# Define column names
column_names = ['Gmt time', 'Open', 'High', 'Low', 'Close', 'Volume']

# Fetch Locally Saved Data (1 minute)
m1_data = pd.read_csv('data/forexsb/EURUSD1.csv', 
                      delimiter='\t',  # Data is comma-separated
                      names=column_names)

# Remove unwanted characters
m1_data['Gmt time'] = m1_data['Gmt time'].str.replace('\t', '')

# Convert 'Gmt time' to datetime
m1_data['Gmt time'] = pd.to_datetime(m1_data['Gmt time'], format='%Y-%m-%d %H:%M')

# Set 'Gmt time' as the index
m1_data.set_index('Gmt time', inplace=True)

# Ensure index is a DatetimeIndex
m1_data.index = pd.DatetimeIndex(m1_data.index.values)

# Rename index
m1_data.index.name = 'Open time'

# Set data for vectorbt
m1_data = vbt.Data.from_data({'EURUSD': m1_data})

In [4]:
## 1m data
m1_open_raw  = m1_data.get('Open')
m1_high_raw  = m1_data.get('High')
m1_low_raw   = m1_data.get('Low')
m1_close_raw = m1_data.get('Close')

# Continuous indexing for 1m data
m1_open  = m1_open_raw.resample('1T').interpolate()
m1_high  = m1_high_raw.resample('1T').interpolate()
m1_low   = m1_low_raw.resample('1T').interpolate()
m1_close = m1_close_raw.resample('1T').interpolate()

# Continuous indexing for 15m data
m15_open = m1_close.resample('15T').first()
m15_high = m1_high.resample('15T').max()
m15_low = m1_low.resample('15T').min()
m15_close = m1_close.resample('15T').last()

# Continuous indexing for 1h data
h1_open = m1_open.resample('60T').first()
h1_high = m1_high.resample('60T').max()
h1_low = m1_low.resample('60T').min()
h1_close = m1_close.resample('60T').last()

In [5]:
##################### Indicator Functions #######################

####### SuperTrend Indicator Helper Functions ###################
def st_get_basic_bands(high, low, close, st_multiplier, st_period):
    st_medprice = vbt.talib('MEDPRICE').run(high, low, skipna=True).real.to_numpy().astype(np.float64)
    st_atr = vbt.talib('ATR').run(high, low, close, timeperiod=st_period, skipna=True).real.to_numpy().astype(np.float64)
    st_basic_upper, st_basic_lower = st_get_basic_bands_nb(st_atr, st_medprice, st_multiplier, st_period)
    return st_basic_upper, st_basic_lower

@njit(nogil=True)
def st_get_basic_bands_nb(st_atr, st_medprice, st_multiplier, st_period):
    st_matr = st_multiplier * st_atr
    st_basic_upper = st_medprice + st_matr
    st_basic_lower = st_medprice - st_matr
    return st_basic_upper, st_basic_lower

@njit(nogil=True)
def st_get_final_bands(close, st_basic_upper, st_basic_lower): 
    
    st_trend = np.full(close.shape, np.nan)  
    st_dir = np.full(close.shape, 1)
    st_long = np.full(close.shape, np.nan)
    st_short = np.full(close.shape, np.nan)
    
    for i in range(1, close.shape[0]):
        if close[i] > st_basic_upper[i - 1]:
            st_dir[i] = 1
        elif close[i] < st_basic_lower[i - 1]:
            st_dir[i] = -1
        else:
            st_dir[i] = st_dir[i - 1]
            if st_dir[i] > 0 and st_basic_lower[i] < st_basic_lower[i - 1]:
                st_basic_lower[i] = st_basic_lower[i - 1]
            if st_dir[i] < 0 and st_basic_upper[i] > st_basic_upper[i - 1]:
                st_basic_upper[i] = st_basic_upper[i - 1]
        if st_dir[i] > 0:
            st_trend[i] = st_long[i] = st_basic_lower[i]
        else:
            st_trend[i] = st_short[i] = st_basic_upper[i]    
    return st_trend, st_dir, st_long, st_short 



In [6]:
#################### Clean Signals ############################

@njit(nogil=True, parallel=True)
def clean_signals_nb(m1_high_np, m1_low_np, m1_open_np, st_dir_m1):  
    
    # Initialize signal arrays
    long_entry = np.full_like(m1_open_np, False, dtype=np.bool_)
    long_exit = np.full_like(m1_open_np, False, dtype=np.bool_)
    
    short_entry = np.full_like(m1_open_np, False, dtype=np.bool_)
    short_exit = np.full_like(m1_open_np, False, dtype=np.bool_)
    
    for col in prange(m1_open_np.shape[1]):

        for i in range(m1_open_np.shape[0]):

            ######################## LONGS ############################

            # Long Entries
            if (st_dir_m1[i, col] == 1.0 and st_dir_m1[i - 1, col] == -1.0):
                short_exit[i, col] = True
                long_entry[i, col] = True

            ######################## SHORTS ############################
            
            # Short Entries
            if (st_dir_m1[i, col] == -1.0 and st_dir_m1[i - 1, col] == 1.0):
                long_exit[i, col] = True
                short_entry[i, col] = True
           
            
    # Returns
    return long_entry, long_exit, short_entry, short_exit

In [7]:
######################### SuperTrend Indicator ############################

expr = """
SuperTrend[st]:

# SuperTrend Indicator
basic_upper, basic_lower = st_get_basic_bands(@in_h1_high, @in_h1_low, @in_h1_close, @p_st_multiplier, @p_st_period)
trend, dir, long, short = st_get_final_bands(@in_h1_close, basic_upper, basic_lower)

# Returns
trend, dir, long, short

"""

SuperTrend = vbt.IF.from_expr(
    expr,
    takes_1d=True,

    # SuperTrend Indicator
    st_period=7,
    st_multiplier=5,
    st_get_basic_bands=st_get_basic_bands,
    st_get_final_bands=st_get_final_bands,
)

In [8]:
st_multiplier = 2.0 #trial.suggest_float("st_multiplier", 1.0, 20.0, step=0.1) 
st_period = 200 #trial.suggest_int("st_period", 2, 1000)

st = SuperTrend.run(h1_high, h1_low, h1_close, # calls h1_high, h1_low, h1_close
                    st_multiplier=st_multiplier,
                    st_period=st_period,                        
                    execute_kwargs=dict(                                        
                                    engine='dask',
                                    chunk_len='auto',
                                    show_progress=True
                                    ))

  0%|          | 0/1 [00:00<?, ?it/s]

In [9]:
##################### Ensure 2-Dim Arrays #######################

def ensure_2d(arr):
    if arr.ndim == 1:
        return arr.reshape(-1, 1)
    return arr

In [10]:
######################## Resample Everything To 1 Min ##########################

# Use some placeholder value
placeholder = -99999

#################### Nan to Placeholder ########################

st_dir = st.dir.fillna(placeholder)
st_long = st.long.fillna(placeholder)
st_short = st.short.fillna(placeholder)

#################### Reindex To 1 Min ##########################

st_dir_m1 = st_dir.reindex(m1_open.index).ffill()
st_dir_m1 = st_dir_m1.bfill()
st_long_m1 = st_long.reindex(m1_open.index).ffill()
st_long_m1 = st_long_m1.bfill()
st_short_m1 = st_short.reindex(m1_open.index).ffill()
st_short_m1 = st_short_m1.bfill()

#################### Placeholder To NaN ########################

st_dir_m1 = st_dir_m1.replace(placeholder, np.nan)
st_short_m1 = st_short_m1.replace(placeholder, np.nan)
st_long_m1 = st_long_m1.replace(placeholder, np.nan)    

#################### Convert to Numpy ##########################

st_dir_m1 = st_dir_m1.to_numpy()

#################### 1 Minute OHLC Data ########################

m1_open_np = m1_open.to_numpy()
m1_high_np = m1_high.to_numpy()
m1_low_np = m1_low.to_numpy()
m1_close_np = m1_close.to_numpy()

################### Ensure 2-Dim Array #########################

st_dir_m1 = ensure_2d(st_dir_m1)
m1_open_np = ensure_2d(m1_open_np)
m1_high_np = ensure_2d(m1_high_np)
m1_low_np = ensure_2d(m1_low_np)
m1_close_np = ensure_2d(m1_close_np)

In [11]:
#################### Clean Signals Call ############################

for col in tqdm(range(m1_open_np.shape[1]), desc='Processing columns'): 
    
    long_entry, \
    long_exit, \
    short_entry, \
    short_exit = clean_signals_nb(m1_high_np, m1_low_np, m1_open_np, st_dir_m1)

Processing columns: 100%|██████████| 1/1 [00:01<00:00,  1.13s/it]


In [12]:
######################### Portfolio Wrapper #########################

pf = vbt.Portfolio.from_signals(
    close=m1_close,
    entries=long_entry,
    exits=long_exit,
    short_entries=short_entry,
    short_exits=short_exit,
    fees=0.00005,
    slippage=0.0000269,
    init_cash=15000,
    freq='1m',
    orders_cls=vbt.FSOrders
)

In [21]:
pf.trades.win_rate # Return as Objective

Close    0.795918
Name: win_rate, dtype: float64

In [14]:
pf.stats()

Start                         2022-12-29 08:46:00+00:00
End                           2023-07-13 06:59:00+00:00
Period                                195 days 22:14:00
Start Value                                     15000.0
Min Value                                  14965.582623
Max Value                                  27312.805092
End Value                                   27308.88573
Total Return [%]                              82.059238
Benchmark Return [%]                           4.846233
Total Time Exposure [%]                       93.806489
Max Gross Exposure [%]                       100.459205
Max Drawdown [%]                               0.824219
Max Drawdown Duration                   4 days 16:32:00
Total Orders                                        196
Total Fees Paid                              405.901219
Total Trades                                        196
Win Rate [%]                                  79.487179
Best Trade [%]                                  

In [15]:
    # SuperTrend
    st_long_df = pd.Series(st_long_m1, index=m1_open.index)
    st_short_df = pd.Series(st_short_m1, index=m1_open.index)
    
    long_entry_df = pd.DataFrame(long_entry, index=m1_open.index, columns=['EURUSD'])
    long_exit_df = pd.DataFrame(long_exit, index=m1_open.index, columns=['EURUSD'])
    short_entry_df = pd.DataFrame(short_entry, index=m1_open.index, columns=['EURUSD'])
    short_exit_df = pd.DataFrame(short_exit, index=m1_open.index, columns=['EURUSD'])
    
    

In [16]:
symbol = "EURUSD"
index = "EURUSD" # Backtest index
start_date = "2023-05-23 00:00:00+00:00"
end_date = "2023-05-26 23:59:00+00:00"

# Create a DataFrame with OHLC data
filtered_data = pd.DataFrame({
    'open': m1_open.loc[start_date:end_date],
    'high': m1_high.loc[start_date:end_date],
    'low': m1_low.loc[start_date:end_date],
    'close': m1_close.loc[start_date:end_date]
})

# Create a candlestick chart
fig = go.Figure(data=[go.Candlestick(x=filtered_data.index,
                open=filtered_data['open'],
                high=filtered_data['high'],
                low=filtered_data['low'],
                close=filtered_data['close'])])

pf.orders.loc[start_date:end_date].plot(plot_close=False, fig=fig)

# Update layout
fig.update_layout(
    xaxis_rangeslider_visible=False,
    dragmode='zoom'
)

# Calculate offset as a fraction of the y-axis range
y_range = filtered_data.max() - filtered_data.min()
offset_fraction = 0.01  # 5% offset
offset_value = y_range * offset_fraction

fig.add_trace(
    go.Scatter(
        x=st_long_df.loc[start_date:end_date].index,
        y=st_long_df.loc[start_date:end_date].values,
        mode='lines',
        name='long supertrend',
        line=dict(color='limegreen', width=1)
    )
)
fig.add_trace(
    go.Scatter(
        x=st_short_df.loc[start_date:end_date].index,
        y=st_short_df.loc[start_date:end_date].values,
        mode='lines',
        name='short supertrend',
        line=dict(color='orange', width=1)
    )
)

long_entry_df[index].loc[start_date:end_date].vbt.signals.plot_as_markers(
    y=m1_close.loc[start_date:end_date].values,
    trace_kwargs=dict(marker=dict(symbol="triangle-up", color="limegreen", size=10), name='long entry'),
    fig=fig
)
long_exit_df[index].loc[start_date:end_date].vbt.signals.plot_as_markers(
    y=m1_close.loc[start_date:end_date].values,
    trace_kwargs=dict(marker=dict(symbol="triangle-down", color="red", size=10), name='long exit'),
    fig=fig
)

# Change the theme to dark
fig.update_layout(
    width=1800,
    height=950,
    template='plotly_dark')

# Show the figure
offline.plot(fig)

'temp-plot.html'