In [None]:
import numpy as np 
import pandas as pd 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset

from torch.utils.data import DataLoader
from torch.utils.data import random_split
from sklearn.preprocessing import StandardScaler
import torch.nn.init as init

import os

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"


In [None]:
# Load Singlescale data

dataset = pd.read_csv("Dataset/Training_Set/data1.csv", header=None)

scaler = StandardScaler().fit(dataset)
dataset_scaled = scaler.transform(dataset)

labels = pd.read_csv("Dataset/Training_Set/label.csv", header=None)

labels = labels.astype(int).values.squeeze()-1
num_classes = labels.max() + 1

def one_hot_encode(labels, num_classes):
    return np.eye(num_classes)[labels]

labels_one_hot = one_hot_encode(labels, num_classes)

dataset_tensor = torch.tensor(dataset_scaled, dtype=torch.float32)
labels_tensor = torch.tensor(labels_one_hot, dtype=torch.float32)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

dataset_tensor = dataset_tensor.to(device)
labels_tensor = labels_tensor.to(device)

print(labels_tensor.shape)  

In [None]:
class Singlescale(nn.Module):
    def __init__(self):
        super(Singlescale, self).__init__()

        self.layer1 = nn.Linear(64, 64)
        self.layer2 = nn.Linear(64, 64)
        self.layer3 = nn.Linear(64, 64)
        self.layer4 = nn.Linear(64, 64)
        self.layer5 = nn.Linear(64, 64)
        self.layer6 = nn.Linear(64, 64)
        self.layer7 = nn.Linear(64, 10)

       
        self.dropout = nn.Dropout(p=0.85)
        
        init.kaiming_normal_(self.layer1.weight)
        init.kaiming_normal_(self.layer2.weight)
        init.kaiming_normal_(self.layer3.weight)
        init.kaiming_normal_(self.layer4.weight)
        init.kaiming_normal_(self.layer5.weight)
        init.kaiming_normal_(self.layer6.weight)
        init.kaiming_normal_(self.layer7.weight)

        
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))        
        x = F.relu(self.layer3(x))
        x = self.dropout(x)
        x = F.relu(self.layer4(x))        
        x = F.relu(self.layer5(x))        
        x = F.relu(self.layer6(x))
        x = self.dropout(x)
        x = self.layer7(x) 
        return F.log_softmax(x, dim=1) 

In [None]:
class Multiscale(nn.Module):
    def __init__(self):
        super(Multiscale, self).__init__()
        self.layer1 = nn.Linear(1472, 1472)
        self.layer2 = nn.Linear(1472, 1472)
        self.layer3 = nn.Linear(1472, 1472)
        self.layer4 = nn.Linear(1472, 1472)
        self.layer5 = nn.Linear(1472, 1472) 
        self.layer6 = nn.Linear(1472, 1472) 
        self.layer7 = nn.Linear(1472, 10)

       
        self.dropout = nn.Dropout(p=0.85)
        
        init.kaiming_normal_(self.layer1.weight)
        init.kaiming_normal_(self.layer2.weight)
        init.kaiming_normal_(self.layer3.weight)
        init.kaiming_normal_(self.layer4.weight)
        init.kaiming_normal_(self.layer5.weight)
        init.kaiming_normal_(self.layer6.weight)
        init.kaiming_normal_(self.layer7.weight)

        
    def forward(self, x):
        x = F.relu(self.layer1(x))       
        x = F.relu(self.layer2(x))        
        x = F.relu(self.layer3(x))
        x = self.dropout(x)
        x = F.relu(self.layer4(x))
        x = F.relu(self.layer5(x))
        x = F.relu(self.layer6(x))
        x = self.dropout(x)
        x = self.layer10(x) 
        return F.log_softmax(x, dim=1) 

In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output

