In [76]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import datetime
from datetime import timedelta, datetime
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go

In [77]:
class BacktestResults:
    def __init__(self, df, cash, commission):
        self.df = df
        self.cash = cash
        self.commission = commission
        self.portfolio = [self.cash]
        self.bench_port = [self.cash]
        self.pnl = []
        self.pnl_up = []
        self.pnl_down = []
        self.pnl_net = [0]
        self.num_trades = 0
        self.long_trades = 0
        self.short_trades = 0
        self.fee_static = 0
        self.fee_comp = 0
        self.entries = {}
        self.exits = {}
        self.long_open_indices = []
        self.short_open_indices = []
        self.short_squareoff_indices = []
        self.long_squareoff_indices = []
        self.position = None
        self.bench_shares = self.cash / self.df['Price'][0]

        for i in range(len(self.df)-1):
            signal = self.df['signals'].iloc[i]
            if self.portfolio[-1] <= 0:
                self.portfolio[-1] = 0
                break

            if signal == 1:  # Long signal
                if self.position is None:
                    entry = self.df['Price'][i+1]
                    shares_comp = self.portfolio[-1] / abs(entry)
                    shares_static = self.cash / abs(entry)
                    self.position = 'Long'
                    self.long_open_indices.append(i+1)
                    self.entries[i] = entry

            elif signal == -1:  # Short signal
                if self.position == 'Long':
                    exit = self.df['Price'][i+1]
                    pnl_comp = shares_comp * (exit - entry) - self.commission * shares_comp * (entry + exit)
                    pnl_static = shares_static * (exit - entry) - self.commission * shares_static * (entry + exit)
                    self.portfolio.append(self.portfolio[-1] + pnl_comp)
                    self.pnl.append(pnl_static)
                    self.pnl_net.append(self.pnl_net[-1] + pnl_static)
                    self.num_trades += 1
                    self.long_trades += 1

                    # Record PnL
                    if pnl_static > 0:
                        self.pnl_up.append(pnl_static)
                    else:
                        self.pnl_down.append(pnl_static)

                    self.position = None
                    self.long_squareoff_indices.append(i+1)
                    self.exits[i] = exit
                    self.bench_port.append(self.bench_shares * exit)

                elif self.position is None:
                    entry = self.df['Price'][i+1]
                    shares_comp = self.portfolio[-1] / abs(entry)
                    shares_static = self.cash / abs(entry)
                    self.position = 'Short'
                    self.short_open_indices.append(i+1)
                    self.entries[i] = entry

            # Handle exit conditions for short positions
            if self.position == 'Short' and signal == 1:
                exit = self.df['Price'][i+1]
                pnl_comp = shares_comp * (entry - exit) - self.commission * shares_comp * (entry + exit)
                pnl_static = shares_static * (entry - exit) - self.commission * shares_static * (entry + exit)
                self.portfolio.append(self.portfolio[-1] + pnl_comp)
                self.pnl.append(pnl_static)
                self.pnl_net.append(self.pnl_net[-1] + pnl_static)
                self.num_trades += 1
                self.short_trades += 1

                # Record PnL
                if pnl_static > 0:
                    self.pnl_up.append(pnl_static)
                else:
                    self.pnl_down.append(pnl_static)

                self.position = None
                self.short_squareoff_indices.append(i+1)
                self.exits[i] = exit
                self.bench_port.append(self.bench_shares * exit)

