## Installo librerie ausiliarie

In [None]:
!pip install torchaudio pytorch_lightning

## Caricamento Librerie

In [None]:
import torch, torchaudio
from torch import nn
from torch.nn import functional as F

import pytorch_lightning as pl
from pytorch_lightning.metrics import functional

import pickle
from pathlib import Path
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

from google.colab import drive

drive.flush_and_unmount()
drive.mount('/content/drive', force_remount=True)
torch.cuda.is_available()

Drive not mounted, so nothing to flush and unmount.
Mounted at /content/drive


True

## Caricamento Dati


I dati sono salvati in file pickle per comodità sotto forma di HASH table

In [None]:
feat = pickle.load(open("drive/My Drive/EEGNET/features.pkl", "rb"))
eeg = pickle.load(open("drive/My Drive/EEGNET/eeg.pkl", "rb"))

X = torch.from_numpy(feat['X'])
y = torch.from_numpy(feat['y'])
eeg = torch.from_numpy(eeg['EEG'])

## Classe per la gestione dei dataset in PyTorch.

Nell'inizializzazione vengono caricati i dati grezzi e prima di fornirli in input alla rete neurale, vengono applicate delle trasformazioni standard (Resamplin, Costruzione spettrogramma, Amplificazione)

In [None]:
class EEGDataset(torch.utils.data.Dataset):
    # Simple class to load the desired folders inside ESC-50
    
    def __init__(self, path: Path = Path("drive/My Drive/EEGNET"), 
                 sample_rate: int = 8000):
        # Load CSV & initialize all torchaudio.transforms:
        # Resample --> MelSpectrogram --> AmplitudeToDB

        feat = pickle.load(open("drive/My Drive/EEGNET/features.pkl", "rb"))
        eeg = pickle.load(open("drive/My Drive/EEGNET/eeg.pkl", "rb"))

        self.X = torch.from_numpy(feat['X']).float()
        self.y = torch.from_numpy(feat['y']).float()
        self.eeg = torch.from_numpy(eeg['EEG']).float()

        self.resample = torchaudio.transforms.Resample(
            orig_freq=250, new_freq=sample_rate
        ) #useful?
        self.melspec = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate)
        self.db = torchaudio.transforms.AmplitudeToDB(top_db=80)
        
        
    def __getitem__(self, index):
        # Returns (xb, yb) pair, after applying all transformations on the audio file.
        
        wav = self.eeg[index]
        label = self.y[index]
        
        tmp = []
        for w in wav:
          tmp.append(self.db(
            self.melspec(
                self.resample(w.reshape(1, -1))
            )
        ))

        xb = torch.vstack(tmp)
        
        return xb, label
        
    def __len__(self):
        # Returns length
        return len(self.eeg)

Verifico la dimensione dei tensori di input

In [None]:
train_data = EEGDataset()
for xb, yb in train_data:
    break

In [None]:
xb.shape

torch.Size([32, 128, 1199])

In [None]:
yb.shape

## Streaming dei dati

Per usufruire degli algoritmi stocastici, i dati vengono caricati a gruppi tramite uno strumento fornito da PyTorch

In [None]:
# We use folds 1,2,3 for training, 4 for validation, 5 for testing.
train_data = EEGDataset()
val_data = EEGDataset()
test_data = EEGDataset()

train_loader = \
    torch.utils.data.DataLoader(train_data, batch_size=2, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_data, batch_size=1)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=1)

## Definizione del Modello

Avendo calcolato lo spettrogramma del segnale EEG, possiamo utilizzare tutte le operazioni di filtraggio e sub-sampling riservati a dati definiti su un dominio bidimensionale. 

Di fatto, trattiamo il segnale audio come se fosse un'immagine.

In [None]:
class EEGNet(pl.LightningModule):
    
    def __init__(self, n_classes = 1, base_filters = 32):
        super().__init__()
        self.conv1 = nn.Conv2d(32, base_filters, 11, padding=5)
        self.bn1 = nn.BatchNorm2d(base_filters)
        self.conv2 = nn.Conv2d(base_filters, base_filters, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(base_filters)
        self.pool1 = nn.MaxPool2d(2)
        self.conv3 = nn.Conv2d(base_filters, base_filters * 2, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(base_filters * 2)
        self.conv4 = nn.Conv2d(base_filters * 2, base_filters * 4, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(base_filters * 4)
        self.pool2 = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(base_filters * 4, n_classes)
        
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool1(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool2(x)
        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = self.fc1(x[:, :, 0, 0])
        return torch.squeeze(x)
    
    def training_step(self, batch, batch_idx):
        # Very simple training loop
        x, y = batch
        y_hat = self(x)
        lam = 1e-3
        bet = 1e-3
        range_loss = bet * (y_hat - 100) -  lam * y_hat # Lagrangian per avere y in [0, 100]
        loss = F.l1_loss(y_hat, y) + range_loss.sum()
        self.log('train_loss', loss, on_step=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.l1_loss(y_hat, y) 
        self.log('val_loss', loss, on_epoch=True, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
      return self.validation_step(batch, batch_idx)
        
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=2e-3)
        return optimizer

In [None]:
pl.seed_everything(0)
# Test that the network works on a single mini-batch
eegnet = EEGNet()
#xb, yb = next(iter(train_loader))
#eegnet(xb).shape

Global seed set to 0


In [None]:
yb.shape

torch.Size([10])

In [None]:
eegnet(xb)

tensor([-0.4977, -0.4721, -0.5407, -0.4707, -0.5965, -0.3359, -0.9113, -0.3669,
        -0.5221, -0.6379], grad_fn=<SqueezeBackward0>)

Addestro la rete neurale per 35 epoche (test di funzionamento)

In [None]:
trainer = pl.Trainer(gpus=1, max_epochs=35)
trainer.fit(eegnet, train_loader, val_loader)


In [None]:
xb, yb = next(iter(train_loader))
eegnet(xb)

tensor([55.0547, 51.6868], grad_fn=<SqueezeBackward0>)

In [None]:
yb

tensor([63., 61.])

In [None]:
# TODO: implement the test loop.
trainer.test(audionet, test_loader)