# Import packages

In [1]:
# Pytorch packages
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import transforms
from torchinfo import summary

import os
import pandas as pd

import glob
import csv
import numpy as np
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

# Plotting
import matplotlib.pyplot as plt

# Select device (CPU or GPU)

## Configure device

In [2]:
# Determine the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


# Read the data

## Create the custom data loader

In [3]:
# Custom dataset class
class FileDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.file_list = []
        self.labels = []
        
        # The number of classes
        self.n_classes = 0

        # Extract file paths and labels
        self._extract_file_paths()
        
         # Perform label encoding as one-hot encoding
        self._encode_labels()

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_path = self.file_list[idx]
        label = self.labels[idx]

        # Read the file
        data = self._read_file(file_path)

        # Apply transformation if provided
        if self.transform:
            data = self.transform(data)

        return data, label

    def _extract_file_paths(self):
        label_dirs = os.listdir(self.root_dir)

        for label in label_dirs:
            label_dir = os.path.join(self.root_dir, label)
            if os.path.isdir(label_dir):
                files = glob.glob(os.path.join(label_dir, '*.csv'))
                self.file_list.extend(files)
                self.labels.extend([label] * len(files))

    def _read_file(self, file_path):
        
        signal_data = np.loadtxt(file_path, dtype=np.float32, delimiter=",")
        # Transform to torch vector and reshape to column vector
        return torch.from_numpy(signal_data)
        
        # Implement your own file reading logic here
        # For example, if you're working with CSV files, you can use pandas
        # dataframe = pd.read_csv(file_path)
        # return dataframe.values

        # In this example, we assume a simple text file and read its content
        #with open(file_path, 'r') as file:
        #    content = file.read()

        #return content
        
    def _encode_labels(self):
        
        label_encoder = LabelEncoder()
        integer_encoded = label_encoder.fit_transform(self.labels)
        
        # Print the original classes names
        #print(label_encoder.classes_)
        
        # Reshape to column vector
        integer_encoded = integer_encoded.reshape(-1, 1)

        # JCPS "sparse" deprecated in version 1.2, use "sparse_output" from version 1.4
        #onehot_encoder = OneHotEncoder(sparse=False)
        onehot_encoder = OneHotEncoder(sparse_output=False)
        
        onehot_encoded = onehot_encoder.fit_transform(integer_encoded)
        
        self.n_classes = onehot_encoded.shape[1]
        
        self.labels = torch.tensor(onehot_encoded, dtype=torch.float32)
        #self.labels = torch.tensor(onehot_encoded, dtype=torch.long)

## Define transformations for the raw data

In [4]:
# This reshape the raw data to a row vector

class ToRowVector(object):
    """Transforms the input signal to a row vector"""
    
    def __call__(self, sample):
        #reshaped_sample = sample.view(1, -1)
        #print(sample.shape[0])
        #print(sample.shape[1])
        
        # Slide the data, use only the first row
        preshaped = sample[0,:]
                        
        #reshaped_sample = sample.view(sample.shape[0]*sample.shape[1])
        reshaped_sample = preshaped.view(preshaped.shape[0])
        
        #return sample
        return reshaped_sample

## Instantiate the data loader

In [5]:
# The folder with the dataset
folder_name = "../02_python_signal_folder_sorting/sorted_signals_by_mhr"
batch_size = 5

# Create training and validation datasets
train_dataset = FileDataset(folder_name, transform=transforms.Compose([ToRowVector()]))
#train_dataset = FileDataset(folder_name)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
print("Train loader: Total number of batches {} using {} items per batch. Total samples {}".format(len(train_loader), batch_size, len(train_loader) * batch_size))
print("Test loader: Total number of batches {} using {} items per batch. Total samples {}".format(len(test_loader), batch_size, len(test_loader) * batch_size))

Train loader: Total number of batches 20 using 5 items per batch. Total samples 100
Test loader: Total number of batches 20 using 5 items per batch. Total samples 100


# Define the Neural Network

