# Imports

In [None]:
# Importing essential libraries for backtesting trading strategies using Backtesting.py
import backtesting
from backtesting import Strategy, Backtest
from backtesting.lib import crossover, plot_heatmaps
from backtesting.test import SMA

# Importing common data manipulation and analysis libraries: Pandas and NumPy
import pandas as pd
import numpy as np

# Importing TA-Lib for technical indicators and seaborn for visualizations
import talib
import seaborn as sns

# Importing standard libraries for time handling and unique identifier generation
import time
import datetime
import uuid
from IPython.display import clear_output

# Importing utilities for date manipulation and generating combinations of parameters
from dateutil.relativedelta import relativedelta
from itertools import product

# Setting up logging for monitoring the script's execution
import logging
logging.basicConfig(level=logging.INFO)

# Defining an aggregation dictionary for resampling OHLCV data in trading strategies
ohlcv_agg = {
    'Open': 'first',
    'High': 'max',
    'Low': 'min',
    'Close': 'last',
    'Volume': 'sum',
}

## initialize data

In [None]:
df = pd.read_csv('./backtest_data/BTCUSDT_1m.csv', index_col=0, parse_dates=True)

# resample dataframe to 15min tickers
df = df.resample('15T').agg(ohlcv_agg)
df = (df / 1e3).assign(Volume=df.Volume * 1e3) # fraction BTC currency by 1e-3 
df.iloc[:, :4] = df.iloc[:, :4].ffill()

# EURUSD data
# df = pd.read_csv('./backtest_data/mt4_eurusd.csv', parse_dates=True, index_col=0)

In [None]:
def swings(length, df, prefix='', suffix=''):
    """
    Swings detection/measurements.
    
    This function identifies swing highs and lows in a given financial time series data.
    
    Args:
    - length: The window length for identifying swings.
    - df: The DataFrame containing financial data (OHLC - Open, High, Low, Close).
    - prefix: Optional prefix for column names.
    - suffix: Optional suffix for column names.

    Returns:
    - Modified DataFrame with added columns for swing highs and lows.
    """
        
    os = 0
    # search for max and min values in len timeperiod
    upper = df['High'].rolling(length).max()
    lower = df['Low'].rolling(length).min()

    # create a boolean series where True if high is greater than upper and low is less than lower
    cond1 = df['High'].shift(length) > upper
    cond2 = df['Low'].shift(length) < lower

    # use np.where to assign values to os
    os = np.where(cond1, 0, np.where(cond2, 1, np.nan))
    os = pd.Series(os).ffill()

    top = np.where(((os == 0) & (os.shift(1)!= 0)), df['High'].shift(length), np.nan)
    btm = np.where(((os == 1) & (os.shift(1)!= 1)), df['Low'].shift(length), np.nan)

    top_x = np.where(((os == 0) & (os.shift(1)!= 0)), np.arange(-length, len(df)-length), np.nan)
    btm_x = np.where(((os == 1) & (os.shift(1)!= 1)), np.arange(-length, len(df)-length), np.nan)

    # Naming the columns for the swing points
    top_name = f'{prefix}top{suffix}'
    btm_name = f'{prefix}btm{suffix}'
    top_x_name = f'{prefix}top_x{suffix}'
    btm_x_name = f'{prefix}btm_x{suffix}'

    # Initializing columns in the DataFrame
    df[[top_name, btm_name]] = np.nan
    df[top_name] = top
    df[btm_name] = btm
    df[top_x_name] = top_x
    df[btm_x_name] = btm_x
    
    df[[top_name, btm_name, top_x_name, btm_x_name]] = df[[top_name, btm_name, top_x_name, btm_x_name]].ffill()
    df[[top_name, btm_name, top_x_name, btm_x_name]] = df[[top_name, btm_name, top_x_name, btm_x_name]].bfill()
    df[[top_x_name, btm_x_name]] = df[[top_x_name, btm_x_name]].astype(int)
    return df

In [None]:
def initialize_data(df, temporalities, from_date, to_date, drop_weekends=False):
    """
    Initialize and preprocess data for backtesting.
    
    This function prepares the financial data for backtesting by resampling, merging different temporalities,
    identifying swings, and computing additional features like ATR (Average True Range).

    Args:
    - df: The DataFrame containing financial data.
    - temporalities: A dictionary specifying different temporalities for resampling.
    - from_date, to_date: Date range for filtering the data.
    - drop_weekends: A flag to indicate whether to drop weekend data (not implemented in this function).

    Returns:
    - A DataFrame ready for backtesting with additional computed features.
    """
    df_base = df[from_date:to_date].resample(f'{temporalities["1"]["t"]}T').agg(ohlcv_agg).copy().shift(1)

    for key, value in temporalities.items():
        df_temp =  df_base.shift(-1).resample(f'{value["t"]}T').agg(ohlcv_agg).copy().shift(1)

        df_temp = swings(value['sp'], df_temp)
        df_temp = swings(value['ip'], df_temp, prefix='i').add_suffix(f'_{key}')
        df_base = df_base.merge(df_temp, left_index=True, right_index=True, how='left').ffill()#.bfill()
    
    # adjust x coordinates for different temporalities
    for t in ['2', '3']:
        df_temp = df_base.shift(-1).resample(f'{temporalities[t]["t"]}T').agg(ohlcv_agg).copy()
        df_base = df_base.merge(talib.ATR(df_temp['High'], df_temp['Low'], df_temp['Close'], timeperiod=200).rename(f'atr_{t}').shift(1), right_index=True, left_index=True, how='left')
    
        df_base[[f'top_x_{t}', f'btm_x_{t}']]*=(temporalities[t]['t']/temporalities['1']['t'])
        df_base[[f'top_x_{t}', f'btm_x_{t}']] = df_base[[f'top_x_{t}', f'btm_x_{t}']].astype(dtype="Int64")
        df_base[[f'itop_x_{t}', f'ibtm_x_{t}']]*=(temporalities[t]['t']/temporalities['1']['t'])
        df_base[[f'itop_x_{t}', f'ibtm_x_{t}']] = df_base[[f'itop_x_{t}', f'ibtm_x_{t}']].astype(dtype="Int64")


    # btm cross column when btm changes in value
    for col in df_base.columns[df_base.columns.to_series().apply(lambda x: x.endswith('btm', -5, -2) or x.endswith('top', -5, -2))]:
        df_base[f'new_{col}'] = df_base[col].diff().fillna(0).astype(bool)

        
    
    return df_base

