<a href="https://colab.research.google.com/github/robbieyyy/robbieyyy.github.io/blob/main/Optimisation_for_ABM_Calibration_Updated.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Step 1: Model Set-up

Instructions:
1. Download the dataset from the following links

*   https://uk.finance.yahoo.com/quote/AAPL/history/?period1=1690416000&period2=
*   https://uk.finance.yahoo.com/quote/%5EIXIC/history/?period1=1690416000&period2
*   https://uk.finance.yahoo.com/quote/NVDA/history/?period1=1690416000&period2=
*   https://finance.yahoo.com/quote/%5ESPX/history/?period1=1690416000&period2=1
*   https://uk.finance.yahoo.com/quote/GOOG/history/?period1=1690416000&period2
*   https://uk.finance.yahoo.com/quote/GBPUSD%3DX/history/?period1=1690416000&period2=1722038400
*   https://uk.finance.yahoo.com/quote/EURUSD%3DX/history/?period1=1690416000&period2=1722038400
*   https://uk.finance.yahoo.com/quote/AUDUSD%3DX/history/?period1=1690416000&period2=1722038400

2. Select the Files icon from the left bar
3. Find the 'content' folder and select the 3 dots
4. Upload the downloaded files into 'content' folder

In [1]:
import pandas as pd
import numpy as np

# Paths to datasets
dataset_paths = {
    'AAPL': '/content/AAPL.csv',
    'IXIC': '/content/IXIC.csv',
    'NVDA': '/content/NVDA.csv',
    'SPX': '/content/SPX.csv',
    'GOOG': '/content/GOOG.csv',
    'GBPUSD': '/content/GBPUSD=X.csv',
    'EURUSD': '/content/EURUSD=X.csv',
    'AUDUSD': '/content/AUDUSD=X.csv'
}

# Define required columns
required_columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

# Loading datasets into a dictionary with date parsing
datasets = {}
for name, path in dataset_paths.items():
    try:
        # Load the dataset and parse 'Date' column
        data = pd.read_csv(path, parse_dates=['Date'])
        datasets[name] = data

        # Check if required columns is in the dataset
        missing_cols = [col for col in required_columns if col not in data.columns]
        if missing_cols:
            raise ValueError(f"Dataset {name} is missing columns: {', '.join(missing_cols)}")

    except FileNotFoundError:
        print(f"Error: File for {name} not found at {path}.")
    except pd.errors.ParserError:
        print(f"Error: Failed to parse the file for {name}.")
    except ValueError as e:
        print(e)


# Step 2: Define Agent Behaviour

In [2]:
import random
from sklearn.metrics import mean_squared_error
from gym import Env, spaces

In [3]:
class TradingEnv(Env):
    def __init__(self, data):
        super(TradingEnv, self).__init__()
        self.data = data.copy()
        self.current_step = 0
        self.holdings = 0
        self.balance = 1000

        # Define action space: 0 = hold, 1 = buy, 2 = sell
        self.action_space = spaces.Discrete(3)
        # Observation space based on data columns (assuming normalized values between 0 and 1)
        self.observation_space = spaces.Box(low=0, high=1, shape=(len(data.columns),), dtype=np.float32)

    def reset(self):
        # Reset the environment to the initial state
        self.current_step = 0
        self.holdings = 0
        self.balance = 1000
        return self.data.iloc[self.current_step].values  # Return the initial observation

    def step(self, action):
        # Perform one step in the environment based on the agent's action
        self.current_step += 1
        done = self.current_step >= len(self.data) - 1

        current_price = self.data.iloc[self.current_step]['Close']
        initial_balance = self.balance

        # Simulate price change based on action
        price_change = 0
        if action == 1:  # Buy
            self.holdings += 1
            price_change = current_price * 0.005  # Simulate a 0.5% price increase
        elif action == 2:  # Sell
            if self.holdings > 0:
                self.holdings -= 1
                price_change = -current_price * 0.005  # Simulate a 0.5% price decrease

        # Update the price and balance
        self.data.at[self.current_step, 'Close'] += price_change
        self.balance += price_change * self.holdings

        # Calculate net worth and reward
        net_worth = self.balance + self.holdings * self.data.iloc[self.current_step]['Close']
        reward = net_worth - initial_balance

        # Observation for the next step
        obs = self.data.iloc[self.current_step].values

        return obs, reward, done, {}

    def render(self, mode='human'):
        # Render the environment (currently a placeholder)
        pass


  and should_run_async(code)


In [4]:
# Traders
class FundamentalTrader:
    def __init__(self, fundamental_value, sensitivity=0.05):
        self.fundamental_value = fundamental_value
        self.sensitivity = sensitivity

    def decide(self, price):
        # Buy if price is significantly below fundamental value, sell if significantly above
        if price < self.fundamental_value * (1 - self.sensitivity):
            return 1  # Buy
        elif price > self.fundamental_value * (1 + self.sensitivity):
            return 2  # Sell
        return 0  # Hold