In [6]:
# Define the neural network architecture
class BaseClassifier(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(BaseClassifier, self).__init__()
           
        # Define your layers here   
        
        #self.linear1 = nn.Linear(in_dim, 1024, bias=True)
        #self.linear2 = nn.Linear(1024, 256)
        #self.relu = nn.ReLU()
        #self.linear3 = nn.Linear(256, out_dim)
     
        self.linear1 = nn.Linear(in_dim, 128, bias=True)
        self.linear2 = nn.Linear(128, 32)
        self.relu = nn.ReLU()
        self.linear3 = nn.Linear(32, out_dim)

    def forward(self, x):
        
        # Define the forward pass of your networkloss.item())
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
                
        return x

# Instantiate the model, optimiser and hyperparameter(s)

## Set hyperparameters

In [7]:
#n_input_features = 2040000
n_input_features = 60000 # input dimension
n_output_classes = 4 # output dimension

# Create an instance of the neural network and move it to the device
net = BaseClassifier(n_input_features, n_output_classes).to(device)

#net.cuda()
#net.cpu()

# Define the loss function
# Cross-entropy
criterion = nn.CrossEntropyLoss()

# Mean Square Error
#criterion = nn.MSELoss()

# Define the number of epochs
n_epochs = 4

# Define the optimiser (with its corresponding learning rate)
learning_rate = 1e-3
# Use Adam optimiser
opt = optim.Adam(net.parameters(), lr=learning_rate)

# Stochastic Gradient Descent
#opt = optim.SGD(net.parameters(), lr=learning_rate)

# Summary of the model
for p in net.parameters():
    print(p.shape)

# Summary of the model
#summary(net, input_size = (batch_size, 2040000, 4))
summary(net, input_size = (batch_size, n_input_features))
#summary(net)

torch.Size([128, 60000])
torch.Size([128])
torch.Size([32, 128])
torch.Size([32])
torch.Size([4, 32])
torch.Size([4])


Layer (type:depth-idx)                   Output Shape              Param #
BaseClassifier                           [5, 4]                    --
├─Linear: 1-1                            [5, 128]                  7,680,128
├─ReLU: 1-2                              [5, 128]                  --
├─Linear: 1-3                            [5, 32]                   4,128
├─ReLU: 1-4                              [5, 32]                   --
├─Linear: 1-5                            [5, 4]                    132
Total params: 7,684,388
Trainable params: 7,684,388
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 38.42
Input size (MB): 1.20
Forward/backward pass size (MB): 0.01
Params size (MB): 30.74
Estimated Total Size (MB): 31.94

# Train the model

In [None]:
# Define the training method
def train(model=net,
          optimizer=opt,
          n_epochs=n_epochs,
          loss_fn=criterion,
          lr=learning_rate):
    
    # Indicate the Pytorch backend we are on training mode
    model.train()
    loss_lt = []
    
    # Training loop
    for epoch in range(n_epochs):
        running_loss = 0.0
        for batch_data, batch_labels in train_loader:
        
            # Prior any operation clear the gradient
            optimizer.zero_grad()
        
            # Move data and labels to the device
            batch_data = batch_data.to(device)
            batch_labels = batch_labels.to(device)
        
            # Forward pass
            outputs = model(batch_data)

            #print("Outputs\n")
            #print(outputs)
        
            #print("Batch labels\n")
            #print(batch_labels)
        
            # Apply the loss function
            loss = loss_fn(outputs, batch_labels)

            # Backpropagation and optimization
            loss.backward()
        
            # Perform an optimization step (this updates the weights and bias on the network)
            optimizer.step()
                    
            # Keep track of sum of loss of each batch
            running_loss+=loss.item()
                
            # Print the loss for monitoring
            print('Epoch [{}/{}], Batch Loss: {:.4f}'.format(epoch+1, n_epochs, loss.item()))
        
        # Add the cumulative loss to a list
        loss_lt.append(running_loss/len(train_loader))
        
        # Print the total loss of the epoch
        print('Epoch: {} training loss: {:.4f}'.format(epoch+1, running_loss/len(train_loader)))
        
    plt.plot([i for i in range(1, n_epochs+1)], loss_lt, label="Train")
    plt.xlabel("Epoch")
    plt.ylabel("Cross-entropy")
    plt.title("Training loss: optimiser {}, lr {:.6f}".format("Adam", lr))
    plt.legend()
    plt.show()

# Save the trained model
#torch.save(net.state_dict(), "./trained_model/model.pt')

# Call the training method
train(net, opt, n_epochs, criterion)

Epoch [1/4], Batch Loss: 114.2428
Epoch [1/4], Batch Loss: 118.9676
Epoch [1/4], Batch Loss: 115.5226
Epoch [1/4], Batch Loss: 122.5690
Epoch [1/4], Batch Loss: 49.0515
Epoch [1/4], Batch Loss: 121.3692
Epoch [1/4], Batch Loss: 157.7504
Epoch [1/4], Batch Loss: 377.1904
Epoch [1/4], Batch Loss: 194.2703
Epoch [1/4], Batch Loss: 543.5787
Epoch [1/4], Batch Loss: 116.6120
Epoch [1/4], Batch Loss: 454.6817
Epoch [1/4], Batch Loss: 556.0588
Epoch [1/4], Batch Loss: 23.7612
Epoch [1/4], Batch Loss: 903.6849
Epoch [1/4], Batch Loss: 236.4304
Epoch [1/4], Batch Loss: 257.3744
Epoch [1/4], Batch Loss: 74.5156
Epoch [1/4], Batch Loss: 658.7189
Epoch [1/4], Batch Loss: 149.1948
Epoch: 1 training loss: 267.2773
Epoch [2/4], Batch Loss: 303.6268
Epoch [2/4], Batch Loss: 476.2717
Epoch [2/4], Batch Loss: 166.1959
Epoch [2/4], Batch Loss: 94.5315
Epoch [2/4], Batch Loss: 162.5549
Epoch [2/4], Batch Loss: -0.0000
Epoch [2/4], Batch Loss: 192.0401
Epoch [2/4], Batch Loss: 1082.8665
Epoch [2/4], Batch 

# Test the model

In [46]:
# Define a helper function to generate a one-hot encoding at the position of the maximum value
def generate_max_indices_tensor(input_tensor):
    # Compute the maximum along each row
    max_values, max_indices = torch.max(input_tensor, dim=1)
    
    # Create a tensor of zeros with the same shape as the input tensor
    output_tensor = torch.zeros_like(input_tensor)
    
    # Set ones at the indices of the maximum values
    output_tensor.scatter_(1, max_indices.unsqueeze(1), 1)
    
    return output_tensor

# Define a helper function that returns a one tensor if the input tensors are equal
def compare_tensors(tensor1, tensor2):
    
    # Ensure both tensors have the same shape
    assert tensor1.shape == tensor2.shape, "Both tensors should have the same dimensions."

    # Calculate element-wise equality and count equal rows
    row_equality = torch.all(tensor1 == tensor2, dim=1)
    equal_rows = torch.sum(row_equality).item()

    # Count different rows
    different_rows = tensor1.shape[0] - equal_rows
    
    return torch.tensor([equal_rows, different_rows])
        
# Define the testing method
def test(model=net,
        loss_fn=criterion):
    
    # Indicate the Pytorch backend we are on testing mode
    model.eval()
    accuracy = 0.0
    total_loss = 0.0
    
    batch_loss = 0.0
    batch_accuracy = 0.0
    
    # Use no grad to reduce memory and computation cost
    with torch.no_grad():
                
        for batch_data, batch_labels in test_loader:
            
            # Move data and labels to the device
            batch_data = batch_data.to(device)
            batch_labels = batch_labels.to(device)
            
            # Forward pass
            outputs = model(batch_data)
            
            #print("These are the outputs")
            #print(outputs)
            
            #print("These are the batch labels")
            #print(batch_labels)
            
            # Compute the loss
            batch_loss = loss_fn(outputs, batch_labels)
            
            # Add up the loss
            total_loss+=batch_loss.item()
            
            # Compute the one-hot enconding version
            one_hot_output = generate_max_indices_tensor(outputs)
            
            #print("These are the one-hot outputs")
            #print(one_hot_output)
            
            # Compute accuracy
            batch_accuracy = compare_tensors(one_hot_output, batch_labels)[0]
            #batch_accuracy = torch.sum(compare_tensors(one_hot_output, batch_labels))
            #print(compare_tensors(one_hot_output, batch_labels))
            
            accuracy+=batch_accuracy
            
            print("Test batch loss: {:.4f}, test batch accuracy: {:.4f}".format(
            batch_loss/batch_size,
            batch_accuracy*100.0/batch_size))
            
        print("Test loss: {:.4f}, test accuracy: {:.4f}".format(
            total_loss/(len(test_loader)*batch_size),
            accuracy*100.0/(len(test_loader)*batch_size)))

# Call the test method
test(net, criterion)

Test batch loss: -0.0000, test batch accuracy: 100.0000
Test batch loss: -0.0000, test batch accuracy: 100.0000
Test batch loss: -0.0000, test batch accuracy: 100.0000
Test batch loss: 0.2313, test batch accuracy: 96.0000
Test loss: 0.0578, test accuracy: 99.0000


# Code testing

In [None]:
for batch_data, batch_labels in train_loader:
    print(batch_data.shape)
    print(batch_labels)
    print(batch_labels.shape)

In [None]:
signal_data = np.loadtxt("../02_python_signal_folder_sorting/sorted_signals_by_mhr/70_74/nifecg.0003.fs_1000_mhr_72_fhr_132.csv", dtype=np.float32, delimiter=",")
signal_data_torch = torch.from_numpy(signal_data)
signal_data_torch

In [None]:
## One-hot enconding
encoder = OneHotEncoder

In [None]:
x = torch.rand(3,4)
print(x)
idx, x_max = x.max(dim=1)
print(x_max)
x_arg_max = torch.argmax(x, 1)
print(x_arg_max)

In [None]:
# Determine the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device