# define strategy

In [None]:
def find_ob(temp_df, atr, ob_type, t = '2'):
    """
    Find the optimal buy/sell opportunity based on the Average True Range (ATR) and other criteria.

    Args:
    - temp_df: The DataFrame containing financial data.
    - atr: Average True Range, a measure of market volatility.
    - ob_type: Type of order block to find (buy or sell).
    - t: Temporality key, defaulting to '2'.

    Returns:
    - A tuple with details of the identified opportunity or default values if none found.
    """
        
    # temp_df = self.data.df[-bars_back+1:-1]
    ob_time = 0

    # Early return with default values if the ATR series is empty
    if atr.shape[0] < 1:
        return np.inf, 0, 0, 0, ob_type
    
    # Identifying opportunity based on ob_type (buy or sell)
    if ob_type:
        ob_time = temp_df[(temp_df[f'High_{t}']-temp_df[f'Low_{t}']) < atr * 2][f'Low_{t}']
        if len(ob_time)==0:
            ob_time = 0
        else:
            ob_time = ob_time.idxmin()
    else:
        ob_time = temp_df[(temp_df[f'High_{t}']-temp_df[f'Low_{t}']) < atr * 2][f'High_{t}']
        if len(ob_time)==0:
            ob_time = 0
        else:
            ob_time = ob_time.idxmax()

    # Returning default values if no opportunity time is found
    if not ob_time:
        return np.inf, 0, 0, 0, ob_type
    
    # Extracting the minimum and maximum values at the opportunity time
    min_val = temp_df.loc[ob_time][f'Low_{t}']
    max_val = temp_df.loc[ob_time][f'High_{t}']
    ob_creation_time = temp_df.index[-1]
    return min_val, max_val, ob_creation_time, ob_time, ob_type

In [None]:
def calc_order(min_val, max_val, ob_type, sl_percent, risk_ratio):

    """
    Calculate order parameters for a trading opportunity.

    Args:
    - min_val: Minimum value in the opportunity window.
    - max_val: Maximum value in the opportunity window.
    - ob_type: Type of order block (buy or sell).
    - sl_percent: Percentage for calculating the stop loss.
    - risk_ratio: Ratio for calculating the take profit.

    Returns:
    - A tuple containing calculated values for the limit order, order width, stop loss, and take profit.
    """

    limit = max_val if ob_type else min_val
    ob_width = max_val - min_val
    stop_loss = limit - (ob_width) * sl_percent * (+1 if ob_type else -1)
    take_profit = limit + (ob_width) * sl_percent * risk_ratio * (+1 if ob_type else -1)
    return limit, ob_width, stop_loss, take_profit

In [None]:
class TrailingStrategy(Strategy):

    def init(self):
        super().init()

    def set_trailing_sl(self, sl_amount: float = 6):
        """
    Set the trailing stop loss as $n below the current price (for long positions)
        Works for future bars only
        """
        self.__sl_amount = sl_amount

    def next(self):
        super().next()
        # Can't use index=-1 because self.__atr is not an Indicator type
        index = len(self.data)-1
    
        if (len(self.inner_structure_bull_1)>0 and self.inner_structure_bull_1[-1]['end_time']==self.data.index[-1]):
            struct = self.inner_structure_bull_1[-1]
            price = struct['price']

            for trade in self.trades:
                
                if trade.is_long:
                    if trade.entry_price > trade.sl:
                        logging.debug((trade.entry_price, trade.sl, self.inner_structure_bull_1[-1]['price']))
                        trade.sl = max(trade.entry_price, trade.sl)
                    else:
                        logging.debug((trade.entry_price, trade.sl, self.inner_structure_bull_1[-2]['price']))
                        trade.sl = max(trade.sl, self.inner_structure_bull_1[-2]['price'])

        if (len(self.inner_structure_bear_1)>0 and self.inner_structure_bear_1[-1]['end_time']==self.data.index[-1]):
            struct = self.inner_structure_bear_1[-1]
            price = struct['price']

            for trade in self.trades:
                if trade.is_short:
                    if trade.entry_price < trade.sl:
                        logging.debug((trade.entry_price, trade.sl, self.inner_structure_bear_1[-1]['price']))
                        
                        trade.sl = min(trade.entry_price, trade.sl)
                    else:
                        logging.debug((trade.entry_price, trade.sl, self.inner_structure_bull_1[-2]['price']))
                        trade.sl = min(trade.sl, self.inner_structure_bear_1[-2]['price'])

In [None]:
## OGSmartMoneyConcepts Strategy

def linear(x):
    return x

