In [None]:
import os
import torch
from torch.utils.data import Dataset

class TensorDataset(Dataset):
    def __init__(self, tensor_list):
        self.tensor_list = tensor_list

    def __len__(self):
        return len(self.tensor_list)

    def __getitem__(self, idx):
        feature = torch.cat((self.tensor_list[idx][0], self.tensor_list[idx][1], self.tensor_list[idx][2].unsqueeze(0)), dim=0).float()
        target = self.tensor_list[idx][3].float()
        return feature, target

# Directory containing the tensors
dir_path = "/content/data/input_tensors"

# List all files in the directory
files = [file for file in os.listdir(dir_path) if file.endswith('.pt')]

# Load all tensors into a list
tensors = [torch.load(os.path.join(dir_path, file)) for file in files]
from torch.utils.data import random_split

# Create the dataset
dataset = TensorDataset(tensors)

# Determine the sizes for each split
total_size = len(dataset)
train_size = int(0.6 * total_size)
val_size = int(0.2 * total_size)
test_size = total_size - train_size - val_size

# Split the dataset
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

from torch.utils.data import DataLoader

# Define batch size
batch_size = 32

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import torch
from torch.nn.modules.transformer import TransformerEncoder, TransformerEncoderLayer
from torch.nn.modules import LayerNorm, Linear, Sequential, ReLU
import torch.nn as nn
import torch.nn.functional as F

class TransformerModel(nn.Module):
    def __init__(self, input_dim=2561, d_model=512, n_head=8, n_layers=6,
                 d_inner=1024, activation="relu", dropout=0.02):
        super(TransformerModel, self).__init__()

        encoder_layer = TransformerEncoderLayer(d_model, n_head, d_inner, dropout, activation)
        encoder_norm = LayerNorm(d_model)

        self.inlinear = Linear(input_dim, d_model)
        self.relu = ReLU()
        self.transformerencoder = TransformerEncoder(encoder_layer, n_layers, encoder_norm)
        self.outlinear = Linear(d_model, 1)  # Output layer for regression

    def forward(self, x):
        x = self.inlinear(x)
        x = self.relu(x)
        x = x.unsqueeze(1)  # Add sequence dimension T=1
        x = x.transpose(0, 1)  # N x T x D -> T x N x D
        x = self.transformerencoder(x)
        x = x.transpose(0, 1)  # T x N x D -> N x T x D
        x = x.squeeze(1)  # Remove sequence dimension
        x = self.relu(x)
        output = self.outlinear(x)
        return output

In [None]:
import torch
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
from tqdm import tqdm
import logging
import matplotlib.pyplot as plt

def calculate_baseline_mse(train_loader, val_loader):
    train_targets = np.concatenate([batch[1].cpu().numpy() for batch in train_loader])
    mean_train_target = np.mean(train_targets)
    val_targets = np.concatenate([batch[1].cpu().numpy() for batch in val_loader])
    mean_predictions = np.full_like(val_targets, mean_train_target)
    baseline_mse = mean_squared_error(val_targets, mean_predictions)
    return baseline_mse

