In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn.functional as F

import pandas as pd

import random

torch.set_printoptions(precision=3, threshold=10000, linewidth=140)

## Data

In [2]:
class Dataset(Dataset):
    def __init__(self, features_file, label_file):
        self.features = pd.read_csv(features_file)
        self.labels = pd.read_csv(label_file)

    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        tensor_features = torch.tensor(self.features.iloc[idx].values, dtype=torch.float)
        tensor_labels = torch.tensor(self.labels.iloc[idx].values, dtype=torch.float)

        return tensor_features, tensor_labels

In [3]:
dataset = Dataset('H1_AI_Dataset/H1_Features_24.csv', 'H1_AI_Dataset/H1_Labels_S_24.csv')

In [4]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

In [5]:
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

## MLP

In [6]:
class MLP(nn.Module):
    def __init__(self, input_size=25, hidden_size=128, output_size=6):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, hidden_size)
        
        self.fc5 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.dropout(x, p=0.2)
        
        x = F.relu(self.fc2(x))
        x = F.dropout(x, p=0.2)
        x = F.relu(self.fc3(x))
        x = F.dropout(x, p=0.2)
        x = F.relu(self.fc4(x))
        x = F.dropout(x, p=0.2)
        
        x = self.fc5(x)  
        return x

In [7]:
class RMSE(nn.Module):
    def __init__(self, eps=1e-6):
        super().__init__()
        self.mse = nn.MSELoss()
        self.eps = eps
        
    def forward(self,yhat,y):
        loss = torch.sqrt(self.mse(yhat,y) + self.eps)
        return loss

In [8]:
# Instantiate the model, optimizer, and loss function
model = MLP()
optimizer = optim.Adam(model.parameters(), lr=0.01)
loss_function = RMSE()

In [9]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

MLP(
  (fc1): Linear(in_features=25, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=128, bias=True)
  (fc3): Linear(in_features=128, out_features=128, bias=True)
  (fc4): Linear(in_features=128, out_features=128, bias=True)
  (fc5): Linear(in_features=128, out_features=6, bias=True)
)

In [10]:
# Training loop
num_epochs = 4
for epoch in range(num_epochs):

    
    model.train()  # Set the model to training mode
    total_loss = 0
    for batch_idx, (data, targets) in enumerate(train_dataloader):
        data, targets = data.to(device), targets.to(device)

        # Forward pass
        outputs = model(data)
        loss = loss_function(outputs, targets)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {total_loss/len(train_dataloader):.4f}')

    

    # Evaluate the model on the test set
    model.eval()  # Set the model to evaluation mode
    total_test_loss = 0
    with torch.no_grad():  # No need to track gradients for validation data
        for batch_idx, (data, targets) in enumerate(test_dataloader):
            data, targets = data.to(device), targets.to(device)

            # Forward pass
            outputs = model(data)
            loss = loss_function(outputs, targets)
            total_test_loss += loss.item()

            
            # Randomly print data and targets based on percentage - on last epoch
            if (epoch == num_epochs-1) and (random.random() < 0.1):
                
                item_idx = random.randint(0, data.size(0) - 1)

                item_target = targets[item_idx]
                item_output = outputs[item_idx]
                
                print(f"Target: {item_target.cpu().numpy()}")
                print(f"Output: {item_output.cpu().numpy()}")

    print(f'Epoch [{epoch+1}/{num_epochs}], Test Loss: {total_test_loss/len(test_dataloader):.4f}')

Epoch [1/4], Training Loss: 51.9286
Epoch [1/4], Test Loss: 31.6714
Epoch [2/4], Training Loss: 52.3493
Epoch [2/4], Test Loss: 30.3493
Epoch [3/4], Training Loss: 51.8530
Epoch [3/4], Test Loss: 30.2715
Epoch [4/4], Training Loss: 51.0935
Target: [38.493057   1.2402778  0.5197222  6.557778   3.7933333  2.89     ]
Output: [ 9.031456   7.713427   7.651391  11.565087   4.7313085  6.5739675]
Target: [9.825      2.2616668  0.49944445 8.106112   3.8944445  3.0641668 ]
Output: [ 8.539504   7.532134  10.418659  11.5612135  4.348647   6.338345 ]
Target: [12.572778    0.75222224  0.5013889  22.981388    3.7572222   2.8633332 ]
Output: [ 6.108394   7.2971187  3.242443  10.577803   4.3163157  5.7599688]
Target: [11.889167    0.73694444  0.49916667  0.99916667  3.9269445   3.0072222 ]
Output: [ 7.149041   7.2513857  4.128093  11.840281   4.4177322  6.2903295]
Target: [ 0.25916666 15.881945   26.195833    1.0036111   5.641667    2.81      ]
Output: [ 7.932976   7.797027   5.7269473 11.617718   4.39