In [5]:
class MomentumTrader:
    def __init__(self, trend_length=3):
        self.trend_length = trend_length
        self.recent_prices = []

    def decide(self, price):
        # Buy if price is trending upward, sell if trending downward
        self.recent_prices.append(price)
        if len(self.recent_prices) > self.trend_length:
            self.recent_prices.pop(0)

        if len(self.recent_prices) < self.trend_length:
            return 0  # Hold
        if all(x < y for x, y in zip(self.recent_prices, self.recent_prices[1:])):
            return 1  # Buy
        elif all(x > y for x, y in zip(self.recent_prices, self.recent_prices[1:])):
            return 2  # Sell
        return 0  # Hold

In [6]:
class RandomTrader:
    def decide(self, price):
        # Randomly decide between holding, buying, or selling
        return np.random.choice([0, 1, 2])


# Step 3: Calibration of Parameters




In [7]:
# Distance Function
def distance_function(real_data, synthetic_data):
    real_prices = real_data['Close'].values
    synthetic_prices = synthetic_data['Close'].values
    return mean_squared_error(real_prices, synthetic_prices)


def generate_synthetic_data(traders, data):
    env = TradingEnv(data)
    state = env.reset()
    done = False

    # Initialise trading env with real data, creates a simulation loop until trading is completed
    while not done:
        actions = [trader.decide(state[4]) for trader in traders]
        action = max(set(actions), key=actions.count)
        state, reward, done, _ = env.step(action)

    # Converts the list of synthetic prices into a DataFrame.
    synthetic_data = pd.DataFrame({
        'Date': data['Date'],
        'Close': [env.data.iloc[i]['Close'] for i in range(len(data))]
    })
    return synthetic_data


In [8]:
# Loading datasets

for name, real_data in datasets.items():
    real_data['Close'] = real_data['Adj Close']
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()

    synthetic_data = generate_synthetic_data([fundamental_trader, momentum_trader, random_trader], real_data)

    # Check if synthetic data matches real data
    if real_data['Close'].equals(synthetic_data['Close']):
        print(f"Synthetic data for {name} is identical to real data.")

    distance = distance_function(real_data, synthetic_data)
    print(f"Distance (MSE) between real and synthetic data for {name}: {distance}")

Distance (MSE) between real and synthetic data for AAPL: 0.19035480687661124
Distance (MSE) between real and synthetic data for IXIC: 1189.1838414878057
Distance (MSE) between real and synthetic data for NVDA: 0.09751629765779138
Distance (MSE) between real and synthetic data for SPX: 120.44924315827969
Distance (MSE) between real and synthetic data for GOOG: 0.28954118126302286
Distance (MSE) between real and synthetic data for GBPUSD: 2.3937435925880793e-05
Distance (MSE) between real and synthetic data for EURUSD: 1.7344663995307758e-05
Distance (MSE) between real and synthetic data for AUDUSD: 6.1485053261951485e-06


# Step 4: Parameter Tuning


In [9]:
!pip install ipython-autotime

%load_ext autotime

  and should_run_async(code)


time: 602 µs (started: 2024-09-11 16:37:40 +00:00)


## 1. Grid Search:

In [10]:
from itertools import product

def grid_search_tuning(trader_classes, param_grid, data):
    best_params = None
    best_mse = float('inf') # Initialise to infinity for comparison

    # Iterate over every combination of parameters using the Cartesian product
    for params in product(*param_grid.values()):
        # Set parameters
        fundamental_value, sensitivity, trend_length = params

        trader_classes[0].fundamental_value = fundamental_value
        trader_classes[0].sensitivity = sensitivity
        trader_classes[1].trend_length = trend_length

        # Generate synthetic data and calculate MSE
        synthetic_data = generate_synthetic_data(trader_classes, data)
        mse = distance_function(data, synthetic_data)

        # Update the best parameters if a lower MSE is found
        if mse < best_mse:
            best_mse = mse
            best_params = params

    return best_params, best_mse

time: 1.17 ms (started: 2024-09-11 16:37:40 +00:00)


In [None]:
# Parameter grid for grid search
param_grid = {
    'fundamental_value': [140, 150, 160],
    'sensitivity': [0.02, 0.05, 0.1],
    'trend_length': [3, 5, 7]
}

# Perform grid search on each dataset
for name, real_data in datasets.items():
    print(f"\nGrid search for {name} dataset")
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()

    trader_classes = [fundamental_trader, momentum_trader, random_trader]
    best_params, best_mse = grid_search_tuning(trader_classes, param_grid, real_data)

    print(f"Best parameters (Grid Search): {best_params}")
    print(f"Best MSE (Grid Search): {best_mse}")