def fit(num_epochs, model, loss_fn, optimizer, train_dataloader, val_loader):
    train_losses = []  # Stores training losses for each epoch
    val_losses = []  # Stores validation losses for each epoch

    for epoch in range(num_epochs):
        model.train()  # Ensure the model is in training mode
        epoch_training_loss = 0  # For accumulating the loss for this epoch

        for inputs, targets in train_dataloader:
            inputs, targets = inputs.float().to(device), targets.float().to(device)
            preds = model(inputs)
            loss = loss_fn(preds, targets)
            epoch_training_loss += loss.item()  # Add the loss for this batch

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
        # Record the average training loss for this epoch
        train_losses.append(epoch_training_loss / len(train_dataloader))

        model.eval()  # Switch the model to evaluation mode
        epoch_val_loss = 0  # For accumulating the loss for this epoch

        with torch.no_grad():
            for val_input, val_targets in val_loader:
                val_input, val_targets = val_input.float().to(device), val_targets.float().to(device)
                out = model(val_input)
                val_loss = loss_fn(out, val_targets)
                epoch_val_loss += val_loss.item()  # Add the loss for this batch

        # Record the average validation loss for this epoch
        val_losses.append(epoch_val_loss / len(val_loader))

        
        clear_output(wait=True)
        print("Epoch [{}/{}], Training loss: {:.4f}, Validation Loss: {:.4f}"
              .format(epoch + 1, num_epochs, train_losses[-1], val_losses[-1]))


        plt.figure(figsize=(10, 5))
        plt.plot(train_losses, label='Training Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.show()

        def evaluate_model(model, val_dataloader, loss_fn, device):
            model.eval()  # Set the model to evaluation mode
            val_loss = 0
            correct = 0
            with torch.no_grad():  # Do not calculate gradients since we are only evaluating
                for inputs, targets in val_dataloader:
                    inputs, targets = inputs.to(device), targets.to(device)
                    outputs = model(inputs)
                    val_loss += loss_fn(outputs, targets).item()

                    # Find the predicted labels by taking the argmax of the outputs
                    _, pred = torch.max(outputs, 1)
                    _, labels = torch.max(targets, 1)
                    correct += (pred == labels).sum().item()

            val_loss /= len(val_dataloader)
            accuracy = correct / len(val_dataloader.dataset)

            return val_loss, accuracy


        val_loss, val_accuracy = evaluate_model(model, val_dataloader, loss_fn, device)
        print(f"Validation Loss: {val_loss:.4f}")
        print(f"Validation Accuracy: {val_accuracy:.4f}")

    return train_losses, val_losses  # Return the losses for each epoch

In [None]:
#Input hyperparameter 
num_epochs = 800
batch_size = 128
learning_rate= 0.001

model = Singlescale()


model.to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adadelta(model.parameters(), lr =learning_rate)
print(dataset_tensor.shape)
print(labels_tensor.shape)


# # Making sure shapes are correct and defining dataloader.
# input_np_array = np.array(dataset[:,1:-1].tolist(), dtype='float32')
# inputs = torch.from_numpy(input_np_array)
# # inputs = torch.from_numpy(np.array(dataset.iloc[:,1:-1].values, dtype='float32'))
# print(inputs.shape)



train_tensor_dataset = TensorDataset(dataset_tensor, labels_tensor)
val_size = int(0.22 * len(train_tensor_dataset))  # 22% for validation
train_size = len(train_tensor_dataset) - val_size  # 78% for training
train_data, val_data = random_split(train_tensor_dataset, [train_size, val_size])
train_dataloader = DataLoader(train_data, batch_size, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size)

# # Train the model.
train_losses,val_losses = fit(num_epochs, model, loss_fn, optimizer, train_dataloader, val_dataloader)

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from sklearn.preprocessing import StandardScaler
import numpy as np

device = torch.device("cuda")

test_data = pd.read_csv('Dataset/Test_set/test1.csv', header=None)
scaler = StandardScaler().fit(test_data)
test_scaled = scaler.transform(test_data)

test_labels = pd.read_csv('Dataset/Test_Set/label.csv', header=None)

test_labels = test_labels.astype(int).values.squeeze() - 1
num_classes = test_labels.max() + 1

def one_hot_encode(labels, num_classes):
    return np.eye(num_classes)[labels]

test_labels_one_hot = one_hot_encode(test_labels, num_classes)

test_data = torch.tensor(test_scaled, dtype=torch.float32).to(device)
test_labels = torch.tensor(test_labels_one_hot, dtype=torch.float32).to(device)

class TestDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

test_dataset = TestDataset(test_data, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=16)

def plcc(x, y):
    if isinstance(x, list):
        x = np.array(x)
    if isinstance(y, list):
        y = np.array(y)

    if len(x.shape) != 1 or len(y.shape) != 1:
        raise Exception("Please input N (* 1) vector.")
    if x.shape[0] != y.shape[0]:
        raise Exception("The lengths of 2 input vectors are not equal.")

    x = x - np.average(x)
    y = y - np.average(y)
    numerator = np.dot(x, y)
    denominator = np.sqrt(np.sum(x ** 2)) * np.sqrt(np.sum(y ** 2))
    return numerator / denominator

def srocc(x, y):
    if isinstance(x, list):
        x = np.array(x)
    if isinstance(y, list):
        y = np.array(y)

    if len(x.shape) != 1 or len(y.shape) != 1:
        raise Exception("Please input N (* 1) vector.")
    if x.shape[0] != y.shape[0]:
        raise Exception("The lengths of 2 input vectors are not equal.")

    rank_x = x.argsort().argsort()
    rank_y = y.argsort().argsort()
    return plcc(rank_x, rank_y)

model = Singlescale().to(device)


model.eval()
test_loss = 0
pred_scores = []
true_scores = []

with torch.no_grad():
    for inputs, targets in test_dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)

        test_loss += loss_fn(outputs, targets).item()

        _, pred = torch.max(outputs, 1)
        _, labels = torch.max(targets, 1)

        pred_scores.extend(pred.cpu().numpy())
        true_scores.extend(labels.cpu().numpy())