class OGSmartMoneyConcepts(Strategy):
    n_ob = 5
    risk_ratio = 200
    sl_percent = 100
    swing_p = 50
    risk_pct = 10
    margin = 1/5
    long_trading = True
    short_trading = True
    order_times_2 = {}
    order_duration = 48
    order_creation_time = 0
    order_cancel_time = 0
    ob_time = 0
    trailing_sl = True
    revancha = False
    ob_mitigation = True
    all_order_blocks = pd.DataFrame(columns=['min_val', 'max_val', 'ob_creation_time', 'ob_time', 'ob_type', 'limit', 'ob_width', 'stop_loss', 'take_profit', 'id', 'not_mitigated', 'order_placed', 'mitigation_time', 'temporality'])
    order_blocks = pd.DataFrame(columns=['min_val', 'max_val', 'ob_creation_time', 'ob_time', 'ob_type', 'limit', 'ob_width', 'stop_loss', 'take_profit', 'id', 'not_mitigated', 'order_placed', 'mitigation_time', 'temporality'])
    inner_structure = pd.DataFrame(columns=['structure_type', 'start_time', 'end_time', 'price', 'trend', 'type', 'temporality'])
    sw_structure = pd.DataFrame(columns=['structure_type', 'start_time', 'end_time', 'price', 'trend', 'type', 'temporality'])
    trends = pd.DataFrame(False, index=[1, 2, 3], columns=['itop', 'itop_cross', 'ibtm', 'ibtm_cross', 'itrend', 'top', 'top_cross', 'btm', 'btm_cross', 'trend'])
    trends1 = np.full((3,6), 1)
    trends1[:,[2,4]]=0

    logging.info('Parameters initialized successfully.')

    def init(self):

        self.bar_index = self.I(linear, np.arange(len(self.data.Close)))

        self.data.len_back = self.swing_p
        self.risk_ratio /= 100
        self.sl_percent /= 100
        self.risk_pct /= 1000
        self.itop_cross_1 = True
        self.ibtm_cross_1 = True
        self.top_cross_1 = True
        self.btm_cross_1 = True
        self.itop_cross_2 = True
        self.ibtm_cross_2 = True
        self.top_cross_2 = True
        self.btm_cross_2 = True
        self.itop_cross_3  =True 
        self.ibtm_cross_3  =True 
        self.btm_cross_3  = True
        self.top_cross_3  = True
        self.trend_1 = 0
        self.itrend_1 = 0
        self.trend_2 = 0
        self.itrend_2 = 0
        self.trend_3 = 0
        self.itrend_3 = 0
        self.n_ob_1 = []
        self.n_ob_2 = []
        self.n_ob_3 = []

        
        self.btm_3 = self.I(linear, self.data.btm_3, overlay=True)
        self.top_3 = self.I(linear, self.data.top_3, overlay=True)
        self.btm_x_3 = self.I(linear, self.data.btm_x_3)
        self.top_x_3 = self.I(linear, self.data.top_x_3)
        


    def next(self):

        # print progress
        if self.data.index[-1].hour + self.data.index[-1].minute == 0:
            clear_output()
            print(self.data.index[-1], self.t_init, self.t_init2, self.t_inner, self.t_inner2, self.t_sw, self.t_sw2)
        
        start_time1 = time.time()

        self.itop_cross_1 = self.itop_cross_1 or self.data['new_itop_1'][-1]
        self.ibtm_cross_1 = self.ibtm_cross_1 or self.data['new_ibtm_1'][-1]
        self.top_cross_1 = self.top_cross_1 or self.data['new_top_1'][-1]

        self.itop_cross_2 = self.itop_cross_2 or self.data['new_itop_2'][-1]
        self.ibtm_cross_2 = self.ibtm_cross_2 or self.data['new_ibtm_2'][-1]
        self.top_cross_2 = self.top_cross_2 or self.data['new_top_2'][-1]
        self.btm_cross_2 = self.btm_cross_2 or self.data['new_btm_2'][-1]
        self.btm_cross_1 = self.btm_cross_1 or self.data['new_btm_1'][-1]
        
        self.itop_cross_3  = self.itop_cross_3 or self.data['new_itop_3'][-1]
        self.ibtm_cross_3  = self.ibtm_cross_3 or self.data['new_ibtm_3'][-1]
        self.btm_cross_3  = self.btm_cross_3 or self.data['new_btm_3'][-1]
        self.top_cross_3  = self.top_cross_3 or self.data['new_top_3'][-1]


        self.new_i_structure=False
        self.new_sw_structure=False
        self.new_ob=False
        self.t_init += time.time()-start_time1
        start_time2 = time.time()

        
        self.t_init2 += time.time()-start_time2
        
        # generate new structure and order blocks for all 3 temporalities
        for key in temporalities.keys():
            for direction in ['top', 'btm']:

                start_time = time.time()
                # calculate structure
                if temporalities[key]['i_structure']:
                    price_cross = crossover(self.data[f'Close_{key}'.replace('_1','')], self.data[f'i{direction}_{key}']) if direction=='top' \
                        else crossover(self.data[f'i{direction}_{key}'], self.data[f'Close_{key}'.replace('_1','')])
                    if price_cross and getattr(self, f'i{direction}_cross_{key}') and (self.data.itop_1[-1] != self.data.top_1[-1]):
                        # set internal top/btm price change to false
                        setattr(self, f'i{direction}_cross_{key}', False)
                        # structure type depending on previous trend
                        
                        structure_type = 'BOS' if getattr(self, f'itrend_{key}')*(1 if direction=='top' else -1) > 0 else 'CHoCH'
                        # set new internal trend depending on top/btm
                        setattr(self, f'itrend_{key}', 1 if direction=='top' else -1)
                        self.t_inner += time.time()-start_time
                        start_time = time.time()
                        
                        self.inner_structure.loc[len(self.inner_structure)] = dict(structure_type=structure_type
                                                        , start_time=self.data.index[self.data[f'i{direction}_x_{key}'][-1]]
                                                        , end_time=self.data.index[-1]
                                                        , price=self.data[f'i{direction}_{key}'][-1]
                                                        , trend=getattr(self, f'itrend_{key}')
                                                        , type= 1 if direction=='top' else 0
                                                        , temporality=key)
                        self.new_i_structure = True
                self.t_inner2 += time.time()-start_time
                start_time = time.time()
                if temporalities[key]['sw_structure'] or temporalities[key]['sw_ob']:
                    price_cross = crossover(self.data[f'Close_{key}'.replace('_1','')], self.data[f'{direction}_{key}']) if direction=='top' \
                        else crossover(self.data[f'{direction}_{key}'], self.data[f'Close_{key}'.replace('_1','')])
                    if price_cross and getattr(self, f'{direction}_cross_{key}'):
                        print(direction, self.data.index[-1], self.data[f'{direction}_x_{key}'][-1], self.data.index[self.data[f'{direction}_x_{key}'][-1]], self.data[f'i{direction}_{key}'][-1])
                        # set top/btm price change to false
                        setattr(self, f'{direction}_cross_{key}', False)
                        # structure type depending on previous trend
                        structure_type = 'BOS' if getattr(self, f'trend_{key}')*(1 if direction=='top' else -1) > 0 else 'CHoCH'
                        # set new trend depending on top/btm
                        # print(getattr(self, f'trend_{key}'), getattr(self, f'trend_{key}')*(1 if direction=='top' else -1), structure_type)
                        setattr(self, f'trend_{key}', 1 if direction=='top' else -1)
                        self.t_sw += time.time()-start_time
                        start_time = time.time()
                        self.sw_structure.loc[len(self.sw_structure)] = dict(structure_type=structure_type
                                                        , start_time=self.data.index[self.data[f'{direction}_x_{key}'][-1]]
                                                        , end_time=self.data.index[-1]
                                                        , price=self.data[f'{direction}_{key}'][-1]
                                                        , trend=getattr(self, f'trend_{key}')
                                                        , type= 1 if direction=='top' else 0
                                                        , temporality=key)
                        self.new_sw_structure = True

                        # find OB
                        if temporalities[key]['sw_ob']:
                            bars_back = int(self.bar_index[-1]-self.data[f'{direction}_x_{key}'][-1])
                            min_val, max_val, ob_creation_time, ob_time, ob_type = find_ob(self.data.df[-bars_back+1:-1].dropna(subset=f'atr_{key}'), self.data[f'atr_{key}'].s[-bars_back+1:-1].dropna(), ob_type=1 if direction=='top' else 0, t=key)

                            if min_val < np.inf:
                                # calculate order block bounds
                                limit, ob_width, stop_loss, take_profit = calc_order(min_val, max_val, ob_type, self.sl_percent, self.risk_ratio)

                                # calculate order size
                                size = (self.equity * self.risk_pct) // (ob_width*self.sl_percent)
                                max_size = (self.equity / self.margin) // limit
                                id = str(uuid.uuid4())

                                # append order block to list
                                self.order_blocks.loc[len(self.order_blocks)] = dict(min_val=min_val
                                                            , max_val=max_val
                                                            , ob_creation_time=ob_creation_time
                                                            , ob_time=ob_time
                                                            , ob_type=ob_type
                                                            , limit=limit
                                                            , ob_width=ob_width
                                                            , stop_loss=stop_loss
                                                            , take_profit=take_profit
                                                            , id=id
                                                            , not_mitigated=True
                                                            , order_placed=False
                                                            , mitigation_time=np.nan
                                                            , temporality=key)
                                self.all_order_blocks.loc[len(self.all_order_blocks)] = dict(min_val=min_val
                                                            , max_val=max_val
                                                            , ob_creation_time=ob_creation_time
                                                            , ob_time=ob_time
                                                            , ob_type=ob_type
                                                            , limit=limit
                                                            , ob_width=ob_width
                                                            , stop_loss=stop_loss
                                                            , take_profit=take_profit
                                                            , id=id
                                                            , not_mitigated=True
                                                            , order_placed=False
                                                            , mitigation_time=np.nan
                                                            , temporality=key)
                                self.new_ob = True


                    self.t_sw2 += time.time()-start_time

        # check if order block is mitigated
        for key in temporalities.keys():
            if temporalities[key]['sw_ob'] and not pd.isnull(self.data[f'atr_{key}'][-1]):

                    self.order_blocks.loc[self.order_blocks['not_mitigated']
                                            & (self.order_blocks['temporality']==key)
                                            & (
                                            (self.order_blocks['stop_loss']<self.data[f'Close_{key}'][-1]) & (self.order_blocks['ob_type']==0)
                                            | (self.order_blocks['stop_loss']>self.data[f'Close_{key}'][-1]) & (self.order_blocks['ob_type']==1)
                                            ), ['not_mitigated', 'mitigation_time']] = [False, self.data.index[-1]]
        
        # create new limit order if there is a new order block in the shortest temporality
        if self.new_ob:
            for index, ob in self.order_blocks.loc[self.order_blocks['not_mitigated'] & (self.order_blocks['temporality']=='2') & (~self.order_blocks['order_placed']) & (self.order_blocks['ob_creation_time'] == self.data.index[-1])].iterrows():
                for long_index, long_ob in self.order_blocks.loc[self.order_blocks['not_mitigated']& (self.order_blocks['temporality']=='3')& (self.order_blocks['ob_type']==ob['ob_type'])].iterrows():    
                    
                    if (long_ob['take_profit'] >= ob['limit'] >= long_ob['limit']) or (long_ob['take_profit'] <= ob['limit'] <= long_ob['limit']):
                        size = (self.equity * self.risk_pct) // (ob['ob_width']*self.sl_percent)
                        max_size = (self.equity / self.margin) // ob['limit']
                        order = self.buy(
                                size=min(size, max_size) * (1 if ob['ob_type'] else -1)
                                , limit=ob['limit']
                                , sl=ob['stop_loss']
                                , tp=ob['take_profit'] 
                                , id=ob['id']
                            )
            self.new_ob = False