Grid search for AAPL dataset
Best parameters (Grid Search): (140, 0.1, 3)
Best MSE (Grid Search): 0.0

Grid search for IXIC dataset
Best parameters (Grid Search): (140, 0.1, 5)
Best MSE (Grid Search): 81.13982310049677

Grid search for NVDA dataset
Best parameters (Grid Search): (140, 0.1, 5)
Best MSE (Grid Search): 0.04213849098155099

Grid search for SPX dataset
Best parameters (Grid Search): (160, 0.05, 3)
Best MSE (Grid Search): 0.0

Grid search for GOOG dataset
Best parameters (Grid Search): (140, 0.1, 3)
Best MSE (Grid Search): 0.014616066413562435

Grid search for GBPUSD dataset


### Visualise Volatility of IXIC and SPX

In [None]:
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from statsmodels.tsa.stattools import adfuller
from sklearn.preprocessing import StandardScaler

# Load IXIC and SPX data from Yahoo Finance
ixic = yf.download('^IXIC', start='2023-07-27', end='2024-07-26')
spx = yf.download('^GSPC', start='2023-07-27', end='2024-07-26')

# Calculate daily returns for both IXIC and SPX
ixic['Returns'] = ixic['Close'].pct_change()
spx['Returns'] = spx['Close'].pct_change()


# Calculate rolling volatility (21-day window) for both IXIC and SPX
ixic['Rolling_Volatility'] = ixic['Returns'].rolling(window=21).std()
spx['Rolling_Volatility'] = spx['Returns'].rolling(window=21).std()


# Plot the rolling volatility of both indices
plt.figure(figsize=(10, 6))
plt.plot(ixic.index, ixic['Rolling_Volatility'], label='IXIC Rolling Volatility (21-day)', color='blue')
plt.plot(spx.index, spx['Rolling_Volatility'], label='SPX Rolling Volatility (21-day)', color='green')
plt.title('Rolling Volatility Comparison (IXIC and SPX)')
plt.xlabel('Date')
plt.ylabel('Volatility')
plt.legend()
plt.show()

## 2. Random Search:

In [None]:
# Random Search Tuning
import random

def random_search_tuning(trader_classes, param_distributions, n_iter, data):
    best_params = None
    best_mse = float('inf')

    for _ in range(n_iter):
        # Randomly sample parameters
        fundamental_value = random.choice(param_distributions['fundamental_value'])
        sensitivity = random.choice(param_distributions['sensitivity'])
        trend_length = random.choice(param_distributions['trend_length'])

        # Set parameters
        trader_classes[0].fundamental_value = fundamental_value
        trader_classes[0].sensitivity = sensitivity
        trader_classes[1].trend_length = trend_length

        # Generate synthetic data and calculate MSE
        synthetic_data = generate_synthetic_data(trader_classes, data)
        mse = distance_function(data, synthetic_data)

        if mse < best_mse:
            best_mse = mse
            best_params = (fundamental_value, sensitivity, trend_length)

    return best_params, best_mse

In [None]:
# Parameter grid for random search
param_distributions = {
    'fundamental_value': [140, 150, 160],
    'sensitivity': [0.02, 0.05, 0.1],
    'trend_length': [3, 5, 7]
}

# n_iter determines how many random samples will be tested.
n_iter = 10

for name, real_data in datasets.items():
    print(f"\nRandom search for {name} dataset")
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()

    trader_classes = [fundamental_trader, momentum_trader, random_trader]
    best_params, best_mse = random_search_tuning(trader_classes, param_distributions, n_iter, real_data)
    print(f"Best parameters (Random Search): {best_params}")
    print(f"Best MSE (Random Search): {best_mse}")

## 3. Bayesian Optimisation:

In [None]:
!pip install scikit-optimize

In [None]:
from skopt import gp_minimize
from skopt.space import Real, Integer
from skopt.utils import use_named_args
from skopt.acquisition import gaussian_ei

# Bayesian Optimisation Tuning
def bayesian_optimization_tuning(trader_classes, data):
    # Define the parameter space
    space = [
        Integer(140, 160, name='fundamental_value'),
        Real(0.01, 0.1, name='sensitivity'),
        Integer(3, 7, name='trend_length')
    ]

    @use_named_args(space)
    def objective(**params):
        trader_classes[0].fundamental_value = params['fundamental_value']
        trader_classes[0].sensitivity = params['sensitivity']
        trader_classes[1].trend_length = params['trend_length']

        synthetic_data = generate_synthetic_data(trader_classes, data)
        return distance_function(data, synthetic_data)

    res = gp_minimize(objective, space, n_calls=20, random_state=42, acq_func="EI")
    return res.x, res.fun


