In [232]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import statsmodels.api as sm

import plotly.express as px

In [233]:
# Read the cleaned dataset
df = pd.read_csv("final_data_with_features.csv")

df['Datetime'] = pd.to_datetime(df['Datetime'])  # Ensure datetime format
df = df[df['Datetime'] >= '2024-10-01']  # Filter data
df = df.reset_index(drop=True)  # Reset index after filtering

# Ensure the index is reset
df.reset_index(drop=True, inplace=True)

# Calculate the returns
df['returns'] = df['Price'].pct_change().fillna(0)

# Ensure there are no NaN, Inf, or constant values
df = df.replace([np.inf, -np.inf], np.nan).dropna()


Columns (25) have mixed types. Specify dtype option on import or set low_memory=False.





In [234]:
def calculate_cusum(data, k):
    """
    Calculate CUSUM for the given dataset.
    
    Parameters:
    - data: DataFrame containing 'returns'
    - k: Threshold array
    
    Returns:
    - S_hi, S_lo arrays for regime identification
    """
    returns = data['returns'].values
    n = len(returns)
    S_hi = np.zeros(n)
    S_lo = np.zeros(n)
    
    # Calculate CUSUM recursively
    for i in range(1, n):
        S_hi[i] = max(0, S_hi[i-1] + returns[i] - k[i])
        S_lo[i] = max(0, S_lo[i-1] - returns[i] - k[i])
    
    return S_hi, S_lo

def identify_regimes(data, window=24, delta=0.5, h_factor=1.0):
    """
    Identify regimes based on CUSUM.
    """
    # Create a copy and ensure we have returns
    data = data.copy()
    
    # Calculate rolling standard deviation and thresholds
    rolling_sigma = data['returns'].rolling(window=window).std().fillna(method='bfill')
    k = (delta * rolling_sigma).values
    
    # Calculate CUSUM
    S_hi, S_lo = calculate_cusum(data, k)
    
    # Define regime thresholds
    rolling_h = h_factor * rolling_sigma
    
    # Identify regimes
    data['S_hi'] = S_hi
    data['S_lo'] = S_lo
    data['regime'] = 'bearish'
    data.loc[S_hi > rolling_h, 'regime'] = 'bullish'
    
    return data

# Apply regime detection
df = identify_regimes(df)

In [235]:
import plotly.graph_objects as go

def plot_price_regime_scatter(data):
    """
    Plot the price series in scatter form,
    coloring points by the identified regime:
      - Blue for 'bullish'
      - Red for 'bearish'
      - Gray for 'neutral'
    """

    # Separate your DataFrame into bullish, bearish, and neutral segments
    bullish_data = data[data['regime'] == 'bullish']
    bearish_data = data[data['regime'] == 'bearish']
    neutral_data = data[data['regime'] == 'neutral']

    # Create a figure
    fig = go.Figure()

    # Plot the bearish points in red
    fig.add_trace(
        go.Scatter(
            x=bearish_data.index, 
            y=bearish_data['Price'],
            mode='markers',
            marker=dict(color='red'),
            name='Bearish'
        )
    )

    # Plot the bullish points in blue
    fig.add_trace(
        go.Scatter(
            x=bullish_data.index, 
            y=bullish_data['Price'],
            mode='markers',
            marker=dict(color='blue'),
            name='Bullish'
        )
    )
    
    # Plot the neutral points in gray
    fig.add_trace(
        go.Scatter(
            x=neutral_data.index, 
            y=neutral_data['Price'],
            mode='markers',
            marker=dict(color='gray'),
            name='Neutral'
        )
    )

    fig.update_layout(
        title='Price vs Regime Shifts (Scatter)',
        xaxis_title='Index',
        yaxis_title='Price'
    )

    fig.show()

# Usage Example:
# df = identify_regimes(df)
plot_price_regime_scatter(df)