# RUN Strategy

## OGSmartMoneyConcepts Strategy

The OGSmartMoneyConcepts strategy is an automated trading strategy designed for use with the Backtesting.py library. This strategy utilizes a sophisticated method to identify trading opportunities based on market swings, order blocks, and trend structures. It is equipped to handle multiple temporalities and offers flexibility for both long and short trading positions.

### Strategy Overview

The strategy's core is based on the identification and utilization of market structures such as internal and external swings, order blocks, and trends. It dynamically adjusts to market conditions, aiming to capitalize on both bullish and bearish market movements.

### Key Parameters

- `n_ob`: The number of order blocks to consider.
- `risk_ratio`: The risk-to-reward ratio for each trade.
- `sl_percent`: The percentage used to calculate the stop loss.
- `swing_p`: The period for swing calculation.
- `risk_pct`: The risk percentage of the equity for each trade.
- `margin`: The margin requirement for the trade.
- `long_trading` and `short_trading`: Flags to enable or disable long and short trading.
- `order_duration`: The duration for which an order remains valid.
- `trailing_sl`: Enables trailing stop loss if set to `True`.

### Initialization (`init` method)

The `init` method sets up various indicators and internal parameters required for the strategy. It initializes the structures for tracking trends, swing points, and establishes parameters for trade execution.

