In [None]:
import sys
sys.path.append("../..")

import random
from time import sleep
from typing import List, Dict, Callable, Optional
from itertools import permutations
from datetime import datetime, timedelta
from IPython.display import display

import numpy as np
import pandas as pd
import pandas_ta as ta

import plotly.graph_objects as go
from plotly.subplots import make_subplots
from lightweight_charts import Chart

from sesto.indicators import SMA, EMA, RSI, ROC, MACD, BB, ATR
from sesto.constants import CURRENCY_PAIRS, MT5Timeframe
import sesto.metatrader.data as mtd
from sesto.plot import plot_tradingview, plot_plotly
from sesto.backtester import Backtester, Trade
from sesto.utils import get_pnl_at_price, calculate_position_size, get_price_at_pnl, calculate_commission, calculate_break_even_price, calculate_price_with_spread, calculate_liquidation_price
from sesto.fractal import high_low_finder


In [None]:
PAIRS = ['BITCOIN', 'ETHEREUM', 'SOLANA']
DISPLAY_SYMBOL = 'BITCOIN'

TIMEFRAMES = [MT5Timeframe.M15]
DATA_FROM_DATE = datetime(2024, 4, 5)
DATA_TO_DATE = datetime(2024, 9, 28)

MA_PERIODS = [7, 14, 21]

CSM_PERIOD = 21

In [None]:
MAIN_TIMEFRAME = MT5Timeframe.M15

INITIAL_CAPITAL = 10000
CAPITAL_PER_TRADE = 200

TP_PNL_MULTIPLIER = 0.5
SL_PNL_MULTIPLIER = -0.25

TRAILING_STOP_STEPS = [
    {'trigger_tp_multiplier': 0.50, 'new_sl_pnl_multiplier': 0.15},
    {'trigger_tp_multiplier': 0.25, 'new_sl_pnl_multiplier': 0.05},
    {'trigger_tp_multiplier': 0.10, 'new_sl_pnl_multiplier': 0.01},
]

SPREAD_MULTIPLIER = 0.0002
LEVERAGE = 500

In [None]:
mtd.fetch_data_all_timeframes(PAIRS, TIMEFRAMES, DATA_FROM_DATE, DATA_TO_DATE)

In [None]:
# Remove symbols with no data for any timeframe
for timeframe in TIMEFRAMES:
    empty_symbols = [symbol for symbol, df in mtd.data[timeframe].items() if df is None or df.empty]
    for symbol in empty_symbols:
        del mtd.data[timeframe][symbol]
        if symbol in PAIRS:
            PAIRS.remove(symbol)

print(f"Remaining pairs after removing empty data: {len(PAIRS)}")

In [None]:
for timeframe, pairs_data in mtd.data.items():
    for pair, df in pairs_data.items():
        if df is not None and not df.empty:
            for period in MA_PERIODS:
                SMA(df, period)
                EMA(df, period)
                ROC(df, period)
                RSI(df, period)
                BB(df, period, 2)
                ATR(df, period)
    
            df['fractal'] = high_low_finder(df)
            print(f'Successfully calculated fractal for {pair} in {timeframe}')

display(mtd.data[MAIN_TIMEFRAME][DISPLAY_SYMBOL].head())