test_loss /= len(test_dataloader)
plcc_score = plcc(true_scores, pred_scores)
srocc_score = srocc(true_scores, pred_scores)

print(f"Test Loss: {test_loss:.4f}")
print(f"PLCC: {plcc_score:.4f}")
print(f"SROCC: {srocc_score:.4f}")

In [None]:
torch.save(model.state_dict(), f'model_single_{learning_rate}_{batch_size}_{num_epochs}.pt')

In [None]:
#Input hyperparameter 
num_epochs = 800
batch_size = 128
learning_rate= 0.001

model = Multiscale()


model.to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adadelta(model.parameters(), lr =learning_rate)
print(dataset_tensor.shape)
print(labels_tensor.shape)


train_tensor_dataset = TensorDataset(dataset_tensor, labels_tensor)
val_size = int(0.22 * len(train_tensor_dataset))  # 22% for validation
train_size = len(train_tensor_dataset) - val_size  # 78% for training
train_data, val_data = random_split(train_tensor_dataset, [train_size, val_size])
train_dataloader = DataLoader(train_data, batch_size, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size)

# # Train the model.
train_losses,val_losses = fit(num_epochs, model, loss_fn, optimizer, train_dataloader, val_dataloader)

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from sklearn.preprocessing import StandardScaler
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

test_data = pd.read_csv('Dataset/Test_set/test.csv', header=None)
scaler = StandardScaler().fit(test_data)
test_scaled = scaler.transform(test_data)

test_labels = pd.read_csv('Dataset/Test_Set/label.csv', header=None)

test_labels = test_labels.astype(int).values.squeeze() - 1
num_classes = test_labels.max() + 1

def one_hot_encode(labels, num_classes):
    return np.eye(num_classes)[labels]

test_labels_one_hot = one_hot_encode(test_labels, num_classes)

test_data = torch.tensor(test_scaled, dtype=torch.float32).to(device)
test_labels = torch.tensor(test_labels_one_hot, dtype=torch.float32).to(device)

class TestDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

test_dataset = TestDataset(test_data, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=16)

def plcc(x, y):
    if isinstance(x, list):
        x = np.array(x)
    if isinstance(y, list):
        y = np.array(y)

    if len(x.shape) != 1 or len(y.shape) != 1:
        raise Exception("Please input N (* 1) vector.")
    if x.shape[0] != y.shape[0]:
        raise Exception("The lengths of 2 input vectors are not equal.")

    x = x - np.average(x)
    y = y - np.average(y)
    numerator = np.dot(x, y)
    denominator = np.sqrt(np.sum(x ** 2)) * np.sqrt(np.sum(y ** 2))
    return numerator / denominator

def srocc(x, y):
    if isinstance(x, list):
        x = np.array(x)
    if isinstance(y, list):
        y = np.array(y)

    if len(x.shape) != 1 or len(y.shape) != 1:
        raise Exception("Please input N (* 1) vector.")
    if x.shape[0] != y.shape[0]:
        raise Exception("The lengths of 2 input vectors are not equal.")

    rank_x = x.argsort().argsort()
    rank_y = y.argsort().argsort()
    return plcc(rank_x, rank_y)

model.eval()
test_loss = 0
pred_scores = []
true_scores = []

with torch.no_grad():
    for inputs, targets in test_dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)

        test_loss += loss_fn(outputs, targets).item()

        _, pred = torch.max(outputs, 1)
        _, labels = torch.max(targets, 1)

        pred_scores.extend(pred.cpu().numpy())
        true_scores.extend(labels.cpu().numpy())

test_loss /= len(test_dataloader)
plcc_score = plcc(true_scores, pred_scores)
srocc_score = srocc(true_scores, pred_scores)

print(f"Test Loss: {test_loss:.4f}")
print(f"PLCC: {plcc_score:.4f}")
print(f"SROCC: {srocc_score:.4f}")

In [None]:
torch.save(model.state_dict(), f'model_multi_{learning_rate}_{batch_size}_{num_epochs}.pt')