### Trade Execution (`next` method)

The `next` method is where the strategy evaluates market conditions and decides on trade actions. It involves the following steps:

1. **Progress Tracking**: Displays the progress of the strategy.
2. **Cross Detection**: Identifies crosses between price and swing points.
3. **Structure Identification**: Determines if there are new internal or external structures based on the current market condition.
4. **Order Block Identification**: Finds new order blocks based on the structures and market swings.
5. **Order Placement**: Places new limit orders if there are unmitigated order blocks in the shortest temporality and matches the conditions for long or short positions.
6. **Order Mitigation Check**: Evaluates if existing order blocks are mitigated based on the current market price.

### Utilizing the Strategy

To utilize the OGSmartMoneyConcepts strategy:

1. **Data Preparation**: Ensure that the data fed into the initialize_data function includes all necessary OHLCV (Open, High, Low, Close, Volume) information.
2. **Strategy Configuration**: Set the strategy parameters according to your trading preferences and risk management rules.
3. **Backtesting**: Use the Backtesting.py framework to backtest the strategy against historical data.
4. **Analysis and Adjustment**: Analyze the backtesting results and adjust the strategy parameters as needed for optimal performance.

In [None]:
problematic_orders = []
problematic_trades = []
closed_t = []
ob_id = ''

In [None]:
# intialize run parameters
year = 2022
temporalities = {'1':{'t':15, 'sp':50, 'ip':5, 'i_structure':False, 'sw_structure':False, 'iob':False, 'sw_ob':False}
                , '2':{'t':15, 'sp':30, 'ip':5, 'i_structure':False, 'sw_structure':False, 'iob':False, 'sw_ob':True}
                , '3':{'t':120, 'sp':20, 'ip':5, 'i_structure':False, 'sw_structure':True, 'iob':False, 'sw_ob':True}
                }
t_init = 0
t_inner = 0
t_inner2 = 0
t_sw = 0
t_sw2 = 0
t_init2 = 0
short_trading = True
long_trading = True #not trade_strategy.short_trading
trailing_sl = False
ob_list_2 = []
ob_df_2 = pd.DataFrame(columns=['min_val', 'max_val', 'ob_creation_time', 'ob_time', 'ob_type', 'limit', 'ob_width', 'stop_loss', 'take_profit', 'id', 'valid'])
structure_list_bear_1 = []
structure_list_bull_1 = []
structure_list_3 = []
atrs = []
from_date = f'{2022}-04-20 00:00:00'
to_date = str(pd.Timestamp(from_date)+relativedelta(months=4)+datetime.timedelta(days=0)-datetime.timedelta(seconds=1))
df_1 = initialize_data(df, temporalities, from_date, to_date, drop_weekends=False).dropna(subset=['Open', 'High', 'Low', 'Close'])


trade_strategy = OGSmartMoneyConcepts
trade_strategy.t_init=0
trade_strategy.t_inner=0
trade_strategy.t_inner2=0
trade_strategy.t_sw=0
trade_strategy.t_sw2=0
trade_strategy.t_init2=0

trade_strategy.risk_ratio = 200
trade_strategy.sl_percent = 100
trade_strategy.risk_pct = 10
trade_strategy.margin = 1/7
trade_strategy.short_trading = short_trading
trade_strategy.long_trading = long_trading #not trade_strategy.short_trading
trade_strategy.trailing_sl = trailing_sl
trade_strategy.order_duration = 48
trade_strategy.revancha = False
trade_strategy.ob_mitigation = True


trade_strategy.all_order_blocks = pd.DataFrame(columns=['min_val', 'max_val', 'ob_creation_time', 'ob_time', 'ob_type', 'limit', 'ob_width', 'stop_loss', 'take_profit', 'id', 'not_mitigated', 'order_placed', 'mitigation_time', 'temporality'])
trade_strategy.order_blocks = pd.DataFrame(columns=['min_val', 'max_val', 'ob_creation_time', 'ob_time', 'ob_type', 'limit', 'ob_width', 'stop_loss', 'take_profit', 'id', 'not_mitigated', 'order_placed', 'mitigation_time', 'temporality'])
trade_strategy.inner_structure = pd.DataFrame(columns=['structure_type', 'start_time', 'end_time', 'price', 'trend', 'type', 'temporality'])
trade_strategy.sw_structure = pd.DataFrame(columns=['structure_type', 'start_time', 'end_time', 'price', 'trend', 'type', 'temporality'])
trade_strategy.trends = pd.DataFrame(False, index=[1, 2, 3], columns=['itop', 'itop_cross', 'ibtm', 'ibtm_cross', 'itrend', 'top', 'top_cross', 'btm', 'btm_cross', 'trend'])
trade_strategy.trends1 = np.full((3,6), 1)
trade_strategy.trends1[:,[2,4]]=0


ob_list = []
bt = Backtest(
    df_1,
    trade_strategy,
    cash=500000.0,
    commission=0.0006,
    margin=1/7,
    trade_on_close=True,
)
stats = bt.run()
print(stats)

bt.plot(resample=False, filename=f'SmartMoneyConcepts_{from_date[:10]}-{to_date[:10]}dates_{temporalities["2"]["sp"]}-{temporalities["3"]["sp"]}swing_{temporalities["2"]["t"]}-{temporalities["3"]["t"]}candles.html', plot_equity=True, plot_return=False, superimpose=f"{temporalities['3']['t']}T", show_legend=True, smooth_equity=True, plot_pl=False, relative_equity=False, open_browser=True)


# OPTIMIZATION



## Optimization for SmartMoneyConcepts Strategy

This section iterates over different combinations of temporalities and strategy-specific parameters. It employs a grid search optimization technique provided by the Backtesting.py library.

### Key Components

