In [1]:
import numpy as np
import torch
import torch.nn as nn
import random

In [2]:
torch.cuda.is_available()

True

In [10]:
5//2

2

In [11]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, seq_len_out, device, dropout=0.1):
        super(LSTM, self).__init__()
        # Initializing the model parameters
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.seq_len_out = seq_len_out
        self.device = device
        # Layer 1: LSTM # batch_size first ()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first = True, dropout=dropout)
        # # Layer 2: Fully coneccted (linear) layer
        self.linear = nn.Linear(self.hidden_size, self.seq_len_out)
    def forward(self, input_seq, prints = True):
        # Reshaping the input_seq
        input_seq = input_seq.view(-1, input_seq.shape[1], self.input_size)
        if prints: print("input_seq shape:", input_seq.shape, "->[num_batches, seq_len, num_features]")     
        # LSTM 
        output, (h_state, c_state) = self.lstm(input_seq)
        if prints: print("LSTM: output shape:" , output.shape, "->[num_batches, seq_len, hidden_size]", 
                         "\n " "LSTM: h_state shape:", h_state.shape, 
                          "->[num_layers*num_directions, num_batches, hidden_size]", "\n"
                          "LSTM: c_state shape:", c_state.shape, 
                          "->[num_layers*num_directions, num_batches, hidden_size]")
        # Reshaping to take last tensor as output
        output = output[:, -1, :]
        if prints: print("LSTM Output reshaped:", output.shape, "->[num_batches, hidden_size]")
        # Fully connected layer
        output = self.linear(output)
        if prints: print("FNN: Final outpu shape:", output.shape, "->[num_batches, num_features]")
        print ("type of the LSTM output is: ", type(output))
        return output

In [14]:
class ConvLSTMCell(nn.Module):
    
    def __init__(self, input_dim, hidden_dim, kernel_size, dropout = None):
        
        super(ConvLSTMCell, self).__init__()
        
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.kernel_size = kernel_size

        self.padding = (kernel_size[0] // 2, kernel_size[1]  // 2) # mantaining same padding
        
        
        # stride = 1, bias = True
        self.conv = nn.Conv2d(in_channels = self.input_dim + self.hidden_dim, 
                              out_channels = self.hidden_dim * 4, 
                              kernel_size = self.kernel_size,
                              padding = self.padding,
                              bias = True)
        
        if (dropout == None):
            self.dropout = nn.Dropout(0)
        else:
            self.dropout = nn.Dropout(dropout)
        
        
        
        
    def forward(self, input_tensor, cur_state):
        """
        Args:
                input_tensor (torch.Tensor): input,  shape (batch_size, input_channels, height, width)
                cur_state  (tuple of torch.Tensors): hidden/cell state (batch_size, hidden_channels, height, width)

        """
        h_cur, c_cur = cur_state
        
     
        combined = self.dropout(torch.cat([input_tensor, h_cur], dim = 1)) 
        # concatenate x(t) and h(t-1)
        # applying dropout before convolution
                
        combined_conv = self.conv(combined) # convolutional operation on input and prev_hidden
        
        cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim = 1)
        
        i = torch.sigmoid(cc_i)
        f = torch.sigmoid(cc_f)
        o = torch.sigmoid(cc_o)
        g = torch.tanh(cc_g)
        
        c_next = f * c_cur + i * g
        h_next = o * torch.tanh(c_next)
        
        return h_next, c_next
    
    def init_hidden(self, batch_size, image_size):
        height, width = image_size
        return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),
                torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))
        
    def linear(self, batch_size, num_chls, image_size):
        height, width = image_size
        input_size = height*num_chls*width
        return nn.Linear(input_size, 1)