In [236]:
# Create a new 'volume_mean' column which is a rolling mean of the previous 48 volumes
df['volume_mean'] = df['Volume'].rolling(window=48).mean()
df['price_mean'] = df['Price'].rolling(window=48).mean()
df['demand_mean'] = df['NationalDemand'].rolling(window=48).mean()

In [237]:
from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression

# Fill missing values in volatility column
df['Price_volatility_24h'].fillna(method='bfill', inplace=True)

# Fit a Markov Switching Model (MSM) with two regimes (low and high volatility)
msm_model = MarkovRegression(df['Price_volatility_24h'], k_regimes=2, trend='c', switching_variance=True)
msm_results = msm_model.fit()

# Extract predicted regimes
df['Volatility_Regime'] = msm_results.smoothed_marginal_probabilities[1] > 0.5

# Convert to categorical labels (0: low volatility, 1: high volatility)
df['Volatility_Regime'] = df['Volatility_Regime'].astype(int)


An unsupported index was provided and will be ignored when e.g. forecasting.



In [238]:
# Create a new 'signals' column initialized with 0 (no trade)
df['signals'] = 0

# Track current position: 0 = No position, 1 = Long, -1 = Short
current_position = 0  
entry_price = 0

# Identify long and short entry signals while ensuring no consecutive duplicate signals
for i in range(1, len(df) - 1):
    if df['SettlementPeriod'].iloc[i] == 47 and current_position != 0:
        if current_position == 1:
            df.loc[df.index[i + 1], 'signals'] = -1  # Long Exit
            entry_price = 0
            current_position = 0
            continue

    if df['SettlementPeriod'].iloc[i] == 48:
        continue

    # if entry_price!=0 and (df['Price'].iloc[i] < entry_price*0.0000001):
    #     df.loc[df.index[i + 1], 'signals'] = -1  # Long Exit
    #     current_position = 0
    #     entry_price = 0

    if (df['regime'].iloc[i] == 'bullish' and df['regime'].iloc[i - 1] == 'bullish' and 
        df['Volume'].iloc[i] > df['volume_mean'].iloc[i-1] and 
        df['Price'].iloc[i] < df['price_mean'].iloc[i-1] and
        df['NationalDemand'].iloc[i] > df['demand_mean'].iloc[i-1] and
        # df['Volatility_Regime'].iloc[i] == 0 and
        df['Price'].iloc[i] > 0):
        if current_position != 1:  # Only enter if not already in a long position
            df.loc[df.index[i + 1], 'signals'] = 1  # Long Entry
            current_position = 1  # Update position
            entry_price = df['Price'].iloc[i+1]

    elif df['regime'].iloc[i] == 'bearish':
        if current_position == 1:  # Only enter if not already in a short position
            df.loc[df.index[i + 1], 'signals'] = -1  # Long
            current_position = 0  # Update position

In [239]:
df = df[['Datetime','SettlementPeriod','Price','Volume','signals']]

df.to_csv('cusum1.csv')

In [240]:
# Plot price chart with entry and exit points
fig = go.Figure(data=[go.Candlestick(x=df['Datetime'],
                open=df['Price'],
                high=df['Price'],
                low=df['Price'],
                close=df['Price'])])

fig.update_layout(title='CUSUM Strategy Signals',
                  yaxis_title='Price (GBP/MWh)',
                  xaxis_title='Date')

# Add triangles for entry and exit points
fig.add_trace(go.Scatter(x=df.loc[df['signals'] == 1, 'Datetime'],
                         y=df.loc[df['signals'] == 1, 'Price'],
                         mode='markers',
                         marker=dict(color='green', size=10, symbol='triangle-up'),
                         name='Long Entry'))

fig.add_trace(go.Scatter(x=df.loc[df['signals'] == -1, 'Datetime'],
                         y=df.loc[df['signals'] == -1, 'Price'],
                         mode='markers',
                         marker=dict(color='red', size=10, symbol='triangle-down'),
                         name='Long Exit'))

fig.show()