In [None]:
class MyStrategy(Backtester):    
    def entry_condition(
        self, 
        symbol: str, 
        time: datetime, 
        row: pd.Series, 
        open_trades: List[Trade], 
        closed_trades: List[Trade], 
        timeframe: MT5Timeframe,
    ) -> Optional[Dict]:
        # Check if we already have an open trade for this symbol
        if any(trade.symbol == symbol for trade in open_trades):
            return None

        fractal = row.get('fractal')

        entry = None
        entry_price = row['close']
        size = calculate_position_size(capital=CAPITAL_PER_TRADE, leverage=LEVERAGE)
        fee = calculate_commission(position_size_usd=size) 

        if fractal == 'bottom':
            print(f"{time} - {symbol} - ENTRY CONDITION - BOTTOM FRACTAL DETECTED - CLOSE: ${row['close']:.3f}")
            sl = get_price_at_pnl(pnl_multiplier=SL_PNL_MULTIPLIER, order_fee=fee, position_size_usd=size, leverage=LEVERAGE, entry_price=entry_price, type='long')
            tp = get_price_at_pnl(pnl_multiplier=TP_PNL_MULTIPLIER, order_fee=fee, position_size_usd=size, leverage=LEVERAGE, entry_price=entry_price, type='long')

            entry = {
                'entry_price': entry_price,
                'type': 'long',
                'capital': CAPITAL_PER_TRADE,
                'tp_price': tp,
                'sl_price': sl
            }
        elif fractal == 'top':
            print(f"{time} - {symbol} - ENTRY CONDITION - TOP FRACTAL DETECTED - CLOSE: ${row['close']:.3f}")
            sl = get_price_at_pnl(pnl_multiplier=SL_PNL_MULTIPLIER, order_fee=fee, position_size_usd=size, leverage=LEVERAGE, entry_price=entry_price, type='short')
            tp = get_price_at_pnl(pnl_multiplier=TP_PNL_MULTIPLIER, order_fee=fee, position_size_usd=size, leverage=LEVERAGE, entry_price=entry_price, type='short')

            entry = {
                'entry_price': entry_price,
                'type': 'short',
                'capital': CAPITAL_PER_TRADE,
                'tp_price': tp,
                'sl_price': sl
            }

        # Ensure no open trades for the symbol
        if entry and not any(trade.symbol == symbol for trade in open_trades):
            return entry

        return None

    def exit_condition(
        self, 
        trade: Trade, 
        time: datetime, 
        row: pd.Series, 
        open_trades: List[Trade], 
        closed_trades: List[Trade], 
        timeframe: MT5Timeframe
    ) -> bool:
        return False

    def trailing_stop(
        self, 
        trade: Trade, 
        time: datetime, 
        row: pd.Series, 
        open_trades: List[Trade], 
        closed_trades: List[Trade], 
        timeframe: MT5Timeframe
    ):
        """
        Implement a trailing stop that adjusts the stop loss to make the trade risk-free
        once the unrealized PnL exceeds 5% of the capital allocation.
        """
        
        for trailing_step in TRAILING_STOP_STEPS:
            trigger_tp_multiplier, new_sl_pnl_multiplier = trailing_step['trigger_tp_multiplier'], trailing_step['new_sl_pnl_multiplier']
            pnl_threshold = TP_PNL_MULTIPLIER * trigger_tp_multiplier * CAPITAL_PER_TRADE

            if trade.unrealized_pnl >= pnl_threshold:
                old_sl_price = trade.sl_price

                if trade.type == 'long':
                    new_sl_price = get_price_at_pnl(pnl_multiplier=new_sl_pnl_multiplier, order_fee=trade.order_fee, position_size_usd=trade.position_size_usd, leverage=trade.leverage, entry_price=trade.entry_price, type='long')
                    if new_sl_price > old_sl_price:
                        trade.sl_price = new_sl_price
                else:
                    new_sl_price = get_price_at_pnl(pnl_multiplier=new_sl_pnl_multiplier, order_fee=trade.order_fee, position_size_usd=trade.position_size_usd, leverage=trade.leverage, entry_price=trade.entry_price, type='short')
                    if new_sl_price < old_sl_price:
                        trade.sl_price = new_sl_price
                
                if trade.sl_price != old_sl_price:
                    capital_gain_at_current_unrealized_pnl = (trade.unrealized_pnl / trade.capital) * 100
                    old_sl_price_diff = (trade.entry_price / old_sl_price - 1) * 100
                    new_sl_price_diff = (trade.entry_price / trade.sl_price - 1) * 100
                    pnl_at_old_sl = get_pnl_at_price(current_price=old_sl_price, entry_price=trade.entry_price, position_size_usd=trade.position_size_usd, leverage=trade.leverage, type=trade.type)
                    pnl_at_new_sl = get_pnl_at_price(current_price=trade.sl_price, entry_price=trade.entry_price, position_size_usd=trade.position_size_usd, leverage=trade.leverage, type=trade.type)
                    capital_loss_at_old_sl = (pnl_at_old_sl / trade.capital) * 100
                    capital_loss_at_new_sl= (pnl_at_new_sl / trade.capital) * 100

                    print(f"{row['time']} - {trade.symbol} - TRAILING STOP - UNREALIZED PNL: ${trade.unrealized_pnl:.3f} ({capital_gain_at_current_unrealized_pnl:.3f}% CAP) - OLD SL: ${old_sl_price:.3f} ({old_sl_price_diff:.3f}%)({capital_loss_at_old_sl:.3f}% CAP) - NEW SL: ${trade.sl_price:.3f} ({new_sl_price_diff:.3f}%)({capital_loss_at_new_sl:.3f}% CAP)")
                    break  # Exit the loop after adjusting the stop loss