In [None]:
# Perform Bayes Optimisation search for each dataset
for name, real_data in datasets.items():
    print(f"\nBayesian Optimisation for {name} dataset")
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()

    # Perform Bayesian Optimisation
    best_params, best_mse = bayesian_optimization_tuning(trader_classes, real_data)
    print(f"Best parameters (Bayesian Optimisation): {best_params}")
    print(f"Best MSE (Bayesian Optimisation): {best_mse}")


## 4. Greedy Search:

In [None]:
# Greedy Search Tuning
def greedy_search(trader_classes, real_data):
    best_distance = float('inf')
    best_params = None

    # Parameter ranges
    fundamental_values = [140, 150, 160]
    sensitivities = [0.02, 0.05, 0.1]
    trend_lengths = [3, 5, 7]

    # Iterate through all combinations
    for fundamental_value in fundamental_values:
        for trend_length in trend_lengths:
            for sensitivity in sensitivities:
                # Set trader parameters
                trader_classes[0].fundamental_value = fundamental_value
                trader_classes[0].sensitivity = sensitivity
                trader_classes[1].trend_length = trend_length

                # Generate synthetic data and calculate distance
                synthetic_data = generate_synthetic_data(trader_classes, real_data)
                distance = distance_function(real_data, synthetic_data)

                if distance < best_distance:
                    best_distance = distance
                    best_params = (fundamental_value, sensitivity, trend_length)

    print(f"Best parameters (Greedy Search): Fundamental value = {best_params[0]}, Sensitivity = {best_params[1]}, Trend length = {best_params[2]}")
    print(f"Best MSE (Greedy Search): {best_distance}")


In [None]:
# Parameter grid for Greedy search
param_distributions = {
    'fundamental_value': [140, 150, 160],
    'sensitivity': [0.02, 0.05, 0.1],
    'trend_length': [3, 5, 7]
}

for name, real_data in datasets.items():
    print(f"\nGreedy search for {name} dataset")
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()

    # Perform Greedy Search
    greedy_search(trader_classes, real_data)


## 5. Reinforcement Learning:

In [None]:
class QLearningAgent:
    def __init__(self, state_space, action_space, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.q_table = np.zeros((state_space, action_space))
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate

    def choose_action(self, state):
        state = int(state)
        # Generates random number and compares it to epsilon. If it's less than epsilon, it explores by choosing a random action
        if np.random.uniform(0, 1) < self.epsilon:
            return np.random.choice(range(self.q_table.shape[1]))
        else:
            # Returns action with the highest Q-value for the current state
            return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state):
        state = int(state)
        next_state = int(next_state)
        predict = self.q_table[state, action]
        # Calculates target Q-value based on reward and maximum Q-value of next state
        target = reward + self.gamma * np.max(self.q_table[next_state])
        # Updates Q-value for state-action pair using the learning rate
        self.q_table[state, action] += self.alpha * (target - predict)

In [None]:
# Simplified Trading Environment
class TradingEnvRL:
    def __init__(self, data):
        self.data = data
        self.current_step = 0

    def reset(self):
        self.current_step = 0
        return self.current_step

    def step(self, action):
        self.current_step += 1
        if self.current_step >= len(self.data) - 1:
            done = True
            next_state = 0
        else:
            done = False
            next_state = self.current_step

        # Calculates reward as the change in closing price between current and previous steps
        reward = self.data.iloc[self.current_step]['Close'] - self.data.iloc[self.current_step - 1]['Close']
        # If agent chose to buy (action == 1), the reward is positive; otherwise, it's negative
        reward = reward if action == 1 else -reward

        return next_state, reward, done, {}

In [None]:
def train_rl_agent(agent, episodes, trader_classes, real_data):
    env = TradingEnvRL(real_data)

    global best_mse  # Make sure we are updating the global best_mse
    global best_params  # Tracking best parameters

    for episode in range(episodes):
        state = env.reset()
        done = False

        while not done:
            # Agent chooses action based on current state
            action = agent.choose_action(state)
            # Takes a step in the environment and gets next state, reward, and done flag
            next_state, reward, done, _ = env.step(action)
            agent.update(state, action, reward, next_state)
            state = next_state

        # Adjust parameters based on learned policy
        # Determines the best action by summing Q-values across all states and finding the maximum
        best_action = np.argmax(agent.q_table.sum(axis=0))
        fundamental_trader.fundamental_value = 140 + best_action * 10  # Example adjustment
        momentum_trader.trend_length = 3 + best_action * 2  # Example adjustment

        # Generate synthetic data and calculate the distance (MSE)
        synthetic_data = generate_synthetic_data([fundamental_trader, momentum_trader, random_trader], real_data)
        distance = distance_function(real_data, synthetic_data)

        # Check if this is the best MSE so far
        if distance < best_mse:
            best_mse = distance
            best_params = {"best_action": best_action}  # Example for best parameters

        print(f"Episode {episode+1}/{episodes}: Distance = {distance}")


