In [1]:
import csv
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

In [3]:
data = []
min_len = float('inf')

In [4]:
with open('../data/2023-09-04-full-spikes.csv', 'r',newline='', encoding='utf-8') as csvfile:
    spikereader = csv.reader(csvfile, delimiter=';')
    for row in spikereader:
        data.append(row)
        if len(row) < min_len:
            min_len = len(row)

In [5]:
data = [[float(item) for item in sublist[:min_len]] for sublist in data]

In [6]:
data_tensor = torch.tensor(data).to(device)

In [7]:
print(data_tensor)
print(data_tensor.shape)

tensor([[   0., 2138., 2120.,  ..., 2078., 2089., 2067.],
        [   0., 1918., 1908.,  ..., 2059., 2089., 2059.],
        [   0., 1900., 1907.,  ..., 2058., 2067., 2078.],
        ...,
        [   9., 1882., 1889.,  ..., 1960., 1960., 1951.],
        [   9.,    3., 1879.,  ..., 1993., 1979., 1970.],
        [   9., 1890.,    7.,  ..., 1951., 1978., 1979.]], device='cuda:0')
torch.Size([500, 5001])


In [8]:
class CustomTensorDataset(Dataset):
    def __init__(self, data_tensor):
        self.data_tensor = data_tensor

    def __len__(self):
        return len(self.data_tensor)

    def __getitem__(self, idx):
        sample = self.data_tensor[idx]
        label = torch.tensor(int(sample[0])).to(device)
        item = sample[1:]
        return item, label

In [9]:
custom_dataset = CustomTensorDataset(data_tensor)
print(custom_dataset[0])

(tensor([2138., 2120., 2119.,  ..., 2078., 2089., 2067.], device='cuda:0'), tensor(0, device='cuda:0'))


In [10]:
train_size = 0.8  # Proportion of the dataset for training
train_dataset, test_dataset = train_test_split(custom_dataset, train_size=train_size, shuffle=True)

In [11]:
number_of_values = [0] * 10

In [12]:
batch_size = 1
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [13]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Expand dimensions to match (num_layers, batch_size, hidden_size)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Reshape input to match (batch_size, sequence_length, input_size)
        x = x.unsqueeze(2)
        
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])  # Use the last time step's output for classification
        return out

In [14]:
class GRUClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(GRUClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x.unsqueeze(2), h0)  # Reshape x to (batch_size, sequence_length, input_size)
        out = self.fc(out[:, -1, :])  # Use the last time step's output for classification
        return out

In [33]:
class CNNLSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(CNNLSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # CNN layers
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=input_size, out_channels=32, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2)
        )

        # LSTM layer
        self.lstm = nn.LSTM(input_size=64, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # Fully connected layer
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Apply CNN layers
        x = x.permute(0, 2, 1)  # Reshape for CNN (batch_size, input_size, sequence_length)
        x = self.cnn(x)

        # Reshape for LSTM (batch_size, sequence_length, input_size)
        x = x.permute(0, 2, 1)

        # LSTM layer
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))

        # Fully connected layer and take output from the last time step
        out = self.fc(out[:, -1, :])
        return out

In [15]:
import numpy as np
from sklearn import metrics

In [36]:
input_size = 1 # Size of each input sequence
hidden_size = 1024  # Number of LSTM units
num_layers = 2    # Number of LSTM layers
num_classes = 10 # Number of classes for classification

model = LSTMClassifier(input_size, hidden_size, num_layers, num_classes)
# model = GRUClassifier(input_size, hidden_size, num_layers, num_classes)
# model = CNNLSTMClassifier(5000, 32,2,num_classes)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = StepLR(optimizer, step_size=15, gamma=1e-1)

In [37]:
num_epochs = 40
best_acu = 0
best_auc = 0
best_f1 = 0
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0
    
    for batch_sequences, batch_labels in train_dataloader:
        optimizer.zero_grad()
        outputs = model(batch_sequences)
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()
        
        total_train_loss += loss.item()
    # scheduler.step()
    average_train_loss = total_train_loss / len(train_dataloader)
    
    # Validation
    model.eval()
    total_val_loss = 0
    correct_predictions = 0
    total_samples = 0
    labels = 0
    preds = 0
    probs = 0
    with torch.no_grad():
        for batch_sequences, batch_labels in val_dataloader:
            val_outputs = model(batch_sequences)
            val_loss = criterion(val_outputs, batch_labels)
            total_val_loss += val_loss.item()
            
            _, predicted = torch.max(val_outputs, 1)
            np_labels = batch_labels.detach().cpu().numpy()
            np_predictions = predicted.detach().cpu().numpy()
            if total_samples > 0:
                labels = np.concatenate((labels,np_labels),axis=0)
                preds = np.concatenate((preds,np_predictions),axis=0)
                probs = np.concatenate((probs, torch.softmax(outputs, dim=1).detach().cpu().numpy()),axis=0)
            else:
                labels = np_labels
                preds = np_predictions
                probs = torch.softmax(outputs, dim=1).detach().cpu().numpy()
            correct_predictions += (predicted == batch_labels).sum().item()
            total_samples += batch_labels.size(0)
    
    average_val_loss = total_val_loss / len(val_dataloader)
    val_accuracy = correct_predictions / total_samples
    acu = metrics.balanced_accuracy_score(labels, preds) * 100
    f1 = metrics.f1_score(labels, preds, average='weighted')
    auc = metrics.roc_auc_score(labels, probs, multi_class='ovo')
    if acu > best_acu:
        best_acu = acu
    if auc > best_auc:
        best_auc = auc
    if f1 > best_f1:
        best_f1 = f1

    print(f"Epoch [{epoch+1}/{num_epochs}] - Validation Loss: {average_val_loss:.4f} - Validation Accuracy: {100*val_accuracy:.2f}% - sklearn Accuracy: {acu:.2f}%")
    # print(f"Epoch [{epoch+1}/{epochs}] - Balanced Accuracy: {acu:.2f}% - AUC: {auc:.4f} - F1: {f1:.4f}")
    
    # print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {average_train_loss:.4f}, Val Loss: {average_val_loss:.4f}, Val Accuracy: {val_accuracy:.2%}')

KeyboardInterrupt: 