- **Temporalities**: Configures different time frames (`t_1`, `t_2`, `t_3`) for the strategy, each with specific swing periods (`sp`) and impulse periods (`ip`).
- **Strategy Parameters**: Sets parameters such as `risk_ratio`, `sl_percent` (stop loss percentage), `risk_pct`, `margin`, `short_trading`, `long_trading`, `trailing_sl`, `order_duration`, `revancha`, and `ob_mitigation`.
- **Backtesting Setup**: Initializes the backtesting environment with a starting cash balance, commission rate, and margin requirement.
- **Optimization Process**: Iterates over specified ranges of parameters like `risk_ratio`, `sl_percent`, `order_duration`, and `revancha` to find the optimal combination.
- **Results Management**: Stores and outputs the results of each optimization run, both in a DataFrame (`results`) and a CSV file for further analysis.

### Script Execution Steps

1. **Parameter Initialization**: Set initial temporalities and trading flags.
2. **Date Range Setup**: Define the date range for the backtesting data.
3. **Optimization Loop**: Iterate over combinations of temporalities.
   - Inside each iteration, configure the strategy with the chosen temporalities.
   - Initialize the backtest with `SmartMoneyConcepts` strategy.
   - Perform optimization using `bt.optimize()`.
   - Store and log results for each combination.
4. **Results Storage**: Collect results in a DataFrame and export to a CSV file for further analysis.
5. **Best Results Identification**: Maintain a list of best-performing parameter combinations for reference.

### How to Use the Script

1. **Prepare Data**: Ensure your dataset (OHLCV format) is ready and covers the required date range.
2. **Configure Initial Parameters**: Set initial values for temporalities and other strategy parameters.
3. **Run the Script**: Execute the script to start the optimization process.
4. **Analyze Results**: After completion, analyze the results stored in the DataFrame and CSV file.
5. **Adjust Strategy**: Use insights from the optimization to fine-tune your trading strategy.

In [None]:
from_date = pd.Timestamp('2022-04-01 00:00:00') + relativedelta(months=1)
to_date = from_date + relativedelta(months=1)-relativedelta(seconds=1)

In [None]:
risk_ratio = range(150, 251, 25)
sl_percent = range(100, 121, 10)
trailing_sl = [True, False] # por ahora desactivamos
order_duration = range(36,61,12)
revancha = [True, False]
mitigacion = [True, False] 
guia = [True, False] # por ahora probamos en true
k=0
#optimize
for t_1 in risk_ratio:
    for t_2 in sl_percent:
        for t_3 in trailing_sl:
                for t_4 in order_duration:
                    for t_5 in revancha:
                        k+=1
print("Expected runs: ",k)

In [None]:
from itertools import product
# intialize run parameters
temporalities = {'1':{'t':1, 'sp':50, 'ip':5}
                 , '2':{'t':5, 'sp':50, 'ip':5}
                 , '3':{'t':15, 'sp':50, 'ip':5}
                 }
short_trading = True
long_trading = True #not trade_strategy.short_trading
trailing_sl = False


from_date = pd.Timestamp('2022-06-01 00:00:00')
to_date = from_date + relativedelta(months=12)-relativedelta(seconds=1)

results = pd.DataFrame()
best_results = []
i = 1

#optimize
for t_1 in product([1], [10], [5]):
    for t_2 in product([5], [10], [5]):
        for t_3 in product([15], [10], [5]):
                clear_output()
                print(f"run number: {i}, {k-i} to go", t_1, t_2, t_3)
                i+=1
                ob_df_2 = pd.DataFrame(columns=['min_val', 'max_val', 'ob_creation_time', 'ob_time', 'ob_type', 'limit', 'ob_width', 'stop_loss', 'take_profit', 'id', 'valid'])

                temporalities["1"] = {'t':t_1[0], 'sp':t_1[1], 'ip':t_1[2]}
                temporalities["2"] = {'t':t_2[0], 'sp':t_2[1], 'ip':t_2[2]}
                temporalities["3"] = {'t':t_3[0], 'sp':t_3[1], 'ip':t_3[2]}

                df_1 = initialize_data(df, temporalities, from_date, to_date).dropna(subset=['Open', 'High', 'Low', 'Close'])

                # initialize backtest
                trade_strategy = SmartMoneyConcepts

                trade_strategy.risk_ratio = 275
                trade_strategy.sl_percent = 110
                trade_strategy.risk_pct = 10
                trade_strategy.margin = 1/7
                trade_strategy.short_trading = short_trading
                trade_strategy.long_trading = long_trading #not trade_strategy.short_trading
                trade_strategy.trailing_sl = trailing_sl
                trade_strategy.order_duration = 48
                trade_strategy.revancha = False
                trade_strategy.ob_mitigation = True
                
                
                ob_list = []
                bt = Backtest(
                    df_1,
                    trade_strategy,
                    cash=5000.0,
                    commission=0.0006,
                    margin=1/7,
                    trade_on_close=True,
                )
                stats, heatmap = bt.optimize(
                    # n_ob = range(3, 7, 1),
                    risk_ratio = range(150, 251, 25),
                    sl_percent = range(100, 121, 10),
                    # trailing_sl = [True, False],
                    order_duration = range(36,61,12),
                    # long_trading = [True, False],
                    # short_trading = [True, False],
                    # constraint = lambda p: p.long_trading or p.short_trading,
                    # swing_p = range(50, 200, 10),
                    revancha = [True, False],
                    # mitigacion = [True, False],
                    # constraint=lambda p: p.n_exit < p.n_enter < p.n1 < p.n2,
                    maximize='Equity Final [$]',
                    max_tries=150,
                    random_state=0,
                    return_heatmap=True)
                # concatenate results series
                # add t1, t2, t3 index to heatmap series without replacing existing index
                htmp = heatmap.reset_index()
                htmp['t_1'] = [(t_1)]*len(htmp)
                htmp['t_2'] = [(t_2)]*len(htmp)
                htmp['t_3'] = [(t_3)]*len(htmp)
                htmp['date'] = f"{from_date.month}-{from_date.year}"
                
                results = pd.concat([results, htmp])
                del df_1
                

                best_results.append((stats, t_1, t_2, t_3, f"{from_date.month}-{from_date.year}"))
                results.to_csv("mid_temp_optimization_w_mitigation.csv", index=False)