In [None]:
for name, real_data in datasets.items():
    print(f"\nReinforcement Learning for {name} dataset")
    agent = QLearningAgent(state_space=len(real_data), action_space=3)  # Adjust dimensions as needed
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()

    # Episode means for each day, may not be consecutive
    train_rl_agent(agent, episodes=10, trader_classes=[fundamental_trader, momentum_trader, random_trader], real_data=real_data)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

# Neural Network for Deep Q-Learning
class DQNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, action_size)

    def forward(self, x):
        # Passes the input through the first layer and applies ReLU activation
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        # Returns the final Q-values
        return self.fc3(x)

# Deep Q-Learning Agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000) # Initialises the memory buffer with a maximum length of 2000
        self.gamma = 0.95    # Discount rate
        self.epsilon = 1.0   # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = DQNetwork(state_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()

    # Stores an experience in memory
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        # With probability epsilon, choose a random action
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        # Converts the state to a tensor
        state = torch.tensor(state, dtype=torch.float32)
        # Passes the state through the model to get Q-values
        act_values = self.model(state)
        # Returns the action with the highest Q-value
        return torch.argmax(act_values).item()

    def replay(self, batch_size):
        # Samples a random minibatch from memory
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            # If the episode is not done, calculate the target with future rewards
            if not done:
                # Converts the next state to a tensor
                next_state = torch.tensor(next_state, dtype=torch.float32)
                # Updates the target with the maximum Q-value of the next state
                target = reward + self.gamma * torch.max(self.model(next_state)).item()
            state = torch.tensor(state, dtype=torch.float32)
            # Predicts the Q-values for the current state
            target_f = self.model(state)
            # Updates the Q-value for the chosen action
            target_f[action] = target
            # Clears the gradients of the model parameters
            self.optimizer.zero_grad()
            # Calculates the loss between the target and predicted Q-values
            loss = self.criterion(target_f, self.model(state))
            # Backpropagates the loss
            loss.backward()
            self.optimizer.step()
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

def train_dqn_agent(agent, episodes, batch_size, trader_classes, real_data):
    global best_mse  # Best MSE across episodes
    global best_params

    for episode in range(episodes):
        env = TradingEnvRL(real_data)
        state = env.reset()
        state = np.array([state])

        for time in range(len(real_data)):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            next_state = np.array([next_state])
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            if done:
                print(f"Episode {episode+1}/{episodes} finished after {time+1} timesteps")
                break
        # If there are enough experiences in memory, start replaying
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

        # Adjust trader parameters based on the best action
        best_action = np.argmax(agent.model(torch.tensor(state, dtype=torch.float32)).detach().numpy())
        fundamental_trader.fundamental_value = 140 + best_action * 10
        momentum_trader.trend_length = 3 + best_action * 2

        # Generate synthetic data and calculate the MSE
        synthetic_data = generate_synthetic_data([fundamental_trader, momentum_trader, random_trader], real_data)
        distance = distance_function(real_data, synthetic_data)

        if distance < best_mse:
            best_mse = distance
            best_params = {"best_action": best_action}

        print(f"Episode {episode+1}/{episodes}: Distance = {distance}")


In [None]:
# Store best MSE and best parameters for each dataset
best_mse_per_dataset = {}

for name, real_data in datasets.items():
    print(f"\nReinforcement Learning for {name} dataset")

    agent = QLearningAgent(state_space=len(real_data), action_space=3)  # Adjust dimensions as needed
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()

    # Reset best MSE and best parameters for this dataset
    best_mse = float('inf')
    best_params = None

    train_rl_agent(agent, episodes=10, trader_classes=[fundamental_trader, momentum_trader, random_trader], real_data=real_data)

    # Store best MSE and parameters for this dataset
    best_mse_per_dataset[name] = {"best_mse": best_mse, "best_params": best_params}

# After training, print the best MSE per dataset
for name, mse_data in best_mse_per_dataset.items():
    print(f"\nReinforcement Learning for {name} dataset")
    print(f"Best MSE (Reinforcement Learning): {mse_data['best_mse']}")



# Step 5: Evaluate Performance, Calibrate Distance Function, and Visualize

In [None]:
from scipy.spatial.distance import euclidean