# ... existing methods remain unchanged ...

    def trade_history(self):
        print("--- TRADE HISTORY ---")
        print()

        position = None

        for i in range(len(self.df)-1):
            if self.df['signals'][i] == 1:
                if position == None:
                    print("LONG POSITION")
                    print(self.df['Datetime'][i+1], ":", "ENTRY - BUY", "@", self.df['Price'][i+1])
                    position = 'Long'

                elif position == 'Short':
                    print(self.df['Datetime'][i+1], ":", "EXIT - BUY", "@", self.df['Price'][i+1])
                    print()
                    position = None

            elif self.df['signals'][i] == -1:
                if position == 'Long':
                    print(self.df['Datetime'][i+1], ":", "EXIT - SELL", "@", self.df['Price'][i+1])
                    print()
                    position = None

                elif position == None:
                    print("SHORT POSITION")
                    print(self.df['Datetime'][i+1], ":", "ENTRY - SELL", "@", self.df['Price'][i+1])
                    position = 'Short'

    def trade_metrics(self):
        entry_times = [key for key in self.entries.keys()]
        exit_times = [key for key in self.exits.keys()]
        trade_durations = []
        for i in range(len(entry_times)):
            trade_durations.append(exit_times[i] - entry_times[i])
        max_duration = np.max(trade_durations)
        max_duration_days = max_duration // 24
        max_duration_hours = max_duration % 24
        avg_duration = np.round(np.mean(trade_durations), 0)
        avg_duration_days = avg_duration // 24
        avg_duration_hours = avg_duration % 24
        print("Maximum Trade Duration:", max_duration_days, "days", max_duration_hours, "hours")
        print("Average Trade Duration:", avg_duration_days, "days", avg_duration_hours, "hours")
        print("Number of Trades:", self.num_trades)
        print("Winning Trades:", len(self.pnl_up))
        print("Losing Trades:", len(self.pnl_down))
        print("Win Rate:", np.round(100 * len(self.pnl_up) / len(self.pnl), 3), "%")
        print("Number of Long Trades:", self.long_trades)
        print("Number of Short Trades:", self.short_trades)

    def compounded_metrics(self):
        # Ensure datetime column is in the correct format
        self.df['Datetime'] = pd.to_datetime(self.df['Datetime'])

        # Calculate net return
        net_return = (self.portfolio[-1] / self.portfolio[0]) - 1

        # Calculate the number of years based on the length of the DataFrame
        num_years = (self.df['Datetime'].iloc[-1] - self.df['Datetime'].iloc[0]).days / 365.0

        # Annualized return calculation
        annual_return = (1 + net_return) ** (1 / num_years) - 1 if num_years > 0 else 0

        # Buy and hold return calculation
        bnh_return = (self.df['Price'].iloc[-1] / self.df['Price'].iloc[0]) - 1

        # Drawdown calculations
        dd = 1 - self.portfolio / np.maximum.accumulate(self.portfolio)
        max_draw = -np.nan_to_num(dd.max())

        print("--- COMPOUNDING ---")
        print("Initial Balance:", np.round(self.portfolio[0], 3))
        print("Final Balance:", np.round(self.portfolio[-1], 3))
        print("Peak Balance:", np.round(np.max(self.portfolio), 3))
        print("Maximum Dip:", -np.round(np.max(np.maximum.accumulate(self.portfolio) - self.portfolio), 3))
        print("Average Dip:", -np.round(np.mean(np.maximum.accumulate(self.portfolio) - self.portfolio), 3))
        print("Lowest Balance:", np.round(np.min(self.portfolio), 3))
        print("Maximum PnL:", np.round(np.diff(self.portfolio).max(), 3))
        print("Minimum PnL:", np.round(np.diff(self.portfolio).min(), 3))
        print("Net Return:", np.round(100 * net_return, 3), "%")
        print("Annualised Return:", np.round(100 * annual_return, 3), "%")
        print("Maximum Drawdown:", np.round(100 * max_draw, 3), "%")
        print("Total Transaction Fees:", np.round(self.fee_comp, 3))

    def static_metrics(self):
        bench = (self.df['Price'].iloc[-1] / self.df['Price'].iloc[0] - 1) * self.cash
        entry_times = [key for key in self.entries.keys()]
        exit_times = [key for key in self.exits.keys()]
        trade_durations = []
        for i in range(len(entry_times)):
            trade_durations.append(exit_times[i] - entry_times[i])
        dur = np.array(trade_durations) / 24
        ret = np.array(self.pnl) / 1000
        daily_ret = ret / dur
        neg_ret = daily_ret[daily_ret < 0]
        sharpe = (daily_ret.mean() * 365) / (daily_ret.std() * np.sqrt(365)) if daily_ret.std() != 0 else 0
        sortino = (daily_ret.mean() * 365) / (neg_ret.std() * np.sqrt(365)) if neg_ret.std() != 0 else 0

        print("--- STATIC ---")
        print("Capital per Trade:", self.cash)
        print("Net Profit:", np.round(self.pnl_net[-1], 3))
        print("Gross Profit:", np.round(self.pnl_net[-1] + self.fee_static, 3))
        print("Benchmark Return:", np.round(bench, 3))
        print("Sharpe Ratio:", np.round(sharpe, 3))
        print("Sortino Ratio:", np.round(sortino, 3))
        print("Average Profit/Loss per Trade:", np.round(np.mean(self.pnl), 3))

        # Check for empty pnl_up and pnl_down
        if len(self.pnl_up) > 0:
            print("Largest Win:", np.round(np.max(self.pnl_up), 3))
            print("Average Win:", np.round(np.mean(self.pnl_up), 3))
        else:
            print("Largest Win: N/A (No winning trades)")
            print("Average Win: N/A (No winning trades)")

        if len(self.pnl_down) > 0:
            print("Largest Loss:", np.round(np.min(self.pnl_down), 3))
            print("Average Loss:", np.round(np.mean(self.pnl_down), 3))
        else:
            print("Largest Loss: N/A (No losing trades)")
            print("Average Loss: N/A (No losing trades)")

        print("Total Transaction Fees:", np.round(self.fee_static, 3))


    def plot_equity_curves(self):
        fig = make_subplots(rows=2, cols=1, subplot_titles=["Static Position", "Compounded"])
        fig.add_trace(px.line(y=self.pnl_net).data[0], row=1, col=1)
        fig.add_trace(px.line(y=self.portfolio).data[0], row=2, col=1)
        fig.update_xaxes(title_text="# of Trades", row=1, col=1)
        fig.update_xaxes(title_text="# of Trades", row=2, col=1)
        fig.update_yaxes(title_text="Net Profit/Loss", row=1, col=1)
        fig.update_yaxes(title_text="Portfolio Balance", row=2, col=1)
        fig.update_layout(template="ggplot2")
        fig.update_layout(height = 900, width = 900)
        fig.show()

    def plot_drawdown_curve(self):
        dd = 100 * np.round(-(1 - self.portfolio / np.maximum.accumulate(self.portfolio)), 3)
        fig = px.bar(x=list(range(len(dd))), y=dd ,
             title='Portfolio Drawdown',
             height=600)
        fig.update_xaxes(title_text="# of Trades")
        fig.update_yaxes(title_text="Portfolio Drawdown (%)")
        fig.update_layout(template="ggplot2")
        fig.update_layout(coloraxis_colorbar=dict(title='Portfolio Drawdown (%)'))
        fig.update_layout(yaxis=dict())
        fig.show()

    def plot_long_trades(self):
        fig = px.line(self.df, y="open", x=self.df["Datetime"], title='Long Trades')
        long_open_dates = []
        long_open_prices = []
        long_close_dates = []
        long_close_prices = []
        for i in range(len(self.long_open_indices)):
            long_open_dates.append(self.df["Datetime"][self.long_open_indices[i]])
            long_open_prices.append(self.df["Price"][self.long_open_indices[i]])
        for i in range(len(self.long_squareoff_indices)):
            long_close_dates.append(self.df["Datetime"][self.long_squareoff_indices[i]])
            long_close_prices.append(self.df["Price"][self.long_squareoff_indices[i]])
        fig.add_trace(go.Scatter(x=long_open_dates, y=long_open_prices, mode='markers', marker=dict(size=20, color='green',symbol="triangle-up"), name="Long Entry"))
        fig.add_trace(go.Scatter(x=long_close_dates, y=long_close_prices, mode='markers', marker=dict(size=20, color='blue',symbol="triangle-down"), name="Long Exit"))
        fig.update_layout(template="ggplot2")
        fig.update_xaxes(title_text="Open")
        fig.show()

    def plot_short_trades(self):
        fig = px.line(self.df, y="open", x=self.df["datetime"], title='Short Trades')
        short_open_dates = []
        short_open_prices = []
        short_close_dates = []
        short_close_prices = []
        for i in range(len(self.short_open_indices)):
            short_open_dates.append(self.df["datetime"][self.short_open_indices[i]])
            short_open_prices.append(self.df["open"][self.short_open_indices[i]])
        for i in range(len(self.short_squareoff_indices)):
            short_close_dates.append(self.df["datetime"][self.short_squareoff_indices[i]])
            short_close_prices.append(self.df["open"][self.short_squareoff_indices[i]])
        fig.add_trace(go.Scatter(x=short_open_dates, y=short_open_prices, mode='markers', marker=dict(size=20, color='green',symbol="triangle-down"), name="Short Entry"))
        fig.add_trace(go.Scatter(x=short_close_dates, y=short_close_prices, mode='markers', marker=dict(size=20, color='blue',symbol="triangle-up"), name="Short Exit"))
        fig.update_layout(template="ggplot2")
        fig.update_xaxes(title_text="Open")
        fig.show()

