In [34]:
#"CNN + BiLSTM as Encoder, Transformer as Decoder" topology
import pandas as pd
import numpy as np
import torch.nn as nn
import torch
import torch.optim as optim

In [13]:
dataset = pd.read_csv('dataset\Gold Price (2013-2023).csv')
dataset.drop(['Vol.','Change %'],axis=1,inplace= True)
dataset['Date'] = pd.to_datetime(dataset['Date'],format = "%m/%d/%Y")
dataset.sort_values(by='Date', ascending=True, inplace=True)
dataset.reset_index(drop=True, inplace=True)
dataset['Date'] = pd.to_datetime(dataset['Date'])
dataset.sort_values(by='Date', ascending=True, inplace=True)
dataset.reset_index(drop=True, inplace=True)


In [14]:

NumCols = dataset.columns.drop(['Date'])
dataset[NumCols] = dataset[NumCols].replace({',': ''}, regex=True)
dataset[NumCols] = dataset[NumCols].astype('float64')
data_array = dataset[NumCols].to_numpy()

def extract_year(date_str):
    return pd.to_datetime(date_str).yea

train_data = data_array[dataset['Date'].dt.year != 2022] 
test_data = data_array[dataset['Date'].dt.year == 2022]

In [31]:
timesteps= 15

def create_dataset(data, lookback=1):
    X, Y = [], []
    for i in range(len(data) - lookback):
        X.append(data[i:(i + lookback), :])
        Y.append(data[i + lookback, :])
    return np.array(X), np.array(Y)

train_X, train_Y = create_dataset(train_data, timesteps)
test_X, test_Y = create_dataset(test_data, timesteps)


print(test_Y.shape) # samples , timesteps , features

(245, 4)


In [57]:
class MyModel(nn.Module):
    def __init__(self, input_features, seq_len=15):
        super(MyModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_features, out_channels=64, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.lstm = nn.LSTM(input_size=64, hidden_size=64, batch_first=True, bidirectional=True)
        self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2)  # Changed kernel size for appropriate dimension reduction

        pooled_output_length = seq_len // 2  # After max pooling
        lstm_output_channels = 64 * 2  # Bidirectional LSTM doubles the output size
        self.flattened_size = pooled_output_length * lstm_output_channels
        self.flatten = nn.Flatten()
        self.dense1 = nn.Linear(self.flattened_size, 512)   
        
    def forward(self, x):
    # Adjusting shape for Conv1d (batch, channels, seq_len)
        x = x.permute(0, 2, 1)  # Now x has shape [batch_size, 4, 15] which matches the expected input shape for the Conv1d layer
    
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.maxpool(x)
    
    # Adjust shape for LSTM (batch, seq_len, features)
    # After Conv1d and pooling, the shape is [batch_size, channels, reduced_seq_len]
    # LSTM expects (batch, seq_len, features), so we need to permute back
        x = x.permute(0, 2, 1)  # Assuming LSTM is configured with batch_first=True
    
        x, _ = self.lstm(x)
        x = self.flatten(x)
        x = self.relu(self.dense1(x))
        return x

In [55]:
class TransformerDecoderModule(nn.Module):
    def __init__(self, d_model, nhead, num_decoder_layers, dim_feedforward=2048, dropout=0.1):
        super(TransformerDecoderModule, self).__init__()
        self.decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead,dim_feedforward=dim_feedforward, dropout=dropout)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_decoder_layers)

    def forward(self, tgt, memory=None, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
        output = self.transformer_decoder(tgt=tgt, memory=memory, tgt_mask=tgt_mask,
                                          memory_mask=memory_mask, tgt_key_padding_mask=tgt_key_padding_mask,
                                          memory_key_padding_mask=memory_key_padding_mask)
        return output

In [65]:
# Define a complete model integrating both the encoder and the decoder
class CompleteModel(nn.Module):
    def __init__(self, input_features, seq_len, d_model=512, nhead=8, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super(CompleteModel, self).__init__()
        self.encoder = MyModel(input_features=input_features, seq_len=seq_len)
        self.decoder = TransformerDecoderModule(d_model=d_model, nhead=nhead,
                                                num_decoder_layers=num_decoder_layers,
                                                dim_feedforward=dim_feedforward, dropout=dropout)
        self.output_layer = nn.Linear(d_model, input_features)  
        
    def forward(self, src, tgt):
        print(f'Input src Dimension: {src.shape}')
        memory = self.encoder(src)
        print(f'Output Dimension after Encoder: {memory.shape}')
        tgt = torch.zeros_like(memory)
        output = self.decoder(tgt, memory)
        print(f'Output Dimension after Decoder: {output.shape}')
        output = self.output_layer(output)
        print(f'Final Output Dimension: {output.shape}')
        return output

# Initialize the complete model
model = CompleteModel(input_features=4, seq_len=15)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Prepare data for PyTorch; convert numpy arrays to tensors and send to the device
train_X_tensor = torch.tensor(train_X, dtype=torch.float).to(device)
train_Y_tensor = torch.tensor(train_Y, dtype=torch.float).to(device)
test_X_tensor = torch.tensor(test_X, dtype=torch.float).to(device)
test_Y_tensor = torch.tensor(test_Y, dtype=torch.float).to(device)

# Define the loss function and optimizer
loss_function = nn.MSELoss()  # Mean Squared Error Loss for regression tasks
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 10  # Example number of epochs
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    output = model(train_X_tensor, None)  # None for tgt since it's generated within the model
    loss = loss_function(output, train_Y_tensor)
    loss.backward()
    optimizer.step()

    print(f'Epoch {epoch+1}, Loss: {loss.item()}')

    
    model.eval()
    with torch.no_grad():
        test_output = model(test_X_tensor, None)
        test_loss = loss_function(test_output, test_Y_tensor)
        print(f'Test Loss: {test_loss.item()}')


Input src Dimension: torch.Size([2308, 15, 4])
Output Dimension after Encoder: torch.Size([2308, 512])
Output Dimension after Decoder: torch.Size([2308, 512])
Final Output Dimension: torch.Size([2308, 4])
Epoch 1, Loss: 2009982.5
Input src Dimension: torch.Size([245, 15, 4])
Output Dimension after Encoder: torch.Size([245, 512])
Output Dimension after Decoder: torch.Size([245, 512])
Final Output Dimension: torch.Size([245, 4])
Test Loss: 3245835.25
Input src Dimension: torch.Size([2308, 15, 4])
Output Dimension after Encoder: torch.Size([2308, 512])
Output Dimension after Decoder: torch.Size([2308, 512])
Final Output Dimension: torch.Size([2308, 4])
Epoch 2, Loss: 1996016.5
Input src Dimension: torch.Size([245, 15, 4])
Output Dimension after Encoder: torch.Size([245, 512])
Output Dimension after Decoder: torch.Size([245, 512])
Final Output Dimension: torch.Size([245, 4])
Test Loss: 3240528.0
Input src Dimension: torch.Size([2308, 15, 4])
Output Dimension after Encoder: torch.Size([2308