In [35]:
class ConvLSTMLayer(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers, conv_seq_len_out, 
                batch_first=True, dropout = None):
        super(ConvLSTMLayer, self).__init__()
        self._check_kernel_size_consistency(kernel_size)
        
        kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
        print (kernel_size)
        hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
        
        if not len(kernel_size) == len(hidden_dim) == num_layers:
            raise ValueError('Inconsistent list length.')
            
        self.input_dim =  input_dim
        self.hidden_dim = hidden_dim
        self.kernel_size = kernel_size
        self.num_layers = num_layers
        self.conv_seq_len_out = conv_seq_len_out
        self.batch_first = batch_first
        self.dropout = dropout
        
        cell_list = []
        for i in range(0, self.num_layers):
            cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]
  
            cell_list.append(ConvLSTMCell(input_dim = cur_input_dim,
                                         hidden_dim = self.hidden_dim[i],
                                         kernel_size = self.kernel_size[i], 
                                         dropout = self.dropout))
        self.cell_list = nn.ModuleList(cell_list)
        
    def forward(self, input_tensor, hidden_state = None):
        
        b, seq_len, _, h, w = input_tensor.size()
        
        hidden_state = self._init_hidden(batch_size = b,
                                        image_size = (h, w))
        
        layer_output_list = []
        last_state_list = []
        
        cur_layer_input = input_tensor
        
        for layer_idx in range(0, self.num_layers):
            h, c = hidden_state[layer_idx]
            output_inner = []

            for t in range(0, seq_len):
                h, c = self.cell_list[layer_idx](input_tensor = cur_layer_input[:, t, :, :, :],
                                                cur_state = [h, c])
                output_inner.append(h)
                print ("shape of inner output: ", h.shape)

            layer_output = torch.stack(output_inner, dim = 1)
            print ("shape of LAYER output: ", layer_output.shape)
            cur_layer_input = layer_output
            
            
            #layer_output_list.append(layer_output)
            last_state_list.append([h, c])
            
        #layer_output_list = layer_output_list[:-1]  
        #output_lstm = layer_output_list
        last_time_step_output = layer_output[:, -self.conv_seq_len_out:, :, _, _]
        print ("shape ConvLSTM output: ", last_time_step_output.shape, "[batch_size, seq_len, hidden_size]")
        print ("ConvLSTM output type: ", type(last_time_step_output))
        
            
        return last_time_step_output
            
    def _init_hidden(self, batch_size, image_size):
        init_states = []
        for i in range(self.num_layers):
            init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))
        return init_states
    
    def _linear(self, batch_size, num_chls, image_size):
        output = self.cell_list[-1].linear(batch_size, num_chls, image_size)
        return output
    
    @staticmethod
    def _check_kernel_size_consistency(kernel_size):
        if not (isinstance(kernel_size, tuple) or
                (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
            raise ValueError('`kernel_size` must be tuple or list of tuples')
        
    @staticmethod
    def _extend_for_multilayer(param, num_layers):
        if not isinstance(param, list):
            param = [param] * num_layers
        return param

In [39]:
batch_size = 14                # how many timeserieses to be trained in one iteration
input_size = 9                  # number of features (stations) --> rain, temp, hum, etc. 
hidden_size = 15                # number of hidden neurons
num_layers = 3                  # number of LSTM layers
seq_len_in = 5                # length of the training time series
conv_seq_len_out = seq_len_in
dropout = 0

shape = (65,97)
channels = 1
kernel = (3,3)

torch.manual_seed(14)
random.seed(14)
np.random.seed(14)


if __name__ == '__main__':
    input_conv = torch.randn(batch_size,seq_len_in,channels,shape[0], shape[1])
   
    input_lstm = torch.randn(batch_size,seq_len_in,input_size)
    
    
    
    convlstm = ConvLSTMLayer(input_dim = channels,
                             hidden_dim = hidden_size,
                             kernel_size = kernel, 
                             num_layers = num_layers,
                             conv_seq_len_out= conv_seq_len_out,
                             dropout = dropout)
    
    
    lstm = LSTM(input_size = input_size,
               hidden_size = hidden_size,
               num_layers= num_layers,
               seq_len_out = seq_len_out, 
                device = "cuda")
    
    layer_output_list = convlstm(input_conv)
    
    #print (layer_output_list)
    
    lstm_output = lstm(input_lstm)
    
    

[(3, 3), (3, 3), (3, 3)]
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of LAYER output:  torch.Size([14, 5, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of LAYER output:  torch.Size([14, 5, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of inner output:  torch.Size([14, 15, 65, 97])
shape of LAYER output:  torch.Size([14, 5, 15, 65, 97])
shape ConvLS