In [78]:
df = pd.read_csv("data/final_data_with_features.csv")

df['Datetime'] = pd.to_datetime(df['Datetime'], dayfirst=True)
df = df[df['Datetime'] >= '2024-10-01']
df = df.reset_index(drop=True)
df.reset_index(drop=True, inplace=True)

df['returns'] = df['Price'].pct_change().fillna(0)


Columns (25) have mixed types. Specify dtype option on import or set low_memory=False.



In [79]:
fig = px.line(df, x="Datetime", y="Price", title="Electricity Prices Over Time")
fig.update_xaxes(title_text="Date")
fig.update_yaxes(title_text="Price (£/MWh)")
fig.show()

In [80]:
def calculate_cusum(data, k):
    """
    Calculate CUSUM for the given dataset.

    Parameters:
    - data: DataFrame containing 'returns'
    - k: Threshold array

    Returns:
    - S_hi, S_lo arrays for regime identification
    """
    returns = data['returns'].values
    n = len(returns)
    S_hi = np.zeros(n)
    S_lo = np.zeros(n)

    # Calculate CUSUM recursively
    for i in range(1, n):
        S_hi[i] = max(0, S_hi[i-1] + returns[i] - k[i])
        S_lo[i] = max(0, S_lo[i-1] - returns[i] - k[i])

    return S_hi, S_lo

