In [None]:
# default_exp models.RNNPlus

# RNNPlus

> These are RNN, LSTM and GRU PyTorch implementations created by Ignacio Oguiza - timeseriesAI@gmail.com based on:

In [None]:
#export
from tsai.imports import *
from tsai.utils import *
from tsai.data.core import *
from tsai.models.layers import *

In [None]:
#hide
# example
# rnn = nn.LSTM(10, 20, 2)
# input = torch.randn(5, 3, 10)
# h0 = torch.randn(2, 3, 20)
# c0 = torch.randn(2, 3, 20)
# output, (hn, cn) = rnn(input, (h0, c0))

In [None]:
# #hide
# class _RNNPlus_Base(Module):
#     def __init__(self, c_in, c_out, seq_len=None, hidden_size=100, n_layers=1, bias=True, rnn_dropout=0, bidirectional=False, fc_dropout=0., 
#                  last_step=True, bn=False, custom_head=None, y_range=None, **kwargs):
#         if not last_step: assert seq_len, 'you need to enter a seq_len to use flatten=True'
#         self.rnn = self._cell(c_in, hidden_size, num_layers=n_layers, bias=bias, batch_first=True, dropout=rnn_dropout, bidirectional=bidirectional)
#         self.transpose = Transpose(-1, -2, contiguous=True)
        
#         # Head
#         self.head_nf = hidden_size * (1 + bidirectional)
#         if custom_head: self.head = custom_head(self.head_nf, c_out, seq_len) # custom head must have all required kwargs
#         else: self.head = self.create_head(self.head_nf, c_out, seq_len, last_step=last_step, fc_dropout=fc_dropout, bn=bn, y_range=y_range)

#     def forward(self, x): 
#         x = x.transpose(2,1)                               # [batch_size x n_vars x seq_len] --> [batch_size x seq_len x n_vars]
#         output, _ = self.rnn(x)                            # [batch_size x seq_len x hidden_size * (1 + bidirectional)]
#         output = self.transpose(output)                    # [batch_size x hidden_size * (1 + bidirectional) x seq_len]
#         return self.head(output)
    
#     def create_head(self, nf, c_out, seq_len, last_step=True, fc_dropout=0., bn=False, y_range=None):
#         if last_step: 
#             layers = [LastStep()]
#         else: 
#             layers = [Flatten()]
#             nf *= seq_len
#         layers += [LinBnDrop(nf, c_out, bn=bn, p=fc_dropout)]
#         if y_range: layers += [SigmoidRange(*y_range)]
#         return nn.Sequential(*layers)

    
# class RNNPlus(_RNNPlus_Base):
#     _cell = nn.RNN
    
# class LSTMPlus(_RNNPlus_Base):
#     _cell = nn.LSTM
    
# class GRUPlus(_RNNPlus_Base):
#     _cell = nn.GRU

In [None]:
#export
class _RNNPlus_Base(Module):
    def __init__(self, c_in, c_out, seq_len=None, hidden_size=100, n_layers=1, bias=True, rnn_dropout=0, bidirectional=False, fc_dropout=0., 
                 last_step=True, bn=False, custom_head=None, y_range=None, **kwargs):
        
        if not last_step: assert seq_len, 'you need to enter a seq_len to use flatten=True'
        
        # Backbone
        self.backbone = _RNN_Backbone(self._cell, c_in, c_out, seq_len=seq_len, hidden_size=hidden_size, n_layers=n_layers, bias=bias, 
                                      rnn_dropout=rnn_dropout,  bidirectional=bidirectional)
        
        # Head
        self.head_nf = hidden_size * (1 + bidirectional)
        if custom_head: self.head = custom_head(self.head_nf, c_out, seq_len) # custom head must have all required kwargs
        else: self.head = self.create_head(self.head_nf, c_out, seq_len, last_step=last_step, fc_dropout=fc_dropout, bn=bn, y_range=y_range)

    def forward(self, x): 
        output = self.backbone(x)        # [batch_size x n_vars x seq_len] --> [batch_size x hidden_size * (1 + bidirectional) x seq_len]
        return self.head(output)
    
    def create_head(self, nf, c_out, seq_len, last_step=True, fc_dropout=0., bn=False, y_range=None):
        if last_step: 
            layers = [LastStep()]
        else: 
            layers = [Flatten()]
            nf *= seq_len
        layers += [LinBnDrop(nf, c_out, bn=bn, p=fc_dropout)]
        if y_range: layers += [SigmoidRange(*y_range)]
        return nn.Sequential(*layers)

    