def evaluate_model(real_data, predicted_data, parameter_sets):
    """
    Evaluates the performance of the model using the mean squared error and calibrates the distance function.

    Args:
    - real_data: Array of real-world data.
    - predicted_data: Array of predicted data from the model.
    - parameter_sets: List of parameter sets used in the model.

    Returns:
    - performance_scores: List of performance scores (e.g., MSE) for each parameter set.
    - distance_scores: List of distance scores between parameter sets and real-world data.
    """
    performance_scores = []
    distance_scores = []

    for i, predicted in enumerate(predicted_data):
        mse = mean_squared_error(real_data, predicted)
        performance_scores.append(mse)

        # Calculates the Euclidean distance between the current parameter set and real_data
        distance = euclidean(parameter_sets[i], real_data)
        # Adds the distance to the distance_scores list
        distance_scores.append(distance)

    return performance_scores, distance_scores

# Example data
real_data = np.array([1.2, 2.5, 3.8])
predicted_data = [np.array([1.1, 2.6, 3.7]), np.array([1.3, 2.4, 3.9])]
parameter_sets = [np.array([0.9, 2.7, 3.6]), np.array([1.4, 2.3, 4.0])]

performance_scores, distance_scores = evaluate_model(real_data, predicted_data, parameter_sets)

print("Performance Scores (MSE):", performance_scores)
print("Distance Scores:", distance_scores)

# Calibrate distance function (example step)
min_distance = min(distance_scores)
# Normalizes the distance scores by dividing by the minimum distance
calibrated_distances = [d / min_distance for d in distance_scores]

print("Calibrated Distances:", calibrated_distances)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def evaluate_model(real_data, predicted_data, parameter_sets):
    performance_scores = []
    distance_scores = []

    for i, predicted in enumerate(predicted_data):
        mse = mean_squared_error(real_data, predicted)
        performance_scores.append(mse)

        distance = euclidean(parameter_sets[i], real_data)
        distance_scores.append(distance)

    return performance_scores, distance_scores

In [None]:
def visualize_performance(performance_scores, distance_scores):
    # Create a scatter plot to visualise relationship between performance and distance
    plt.figure(figsize=(10, 6))
    sns.scatterplot(x=distance_scores, y=performance_scores, s=100)
    plt.title("Performance vs. Distance")
    plt.xlabel("Distance Score")
    plt.ylabel("Performance Score (MSE)")
    plt.grid(True)
    plt.show()

    # Plotting performance scores
    plt.figure(figsize=(10, 6))
    plt.plot(performance_scores, marker='o', linestyle='-', color='b')
    plt.title("Performance Scores (MSE) Across Parameter Sets")
    plt.xlabel("Parameter Set Index")
    plt.ylabel("Performance Score (MSE)")
    plt.grid(True)
    plt.show()

    # Plotting distance scores
    plt.figure(figsize=(10, 6))
    plt.plot(distance_scores, marker='o', linestyle='-', color='r')
    plt.title("Distance Scores Across Parameter Sets")
    plt.xlabel("Parameter Set Index")
    plt.ylabel("Distance Score")
    plt.grid(True)
    plt.show()

In [None]:
# Calibrate distance function (example step)
min_distance = min(distance_scores)

# Check to prevent division by zero
if min_distance == 0:
    # Skip normalisation because it would disrupt/manipulate model calibration
    calibrated_distances = distance_scores
    print("Min distance is zero, skipping normalization.")
else:
    # If minimum distance is not zero, normalises distance scores by dividing each by the minimum distance
    # This step calibrates the distances, making the smallest distance equal to 1 and scaling the others accordingly
    calibrated_distances = [d / min_distance for d in distance_scores]

print("Performance Scores (MSE):", performance_scores)
print("Distance Scores:", distance_scores)
print("Calibrated Distances:", calibrated_distances)

In [None]:
# Data for plotting
methods = ['Method 1', 'Method 2', 'Method 3']
mse_scores = performance_scores  # Ensure length matches 'methods'
distance_scores = distance_scores
calibrated_distances = calibrated_distances

# Check if all lists have the same length
if len(methods) == len(mse_scores) == len(distance_scores) == len(calibrated_distances):
    # Plotting the MSE Scores
    plt.figure(figsize=(10, 6))

    plt.subplot(1, 3, 1)
    plt.bar(methods, mse_scores, color='skyblue')
    plt.title('Performance Scores (MSE)')
    plt.ylabel('MSE')

    # Plotting the Distance Scores
    plt.subplot(1, 3, 2)
    plt.bar(methods, distance_scores, color='lightgreen')
    plt.title('Distance Scores')
    plt.ylabel('Distance')

    # Plotting the Calibrated Distances
    plt.subplot(1, 3, 3)
    plt.bar(methods, calibrated_distances, color='salmon')
    plt.title('Calibrated Distances')
    plt.ylabel('Calibrated Distance')

    plt.tight_layout()
    plt.show()

else:
    print("Error: Length mismatch between 'methods' and score arrays.")