In [None]:
results

In [None]:
plot_heatmaps(results.drop(['t_1', 't_2', 't_3', 'date'], axis=1).set_index(['risk_ratio', 'sl_percent', 'trailing_sl', 'order_duration']).iloc[:,0], agg='std')

## save trades as CSV

In [None]:
order_blocks = pd.DataFrame(data=ob_list_2)
order_blocks.tail()
trade_ob_comparison = order_blocks.merge(stats._trades, how='left', left_on='limit', right_on='EntryPrice')
empty_ob = trade_ob_comparison[trade_ob_comparison['EntryPrice'].isna()].copy()
empty_ob = empty_ob[empty_ob.columns[~empty_ob.isna().all()]]
trade_ob_comparison
empty_ob = trade_ob_comparison[trade_ob_comparison.columns[~trade_ob_comparison.isna().any()]]
unassigned_trades = stats._trades[~stats._trades['EntryPrice'].isin(trade_ob_comparison['EntryPrice'])].copy()
temp_df = empty_ob.assign(key=1).merge(unassigned_trades.assign(key=1), on='key').drop('key', axis=1)

temp_df['price_diff'] = (temp_df['EntryPrice']-temp_df['limit']).abs()

trade_ob_comparison = trade_ob_comparison.merge(df_1[['Open', 'High', 'Low', 'Close']], left_on='EntryTime', right_index=True, how='left')
trade_ob_comparison['ob_width'] = trade_ob_comparison['max_val'] - trade_ob_comparison['min_val']
trade_ob_comparison['risked_equity'] = trade_ob_comparison['ob_width']*trade_ob_comparison['Size']
trade_ob_comparison['valid_sl'] = ~(((trade_ob_comparison['High'] > trade_ob_comparison['stop_loss']) & (trade_ob_comparison['ob_type'] == 0)) | ((trade_ob_comparison['Low'] < trade_ob_comparison['stop_loss']) & (trade_ob_comparison['ob_type'] == 1)))
trade_ob_comparison['valid_tp'] = ~(((trade_ob_comparison['High'] > trade_ob_comparison['take_profit']) & (trade_ob_comparison['ob_type'] == 1)) | ((trade_ob_comparison['Low'] < trade_ob_comparison['take_profit']) & (trade_ob_comparison['ob_type'] == 0)))
trade_ob_comparison.to_csv('trade_ob_comparison_1e-3.csv', index=False)
trade_ob_comparison.head()
trade_ob_comparison[['min_val', 'max_val', 'EntryPrice', 'ExitPrice', 'stop_loss', 'take_profit', 'limit', 'ob_width']] = trade_ob_comparison[['min_val', 'max_val', 'EntryPrice', 'ExitPrice', 'stop_loss', 'take_profit', 'limit', 'ob_width']]*1e3
trade_ob_comparison['Size']/=1e3
trade_ob_comparison.to_csv('trade_ob_comparison.csv', index=False)

In [None]:
# Creating a DataFrame from order block list and displaying the last few rows
order_blocks = pd.DataFrame(data=ob_list_2)
order_blocks.tail()

# Merging order blocks with trade statistics based on limit price and entry price
trade_ob_comparison = order_blocks.merge(stats._trades, how='left', left_on='limit', right_on='EntryPrice')

# Identifying and isolating order blocks without corresponding trades
empty_ob = trade_ob_comparison[trade_ob_comparison['EntryPrice'].isna()].copy()
empty_ob = empty_ob[empty_ob.columns[~empty_ob.isna().all()]]
trade_ob_comparison

# Filtering columns in trade_ob_comparison that don't have any NaN values
empty_ob = trade_ob_comparison[trade_ob_comparison.columns[~trade_ob_comparison.isna().any()]]

# Identifying trades that are not assigned to any order block
unassigned_trades = stats._trades[~stats._trades['EntryPrice'].isin(trade_ob_comparison['EntryPrice'])].copy()

# Merging the empty order blocks with unassigned trades and calculating price differences
temp_df = empty_ob.assign(key=1).merge(unassigned_trades.assign(key=1), on='key').drop('key', axis=1)
temp_df['price_diff'] = (temp_df['EntryPrice'] - temp_df['limit']).abs()

# Merging trade and order block data with market data and calculating order block width
trade_ob_comparison = trade_ob_comparison.merge(df_1[['Open', 'High', 'Low', 'Close']], left_on='EntryTime', right_index=True, how='left')
trade_ob_comparison['ob_width'] = trade_ob_comparison['max_val'] - trade_ob_comparison['min_val']

# Calculating risked equity and checking the validity of stop loss and take profit
trade_ob_comparison['risked_equity'] = trade_ob_comparison['ob_width'] * trade_ob_comparison['Size']
trade_ob_comparison['valid_sl'] = ~(((trade_ob_comparison['High'] > trade_ob_comparison['stop_loss']) & (trade_ob_comparison['ob_type'] == 0)) | ((trade_ob_comparison['Low'] < trade_ob_comparison['stop_loss']) & (trade_ob_comparison['ob_type'] == 1)))
trade_ob_comparison['valid_tp'] = ~(((trade_ob_comparison['High'] > trade_ob_comparison['take_profit']) & (trade_ob_comparison['ob_type'] == 1)) | ((trade_ob_comparison['Low'] < trade_ob_comparison['take_profit']) & (trade_ob_comparison['ob_type'] == 0)))

# Exporting the comparison data to CSV after adjusting for scale
trade_ob_comparison.to_csv('trade_ob_comparison_1e-3.csv', index=False)
trade_ob_comparison[['min_val', 'max_val', 'EntryPrice', 'ExitPrice', 'stop_loss', 'take_profit', 'limit', 'ob_width']] = trade_ob_comparison[['min_val', 'max_val', 'EntryPrice', 'ExitPrice', 'stop_loss', 'take_profit', 'limit', 'ob_width']] * 1e3
trade_ob_comparison['Size'] /= 1e3
trade_ob_comparison.to_csv('trade_ob_comparison.csv', index=False)


