In [None]:
#|default_exp models.TransformerRNNPlus

# TransformerRNNPlus

These is a Pytorch implementation of a Transformer + RNN created by Ignacio Oguiza - oguiza@timeseriesAI.co inspired by the code created by Baurzhan Urazalinov (https://www.kaggle.com/baurzhanurazalinov).

Baurzhan Urazalinov won a Kaggle competition (Parkinson's Freezing of Gait Prediction: Event detection from wearable sensor data - 2023) using the following original tensorflow code:

* https://www.kaggle.com/code/baurzhanurazalinov/parkinson-s-freezing-defog-training-code
* https://www.kaggle.com/code/baurzhanurazalinov/parkinson-s-freezing-tdcsfog-training-code
* https://www.kaggle.com/code/baurzhanurazalinov/parkinson-s-freezing-submission-code

I'd like to congratulate Baurzhan for winning this competition, and for sharing the code he used.

In [None]:
#|export
import torch
from torch import nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from collections import OrderedDict
from tsai.models.layers import lin_nd_head

In [None]:
from tsai.models.utils import count_parameters

In [None]:
t = torch.rand(4, 864, 54)
encoder_layer = torch.nn.TransformerEncoderLayer(54, 6, dim_feedforward=2048, dropout=0.1, 
                                                 activation="relu", layer_norm_eps=1e-05, 
                                                 batch_first=True, norm_first=False)
print(encoder_layer(t).shape)
print(count_parameters(encoder_layer))

torch.Size([4, 864, 54])
235382


In [None]:
#|export
class _TransformerRNNEncoder(nn.Module):
    def __init__(self, 
        cell:nn.Module, # A RNN cell instance.
        c_in:int, # Number of channels in the input tensor.
        seq_len:int, # Number of time steps in the input tensor.
        d_model:int, # The number of expected features in the input.
        nhead:int, # Number of parallel attention heads (d_model will be split across nhead - each head will have dimension d_model // nhead).
        proj_dropout:float=0.1, # Dropout probability after the projection linear layer. Default: 0.1.
        num_encoder_layers:int=1, # Number of transformer layers in the encoder. Default: 1.
        dim_feedforward:int=2048, # The dimension of the feedforward network model. Default: 2048.
        dropout:float=0.1, # Transformer encoder layers dropout. Default: 0.1.
        num_rnn_layers:int=1, # Number of RNN layers in the encoder. Default: 1.
        bidirectional:bool=True, # If True, becomes a bidirectional RNN. Default: True.
        ):
        super().__init__()

        # projection layer
        self.proj_linear = nn.Linear(c_in, d_model)
        self.proj_dropout = nn.Dropout(proj_dropout)

        # transformer encoder layers
        self.num_encoder_layers = num_encoder_layers
        dim_feedforward = dim_feedforward or d_model
        self.enc_layers = nn.ModuleList([TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True, ) for _ in range(num_encoder_layers)])

        # rnn layers
        self.num_rnn_layers = num_rnn_layers
        self.rnn_layers = nn.ModuleList([cell(d_model * (1 + bidirectional) ** i, d_model * (1 + bidirectional) ** i, bidirectional=bidirectional) for i in range(num_rnn_layers)])
        self.seq_len = seq_len
        self.pos_encoding = nn.Parameter(torch.randn(1, self.seq_len, d_model) * 0.02)

    def forward(self, x): # (batch_size, c_in, seq_len), Example shape (4, 54, 864) 
        
        x = x.swapaxes(1, 2) # (batch_size, seq_len, c_in), Example shape (4, 864, 54)
        batch_size = x.shape[0]

        # projection layer
        x = self.proj_linear(x) # (batch_size, seq_len, d_model), Example shape (4, 864, 320)
        x = x + self.pos_encoding.repeat(batch_size, 1, 1)
        x = self.proj_dropout(x)

        # transformer encoder layers
        for i in range(self.num_encoder_layers): 
            x = self.enc_layers[i](x.transpose(0, 1)).transpose(0, 1) # (batch_size, seq_len, d_model), Example shape (4, 864, 320)
        
        # rnn layers
        for i in range(self.num_rnn_layers): 
            x, _ = self.rnn_layers[i](x) # (batch_size, seq_len, CFG['fog_model_dim']*2), Example shape (4, 864, 640)
        
        x = x.swapaxes(1, 2)
        return x

In [None]:
bs = 4
c_in = 5
seq_len = 50

encoder = _TransformerRNNEncoder(nn.LSTM, c_in=c_in, seq_len=seq_len, d_model=128, nhead=4, num_encoder_layers=1, dim_feedforward=None, proj_dropout=0.1, dropout=0.1, num_rnn_layers=3, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
print(encoder(t).shape)

torch.Size([4, 1024, 50])


In [None]:
#|export
class _TransformerRNNPlus(nn.Sequential):
    def __init__(self, 
        c_in:int, # Number of channels in the input tensor.
        c_out:int, # Number of output channels.
        seq_len:int, # Number of time steps in the input tensor.
        d:tuple=None, # int or tuple with shape of the output tensor
        d_model:int=128, # Total dimension of the model.
        nhead:int=16, # Number of parallel attention heads (d_model will be split across nhead - each head will have dimension d_model // nhead).
        proj_dropout:float=0.1, # Dropout probability after the first linear layer. Default: 0.1.
        num_encoder_layers:int=1, # Number of transformer encoder layers. Default: 1.
        dim_feedforward:int=2048, # The dimension of the feedforward network model. Default: 2048.
        dropout:float=0.1, # Transformer encoder layers dropout. Default: 0.1.
        num_rnn_layers:int=1, # Number of RNN layers in the encoder. Default: 1.
        bidirectional:bool=True, # If True, becomes a bidirectional RNN. Default: True.
        custom_head=None, # Custom head that will be applied to the model. If None, a head with `c_out` outputs will be used. Default: None.
        **kwargs
        ):

        backbone = _TransformerRNNEncoder(cell=self._cell, c_in=c_in, seq_len=seq_len, proj_dropout=proj_dropout, 
                                          d_model=d_model, nhead=nhead, num_encoder_layers=num_encoder_layers, dim_feedforward=dim_feedforward, dropout=dropout, 
                                          num_rnn_layers=num_rnn_layers, bidirectional=bidirectional)
        self.head_nf = d_model * ((1 + bidirectional) ** (num_rnn_layers))
        if custom_head:
            if isinstance(custom_head, nn.Module): head = custom_head
            else: head = custom_head(self.head_nf, c_out, seq_len, d=d, **kwargs)
        else:
            head = lin_nd_head(self.head_nf, c_out, seq_len=seq_len, d=d)
        super().__init__(OrderedDict([('backbone', backbone), ('head', head)]))

class TransformerRNNPlus(_TransformerRNNPlus):
    _cell = nn.RNN

class TransformerLSTMPlus(_TransformerRNNPlus):
    _cell = nn.LSTM

class TransformerGRUPlus(_TransformerRNNPlus):
    _cell = nn.GRU

In [None]:
bs = 4
c_in = 5
c_out = 1
seq_len = 50
d = None

model = TransformerRNNPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == torch.Size([4]) 
print(model(t).shape)

model = TransformerLSTMPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == torch.Size([4])
print(model(t).shape)

model = TransformerGRUPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == torch.Size([4])
print(model(t).shape)

torch.Size([4])
torch.Size([4])
torch.Size([4])


In [None]:
bs = 4
c_in = 5
c_out = 3
seq_len = 50
d = None

model = TransformerRNNPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == (bs, c_out)
print(model(t).shape)

model = TransformerLSTMPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == (bs, c_out)
print(model(t).shape)

model = TransformerGRUPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == (bs, c_out)
print(model(t).shape)

torch.Size([4, 3])
torch.Size([4, 3])
torch.Size([4, 3])


In [None]:
bs = 4
c_in = 5
c_out = 3
seq_len = 50
d = 50

model = TransformerRNNPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == (bs, d, c_out)
print(model(t).shape)

model = TransformerLSTMPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == (bs, d, c_out)
print(model(t).shape)

model = TransformerGRUPlus(c_in=c_in, c_out=c_out, seq_len=seq_len, d=d, proj_dropout=0.1, d_model=128, nhead=4, num_encoder_layers=2, dropout=0.1, num_rnn_layers=1, bidirectional=True)
t = torch.randn(bs, c_in, seq_len)
assert model(t).shape == (bs, d, c_out)
print(model(t).shape)

torch.Size([4, 50, 3])
torch.Size([4, 50, 3])
torch.Size([4, 50, 3])


## Export -

In [None]:
#|eval: false
#|hide
from tsai.export import get_nb_name; nb_name = get_nb_name(locals())
from tsai.imports import create_scripts; create_scripts(nb_name)

<IPython.core.display.Javascript object>

/Users/nacho/notebooks/tsai/nbs/078_models.TransformerRNNPlus.ipynb couldn't be saved automatically. You should save it manually 👋
Correct notebook to script conversion! 😃
Saturday 17/06/23 12:20:57 CEST
