In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchmetrics


class LinearWeightBlock(nn.Module):
    def __init__(self, lenght_sequence, in_channels, n_head = 2) -> None:
        super().__init__()

        ##################################################
        # 1. Convolutional Block
        ##################################################

        self.in_channels = in_channels

        self.conv_11 = nn.Conv1d(
                                 in_channels=in_channels, 
                                 out_channels=in_channels//4, 
                                 kernel_size=3, 
                                 stride=1, 
                                 padding=1, 
                                 dilation=1, 
                                 groups=1, 
                                 bias=True
                            )
        self.conv_12 = nn.Conv1d(
                                 in_channels=in_channels, 
                                 out_channels=in_channels//4,
                                 kernel_size=3, 
                                 stride=1, 
                                 padding=1, 
                                 dilation=1, 
                                 groups=1, 
                                 bias=True
                            )
        self.conv_11_12 = nn.Conv1d(
                                    in_channels=in_channels//4, 
                                    out_channels=in_channels//2, 
                                    kernel_size=3, 
                                    stride=2, 
                                    padding=1, 
                                    dilation=1, 
                                    groups=1, 
                                    bias=True
                            )

        self.conv_21 = nn.Conv1d(
                                in_channels=in_channels//2, 
                                out_channels=in_channels//2, 
                                kernel_size=3, 
                                stride=1, 
                                padding=1, 
                                dilation=1, 
                                groups=1, 
                                bias=True
                            )
        self.conv_22 = nn.Conv1d(
                                in_channels=in_channels//2, 
                                out_channels=in_channels//2, 
                                kernel_size=3, 
                                stride=1, 
                                padding=1, 
                                dilation=1, 
                                groups=1, 
                                bias=True
                            )
        self.conv_21_22 = nn.Conv1d(
                                in_channels=in_channels//2, 
                                out_channels=in_channels//2, 
                                kernel_size=3, 
                                stride=2, 
                                padding=1, 
                                dilation=1, 
                                groups=1, 
                                bias=True
                            )

        # layer norm
        self.lenght_sequence = lenght_sequence

        self.layer_norm_1 = nn.LayerNorm(self.lenght_sequence)
        self.layer_norm_11_12 = nn.LayerNorm(self.lenght_sequence//2)

        self.layer_norm_2 = nn.LayerNorm(self.lenght_sequence//2)
        self.layer_norm_21_22 = nn.LayerNorm(self.lenght_sequence//4)

        ##################################################
        # 2. Linear Weighting Block
        ##################################################
        
        self.dim_embedding = 300 
        c = 1
        
        self.linear = nn.Linear(self.dim_embedding, self.dim_embedding//8 + c)
        
        ##################################################
        # 3. Transformer Encoder Block
        ##################################################

        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=self.dim_embedding//8+ c, nhead=n_head), num_layers=2
        )
        
        ##################################################
        # 4. Residual Bilinear Block
        ##################################################

        self.bilinear = nn.Bilinear(self.dim_embedding//8 + c, self.dim_embedding//8 + c, self.dim_embedding//8 + c)
        
        self.dropout = nn.Dropout(0.1)


        ##################################################
        # - Layer Norm
        # - GELU
        ##################################################
        
        self.gelu = nn.GELU()
        
        self.layer_norm = nn.LayerNorm(self.dim_embedding//8 + c)
        
    def forward(self, x):

        # down sampling 1
        x_1 = self.layer_norm_1(self.gelu(self.conv_11(x)))
        x_2 = self.layer_norm_1(self.gelu(self.conv_12(x)))
        x = self.layer_norm_11_12(self.conv_11_12(x_1 + x_2))

        # down sampling 2
        x_1 = self.layer_norm_2(self.gelu(self.conv_21(x)))
        x_2 = self.layer_norm_2(self.gelu(self.conv_22(x)))
        x = self.layer_norm_21_22(self.conv_21_22(x_1 + x_2))

        # linear weighting block
        x = self.linear(x)
        x = self.layer_norm(x)

        # transformer encoder block
        x_0 = self.transformer_encoder(x)

        # residual bilinear block
        x = self.bilinear(x_0 + self.dropout(x_0), x)
        x = self.gelu(x)

        return x
    
    
class Classifier(nn.Module):
    def __init__(self, lenght_sequence = 1200, in_channels = 235, n_head = 2, n_class = 15) -> None:
        super().__init__()
        
        self.in_channels = in_channels
        self.n_class = n_class
        
        self.linear_weight_block = LinearWeightBlock(lenght_sequence, in_channels, n_head)

        self.flatten = nn.Flatten(start_dim=1, end_dim=- 1)

        self.linear_transformation = nn.Linear(((lenght_sequence//4)//8 + 1) * (in_channels//2), lenght_sequence//32)
        # self.linear_transformation = nn.Linear((lenght_sequence//8 + 1) * (in_channels//2), lenght_sequence//32)

        self.layer_norm = nn.LayerNorm(lenght_sequence//32)

        self.mlp_subject_info = nn.Sequential(
            nn.Linear(4, lenght_sequence//32),
            nn.GELU(),
            nn.LayerNorm(lenght_sequence//32),
            nn.Linear(lenght_sequence//32, lenght_sequence//32)
        )
        
        self.mlp = nn.Sequential(
            nn.Linear(lenght_sequence//32, lenght_sequence//16),
            nn.GELU(),
            nn.LayerNorm(lenght_sequence//16),
            nn.Linear(lenght_sequence//16, n_class)
        )
        
    def forward(self, x, subject_info):
        x = self.linear_weight_block(x)
        x = self.flatten(x)
        x = self.linear_transformation(x)
        x = self.layer_norm(x)
        # adding subject info as a shift
        x = x + self.mlp_subject_info(subject_info)
        x = self.mlp(x)
        return x
    
    
class LitClassifier(pl.LightningModule):
    def __init__(self, lenght_sequence = 1200, in_channels = 235, n_head = 2, n_class = 15) -> None:
        super().__init__()
        
        self.model = Classifier(lenght_sequence, in_channels, n_head, n_class)
        
        self.loss = nn.CrossEntropyLoss()
        self.accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=n_class)
        self.f1 = torchmetrics.F1Score(task="multiclass",num_classes=n_class)
        self.precision = torchmetrics.Precision(task="multiclass",num_classes=n_class)
        self.recall = torchmetrics.Recall(task="multiclass",num_classes=n_class)

        
    def forward(self, x, subject_info):
        return self.model(x, subject_info)
    
    def training_step(self, batch, batch_idx):
        x, subject_info, y = batch
        
        # y = y.long()

        y_hat = self(x, subject_info)
        loss = self.loss(y_hat, y)

        y = torch.argmax(y, dim=-1)

        self.log("train_loss", loss)
        self.log("train_acc", self.accuracy(y_hat, y))
        self.log("train_f1", self.f1(y_hat, y))
        self.log("train_precision", self.precision(y_hat, y))
        self.log("train_recall", self.recall(y_hat, y))
        
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, subject_info, y = batch

        # y = y.long()
        
        y_hat = self(x, subject_info)
        loss = self.loss(y_hat, y)

        y = torch.argmax(y, dim=-1)

        self.log("val_loss", loss)
        self.log("val_acc", self.accuracy(y_hat, y))
        self.log("val_f1", self.f1(y_hat, y))
        self.log("val_precision", self.precision(y_hat, y))
        self.log("val_recall", self.recall(y_hat, y))
        
        return loss
    
    # def test_step(self, batch, batch_idx):
    #     x, subject_info, y = batch

    #     # y = y.long()
        
    #     y_hat = self(x, subject_info)
    #     loss = self.loss(y_hat, y)

    #     y = torch.argmax(y, dim=-1)

    #     self.log("test_loss", loss)
    #     self.log("test_acc", self.accuracy(y_hat, y))
    #     self.log("test_f1", self.f1(y_hat, y))
    #     self.log("test_precision", self.precision(y_hat, y))
    #     self.log("test_recall", self.recall(y_hat, y))
        
    #     return loss
    
    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-3)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
        return [optimizer], [scheduler]

In [17]:
import torch

data = torch.load('./data/emotion_recognition_preprocessed_data/preprocessed/data.pt')
info = torch.load('./data/emotion_recognition_preprocessed_data/preprocessed/info.pt').type(torch.float32)
labels = torch.load('./data/emotion_recognition_preprocessed_data/preprocessed/labels.pt').type(torch.long)


labels = torch.nn.functional.one_hot(labels, num_classes=15).type(torch.float32)


from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.dataset import random_split

BACH_SIZE = 256

dataset = TensorDataset(data, info, labels)

# train_size = int(0.8 * len(dataset))
# val_size = int(0.1 * len(dataset))
# test_size = len(dataset) - train_size - val_size

# train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size], generator=torch.Generator().manual_seed(42))

# train_loader = DataLoader(train_dataset, batch_size=BACH_SIZE, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=BACH_SIZE, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=BACH_SIZE, shuffle=True)


train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(train_dataset, batch_size=BACH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BACH_SIZE, shuffle=True)



In [18]:


net = LitClassifier().cuda()

from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint

early_stopping = EarlyStopping('val_loss', patience=150, verbose=True, mode='min')
modelCheckPoint = ModelCheckpoint(monitor='val_loss', save_top_k=1, mode='min', dirpath='./models', filename='model-{epoch:02d}-{val_loss:.2f}')

trainer = pl.Trainer(max_epochs=10, accelerator='auto', callbacks=[early_stopping])

trainer.fit(net, train_loader, val_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type                | Params
--------------------------------------------------
0 | model     | Classifier          | 794 K 
1 | loss      | CrossEntropyLoss    | 0     
2 | accuracy  | MulticlassAccuracy  | 0     
3 | f1        | MulticlassF1Score   | 0     
4 | precision | MulticlassPrecision | 0     
5 | recall    | MulticlassRecall    | 0     
--------------------------------------------------
794 K     Trainable params
0         Non-trainable params
794 K     Total params
3.177     Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn(
  rank_zero_warn(
  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Metric val_loss improved. New best score: 2.713


Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Metric val_loss improved by 0.002 >= min_delta = 0.0. New best score: 2.711


Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Metric val_loss improved by 0.001 >= min_delta = 0.0. New best score: 2.709


Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.


In [18]:
1200//16

75

In [19]:
# !kill 600
# %load_ext tensorboard
%reload_ext tensorboard
%tensorboard --logdir lightning_logs/

Reusing TensorBoard on port 6006 (pid 312), started 0:03:03 ago. (Use '!kill 312' to kill it.)