In [1]:
import lightning as L


In [2]:
import torch
import torch.nn as nn

class sLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout=0.0):
        super(sLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = dropout

        self.lstms = nn.ModuleList([nn.LSTMCell(input_size if i == 0 else hidden_size, hidden_size) for i in range(num_layers)])
        self.dropout_layer = nn.Dropout(dropout)

        self.exp_forget_gates = nn.ModuleList([nn.Linear(hidden_size, hidden_size) for _ in range(num_layers)])
        self.exp_input_gates = nn.ModuleList([nn.Linear(hidden_size, hidden_size) for _ in range(num_layers)])

        self.reset_parameters()

    def reset_parameters(self):
        for lstm in self.lstms:
            nn.init.xavier_uniform_(lstm.weight_ih)
            nn.init.xavier_uniform_(lstm.weight_hh)
            nn.init.zeros_(lstm.bias_ih)
            nn.init.zeros_(lstm.bias_hh)

        for gate in self.exp_forget_gates + self.exp_input_gates:
            nn.init.xavier_uniform_(gate.weight)
            nn.init.zeros_(gate.bias)

    def forward(self, input_seq, hidden_state=None):
        print(f"input_seq: {type(input_seq)}")
        batch_size = input_seq.size(0)
        seq_length = input_seq.size(1)

        if hidden_state is None:
            hidden_state = self.init_hidden(batch_size)

        output_seq = []
        for t in range(seq_length):
            x = input_seq[:, t, :]
            new_hidden_state = []
            for i, (lstm, f_gate, i_gate) in enumerate(zip(self.lstms, self.exp_forget_gates, self.exp_input_gates)):
                if hidden_state[i][0] is None:
                    h, c = lstm(x)
                else:
                    h, c = lstm(x, (hidden_state[i][0], hidden_state[i][1]))

                f = torch.exp(f_gate(h))
                i_g = torch.exp(i_gate(h))
                c = f * c + i_g * lstm.weight_hh.new_zeros(batch_size, self.hidden_size)
                new_hidden_state.append((h, c))

                if i < self.num_layers - 1:
                    x = self.dropout_layer(h)
                else:
                    x = h
            hidden_state = new_hidden_state
            output_seq.append(x)

        output_seq = torch.stack(output_seq, dim=1)
        return output_seq, hidden_state

    def init_hidden(self, batch_size):
        hidden_state = []
        for lstm in self.lstms:
            h = torch.zeros(batch_size, self.hidden_size, device=lstm.weight_ih.device)
            c = torch.zeros(batch_size, self.hidden_size, device=lstm.weight_ih.device)
            hidden_state.append((h, c))
        return hidden_state

In [3]:
import torch
import torch.nn as nn

from src.models.xlstm.m_lstm import mLSTM
import torch.nn.functional as F
from torchmetrics import Accuracy, F1Score


class xLSTMBlock(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout=0.0, bidirectional=False, lstm_type="slstm"):
        super(xLSTMBlock, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = dropout
        self.bidirectional = bidirectional
        self.lstm_type = lstm_type

        if lstm_type == "slstm":
            self.lstm = sLSTM(input_size, hidden_size, num_layers, dropout)
        elif lstm_type == "mlstm":
            print("Warning: mLSTM is not working yet.")
            self.lstm = mLSTM(input_size, hidden_size, num_layers, dropout)
        else:
            raise ValueError(f"Invalid LSTM type: {lstm_type}")

        self.norm = nn.LayerNorm(input_size)
        self.activation = nn.GELU()
        self.dropout_layer = nn.Dropout(dropout)

        if bidirectional:
            self.proj = nn.Linear(2 * hidden_size, input_size)
        else:
            self.proj = nn.Linear(hidden_size, input_size)

        # print shapes
        print(f"input_size: {input_size}")
        print(f"hidden_size: {hidden_size}")
        print(f"num_layers: {num_layers}")
        print(f"dropout: {dropout}")
        print(f"proj: {self.proj}")

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.xavier_uniform_(self.proj.weight)
        nn.init.zeros_(self.proj.bias)

    def forward(self, input_seq, hidden_state=None):
        lstm_output, hidden_state = self.lstm(input_seq, hidden_state)
        if self.lstm_type == "slstm":
            hidden_state = [[hidden_state[i][0].detach(), hidden_state[i][1].detach()] for i in range(len(hidden_state))]

        if self.bidirectional:
            lstm_output = torch.cat((lstm_output[:, :, :self.hidden_size], lstm_output[:, :, self.hidden_size:]), dim=-1)

        output = self.activation(self.proj(lstm_output))
        output = self.norm(output + input_seq)
        output = self.dropout_layer(output)

        return output, hidden_state

class xLSTM(L.LightningModule):
    def __init__(self, optimizer, scheduler, input_size, hidden_size, output_size, num_layers, num_blocks,
                 dropout=0.0, bidirectional=False, lstm_type="slstm"):
        super().__init__()
        self.save_hyperparameters()

        self.accuracy = Accuracy(task='multiclass', num_classes=output_size)
        self.f1_score = F1Score(num_classes=output_size, average='weighted', task='multiclass')
        self.num_blocks = num_blocks
        self.lstm_type = lstm_type

        self.blocks = nn.ModuleList([
            xLSTMBlock(input_size, hidden_size, num_layers,
                       dropout, bidirectional, lstm_type)
            for i in range(num_blocks)
        ])

        self.output_layer = nn.Linear(input_size, output_size)

    def forward(self, input_seq, hidden_states=None):
        if hidden_states is None:
            hidden_states = [None] * self.num_blocks

        output_seq = input_seq
        for i, block in enumerate(self.blocks):
            output_seq, hidden_state = block(output_seq, hidden_states[i])
            if self.lstm_type == "slstm":
                hidden_states[i] = [[hidden_state[j][0].detach(), hidden_state[j][1].detach()] for j in range(len(hidden_state))]
            else:
                hidden_states[i] = hidden_state

        output_seq = output_seq[:, -1, :]
        output_seq = self.output_layer(output_seq)
        print(f"output_seq: {output_seq.shape}")
        return output_seq

    def _shared_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        preds = torch.argmax(logits, dim=1)
        y = torch.argmax(y, dim=1)
        loss = F.cross_entropy(logits, y)
        acc = self.accuracy(preds, y)
        f1 = self.f1_score(preds, y)
        return loss, acc, f1

    def training_step(self, batch, batch_idx):
        loss, acc, f1 = self._shared_step(batch, batch_idx)
        self.log_dict({"train_loss": loss, "train_acc": acc, "train_f1": f1}, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        loss, acc, f1 = self._shared_step(batch, batch_idx)
        self.log_dict({"val_loss": loss, "val_acc": acc, "val_f1": f1}, prog_bar=True)

    def configure_optimizers(self):
        optimizer = self.hparams.optimizer(params=self.trainer.model.parameters())
        scheduler = self.hparams.scheduler(optimizer, T_max=10)

        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "interval": "epoch",
                "frequency": 1,
            }
        }

In [4]:
from src.data.dataset import SensorDataModule

dataset = SensorDataModule(32, "../data/partitions", k_folds=0)
dataset.setup()

train_dataloader, val_dataloader = dataset.train_dataloader(), dataset.val_dataloader()
trainer = L.Trainer(max_epochs=5,
                     accelerator='cpu',
                     log_every_n_steps=10)


GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
/Users/dmnk/PycharmProjects/cdl1-sensor-based/.venv/lib/python3.11/site-packages/lightning/pytorch/trainer/setup.py:187: GPU available but not used. You can set it by doing `Trainer(accelerator='gpu')`.
/Users/dmnk/PycharmProjects/cdl1-sensor-based/.venv/lib/python3.11/site-packages/lightning/pytorch/trainer/connectors/logger_connector/logger_connector.py:75: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `lightning.pytorch` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by default


In [5]:

batch = next(iter(train_dataloader))


In [6]:
batch[0].shape

torch.Size([32, 251, 16])

In [7]:
model = xLSTM(input_size=16,
                         hidden_size=24,
                         output_size=dataset.num_classes,
                         num_layers=3,
                         dropout=0.2,
                         num_blocks=2, optimizer=torch.optim.Adam, scheduler=torch.optim.lr_scheduler.CosineAnnealingLR)

model.training_step(batch, 0)

input_size: 16
hidden_size: 24
num_layers: 3
dropout: 0.2
proj: Linear(in_features=24, out_features=16, bias=True)
input_size: 16
hidden_size: 24
num_layers: 3
dropout: 0.2
proj: Linear(in_features=24, out_features=16, bias=True)
input_seq: <class 'torch.Tensor'>
input_seq: <class 'torch.Tensor'>
output_seq: torch.Size([32, 5])


/Users/dmnk/PycharmProjects/cdl1-sensor-based/.venv/lib/python3.11/site-packages/lightning/pytorch/core/module.py:436: You are trying to `self.log()` but the `self.trainer` reference is not registered on the model yet. This is most likely because the model hasn't been passed to the `Trainer`


tensor(1.9942, grad_fn=<NllLossBackward0>)