In [None]:
backtest = MyStrategy(
    mtd.data, 
    initial_capital=INITIAL_CAPITAL, 
    main_timeframe=MAIN_TIMEFRAME,
    spread_multiplier=SPREAD_MULTIPLIER,
    leverage=LEVERAGE
)

backtest.run()

In [None]:
trade_log = pd.DataFrame(backtest.trade_log)
trade_log[['type', 'entry_price', 'tp_price', 'sl_price', 'liq_p','pnl', 'potential_loss_usd']]
# trade_log.to_csv('./log.csv')
trade_query = trade_log[trade_log['symbol'] == DISPLAY_SYMBOL]
trade_query.head()

In [None]:
performance_report = backtest.generate_report()
performance_report
# performance_report.to_csv('./performance_report.csv')

In [None]:
trades_per_symbol = trade_log['symbol'].value_counts()

# Create a bar chart
fig = go.Figure(data=[go.Bar(
    x=trades_per_symbol.index,
    y=trades_per_symbol.values,
    text=trades_per_symbol.values,
    textposition='auto',
)])

# Update the layout
fig.update_layout(
    title='Number of Trades per Symbol',
    xaxis_title='Symbol',
    yaxis_title='Number of Trades',
    height=600,
    width=1000,
    template='plotly_dark'
)

# Show the plot
fig.show()

In [None]:
import plotly.graph_objects as go

# Calculate total PnL for each symbol
pnl_per_symbol = trade_log.groupby('symbol')['pnl'].sum().sort_values(ascending=False)

# Create color array
colors = ['green' if pnl > 0 else 'red' for pnl in pnl_per_symbol.values]

# Create a bar chart
fig = go.Figure(data=[go.Bar(
    x=pnl_per_symbol.index,
    y=pnl_per_symbol.values,
    text=pnl_per_symbol.values.round(2),
    textposition='auto',
    marker_color=colors
)])

# Update the layout
fig.update_layout(
    title='Total PnL per Symbol',
    xaxis_title='Symbol',
    yaxis_title='Total PnL ($)',
    height=600,
    width=1000,
    template='plotly_dark'
)

# Show the plot
fig.show()

In [None]:
# Sort the trade log by entry time
sorted_trade_log = trade_log.sort_values('entry_time')

# Create a DataFrame to store the equity curve
equity_curve = pd.DataFrame(columns=['time', 'equity'])

# Add the initial point with the initial capital
initial_point = pd.DataFrame({'time': [sorted_trade_log['entry_time'].min()], 'equity': [INITIAL_CAPITAL]})
equity_curve = pd.concat([equity_curve, initial_point], ignore_index=True)

# Calculate cumulative PnL and add it to the initial capital
cumulative_pnl = sorted_trade_log['pnl'].cumsum()
trade_equity = pd.DataFrame({'time': sorted_trade_log['close_time'], 'equity': INITIAL_CAPITAL + cumulative_pnl})
equity_curve = pd.concat([equity_curve, trade_equity], ignore_index=True)

# Sort the equity curve by time
equity_curve = equity_curve.sort_values('time')

# Create the plot
fig = make_subplots(rows=1, cols=1, shared_xaxes=True)

# Add equity curve
fig.add_trace(
    go.Scatter(x=equity_curve['time'], y=equity_curve['equity'], name='Equity', line=dict(color='purple')),
    row=1, col=1
)

# Update layout
fig.update_layout(
    title='Portfolio Equity Curve',
    xaxis_title='Date',
    yaxis_title='Equity ($)',
    height=600,
    width=1000,
    showlegend=True,
    template='plotly_dark'
)

# Show the plot
fig.show()

In [None]:
o_df = mtd.data[MAIN_TIMEFRAME][DISPLAY_SYMBOL]
df_first_row = o_df.iloc[:1]  # Selecting the first row
df = o_df.iloc[1:]       # Selecting the remaining rows

