# Bond Yield Prediction using LSTM Encoder-Decoder Architecture

This notebook implements a deep learning approach for predicting US bond yields using an LSTM encoder-decoder architecture with professor forcing.

In [None]:
# Import libraries
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm, trange
from dotenv import load_dotenv

# Machine Learning libraries
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# Deep Learning libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split
import pytorch_lightning as pl

# Visualization libraries
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Load environment variables
load_dotenv()

# Hyperparamter Optimization
from scipy.optimize import differential_evolution

# Configuration and Constants

In [None]:


# Set random seeds for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

# Model hyperparameters
SEQUENCE_LENGTH = 22
INPUT_SIZE = 3
HIDDEN_SIZE = 50
NUM_LAYERS = 2
BATCH_SIZE = 50
LEARNING_RATE = 0.006
N_EPOCHS = 500
TARGET_LENGTH = 22

# Data split ratios
TRAIN_RATIO = 0.8
VAL_RATIO = 0.75  # 75% of training data for validation split

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Data Loading and Initial Processing

In [None]:


def load_data():
    """Load and perform initial processing of bond and economic data."""
    
    # Load main bond data
    bond_data_path = os.getenv('BOND_DATA_PATH', '/content/sample_data/us-government-bond.csv')
    df = pd.read_csv(bond_data_path)
    df.dropna(inplace=True)
    
    # Load CPI data
    cpi_data_path = os.getenv('CPI_DATA_PATH', '/content/sample_data/CORESTICKM159SFRBATL.csv')
    cpi = pd.read_csv(cpi_data_path)
    cpi["DATE"] = pd.to_datetime(cpi["DATE"])
    cpi = cpi.rename(columns={"DATE": "date"})
    
    # Load ISM data  
    ism_data_path = os.getenv('ISM_DATA_PATH', '/content/sample_data/AMTMNO.csv')
    ism = pd.read_csv(ism_data_path)
    ism["DATE"] = pd.to_datetime(ism["DATE"])
    ism = ism.rename(columns={"DATE": "date"})
    
    return df, cpi, ism

# Load the data
df, cpi, ism = load_data()
print(f"Loaded bond data shape: {df.shape}")
print(f"Loaded CPI data shape: {cpi.shape}")
print(f"Loaded ISM data shape: {ism.shape}")

# Data Preprocessing and Feature Engineering

In [None]:

def preprocess_data(df, cpi, ism):
    """Complete data preprocessing pipeline."""
    
    # Process main dataframe
    df['date'] = pd.to_datetime(df['date'], format="%d/%m/%Y")
    df['DivYield'] = df['DivYield'].replace('%', '', regex=True)
    df["DivYield"] = pd.to_numeric(df["DivYield"])
    
    # Merge datasets
    df = pd.merge(df, ism, how="left", on="date")
    df = pd.merge(df, cpi, how="left", on="date")
    
    # Handle missing values
    df.fillna(method="backfill", inplace=True)
    df.dropna(inplace=True)
    
    return df

def create_features(df):
    """Create feature matrix and target variable."""
    # Separate features and target
    features_df = df.drop(['date'], axis=1)
    
    # Define columns to scale (all except target)
    columns_to_scale = [col for col in features_df.columns if col != 'us_5_year_yields']
    df_to_scale = features_df[columns_to_scale]
    df_unscaled = features_df[['us_5_year_yields']]
    
    # Apply scaling
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(df_to_scale)
    scaled_df = pd.DataFrame(scaled_data, columns=columns_to_scale, index=features_df.index)
    
    # Combine scaled and unscaled data
    features_df = pd.concat([scaled_df, df_unscaled], axis=1)
    
    return features_df, scaler

# Preprocess data
df = preprocess_data(df, cpi, ism)
features_df, scaler = create_features(df)

print(f"Preprocessed data shape: {features_df.shape}")
print(f"Features: {list(features_df.columns)}")
features_df.tail()