In [None]:
# Function to evaluate and plot results
def evaluate_and_plot(trader_classes, real_data, method_name, best_params):
    # Set the best parameters for the traders
    fundamental_value, sensitivity, trend_length = best_params
    trader_classes[0].fundamental_value = fundamental_value
    trader_classes[0].sensitivity = sensitivity
    trader_classes[1].trend_length = trend_length

    # Generate synthetic data using the best parameters
    synthetic_data = generate_synthetic_data(trader_classes, real_data)

    # Calculate distance (MSE) between real and synthetic data
    distance = distance_function(real_data, synthetic_data)

    # Plot the real and synthetic closing prices
    plt.figure(figsize=(10, 6))
    plt.plot(real_data['Date'], real_data['Close'], label='Real Data')
    plt.plot(synthetic_data['Date'], synthetic_data['Close'], label='Synthetic Data')
    plt.xlabel('Date')
    plt.ylabel('Closing Price')
    plt.title(f'Real vs Synthetic Data - {method_name}\nMSE: {distance:.10f}')
    plt.legend()
    plt.show()

    return distance

# Modified function for greedy search to return best parameters
def greedy_search_with_plot(trader_classes, real_data):
    best_distance = float('inf')
    best_params = None

    fundamental_values = [140, 150, 160]
    trend_lengths = [3, 5, 7]

    for fundamental_value in fundamental_values:
        for trend_length in trend_lengths:
            trader_classes[0].fundamental_value = fundamental_value
            trader_classes[1].trend_length = trend_length

            synthetic_data = generate_synthetic_data(trader_classes, real_data)
            distance = distance_function(real_data, synthetic_data)

            if distance < best_distance:
                best_distance = distance
                best_params = (fundamental_value, trend_length)

    print(f"Greedy Search Best Params: Fundamental value = {best_params[0]}, Trend length = {best_params[1]}")
    print(f"Greedy Search Best Distance: {best_distance:.4f}")

    # Plot the results
    sensitivity = trader_classes[0].sensitivity  # Retain sensitivity value
    return evaluate_and_plot(trader_classes, real_data, "Greedy Search", (best_params[0], sensitivity, best_params[1]))

# Function to train RL agent and plot results
def train_and_plot_rl(trader_classes, agent, real_data, method_name, episodes=10):
    for episode in range(episodes):
        state = 0
        done = False
        env = TradingEnvRL(real_data)
        state = env.reset()

        while not done:
            action = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.update(state, action, reward, next_state)
            state = next_state

        best_action = np.argmax(agent.q_table.sum(axis=0))
        trader_classes[0].fundamental_value = 140 + best_action * 10
        trader_classes[1].trend_length = 3 + best_action * 2

    # After training, evaluate and plot results
    sensitivity = trader_classes[0].sensitivity  # Retain sensitivity value
    evaluate_and_plot(trader_classes, real_data, method_name, (trader_classes[0].fundamental_value, sensitivity, trader_classes[1].trend_length))

for name, real_data in datasets.items():
    print(f"\nEvaluating performance for dataset: {name}")

    # Initialise traders with default values
    fundamental_trader = FundamentalTrader(fundamental_value=150)
    momentum_trader = MomentumTrader(trend_length=3)
    random_trader = RandomTrader()
    trader_classes = [fundamental_trader, momentum_trader, random_trader]

    # Grid search best params
    grid_best_params, _ = grid_search_tuning(trader_classes, param_grid, real_data)
    grid_mse = evaluate_and_plot(trader_classes, real_data, "Grid Search", grid_best_params)

    # Random search best params
    random_best_params, _ = random_search_tuning(trader_classes, param_distributions, n_iter, real_data)
    random_mse = evaluate_and_plot(trader_classes, real_data, "Random Search", random_best_params)

    # Bayesian optimization best params
    bayesian_best_params, _ = bayesian_optimization_tuning(trader_classes, real_data)
    bayesian_mse = evaluate_and_plot(trader_classes, real_data, "Bayesian Optimisation", bayesian_best_params)

    # Greedy search best params and plot
    greedy_mse = greedy_search_with_plot(trader_classes, real_data)

    # Reinforcement Learning
    print(f"\nReinforcement Learning for {name} dataset")
    agent = QLearningAgent(state_space=len(real_data), action_space=3)
    train_and_plot_rl(trader_classes, agent, real_data, "Reinforcement Learning")

    print(f"Summary for {name}:")
    print(f"Grid Search MSE: {grid_mse:.10f}")
    print(f"Random Search MSE: {random_mse:.10f}")
    print(f"Bayesian Optimisation MSE: {bayesian_mse:.10f}")
    print(f"Greedy Search MSE: {greedy_mse:.10f}")



In [None]:
import seaborn as sns
from sklearn.model_selection import KFold