def identify_regimes(data, window=48, delta=0.5, h_factor=1.0):
    """
    Identify regimes based on CUSUM.
    """
    # Create a copy and ensure we have returns
    data = data.copy()

    # Calculate rolling standard deviation and thresholds
    rolling_sigma = data['returns'].rolling(window=window).std().fillna(method='bfill')
    k = (delta * rolling_sigma).values

    # Calculate CUSUM
    S_hi, S_lo = calculate_cusum(data, k)

    # Define regime thresholds
    rolling_h = h_factor * rolling_sigma

    # Identify regimes
    data['S_hi'] = S_hi
    data['S_lo'] = S_lo
    data['regime'] = 'neutral'
    data.loc[S_hi > rolling_h, 'regime'] = 'bullish'
    data.loc[S_lo > rolling_h, 'regime'] = 'bearish'

    return data

df = identify_regimes(df)

In [81]:
df['volume_mean'] = df['Volume'].rolling(window=48).mean()
df['price_mean'] = df['Price'].rolling(window=48).mean()
df['demand_mean'] = df['NationalDemand'].rolling(window=48).mean()

In [82]:
df['signals'] = 0

current_position = 0
entry_price = 0

for i in range(1, len(df) - 1):
    if df['SettlementPeriod'].iloc[i] == 47 and current_position != 0:
        if current_position == 1:
            df.loc[df.index[i], 'signals'] = -1
            current_position = 0
            continue

        if current_position == -1:
            df.loc[df.index[i], 'signals'] = 1
            current_position = 0
            continue

    if df['SettlementPeriod'].iloc[i] == 48:
        continue


    if (df['regime'].iloc[i] == 'bullish' and df['regime'].iloc[i - 1] == 'bullish' and
        df['Volume'].iloc[i] > df['volume_mean'].iloc[i-1] and
        df['Price'].iloc[i] < df['price_mean'].iloc[i-1] and
        df['NationalDemand'].iloc[i] > df['demand_mean'].iloc[i-1] and
        df['Price'].iloc[i] > 0):
        if current_position != 1:
            df.loc[df.index[i], 'signals'] = 1
            current_position = 1

    elif (df['regime'].iloc[i] == 'bearish' and df['regime'].iloc[i - 1] == 'bearish' and
        df['Price'].iloc[i] > df['price_mean'].iloc[i-1] and
        df['NationalDemand'].iloc[i] < df['demand_mean'].iloc[i-1] and
        df['Price'].iloc[i] > 0):
        if current_position != -1:
            df.loc[df.index[i], 'signals'] = -1
            current_position = -1

    elif df['regime'].iloc[i] != 'bullish' and current_position == 1:
        df.loc[df.index[i + 1], 'signals'] = -1
        current_position = 0

    elif df['regime'].iloc[i] != 'bearish' and current_position == -1:
        df.loc[df.index[i], 'signals'] = 1
        current_position = 0

# df[['Datetime','SettlementPeriod','Price','Volume','signals']].to_csv('data/trade__signals.csv')

In [83]:
signals = pd.read_csv('data/trade_signals.csv')
columns_to_keep = ['Datetime', 'Price', 'Volume', 'signals']
signals = signals[columns_to_keep]
res = BacktestResults(signals, cash = 10000, commission = 0)

In [84]:
res.trade_metrics()

Maximum Trade Duration: 1 days 22 hours
Average Trade Duration: 0.0 days 17.0 hours
Number of Trades: 20
Winning Trades: 13
Losing Trades: 7
Win Rate: 65.0 %
Number of Long Trades: 11
Number of Short Trades: 9


In [85]:
res.compounded_metrics()

--- COMPOUNDING ---
Initial Balance: 10000
Final Balance: 240743.892
Peak Balance: 240743.892
Maximum Dip: -10788.244
Average Dip: -1647.896
Lowest Balance: 9106.751
Maximum PnL: 100490.842
Minimum PnL: -10788.244
Net Return: 2307.439 %
Annualised Return: 34785783.206 %
Maximum Drawdown: -21.296 %
Total Transaction Fees: 0


In [86]:
res.plot_equity_curves()