In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Input, Softmax, RNN, Dense, Embedding, LSTM, Flatten
import tensorflow.keras.backend as K

In [2]:
class EncoderAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(EncoderAttention, self).__init__(**kwargs)
    
    def build(self, input_shape):
        self.W_e = self.add_weight(name = "attention_weight1", shape = (input_shape[1], input_shape[1]), initializer = "normal")
        self.V_e = self.add_weight(name = "attention_weight2", shape = (1, input_shape[1]), initializer = "normal")
        super(EncoderAttention, self).build(input_shape)
    
    def call(self, x):
        
        output_1 = tf.tanh(tf.matmul(self.W_e, x))
        output_2 = tf.matmul(self.V_e, output_1)
        output_3 = tf.squeeze(output_2)
        attention_weights = tf.nn.softmax(output_3, axis = 1)
        return attention_weights

In [3]:
attention_input = tf.random.uniform(shape = [16, 74, 5])
attention_1 = EncoderAttention()
attention_weights = attention_1(attention_input)

In [4]:
class Encoder(tf.keras.Model):
    
    def __init__(self, input_size, enc_lstm_hid_size, time_steps, return_encoded = False, **kwargs):
        
        super(Encoder, self).__init__(**kwargs)
        
        self.input_size = input_size
        self.hid_units = enc_lstm_hid_size
        self.time_steps = time_steps
        self.return_encoded = return_encoded
        
        self.lstm = LSTM(self.hid_units, return_state = True, return_sequences = True, name = "encoder_lstm")
        self.att_layer = EncoderAttention(name = "encoder_attention")
    
    def call(self, input_data):
        
        input_weighted = tf.TensorArray(tf.float32, size = input_data.shape[1], name = "weighted_input")
        input_encoded = tf.TensorArray(tf.float32, size = input_data.shape[1], name = "input_encoded")
        
        batch_size = input_data.shape[0]
        hidden_state = tf.zeros(shape = [batch_size, self.hid_units])
        cell_state = tf.zeros(shape = [batch_size, self.hid_units])
        
        for t in range(self.time_steps):
            x = tf.concat(values = [tf.repeat(tf.expand_dims(hidden_state, axis = 2), repeats = self.input_size, axis = 2),
                          tf.repeat(tf.expand_dims(cell_state, axis = 2), repeats = self.input_size, axis = 2),
                          input_data], axis = 1)
            
            x = self.att_layer(x)
            att_weights = K.softmax(x, axis = 1)
            weighted_input = tf.math.multiply(att_weights, input_data[:, t, :])
            lstm_input = tf.expand_dims(weighted_input, axis = 1)
            
            #https://stackoverflow.com/questions/42415909/initializing-lstm-hidden-state-tensorflow-keras
            lstm_output, hidden_state, cell_state = self.lstm(lstm_input, initial_state = [hidden_state, cell_state])
            input_weighted = input_weighted.write(t, weighted_input)
            input_encoded = input_encoded.write(t, hidden_state)
        
        
        input_weighted = tf.transpose(input_weighted.stack(), perm = [1, 0, 2])
        input_encoded = tf.transpose(input_encoded.stack(), perm = [1, 0, 2])
        
        if self.return_encoded:
            return input_weighted, input_encoded
        else:
            return input_weighted
        

In [9]:
input_size = 5
time_steps = 10
hidden_size = 32
batch_size = 16
input_data = tf.random.uniform(shape = [batch_size, time_steps, input_size])
encoder_1 = Encoder(input_size, hidden_size, time_steps, return_encoded = True)
input_weighted, input_encoded = encoder_1(input_data)

In [12]:
class DecoderAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(DecoderAttention, self).__init__(**kwargs)
    
    def build(self, input_shape):
        self.W_e = self.add_weight(name = "decoder_attention_weight1", shape = (input_shape[1], input_shape[1]), initializer = "normal")
        self.V_e = self.add_weight(name = "decoder_attention_weight2", shape = (1, input_shape[1]), initializer = "normal")
        super(DecoderAttention, self).build(input_shape)
    
    def call(self, x):
        
        output_1 = tf.tanh(tf.matmul(self.W_e, x))
        output_2 = tf.matmul(self.V_e, output_1)
        output_3 = tf.squeeze(output_2)
        attention_weights = tf.nn.softmax(output_3, axis = 1)
        return attention_weights

In [111]:
class Decoder(tf.keras.Model):
    
    def __init__(self, encoder_hidden_size, decoder_hidden_size, time_steps, **Kwargs):
        
        super(Decoder, self).__init__(**Kwargs)
        self.time_steps = time_steps
        self.encoder_hidden_size = encoder_hidden_size
        self.decoder_hidden_size = decoder_hidden_size
        
        self.lstm = LSTM(self.decoder_hidden_size, return_state = True, return_sequences = True, name = "decoder_lstm")
        self.att_layer = DecoderAttention(name = "decoder_attention")
        self.dense_1 = Dense(1, name = "decoder_dense_1")
        self.dense_2 = Dense(1, name = "decoder_dense_2")
    
    def call(self, input_encoded, y_history):
        
        batch_size = input_encoded.shape[0]
        input_encoded_size = input_encoded.shape[2]
        hidden_state = tf.zeros(shape = [batch_size, self.decoder_hidden_size])
        cell_state = tf.zeros(shape = [batch_size, self.decoder_hidden_size])
        
        
        for time in range(self.time_steps):
            
            x = tf.concat(values = [tf.repeat(tf.expand_dims(hidden_state, axis = 2), repeats = self.time_steps, axis = 2),
                          tf.repeat(tf.expand_dims(cell_state, axis = 2), repeats = self.time_steps, axis = 2),
                          tf.transpose(input_encoded, perm = [0, 2, 1])], axis = 1)
            #print(x.shape)
            
            x = self.att_layer(x)
            att_weights = K.softmax(x, axis = 1)
            #print(att_weights.shape)
            #print(np.sum(att_weights[1, :]))
            
            att_weights = tf.expand_dims(att_weights, axis = 2)
            weighted_encoded = tf.math.multiply(att_weights, input_encoded)
            #print(weighted_encoded.shape)
            
            context_vector = K.sum(weighted_encoded, axis = 1)
            #print(context_vector.shape)
            
            if(time < self.time_steps - 1):
                y_tilde = self.dense_1(tf.concat(values = [tf.expand_dims(tf.expand_dims(y_history[:, time], axis = 1), axis = 1),
                                               tf.expand_dims(context_vector, axis = 1)], axis = 2))
                #print(y_tilde.shape)
                lstm_output, hidden_state, cell_state = self.lstm(y_tilde, initial_state = [hidden_state, cell_state])
                #print(lstm_output.shape)
                #print(hidden_state.shape)
                #print(cell_state.shape)
            
        concat_final_input = tf.concat(values = [hidden_state, context_vector], axis = 1)
        #print(tf.expand_dims(concat_final_input, axis = 1).shape)
        y_pred = self.dense_2(concat_final_input)
        print(y_pred.shape)
            
        

In [89]:
print(y_history[:, 0].shape)

(16,)


In [112]:
#print(input_encoded.shape)
time_steps = 10

encoder_hidden_size = 32
decoder_hidden_size = 16

batch_size = 16
y_history = tf.random.uniform(shape = [batch_size, time_steps])
decoder_1 = Decoder(encoder_hidden_size, decoder_hidden_size, time_steps)
decoder_1(input_encoded, y_history)

(16, 1)