chart = Chart()
chart.set(df_first_row)
chart.show()

def create_line(start_time, end_time, entry_price, close_price, name):
    return pd.DataFrame({
        'time': [start_time, end_time],
        name: [entry_price, close_price]
    })

for i, row in df.iterrows():
    chart.update(row)

    trade_entry_query = trade_log[
        (trade_log['entry_time'] == row['time']) & 
        (trade_log['symbol'] == DISPLAY_SYMBOL)
    ]
    
    if not trade_entry_query.empty:
        trade = trade_entry_query.iloc[0]  # Get the first row
        chart.marker(color="white", text=f'{trade.type} opened at ${trade.entry_price:.3f}', position='below' if trade.type == 'long' else 'above', shape='arrow_up' if trade.type == 'long' else 'arrow_down')
        
        # Create TP line
        tp_line = chart.create_line('TP', color='green', style='dashed', price_label=False, price_line=False)
        tp_data = create_line(trade.entry_time, trade.close_time, trade.tp_price, trade.tp_price,'TP')
        tp_line.set(tp_data)
        
        # Create TP line
        tp_vertical_line1 = chart.create_line('tp_line_1', color='green', style='dashed', price_label=False, price_line=False)
        tp_vertical_line1_data = create_line(trade.entry_time, trade.entry_time, trade.entry_price, trade.tp_price,'tp_line_1')
        tp_vertical_line1.set(tp_vertical_line1_data)
        
        # Create TP line
        tp_vertical_line2 = chart.create_line('tp_line_2', color='green', style='dashed', price_label=False, price_line=False)
        tp_vertical_line2_data = create_line(trade.close_time, trade.close_time, trade.entry_price, trade.tp_price,'tp_line_2')
        tp_vertical_line2.set(tp_vertical_line2_data)
        
        # Create SL line
        sl_line = chart.create_line('SL', color='red', style="dashed", price_label=False, price_line=False)
        sl_data = create_line(trade.entry_time, trade.close_time, trade.sl_price, trade.sl_price, 'SL')
        sl_line.set(sl_data)

        sl_vertical_line1 = chart.create_line('sl_line_1', color='red', style='dashed', price_label=False, price_line=False)
        sl_vertical_line1_data = create_line(trade.entry_time, trade.entry_time, trade.entry_price, trade.sl_price,'sl_line_1')
        sl_vertical_line1.set(sl_vertical_line1_data)
        
        # Create sl line
        sl_vertical_line2 = chart.create_line('sl_line_2', color='red', style='dashed', price_label=False, price_line=False)
        sl_vertical_line2_data = create_line(trade.close_time, trade.close_time, trade.entry_price, trade.sl_price,'sl_line_2')
        sl_vertical_line2.set(sl_vertical_line2_data)

        trade_line = chart.create_line('TRADE', color='white', price_label=False, price_line=False)
        trade_data = create_line(trade.entry_time, trade.close_time, trade.entry_price, trade.close_price, 'TRADE')
        trade_line.set(trade_data)
    
    trade_close_query = trade_log[
        (trade_log['close_time'] == row['time']) & 
        (trade_log['symbol'] == DISPLAY_SYMBOL)
    ]

    if not trade_close_query.empty:
        trade = trade_close_query.iloc[0]  # Get the first row
        trade_text = f'Closed at ${trade.close_price:.3f} - PNL: {trade.pnl / trade.capital * 100:.3f}% - ${trade.pnl:.3f}'
        if trade.pnl > 0:
            chart.marker(color="green", position="inside", shape='circle', text=trade_text)
        else:
            chart.marker(color="red", position="inside", shape="circle", text=trade_text)


    # Conditional sleep based on main_timeframe
    if MAIN_TIMEFRAME == MT5Timeframe.M1:
        sleep(0.0025)
    elif MAIN_TIMEFRAME == MT5Timeframe.M5:
        sleep(0.005)
    elif MAIN_TIMEFRAME == MT5Timeframe.M15:
        sleep(0.01)
    elif MAIN_TIMEFRAME == MT5Timeframe.M30:
        sleep(0.03)
    elif MAIN_TIMEFRAME == MT5Timeframe.H1:
        sleep(0.05)