class _RNN_Backbone(Module):
    def __init__(self, cell, c_in, c_out, seq_len=None, hidden_size=100, n_layers=1, bias=True, rnn_dropout=0, bidirectional=False):
        self.rnn = cell(c_in, hidden_size, num_layers=n_layers, bias=bias, batch_first=True, dropout=rnn_dropout, bidirectional=bidirectional)
        self.transpose = Transpose(-1, -2, contiguous=True)
        
    def forward(self, x): 
        x = x.transpose(2,1)                               # [batch_size x n_vars x seq_len] --> [batch_size x seq_len x n_vars]
        output, _ = self.rnn(x)                            # [batch_size x seq_len x hidden_size * (1 + bidirectional)]
        output = self.transpose(output)                    # [batch_size x hidden_size * (1 + bidirectional) x seq_len]
        return output

class RNNPlus(_RNNPlus_Base):
    _cell = nn.RNN
    
class LSTMPlus(_RNNPlus_Base):
    _cell = nn.LSTM
    
class GRUPlus(_RNNPlus_Base):
    _cell = nn.GRU

In [None]:
bs = 16
c_in = 3
seq_len = 12
c_out = 2
xb = torch.rand(bs, c_in, seq_len)
test_eq(RNNPlus(c_in, c_out)(xb).shape, [bs, c_out])
test_eq(RNNPlus(c_in, c_out, hidden_size=100, n_layers=2, bias=True, rnn_dropout=0.2, bidirectional=True, fc_dropout=0.5)(xb).shape, [bs, c_out])
test_eq(LSTMPlus(c_in, c_out, hidden_size=100, n_layers=2, bias=True, rnn_dropout=0.2, bidirectional=True, fc_dropout=0.5)(xb).shape, [bs, c_out])
test_eq(GRUPlus(c_in, c_out, hidden_size=100, n_layers=2, bias=True, rnn_dropout=0.2, bidirectional=True, fc_dropout=0.5)(xb).shape, [bs, c_out])
test_eq(RNNPlus(c_in, c_out, seq_len, flatten=True)(xb).shape, [bs, c_out])
test_eq(RNNPlus(c_in, c_out, seq_len, flatten=True)(xb).shape, [bs, c_out])
test_eq(RNNPlus(c_in, c_out, seq_len, hidden_size=100, n_layers=2, bias=True, rnn_dropout=0.2, bidirectional=True, fc_dropout=0.5, flatten=True)(xb).shape, 
        [bs, c_out])
test_eq(LSTMPlus(c_in, c_out, seq_len, flatten=True)(xb).shape, [bs, c_out])
test_eq(GRUPlus(c_in, c_out, seq_len, last_step=False, flatten=True)(xb).shape, [bs, c_out])

In [None]:
bs = 16
c_in = 3
seq_len = 12
c_out = 2
xb = torch.rand(bs, c_in, seq_len)
custom_head = partial(create_mlp_head, fc_dropout=0.5)
test_eq(LSTMPlus(c_in, c_out, seq_len, flatten=True, custom_head=custom_head)(xb).shape, [bs, c_out])
custom_head = partial(create_pool_head, concat_pool=True, fc_dropout=0.5)
test_eq(LSTMPlus(c_in, c_out, seq_len, last_step=False, flatten=False, custom_head=custom_head)(xb).shape, [bs, c_out])
custom_head = partial(create_pool_plus_head, fc_dropout=0.5)
test_eq(LSTMPlus(c_in, c_out, seq_len, last_step=False, flatten=False, custom_head=custom_head)(xb).shape, [bs, c_out])
custom_head = partial(create_conv_head)
test_eq(LSTMPlus(c_in, c_out, seq_len, last_step=False, flatten=False, custom_head=custom_head)(xb).shape, [bs, c_out])