# PLOT FINANCE

In [None]:
def draw_structure(fig, structure_df, prefix='i'):
    # add a horizontal line for each structure component
    for index, item in structure_df[(structure_df[f'{prefix}trend']==1)].iterrows():
        # item = structure_list[i]
        fig.add_shape(
            type="line"
            , x0=item['start_time'], x1=item['end_time']
            , y0=item['price'], y1=item['price']
            , line=dict(
                width=2,
                dash='dashdot' if item['structure_type']=='CHoCH' else 'dot',
                color='green'
            )
        )
    # add a horizontal line for each structure component
    for index, item in structure_df[(structure_df[f'{prefix}trend']==-1)].iterrows():
        # item = structure_list[i]
        fig.add_shape(
            type="line"
            , x0=item['start_time'], x1=item['end_time']
            , y0=item['price'], y1=item['price']
            , line=dict(
                width=2,
                dash='dashdot' if item['structure_type']=='CHoCH' else 'dot',
                color='red'
            )
        )

In [None]:
order_blocks = trade_strategy.order_blocks.copy()
order_blocks.head(1)
order_blocks.tail(1)

In [None]:
order_blocks.ob_creation_time.min(), order_blocks.ob_creation_time.max()

In [None]:
trade_ob_comparison = order_blocks.merge(stats._trades, how='outer', right_on='OrderBlock_id', left_on='id')

In [None]:
trade_ob_comparison = trade_ob_comparison.assign(key=1).merge(trade_ob_comparison.assign(key=1), on='key', suffixes=['', '_long'])

In [None]:
trade_ob_comparison = trade_ob_comparison[((trade_ob_comparison['limit']>=trade_ob_comparison['limit_long']) & (trade_ob_comparison['take_profit']<=trade_ob_comparison['take_profit_long']))
|((trade_ob_comparison['limit']<=trade_ob_comparison['limit_long']) & (trade_ob_comparison['take_profit']>=trade_ob_comparison['take_profit_long']))
&((trade_ob_comparison['temporality']=='3') | ((trade_ob_comparison['temporality_long']=='3')&(trade_ob_comparison['temporality']=='2')))].drop_duplicates(subset='id')

In [None]:
trade_ob_comparison['temporality'].value_counts()

In [None]:
import plotly.graph_objects as go

df_prueba = df_1.copy()
candlestick = go.Candlestick(
                            x=df_1.index,
                            open=df_prueba['Open'],
                            high=df_prueba['High'],
                            low=df_prueba['Low'],
                            close=df_prueba['Close']
                            )

fig = go.Figure(data=[candlestick], layout=go.Layout(yaxis={'fixedrange': False}))

In [None]:
trade_strategy.sw_structure[trade_strategy.sw_structure['temporality']=='3']

In [None]:
trade_strategy.order_blocks[trade_strategy.order_blocks['temporality']=='3']

In [None]:
trade_ob_comparison.ob_creation_time.min()

In [None]:
# Create the figure and add the candlestick trace
for i, block in trade_ob_comparison.iterrows():
    ## omit blocks earlier than '2023-02-16 00:00:00'
    ## create blocks of length 10 bars of 5 minutes
    
    fig.add_shape(
        type="rect"
        , x0=block['ob_time'], x1=block['ob_creation_time']
        , y0=block['min_val'], y1=block['max_val']
        , fillcolor='blue'
        , opacity=0.2
        , line=dict(
            width=0,
            color='blue'
        )
    )
    
    fig.add_shape(
        type="rect"
        , x0=block['ob_creation_time'], x1=min(block[2]+datetime.timedelta(hours=trade_strategy.order_duration*10),df_1.index.max()) if pd.isnull(block['mitigation_time']) else block['mitigation_time']
        , y0=block['limit'], y1=block['stop_loss']
        , fillcolor='cyan' if block['temporality']=='3' else 'red'
        , opacity=0.3 if block['temporality']=='3' else 0.3
        , line=dict(
            width=0,
            color='red'
        )
    )
    
    fig.add_shape(
        type="rect"
        , x0=block['ob_creation_time'], x1=min(block[2]+datetime.timedelta(hours=trade_strategy.order_duration*10), df_1.index.max()) if pd.isnull(block['mitigation_time']) else block['mitigation_time']
        , y0=block['limit'], y1=block['take_profit']
        , fillcolor='cyan' if block['temporality']=='3' else 'green'
        , opacity=0.5 if block['temporality']=='3' else 0.3
        , line=dict(
            width=0,
            color='green'
        )
    )
fig.write_html('candlestick.html', auto_open=True, )

In [None]:
structure_df = pd.DataFrame(structure_list_bear_1+structure_list_bull_1)
structure_df = trade_strategy.inner_structure.copy()

In [None]:
temp_df = trade_ob_comparison.assign(key=1).merge(structure_df.assign(key=1), on='key').drop('key', 1)

structure_df = temp_df[(temp_df['EntryTime'] < temp_df['start_time']) & (temp_df['end_time'] < temp_df['ExitTime'])|
(temp_df['EntryTime']>temp_df['start_time']) & (temp_df['EntryTime']<temp_df['end_time'])|
(temp_df['ExitTime']>temp_df['start_time']) & (temp_df['ExitTime']<temp_df['end_time'])].drop_duplicates(subset=['start_time', 'end_time'])

In [None]:
trade_strategy.sw_structure

In [None]:
structure_df_3 = trade_strategy.sw_structure.copy()
draw_structure(fig, structure_df_3, prefix='')

In [None]:
fig.write_html('candlestick.html', auto_open=True, )

In [None]:
df_1['2023-05-28 15:00:00':'2023-05-28 21:00:00'][df_1.columns[df_1.columns.str.contains('_3')].to_list()+ ['Close']]

In [None]:
df_1[df_1['new_top_3']][df_1.columns[df_1.columns.str.contains('_3')]]

In [None]:
trade_strategy.sw_structure