def greedy_search_with_plot(trader_classes, real_data, n_splits=5):
    best_distance = float('inf')
    best_params = None

    fundamental_values = [140, 150, 160]
    trend_lengths = [3, 5, 7]

    for fundamental_value in fundamental_values:
        for trend_length in trend_lengths:
            trader_classes[0].fundamental_value = fundamental_value
            trader_classes[1].trend_length = trend_length

            synthetic_data = generate_synthetic_data(trader_classes, real_data)
            distance = distance_function(real_data, synthetic_data)

            if distance < best_distance:
                best_distance = distance
                best_params = (fundamental_value, trend_length)

    print(f"Greedy Search Best Params: Fundamental value = {best_params[0]}, Trend length = {best_params[1]}")
    print(f"Greedy Search Best Distance: {best_distance:.4f}")

    # Perform cross-validation using the best parameters
    fold_mse = cross_validate(trader_classes, real_data, "Greedy Search", (best_params[0], trader_classes[0].sensitivity, best_params[1]), n_splits)

    return fold_mse  # Return the list of MSE values for each fold

def train_and_plot_rl(trader_classes, agent, real_data, method_name, episodes=10, n_splits=5):
    # Placeholder to store MSE values across folds
    fold_mse = []

    # K-Fold Cross-Validation setup
    kf = KFold(n_splits=n_splits)

    for train_index, test_index in kf.split(real_data):
        # Split data into train and test sets
        train_data, test_data = real_data.iloc[train_index].reset_index(drop=True), real_data.iloc[test_index].reset_index(drop=True)

        # Train the RL agent on the train set
        for episode in range(episodes):
            state = 0
            done = False
            env = TradingEnvRL(train_data)
            state = env.reset()

            while not done:
                action = agent.choose_action(state)
                next_state, reward, done, _ = env.step(action)
                agent.update(state, action, reward, next_state)
                state = next_state

            best_action = np.argmax(agent.q_table.sum(axis=0))
            trader_classes[0].fundamental_value = 140 + best_action * 10
            trader_classes[1].trend_length = 3 + best_action * 2

        # Generate synthetic data on the train set
        synthetic_data = generate_synthetic_data(trader_classes, train_data)

        # Adjust the synthetic data to match the test set length
        synthetic_data = synthetic_data.iloc[:len(test_data)].reset_index(drop=True)

        # Evaluate the model on the test set
        mse = distance_function(test_data, synthetic_data)
        fold_mse.append(mse)

    return fold_mse  # Return the list of MSE values for each fold

def cross_validate(trader_classes, real_data, method_name, best_params, n_splits=5):
    # K-Fold Cross-Validation setup
    kf = KFold(n_splits=n_splits)
    fold_mse = []

    for train_index, test_index in kf.split(real_data):
        # Split data into train and test sets
        train_data, test_data = real_data.iloc[train_index].reset_index(drop=True), real_data.iloc[test_index].reset_index(drop=True)

        # Set best parameters for the traders
        fundamental_value, sensitivity, trend_length = best_params
        trader_classes[0].fundamental_value = fundamental_value
        trader_classes[0].sensitivity = sensitivity
        trader_classes[1].trend_length = trend_length

        # Generate synthetic data on the train set
        synthetic_data = generate_synthetic_data(trader_classes, train_data)

        # Adjust the synthetic data to match the test set length
        synthetic_data = synthetic_data.iloc[:len(test_data)].reset_index(drop=True)

        # Evaluate the model on the test set
        mse = distance_function(test_data, synthetic_data)
        fold_mse.append(mse)

    return fold_mse  # Return list of MSE values

# Dictionary to store MSE values for each method across folds
results = {}

# Cross-validation calls (ensure all methods return a list of MSE values)
results['Grid Search'] = cross_validate(trader_classes, real_data, "Grid Search", grid_best_params)
results['Random Search'] = cross_validate(trader_classes, real_data, "Random Search", random_best_params)
results['Bayesian Optimisation'] = cross_validate(trader_classes, real_data, "Bayesian Optimisation", bayesian_best_params)
results['Greedy Search'] = greedy_search_with_plot(trader_classes, real_data)
results['Reinforcement Learning'] = train_and_plot_rl(trader_classes, agent, real_data, "Reinforcement Learning")

print(f"results: {results}")

mse_values = []
methods_labels = []
for method, mse_list in results.items():
    mse_values.extend(mse_list)  # Collect all MSE values
    methods_labels.extend([method] * len(mse_list))  # Label them by method

# Create box plot
plt.figure(figsize=(10, 6))
sns.boxplot(x=methods_labels, y=mse_values)
plt.ylabel('Mean Squared Error (MSE)')
plt.title('Box Plot of MSE Across Folds for Each Optimisation Method')
plt.grid(axis='y')
plt.show()