In [None]:
#features_df = features_df.drop(columns=["deficit_as_percent_of_gdp", "daily_us_real_gdp","DivYield", "us_10_year_yields","us_1_year_yields"])
# Using 80% of data as training data
train_size = int(len(features_df) * .8)

# Train test splitting
train_df, test_df = features_df[:train_size], features_df[train_size + 1:]
train_df.shape, test_df.shape
y_train = train_df['us_5_year_yields']
x_train = train_df.drop('us_5_year_yields', axis=1)
x_test = test_df.drop('us_5_year_yields', axis=1)
y_test = test_df['us_5_year_yields']
## Principal Component Analysis and Data Splitting

def perform_pca_and_split(features_df, n_components=3, train_ratio=0.8):
    """Perform PCA analysis and train-test split."""
    # Split data
    train_size = int(len(features_df) * train_ratio)
    train_df, test_df = features_df[:train_size], features_df[train_size + 1:]
    
    # Separate features and target
    y_train = train_df['us_5_year_yields']
    x_train = train_df.drop('us_5_year_yields', axis=1)
    x_test = test_df.drop('us_5_year_yields', axis=1)
    y_test = test_df['us_5_year_yields']
    
    # Calculate covariance matrix
    cov_matrix = np.cov(x_train, rowvar=False)
    
    # Apply PCA
    pca = PCA(n_components=n_components)
    x_train_pca = pca.fit_transform(x_train)
    x_test_pca = pca.transform(x_test)  # Use transform, not fit_transform for test
    
    # Convert to DataFrame
    x_train_pca = pd.DataFrame(x_train_pca, columns=[f'Component {i+1}' for i in range(n_components)])
    x_test_pca = pd.DataFrame(x_test_pca, columns=[f'Component {i+1}' for i in range(n_components)])
    
    return x_train_pca, x_test_pca, y_train, y_test, pca, cov_matrix

# Perform PCA and data splitting
x_train, x_test, y_train, y_test, pca, cov_matrix = perform_pca_and_split(features_df)

print(f"Training data shape: {x_train.shape}")
print(f"Test data shape: {x_test.shape}")
print(f"PCA explained variance ratio: {pca.explained_variance_ratio_}")
print(f"Total explained variance: {pca.explained_variance_ratio_.sum():.3f}")

x_train.head()

# Functions for plotting

In [None]:
## Data Visualization Functions

def plot_covariance_matrix(cov_matrix):
    """Plot covariance matrix heatmap."""
    cov_matrix_df = pd.DataFrame(cov_matrix)
    
    fig = go.Figure(data=go.Heatmap(
        z=cov_matrix_df.values,
        x=list(range(cov_matrix_df.shape[1])),
        y=list(range(cov_matrix_df.shape[0])),
        colorscale='Viridis',
        colorbar=dict(title='Covariance'),
    ))
    
    fig.update_layout(
        title='Feature Covariance Matrix Heatmap',
        xaxis_title='Feature Index',
        yaxis_title='Feature Index',
        template='plotly_white'
    )
    
    return fig

def plot_pca_explained_variance(pca):
    """Plot PCA explained variance."""
    explained_variance = pca.explained_variance_ratio_
    cumulative_variance = np.cumsum(explained_variance)
    
    fig = go.Figure()
    
    # Bar plot for explained variance
    fig.add_trace(go.Bar(
        x=[f'PC{i+1}' for i in range(len(explained_variance))],
        y=explained_variance,
        name='Explained Variance',
        yaxis='y'
    ))
    
    # Line plot for cumulative explained variance
    fig.add_trace(go.Scatter(
        x=[f'PC{i+1}' for i in range(len(cumulative_variance))],
        y=cumulative_variance,
        mode='lines+markers',
        name='Cumulative Explained Variance',
        yaxis='y2'
    ))
    
    fig.update_layout(
        title='PCA Explained Variance Analysis',
        xaxis_title='Principal Components',
        yaxis=dict(title='Explained Variance', side='left'),
        yaxis2=dict(title='Cumulative Variance', side='right', overlaying='y'),
        template='plotly_white'
    )
    
    return fig