In [None]:
from tsai.data.all import *
from tsai.models.utils import *
dsid = 'NATOPS' 
bs = 16
X, y, splits = get_UCR_data(dsid, return_split=False)
tfms  = [None, [Categorize()]]
dls = get_ts_dls(X, y, tfms=tfms, splits=splits, bs=bs)

In [None]:
model = build_ts_model(LSTMPlus, dls=dls)
print(model[-1])
learn = Learner(dls, model,  metrics=accuracy)
learn.fit_one_cycle(1, 3e-3)

Sequential(
  (0): LastStep()
  (1): LinBnDrop(
    (0): Linear(in_features=100, out_features=6, bias=True)
  )
)


epoch,train_loss,valid_loss,accuracy,time
0,1.816546,1.795204,0.166667,00:01


In [None]:
model = LSTMPlus(dls.vars, dls.c, dls.len, last_step=False, flatten=True)
learn = Learner(dls, model,  metrics=accuracy)
learn.fit_one_cycle(1, 3e-3)

epoch,train_loss,valid_loss,accuracy,time
0,1.313294,0.804326,0.722222,00:01


In [None]:
custom_head = partial(create_pool_head, concat_pool=True)
model = LSTMPlus(dls.vars, dls.c, dls.len, last_step=False, flatten=False, custom_head=custom_head)
learn = Learner(dls, model,  metrics=accuracy)
learn.fit_one_cycle(1, 3e-3)

epoch,train_loss,valid_loss,accuracy,time
0,1.727251,1.642483,0.527778,00:01


In [None]:
custom_head = partial(create_pool_plus_head, concat_pool=True)
model = LSTMPlus(dls.vars, dls.c, dls.len, last_step=False, flatten=False, custom_head=custom_head)
learn = Learner(dls, model,  metrics=accuracy)
learn.fit_one_cycle(1, 3e-3)

epoch,train_loss,valid_loss,accuracy,time
0,1.230085,1.586665,0.588889,00:02


In [None]:
m = RNNPlus(c_in, c_out, seq_len, hidden_size=100,n_layers=2,bidirectional=True,rnn_dropout=.5,fc_dropout=.5, flatten=True)
print(m)
print(total_params(m))
m(xb).shape

RNNPlus(
  (backbone): _RNN_Backbone(
    (rnn): RNN(3, 100, num_layers=2, batch_first=True, dropout=0.5, bidirectional=True)
    (transpose): Transpose(dims=-1, -2).contiguous()
  )
  (head): Sequential(
    (0): LastStep()
    (1): LinBnDrop(
      (0): Dropout(p=0.5, inplace=False)
      (1): Linear(in_features=200, out_features=2, bias=True)
    )
  )
)
(81802, True)


torch.Size([16, 2])

In [None]:
m = LSTMPlus(c_in, c_out, seq_len, hidden_size=100,n_layers=2,bidirectional=True,rnn_dropout=.5,fc_dropout=.5, flatten=True)
print(m)
print(total_params(m))
m(xb).shape

LSTMPlus(
  (backbone): _RNN_Backbone(
    (rnn): LSTM(3, 100, num_layers=2, batch_first=True, dropout=0.5, bidirectional=True)
    (transpose): Transpose(dims=-1, -2).contiguous()
  )
  (head): Sequential(
    (0): LastStep()
    (1): LinBnDrop(
      (0): Dropout(p=0.5, inplace=False)
      (1): Linear(in_features=200, out_features=2, bias=True)
    )
  )
)
(326002, True)


torch.Size([16, 2])

In [None]:
m = GRUPlus(c_in, c_out, seq_len, hidden_size=100,n_layers=2,bidirectional=True,rnn_dropout=.5,fc_dropout=.5, flatten=True)
print(m)
print(total_params(m))
m(xb).shape

GRUPlus(
  (backbone): _RNN_Backbone(
    (rnn): GRU(3, 100, num_layers=2, batch_first=True, dropout=0.5, bidirectional=True)
    (transpose): Transpose(dims=-1, -2).contiguous()
  )
  (head): Sequential(
    (0): LastStep()
    (1): LinBnDrop(
      (0): Dropout(p=0.5, inplace=False)
      (1): Linear(in_features=200, out_features=2, bias=True)
    )
  )
)
(244602, True)


torch.Size([16, 2])

In [None]:
#hide
out = create_scripts()
beep(out)