def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs, baseline_mse):
    model.train()
    losses = []
    val_losses = []  # Store validation losses
    for epoch in range(num_epochs):
        running_loss = 0.0
        all_labels = []
        all_predictions = []
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')
        for batch in progress_bar:
            inputs, labels = batch[:2]
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs.float()).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            all_predictions.extend(outputs.detach().cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            current_loss = running_loss / ((progress_bar.n + 1) * train_loader.batch_size)
            current_mse = mean_squared_error(all_labels, all_predictions)
            current_r2 = r2_score(all_labels, all_predictions)
            progress_bar.set_postfix({
                'Loss': f'{current_loss:.4f}',
                'MSE': f'{current_mse:.4f}',
                'R^2': f'{current_r2:.4f}'
            })

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_mse = mean_squared_error(all_labels, all_predictions)
        epoch_r2 = r2_score(all_labels, all_predictions)
        logging.info(f'End of Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, MSE: {epoch_mse:.4f}, R^2: {epoch_r2:.4f}')

        if epoch_mse < baseline_mse:
            logging.info(f'Model is performing better than the baseline with MSE: {epoch_mse:.4f} < {baseline_mse:.4f}')
        else:
            logging.info(f'Model is not performing better than the baseline with MSE: {epoch_mse:.4f} >= {baseline_mse:.4f}')

        losses.append(epoch_loss)

        # At the end of each epoch, calculate validation loss
        val_loss, _, _ = validate_model(model, val_loader, criterion, device, baseline_mse)
        val_losses.append(val_loss)

    return losses, val_losses  # Return validation losses along with training losses

def validate_model(model, val_loader, criterion, device, baseline_mse):
    model.eval()
    total_loss = 0
    total_items = 0
    all_labels = []
    all_predictions = []
    progress_bar = tqdm(val_loader, desc='Validation')
    with torch.no_grad():
        for inputs, labels in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs.float()).squeeze()
            loss = criterion(outputs, labels)

            total_loss += loss.item() * inputs.size(0)
            total_items += inputs.size(0)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(outputs.cpu().numpy())

            current_loss = total_loss / total_items
            current_mse = mean_squared_error(all_labels, all_predictions)
            current_r2 = r2_score(all_labels, all_predictions)
            progress_bar.set_postfix({
                'Loss': f'{current_loss:.4f}',
                'MSE': f'{current_mse:.4f}',
                'R^2': f'{current_r2:.4f}'
            })

    avg_loss = total_loss / len(val_loader.dataset)
    mse = mean_squared_error(all_labels, all_predictions)
    r2 = r2_score(all_labels, all_predictions)
    logging.info(f'Validation Loss: {avg_loss:.4f}, MSE: {mse:.4f}, R^2: {r2:.4f}')

    if mse < baseline_mse:
        logging.info(f'Model is performing better than the baseline with MSE: {mse:.4f} < {baseline_mse:.4f}')
    else:
        logging.info(f'Model is not performing better than the baseline with MSE: {mse:.4f} >= {baseline_mse:.4f}')

    return avg_loss, all_labels, all_predictions

def test_model(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0
    total_items = 0
    all_labels = []
    all_predictions = []
    progress_bar = tqdm(test_loader, desc='Testing')
    with torch.no_grad():
        for inputs, labels in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs.float()).squeeze()
            loss = criterion(outputs, labels)

            total_loss += loss.item() * inputs.size(0)
            total_items += inputs.size(0)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(outputs.cpu().numpy())

            current_loss = total_loss / total_items
            current_mse = mean_squared_error(all_labels, all_predictions)
            current_r2 = r2_score(all_labels, all_predictions)
            progress_bar.set_postfix({
                'Loss': f'{current_loss:.4f}',
                'MSE': f'{current_mse:.4f}',
                'R^2': f'{current_r2:.4f}'
            })

    avg_loss = total_loss / len(test_loader.dataset)
    mse = mean_squared_error(all_labels, all_predictions)
    r2 = r2_score(all_labels, all_predictions)
    logging.info(f'Test Loss: {avg_loss:.4f}, MSE: {mse:.4f}, R^2: {r2:.4f}')
    print(f'Test Loss: {avg_loss:.4f}, MSE: {mse:.4f}, R^2: {r2:.4f}')

    return all_labels, all_predictions

# Calculate baseline MSE before training
baseline_mse = calculate_baseline_mse(train_loader, val_loader)
print(f'Baseline MSE: {baseline_mse:.4f}')

# Example usage
num_epochs = 20

# Set up logging
logging.basicConfig(filename='training.log', level=logging.INFO)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(input_dim=2561).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

train_losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs, baseline_mse)
val_loss, val_labels, val_predictions = validate_model(model, val_loader, criterion, device, baseline_mse)
test_labels, test_predictions = test_model(model, test_loader, criterion, device)

# Plot training and validation loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')  # Plot validation losses
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot residual plot
residuals = np.array(test_labels) - np.array(test_predictions)
plt.figure(figsize=(10, 5))
plt.scatter(test_predictions, residuals)
plt.xlabel('Predicted')
plt.ylabel('Residuals')
plt.axhline(y=0, color='r', linestyle='-')
plt.title('Residual Plot')
plt.show()

# Plot actual vs predicted plot
plt.figure(figsize=(10, 5))
plt.scatter(test_labels, test_predictions)
plt.xlabel('Actual')
plt.ylabel('Predicted')
plt.title('Actual vs Predicted Plot')
plt.show()