def plot_feature_loadings(pca, feature_names):
    """Plot feature loadings for first principal component."""
    loadings = pca.components_[0]
    loadings_df = pd.DataFrame(loadings, index=feature_names, columns=['Loading'])
    
    # Generate colors
    colors = [f'rgb({np.random.randint(0, 255)}, {np.random.randint(0, 255)}, {np.random.randint(0, 255)})' 
              for _ in feature_names]
    
    fig = go.Figure(data=go.Bar(
        x=loadings_df.index,
        y=loadings_df['Loading'],
        marker_color=colors,
        name='Feature Loadings'
    ))
    
    fig.update_layout(
        title='Feature Loadings for First Principal Component',
        xaxis_title='Features',
        yaxis_title='Loading Weight',
        template='plotly_white',
        xaxis_tickangle=-45
    )
    
    return fig

# Create visualizations
cov_fig = plot_covariance_matrix(cov_matrix)
pca_fig = plot_pca_explained_variance(pca)

# Get feature names for loadings plot
feature_names = [col for col in features_df.columns if col != 'us_5_year_yields']
loadings_fig = plot_feature_loadings(pca, feature_names)

print("Covariance matrix shape:", cov_matrix.shape)
print("PCA loadings shape:", pca.components_.shape)

In [None]:
# Create the bar plot for explained variance
bar_plot = go.Bar(
    x=[f'PC{i+1}' for i in range(len(explained_variance))],
    y=explained_variance,
    name='Explained Variance'
)

# Create the line plot for cumulative explained variance
line_plot = go.Scatter(
    x=[f'PC{i+1}' for i in range(len(cumulative_variance))],
    y=cumulative_variance,
    mode='lines+markers',
    name='Cumulative Explained Variance'
)

# Combine both plots
fig = go.Figure(data=[bar_plot, line_plot])

# Update layout
fig.update_layout(
    title='Explained Variance by PCA Components',
    xaxis_title='Principal Components',
    yaxis_title='Variance Explained',
    yaxis=dict(range=[0, 1]),  # Ensuring the y-axis goes from 0 to 1
    template='plotly_white'
)

# Show plot
fig.show()


In [None]:
def random_color():
    return f'rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})'
colors = [random_color() for _ in feature_names]

# Preparing data for LSTM

In [None]:
## Data Preparation for LSTM

def split_dataframe(df, chunk_size):
    """Split dataframe into chunks of specified size, filtering out small chunks."""
    chunks = [df.iloc[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
    filtered_chunks = [chunk for chunk in chunks if len(chunk) >= chunk_size]
    return filtered_chunks

def prepare_sequences(x_train, x_test, y_train, y_test, sequence_length=SEQUENCE_LENGTH):
    """Convert data into sequences for LSTM training."""
    
    # Split into sequences
    x_train_seq = split_dataframe(x_train, sequence_length)
    x_test_seq = split_dataframe(x_test, sequence_length)
    y_train_seq = split_dataframe(y_train, sequence_length)
    y_test_seq = split_dataframe(y_test, sequence_length)
    
    # Convert to numpy arrays
    x_train_np = [np.array(seq) for seq in x_train_seq]
    x_test_np = [np.array(seq) for seq in x_test_seq]
    y_train_np = [np.array(seq) for seq in y_train_seq]
    y_test_np = [np.array(seq) for seq in y_test_seq]
    
    # Convert to PyTorch tensors
    X_train = torch.tensor(x_train_np).type(torch.float32)
    Y_train = torch.tensor(y_train_np).type(torch.float32)
    X_test = torch.tensor(x_test_np).type(torch.float32)
    Y_test = torch.tensor(y_test_np).type(torch.float32)
    
    # Transpose for LSTM input format: (seq_len, batch_size, input_size)
    X_train = X_train.transpose(0, 1)
    Y_train = Y_train.transpose(0, 1).reshape(sequence_length, -1, 1)
    X_test = X_test.transpose(0, 1)
    Y_test = Y_test.transpose(0, 1).reshape(sequence_length, -1, 1)
    return X_train, Y_train, X_test, Y_test
# Prepare sequences
X_train, Y_train, X_test, Y_test = prepare_sequences(x_train, x_test, y_train, y_test)
print(f"X_train shape: {X_train.shape}")
print(f"Y_train shape: {Y_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"Y_test shape: {Y_test.shape}")
# Move to device
X_train = X_train.to(device)
Y_train = Y_train.to(device)

# Encoder and Decoder 

In [None]:
class LSTMEncoder(nn.Module):
    """LSTM Encoder for sequence-to-sequence prediction."""
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMEncoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(
            input_size=input_size, 
            hidden_size=hidden_size, 
            num_layers=num_layers, 
            dropout=0.5
        )

    def forward(self, x_input):
        """Forward pass through encoder."""
        lstm_out, self.hidden = self.lstm(
            x_input.view(x_input.shape[0], x_input.shape[1], self.input_size)
        )
        return lstm_out, self.hidden

    def init_hidden(self, batch_size):
        """Initialize hidden states."""
        return (
            torch.zeros(self.num_layers, batch_size, self.hidden_size),
            torch.zeros(self.num_layers, batch_size, self.hidden_size)
        )


class LSTMDecoder(nn.Module):
    """LSTM Decoder for sequence-to-sequence prediction."""
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMDecoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(
            input_size=input_size, 
            hidden_size=hidden_size, 
            num_layers=num_layers, 
            dropout=0.5
        )
        self.linear = nn.Linear(hidden_size, 1)

    def forward(self, x_input, encoder_hidden_states):
        """Forward pass through decoder."""
        lstm_out, self.hidden = self.lstm(x_input, encoder_hidden_states)
        output = self.linear(lstm_out)
        return output, self.hidden


class LSTMDecoder2(nn.Module):
    """Alternative LSTM Decoder with different input size."""
    def __init__(self, input_size, hidden_size, num_layers=1):
        super(LSTMDecoder2, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(
            input_size=1, 
            hidden_size=hidden_size, 
            num_layers=num_layers, 
            batch_first=True
        )
        self.linear = nn.Linear(hidden_size, 1)

    def forward(self, x_input, encoder_hidden_states):
        """Forward pass through alternative decoder."""
        lstm_out, self.hidden = self.lstm(x_input, encoder_hidden_states)
        output = self.linear(lstm_out)
        return output, self.hidden



# Discriminator for Professor Forcing 

In [None]:

class Discriminator(nn.Module):
    """Discriminator for Professor Forcing training."""
    def __init__(self, input_size, hidden_size, linear_size, lin_dropout):
        super(Discriminator, self).__init__()
        self.hidden_size = hidden_size
        
        self.lstm = nn.LSTM(
            input_size=input_size, 
            hidden_size=hidden_size, 
            num_layers=2, 
            batch_first=True
        )
        
        self.linears = nn.Sequential(
            nn.Linear(hidden_size * 2, linear_size),
            nn.ReLU(),
            nn.Dropout(lin_dropout),
            nn.ReLU(),
            nn.Dropout(lin_dropout),
            nn.Linear(linear_size, 1),
            nn.Sigmoid()
        )

    def forward(self, hidden_states):
        """Forward pass through discriminator."""
        batch_size = hidden_states.size(0)
        initial_hidden = self.init_hidden(batch_size)
        _, rnn_final_hidden = self.lstm(hidden_states, initial_hidden)
        
        rnn_final_hidden = (
            rnn_final_hidden[0].view(batch_size, -1), 
            rnn_final_hidden[1].view(batch_size, -1)
        )
        
        scores = self.linears(rnn_final_hidden[0])
        return scores

    def init_hidden(self, batch_size):
        """Initialize hidden states for discriminator."""
        hidden_1 = torch.zeros(2, batch_size, self.hidden_size)
        hidden_2 = torch.zeros(2, batch_size, self.hidden_size)
        return (hidden_1, hidden_2)

# Initialize loss functions
criterion = nn.MSELoss()
binary_cross_entropy = nn.BCELoss()

print("Model architecture defined successfully!")

# Main LSTM Sequence-to-Sequence Model

In [None]:
class LSTMSeq2Seq(nn.Module):
    """Complete LSTM Encoder-Decoder model with training and prediction capabilities."""
    def __init__(self, input_size, hidden_size):
        super(LSTMSeq2Seq, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.encoder = LSTMEncoder(input_size=input_size, hidden_size=hidden_size, num_layers=2)
        self.decoder = LSTMDecoder(input_size=input_size, hidden_size=hidden_size, num_layers=2)
        self.decoder2 = LSTMDecoder2(input_size=1, hidden_size=hidden_size, num_layers=2)

    def train_model(self, input_tensor, target_tensor, n_epochs, target_len, batch_size, 
                   training_prediction="recursive", teacher_forcing_ratio=0.5, 
                   learning_rate=0.01, dynamic_tf=False):
        """Train the model with specified parameters."""
        losses = np.full(n_epochs, np.nan)
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        criterion = nn.MSELoss()
        n_batches = int(input_tensor.shape[1] // batch_size)
        
        print(f"Training with {n_batches} batches")
        
        with trange(n_epochs) as tr:
            for it in tr:
                batch_loss = 0
                
                for b in range(n_batches):
                    # Get batch data
                    input_batch = input_tensor[:, b: b + batch_size, :]
                    target_batch = target_tensor[:, b: b + batch_size, :]
                    outputs = torch.zeros(target_len, batch_size, 1)
                    
                    # Initialize encoder
                    encoder_hidden = self.encoder.init_hidden(batch_size=batch_size)
                    optimizer.zero_grad()
                    
                    # Encode
                    encoder_output, encoder_hidden = self.encoder(input_batch)
                    decoder_input = input_batch[-1, :, :]
                    
                    # Prepare decoder hidden state
                    hidden_state = encoder_hidden[0]
                    cell_state = encoder_hidden[1]
                    decoder_hidden = (hidden_state[:, 0, :], cell_state[:, 0, :])
                    
                    # Decode based on training strategy
                    if training_prediction == "recursive":
                        for t in range(target_len):
                            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
                            outputs[t] = decoder_output
                            decoder_input = decoder_output
                            
                    elif training_prediction == "teacher_forcing":
                        for t in range(target_len):
                            if t == 0:
                                decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
                                outputs[t] = decoder_output
                                decoder_input = target_batch[t, :, :]
                            else:
                                decoder_output, decoder_hidden = self.decoder2(decoder_input, decoder_hidden)
                                outputs[t] = decoder_output
                                decoder_input = target_batch[t, :, :]
                    
                    # Calculate loss and backpropagate
                    target_batch = target_batch.reshape(target_batch.shape[0], target_batch.shape[1], 1)
                    loss = criterion(outputs, target_batch)
                    batch_loss += loss.item()
                    loss.backward()
                    optimizer.step()
                
                losses[it] = batch_loss
                if dynamic_tf and teacher_forcing_ratio > 0:
                    teacher_forcing_ratio = teacher_forcing_ratio - 0.02
                    
                tr.set_postfix(loss=f"{batch_loss:.3f}")
        
        # Save model
        model_save_path = os.getenv('MODEL_SAVE_PATH', 'trained_model.pth')
        torch.save(self.state_dict(), model_save_path)
        return sum(losses) / len(losses)

    def predict(self, input_tensor, target_len):
        """Generate predictions using the trained model."""
        encoder_output, encoder_hidden = self.encoder(input_tensor)
        outputs = torch.zeros(target_len, input_tensor.shape[1], 1)
        # Prepare decoder
        decoder_input = input_tensor[-1, :, :]
        hidden_state = encoder_hidden[0]
        cell_state = encoder_hidden[1]
        decoder_hidden = (hidden_state[:, 0, :], cell_state[:, 0, :])
        
        # Generate predictions
        for t in range(target_len):
            if t == 0:
                decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
                outputs[t] = decoder_output
                decoder_input = decoder_output
            else:
                decoder_output, decoder_hidden = self.decoder2(decoder_input, decoder_hidden)
                outputs[t] = decoder_output
                decoder_input = decoder_output
        
        return outputs.detach().numpy()



# Alternate Sequence to Sequence Model with Professor Forcing training step

In [None]:

class ModelAlternate(LSTMSeq2Seq):
    """Alternative model with Professor Forcing training."""
    def __init__(self, input_size, hidden_size):
        super(ModelAlternate, self).__init__(hidden_size=hidden_size, input_size=input_size)
        self.discriminator = Discriminator(
            input_size=1, 
            hidden_size=hidden_size, 
            linear_size=64, 
            lin_dropout=0.5
        )
        self.other_params = [
            {'params': self.encoder.parameters(), 'lr': 0.0001},
            {'params': self.decoder.parameters(), 'lr': 0.0002},
            {'params': self.decoder2.parameters(), 'lr': 0.0003, 'weight_decay': 1e-4}
        ]

    def adversarial_train(self, learning_rate, input_tensor, target_tensor, 
                         n_epochs, target_len, batch_size):
        """Train model using adversarial approach with discriminator."""
        losses = np.full(n_epochs, np.nan)
        gen_optimizer = optim.SGD(self.other_params)
        disc_optimizer = optim.SGD(self.discriminator.parameters(), lr=0.003)
        
        n_batches = int(input_tensor.shape[1] // batch_size)
        
        with trange(n_epochs) as tr:
            for it in tr:
                for b in range(n_batches):
                    input_batch = input_tensor[:, b:b + batch_size, :]
                    target_batch = target_tensor[:, b:b + batch_size, :]
                    outputs = torch.zeros(target_len, batch_size, 1).to(input_tensor.device)
                    labels = torch.zeros(target_len, batch_size, 1).to(input_tensor.device)
                    
                    encoder_hidden = self.encoder.init_hidden(batch_size=batch_size)
                    gen_optimizer.zero_grad()
                    disc_optimizer.zero_grad()
                    
                    encoder_output, encoder_hidden = self.encoder(input_batch)
                    decoder_input = input_batch[-1, :, :]
                    
                    hidden_state = encoder_hidden[0]
                    cell_state = encoder_hidden[1]
                    decoder_hidden = (hidden_state[:, 0, :], cell_state[:, 0, :])
                    
                    for t in range(target_len):
                        if t == 0:
                            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
                            outputs[t] = decoder_output
                            decoder_input = target_batch[t, :, :]
                        else:
                            decoder_output, decoder_hidden = self.decoder2(decoder_input, decoder_hidden)
                            outputs[t] = decoder_output
                            decoder_input = torch.cat([
                                decoder_output[0:25, :], 
                                target_batch[t, 0:25, :]
                            ], dim=0)
                        
                        labels[t] = torch.cat([torch.ones(25, 1), torch.zeros(25, 1)], dim=0)
                    
                    labels = labels.transpose(1, 0)
                    outputs = outputs.transpose(1, 0)
                    
                    preds = self.discriminator(outputs)
                    indices = torch.randperm(preds.size(0))
                    preds = preds[indices]
                    labels = labels[indices][:, :, 0, :]
                    
                    discriminator_loss = binary_cross_entropy(preds, labels)
                    generator_loss = -discriminator_loss
                    
                    if b % 2 == 0:
                        generator_loss.backward()
                        gen_optimizer.step()
                    else:
                        discriminator_loss.backward()
                        disc_optimizer.step()
        
        # Save model
        model_save_path = os.getenv('MODEL_SAVE_PATH', 'trained_model.pth')
        torch.save(self.state_dict(), model_save_path)

print("Main model classes defined successfully!")

# Helper Functions for Manipulation

In [None]:
def list_to_numpy(list_of_lists):
    """
    Convert all lists in a list of lists to NumPy arrays.
    """
    collection =  [np.array(sublist) for sublist in list_of_lists]
    return collection

def numpy_to_torch(Xtrain, Ytrain, Xtest, Ytest):
    '''
    Convert numpy array to PyTorch tensor
    '''
    X_train_torch = torch.tensor(Xtrain).type(torch.float32)
    Y_train_torch = torch.tensor(Ytrain).type(torch.float32)

    X_test_torch = torch.tensor(Xtest).type(torch.float32)
    Y_test_torch = torch.tensor(Ytest).type(torch.float32)

    return X_train_torch, Y_train_torch, X_test_torch, Y_test_torch

In [None]:

x_train = list_to_numpy(x_train)
x_test = list_to_numpy(x_test)
y_train = list_to_numpy(y_train)
y_test = list_to_numpy(y_test)
X_train, Y_train, X_test, Y_test = numpy_to_torch(x_train, y_train, x_test, y_test)
X_train = X_train.transpose(0, 1)  # From (batch_size, seq_len, input_length) to (seq_len, batch_size, input_length)
Y_train = Y_train.transpose(0, 1)  # From (seq_len, batch_size, 1) to (batch_size, seq_len, 1)
X_test = X_test.transpose(0, 1)    # From (batch_size, seq_len, input_length) to (seq_len, batch_size, input_length)
Y_test = Y_test.transpose(0, 1)
Y_test = Y_test.reshape(22,50,1)
Y_train = Y_train.reshape(22,200,1)
Y_train.shape

# Main Training 

In [None]:
model = lstm_seq2seq(input_size = 3, hidden_size = 50)
model = model.to(device)
loss = model.train_model(X_train, Y_train, n_epochs = 500, target_len = 22, batch_size = 50, training_prediction = 'teacher_forcing', teacher_forcing_ratio = 0.6, learning_rate = 0.006, dynamic_tf = False)


In [None]:


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train = X_train.to(device)
Y_train = Y_train.to(device)


4


100%|██████████| 500/500 [02:38<00:00,  3.16it/s, loss=0.025]


# Training with Professor Forcing

In [None]:
extra = model_alternate(input_size = 3, hidden_size = 50).to(device)
model.load_state_dict(torch.load('trained_model.pth'))
extra.adversarial_train(input_tensor=X_train, target_tensor=Y_train, n_epochs = 500, target_len = 22, batch_size = 50, learning_rate=0.003)

# Hyperparameter Optimisation

In [None]:



param_bounds = {
    'epochs': (400, 500),            
    'learning_rate': (0.0001, 0.01), 
    'batch_size': (16, 128),         
    'hidden_size': (50, 300)         
}

def objective_function(params):
    epochs = int(params[0])
    learning_rate = params[1]
    batch_size = int(params[2])
    hidden_size = int(params[3])
    model_2 = lstm_seq2seq(input_size=3, hidden_size= hidden_size)
    losses = model_2.train_model(X_train_split, Y_train_split, n_epochs=epochs, target_len=22,
                                 batch_size=batch_size, training_prediction='teacher_forcing',
                                 teacher_forcing_ratio=0.6, learning_rate=learning_rate, dynamic_tf=False)

    Y_val_pred = model_2.predict(X_val_split, 22)
    mse = calculate_mse(Y_val_pred, Y_val_split)
    return mse[0]

num_iterations = 1  
popsize = 1  
total_evaluations = num_iterations * popsize
result = differential_evolution(objective_function,
                                    bounds=[param_bounds['epochs'], param_bounds['learning_rate'],
                                            param_bounds['batch_size'], param_bounds['hidden_size']],
                                    strategy='best1bin', maxiter=num_iterations, popsize=popsize,
                                    tol=0.01, seed=42, disp=True)
