In [None]:
# Imports

import tensorflow as tf
from tensorflow.keras import datasets, models, layers, backend
from sklearn.model_selection import train_test_split
import wandb
from wandb.keras import WandbCallback
from matplotlib import pyplot
import os, time
import numpy as np
import pandas as pd
from IPython.display import HTML as html_print, display, clear_output

In [None]:
# Setting up input flows

# Change the dataset path to the directory where you have stored the three train, test and validation files

dataset_path = "../../dakshina_dataset_v1.0/ta/lexicons"
train_path = dataset_path + "/ta.translit.sampled.train.tsv"
test_path = dataset_path + "/ta.translit.sampled.test.tsv"
val_path = dataset_path + "/ta.translit.sampled.dev.tsv"

train_data = ''.join([each.decode('utf-8') for each in open(train_path, 'rb')]).split()
y_train, X_train, z_train = train_data[::3], train_data[1::3], [int(each) for each in train_data[2::3]]

test_data = ''.join([each.decode('utf-8') for each in open(test_path, 'rb')]).split()
y_test, X_test, z_test = test_data[::3], test_data[1::3], [int(each) for each in test_data[2::3]]

val_data = ''.join([each.decode('utf-8') for each in open(val_path, 'rb')]).split()
y_val, X_val, z_val = val_data[::3], val_data[1::3], [int(each) for each in val_data[2::3]]

In [None]:
# Setting up the vocabulary

input_vocab, output_vocab = set([" "]), set([" ", "\t", "\n"])

max_input_len, max_output_len = 0, 0

for each in X_train + X_test + X_val:
    for every in each:
        input_vocab.add(every)
    max_input_len = max(max_input_len, len(each))

for each in y_train + y_test + y_val:
    for every in each:
        output_vocab.add(every)
    max_output_len = max(max_output_len, len(each))

input_vocab = sorted(list(input_vocab))
output_vocab = sorted(list(output_vocab))
input_v_len = len(input_vocab)
output_v_len = len(output_vocab)
max_output_len += 2

input_inv = dict([(char, i) for i, char in enumerate(input_vocab)])
output_inv = dict([(char, i) for i, char in enumerate(output_vocab)])

reverse_inp = dict((i, char) for char, i in input_inv.items())
reverse_out = dict((i, char) for char, i in output_inv.items())

In [None]:
# Converting the data to a one hot representation

def onehot(X, y):
    
    encoder_input_data = np.zeros((len(X), max_input_len, input_v_len), dtype="float32")
    decoder_input_data = np.zeros((len(X), max_output_len, output_v_len), dtype="float32")
    decoder_target_data = np.zeros((len(X), max_output_len, output_v_len), dtype="float32")

    for i, (a, b) in enumerate(zip(X, y)):
        for t, char in enumerate(a):
            encoder_input_data[i, t, input_inv[char]] = 1.0
        encoder_input_data[i, t + 1 :, input_inv[" "]] = 1.0
        for t, char in enumerate("\t" + b + "\n"):
            decoder_input_data[i, t, output_inv[char]] = 1.0
            if t > 0:
                decoder_target_data[i, t - 1, output_inv[char]] = 1.0
        decoder_input_data[i, t + 1 :, output_inv[" "]] = 1.0
        decoder_target_data[i, t:, output_inv[" "]] = 1.0
    
    return encoder_input_data, decoder_input_data, decoder_target_data

In [None]:
# Preprocessing the data for RNNs with embedding layer

def onehot_embed(X, y):
    
    encoder_input_data = np.zeros((len(X), max_input_len), dtype="float32")
    decoder_input_data = np.zeros((len(X), max_output_len), dtype="float32")
    decoder_target_data = np.zeros((len(X), max_output_len, output_v_len), dtype="float32")

    for i, (a, b) in enumerate(zip(X, y)):
        for t, char in enumerate(a):
            encoder_input_data[i, t] = input_inv[char]
        encoder_input_data[i, t + 1 :] = input_inv[" "]
        for t, char in enumerate("\t" + b + "\n"):
            decoder_input_data[i, t] = output_inv[char]
            if t > 0:
                decoder_target_data[i, t - 1, output_inv[char]] = 1.0
        decoder_input_data[i, t + 1 :] = output_inv[" "]
        decoder_target_data[i, t:, output_inv[" "]] = 1.0
    
    return encoder_input_data, decoder_input_data, decoder_target_data

In [None]:
# Building the RNN model

def myRNN(latent_dim, num_encoders = 1, num_decoders = 1, embed_dim = None, dropout = 0.0, cell_type = 'GRU'):
    
    if(embed_dim == None):
        encoder_inputs = tf.keras.Input(shape=(None, input_v_len))
        encoder_inp = encoder_inputs
    else:
        encoder_inputs = tf.keras.Input(shape=(None,))
        encoder_inp = layers.Embedding(input_dim=input_v_len, output_dim=embed_dim)(encoder_inputs)
    
    for ii in range(num_encoders):
        if cell_type == 'LSTM':
            encoder = layers.LSTM(latent_dim, return_state=True, return_sequences=True, dropout = dropout)
            encoder_inp, state_h, state_c = encoder(encoder_inp)
            encoder_states = [state_h, state_c]
        else:
            if cell_type == 'RNN':
                encoder = layers.SimpleRNN(latent_dim, return_state=True, return_sequences=True, dropout = dropout)
            else:
                encoder = layers.GRU(latent_dim, return_state=True, return_sequences=True, dropout = dropout)
            encoder_inp, state_h = encoder(encoder_inp)
            encoder_states = [state_h]
    
    if(embed_dim == None):
        decoder_inputs = tf.keras.Input(shape=(None, output_v_len))
        decoder_out = decoder_inputs
    else:
        decoder_inputs = tf.keras.Input(shape=(None,))
        decoder_out = layers.Embedding(input_dim=output_v_len, output_dim=embed_dim)(decoder_inputs)
    
    for ii in range(num_decoders):
        if cell_type == 'LSTM':
            decoder = layers.LSTM(latent_dim, return_sequences=True, return_state=True, dropout = dropout)
            decoder_out, _, _ = decoder(decoder_out, initial_state=encoder_states)
        else:
            if cell_type == 'RNN':
                decoder = layers.SimpleRNN(latent_dim, return_sequences=True, return_state=True, dropout = dropout)
            else:
                decoder = layers.GRU(latent_dim, return_sequences=True, return_state=True, dropout = dropout)
            decoder_out, _ = decoder(decoder_out, initial_state=encoder_states)
    
    decoder_outputs = decoder_out
    decoder_dense = layers.Dense(output_v_len, activation="softmax")
    decoder_outputs = decoder_dense(decoder_outputs)
    
    model = tf.keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
    
    return model

In [None]:
# Getting the encoder and decoder models

def decompose(model, cell_type, latent_dim, num_encoders, num_decoders, embed_dim):

    offset = 0 if embed_dim == None else 2

    encoder_inputs = model.input[0]
    _, *encoder_states = model.layers[num_encoders + 1 + offset].output    
    encoder_model = tf.keras.Model(encoder_inputs, encoder_states)

    decoder_inputs = model.input[1]  # input_2
    if embed_dim == None:
        decoder_outputs = decoder_inputs
    else:
        decoder_outputs = model.layers[num_encoders + offset](decoder_inputs)

    decoder_state_inputs = []
    decoder_state_outputs = []

    for ii in range(num_decoders):
        if cell_type == 'LSTM':
            temp_inputs = [tf.keras.Input(shape=(latent_dim,), name = 'decoder_0_' + str(ii)), tf.keras.Input(shape=(latent_dim,), name = 'decoder_1_' + str(ii))]
        else:
            temp_inputs = [tf.keras.Input(shape=(latent_dim,), name = 'decoder_' + str(ii))]
        decoder_state_inputs += temp_inputs

        decoder = model.layers[num_encoders + 2 + ii + offset]
        decoder_outputs, *temp_states = decoder(decoder_outputs, initial_state=temp_inputs)
        decoder_state_outputs += temp_states

    decoder_dense = model.layers[num_encoders + num_decoders + 2 + offset]
    decoder_outputs = decoder_dense(decoder_outputs)

    decoder_model = tf.keras.Model([decoder_inputs] + decoder_state_inputs, [decoder_outputs] + decoder_state_outputs)
    
    return encoder_model, decoder_model

In [None]:
# Decode the sequence

def decode_sequence(input_seq, encoder_model, decoder_model, num_decoders, embed_dim):
    
    enc_states = [encoder_model.predict(input_seq)] * num_decoders

    if(embed_dim == None):
        target_seq = np.zeros((1, 1, output_v_len))
        target_seq[0, 0, output_inv["\t"]] = 1.0
    else:
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = output_inv["\t"]

    stop_condition = False
    final_ans = ""
    while not stop_condition:
        output_chars, *h = decoder_model.predict([target_seq] + enc_states)
        enc_states = h
        
        sampled_char_index = np.argmax(output_chars[0, -1, :])
        sampled_char = reverse_out[sampled_char_index]
        final_ans += sampled_char

        if sampled_char == "\n" or len(final_ans) > max_output_len:
            stop_condition = True

        if(embed_dim == None):
            target_seq = np.zeros((1, 1, output_v_len))
            target_seq[0, 0, sampled_char_index] = 1.0
        else:
            target_seq = np.zeros((1, 1))
            target_seq[0, 0] = sampled_char_index
    
    return final_ans[:-1]

def get_acc(input_seq, test_seq, encoder_model, decoder_model, num_decoders, embed_dim):
    n = len(input_seq)
    
    enc_states = [encoder_model.predict(input_seq)] * num_decoders
    
    if(embed_dim == None):
        target_seq = np.zeros((n, 1, output_v_len))
        target_seq[:, 0, output_inv["\t"]] = 1.0
    else:
        target_seq = np.zeros((n, 1))
        target_seq[:, 0] = output_inv["\t"]
    
    final_ans = ["" for _ in range(n)]
    for _ in range(max_output_len):
        output_chars, *h = decoder_model.predict([target_seq] + enc_states)
        enc_states = h
        
        sampled_char_index = np.argmax(output_chars[:, -1, :], axis=1)
        sampled_char = [reverse_out[sampled_char_index[ii]] for ii in range(n)]
        final_ans = [ x + y for x, y in zip(final_ans, sampled_char)]
        
        if(embed_dim == None):
            target_seq = np.zeros((n, 1, output_v_len))
            target_seq[range(n), 0, sampled_char_index] = 1.0
        else:
            target_seq = np.zeros((n, 1))
            target_seq[:, 0] = sampled_char_index
    
    final_ans = [x.split('\n')[0] for x in final_ans]
    
    final_acc = sum([x == y for x, y in zip(final_ans, test_seq)]) / n
    return final_ans, final_acc

In [None]:
def train(model, batch_size=64, epochs=25, embed_dim=None, wandb_cb = True):
    model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])
    if embed_dim == None:
        aa, bb, cc = onehot(X_train, y_train)
        dd, ee, ff = onehot(X_val, y_val)
    else:
        aa, bb, cc = onehot_embed(X_train, y_train)
        dd, ee, ff = onehot_embed(X_val, y_val)
        
    if wandb_cb:   
        model.fit([aa, bb], cc, batch_size=batch_size, epochs=epochs, validation_data = ([dd, ee], ff), callbacks=[WandbCallback()])
    else:
        model.fit([aa, bb], cc, batch_size=batch_size, epochs=epochs, validation_data = ([dd, ee], ff))

In [None]:
sweep_config = {
    'method': 'bayes',
    'metric': {
        'name': 'word_acc',
        'goal': 'maximize'
    },
    'parameters': {
        'hidden_layer_size': {
            'values': [ 16, 32, 64, 256],
        },
        'num_encoders': {
            'values': [1, 2, 3]
        },
        'num_decoders': {
            'values': [1, 2, 3]
        },
        'dropout': {
            'values': [0.0, 0.2, 0.3]
        },
        'cell_type': {
            'values': ['RNN', 'LSTM', 'GRU']
        },
        'num_epochs': {
            'values': [5, 10, 15]
        },
        'embed_dim' : {
            'values': [None, 64, 256, 512]
        }
    }
}

In [None]:
sweep_id = wandb.sweep(sweep_config, entity = '0x2e4', project = 'cs6910-a3')

In [None]:

def run():
    default_config = {'hidden_layer_size': 16, 'num_encoders': 1, 'num_decoders': 1, 'dropout': 0.0, 'cell_type': 'RNN', 'num_epochs': 30, 'embed_dim' : None}

    run = wandb.init(project='cs6910-a3', config=default_config)
    config = wandb.config

    model = myRNN(latent_dim=config.hidden_layer_size, num_encoders = config.num_encoders, num_decoders = config.num_decoders, dropout = config.dropout, cell_type = config.cell_type, embed_dim = config.embed_dim)

    optimizer = tf.keras.optimizers.Nadam()
    loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=True)

    train(model, epochs=config.num_epochs, embed_dim = config.embed_dim)
    
    enc_model, dec_model = decompose(model, config.cell_type, config.hidden_layer_size, config.num_encoders, config.num_decoders, config.embed_dim)
    
    if config.embed_dim == None:
        test_in, test_out, _ = onehot(X_test, y_test)
    else:
        test_in, test_out, _ = onehot_embed(X_test, y_test)
    _, word_acc = get_acc(test_in, y_test, enc_model, dec_model, config.num_decoders, config.embed_dim)
    
    wandb.log({ 'word_acc' : word_acc})

In [None]:
wandb.agent(sweep_id, run)

In [None]:
# Training the best model

best_model = myRNN(cell_type = 'GRU', latent_dim = 256, dropout = 0.2, num_encoders = 2, num_decoders = 3, embed_dim = 256)

train(best_model, epochs = 5, embed_dim = 256, wandb_cb=False)

encoder_model, decoder_model = decompose(best_model, 'GRU', 256, 2, 3, 256)

In [None]:
# Evaluating the model

test_in, test_out, _ = onehot_embed(X_test, y_test)
test_pred, test_acc = get_acc(test_in, y_test, encoder_model, decoder_model, 3, 400)

print(test_acc)

test_dict = { 'Input' : X_test, 'Prediction' : test_pred, 'True Output' : y_test}
vanilla = pd.DataFrame(data=test_dict)

vanilla.to_csv('predictions_vanilla.csv', index=False)

vanilla_sample = vanilla.sample(10, replace=False)
vanilla_sample

In [None]:
pyplot.figure(figsize=[50, 100])
all_classes = list(test_generator.class_indices.keys())

ax = pyplot.subplot(11, 3, 1)
ax1 = pyplot.subplot(11, 3, 2)
ax2 = pyplot.subplot(11, 3, 3)
ax.axis('off')
ax.text(0.3, 0.5, "Sample Image", fontsize=70)
ax1.axis('off')
ax1.text(0.3, 0.5, "Prediction", fontsize=70)
ax2.axis('off')
ax2.text(0.3, 0.5, "True Class", fontsize=70)

for some in os.listdir(test_path):
    idx = test_generator.class_indices[some]
    new_path = test_path + "/" + some
    img_path = new_path + "/" + os.listdir(new_path)[0]
    img = tf.keras.preprocessing.image.load_img(img_path,
                                                target_size=(max_shape[0],
                                                             max_shape[1]))
    img_np = np.asarray(img)
    ax = pyplot.subplot(11, 3, 3 * idx + 4)
    ax1 = pyplot.subplot(11, 3, 3 * idx + 5)
    ax2 = pyplot.subplot(11, 3, 3 * idx + 6)
    ax.imshow(img_np)
    ax1.axis('off')
    ax1.text(0.3, 0.5, all_classes[y_pred[200 * idx]], fontsize=70)
    ax2.axis('off')
    ax2.text(0.3, 0.5, some, fontsize=70)

wandb.init(project='cs6910-a2')
wandb.log({'Sample Predictions': pyplot})

In [None]:
# The attention layer

class AttentionLayer(layers.Layer):
    """
    This class implements Bahdanau attention (https://arxiv.org/pdf/1409.0473.pdf).
    There are three sets of weights introduced W_a, U_a, and V_a
     """

    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        assert isinstance(input_shape, list)
        # Create a trainable weight variable for this layer.

        self.W_a = self.add_weight(name='W_a',
                                   shape=tf.TensorShape((input_shape[0][2], input_shape[0][2])),
                                   initializer='uniform',
                                   trainable=True)
        self.U_a = self.add_weight(name='U_a',
                                   shape=tf.TensorShape((input_shape[1][2], input_shape[0][2])),
                                   initializer='uniform',
                                   trainable=True)
        self.V_a = self.add_weight(name='V_a',
                                   shape=tf.TensorShape((input_shape[0][2], 1)),
                                   initializer='uniform',
                                   trainable=True)

        super(AttentionLayer, self).build(input_shape)  # Be sure to call this at the end

    def call(self, inputs, verbose=False):
        """
        inputs: [encoder_output_sequence, decoder_output_sequence]
        """
        assert type(inputs) == list
        encoder_out_seq, decoder_out_seq = inputs
        if verbose:
            print('encoder_out_seq>', encoder_out_seq.shape)
            print('decoder_out_seq>', decoder_out_seq.shape)

        def energy_step(inputs, states):
            """ Step function for computing energy for a single decoder state
            inputs: (batchsize * 1 * de_in_dim)
            states: (batchsize * 1 * de_latent_dim)
            """

            assert_msg = "States must be an iterable. Got {} of type {}".format(states, type(states))
            assert isinstance(states, list) or isinstance(states, tuple), assert_msg

            """ Some parameters required for shaping tensors"""
            en_seq_len, en_hidden = encoder_out_seq.shape[1], encoder_out_seq.shape[2]
            de_hidden = inputs.shape[-1]

            """ Computing S.Wa where S=[s0, s1, ..., si]"""
            # <= batch size * en_seq_len * latent_dim
            W_a_dot_s = backend.dot(encoder_out_seq, self.W_a)

            """ Computing hj.Ua """
            U_a_dot_h = backend.expand_dims(backend.dot(inputs, self.U_a), 1)  # <= batch_size, 1, latent_dim
            if verbose:
                print('Ua.h>', U_a_dot_h.shape)

            """ tanh(S.Wa + hj.Ua) """
            # <= batch_size*en_seq_len, latent_dim
            Ws_plus_Uh = backend.tanh(W_a_dot_s + U_a_dot_h)
            if verbose:
                print('Ws+Uh>', Ws_plus_Uh.shape)

            """ softmax(va.tanh(S.Wa + hj.Ua)) """
            # <= batch_size, en_seq_len
            e_i = backend.squeeze(backend.dot(Ws_plus_Uh, self.V_a), axis=-1)
            # <= batch_size, en_seq_len
            e_i = backend.softmax(e_i)

            if verbose:
                print('ei>', e_i.shape)

            return e_i, [e_i]

        def context_step(inputs, states):
            """ Step function for computing ci using ei """

            assert_msg = "States must be an iterable. Got {} of type {}".format(states, type(states))
            assert isinstance(states, list) or isinstance(states, tuple), assert_msg

            # <= batch_size, hidden_size
            c_i = backend.sum(encoder_out_seq * backend.expand_dims(inputs, -1), axis=1)
            if verbose:
                print('ci>', c_i.shape)
            return c_i, [c_i]

        fake_state_c = backend.sum(encoder_out_seq, axis=1)
        fake_state_e = backend.sum(encoder_out_seq, axis=2)  # <= (batch_size, enc_seq_len, latent_dim

        """ Computing energy outputs """
        # e_outputs => (batch_size, de_seq_len, en_seq_len)
        last_out, e_outputs, _ = backend.rnn(
            energy_step, decoder_out_seq, [fake_state_e],
        )

        """ Computing context vectors """
        last_out, c_outputs, _ = backend.rnn(
            context_step, e_outputs, [fake_state_c],
        )

        return c_outputs, e_outputs

    def compute_output_shape(self, input_shape):
        """ Outputs produced by the layer """
        return [
            tf.TensorShape((input_shape[1][0], input_shape[1][1], input_shape[1][2])),
            tf.TensorShape((input_shape[1][0], input_shape[1][1], input_shape[0][1]))
        ]

In [None]:
# Building the RNN model with attention

def myRNN_attn(latent_dim, embed_dim = None, dropout = 0.0, cell_type = 'GRU'):
    
    if(embed_dim == None):
        encoder_inputs = tf.keras.Input(shape=(None, input_v_len))
        encoder_inp = encoder_inputs
    else:
        encoder_inputs = tf.keras.Input(shape=(None,))
        encoder_inp = layers.Embedding(input_dim=input_v_len, output_dim=embed_dim)(encoder_inputs)
    
    if cell_type == 'LSTM':
        encoder = layers.LSTM(latent_dim, return_state=True, return_sequences=True, dropout = dropout)
        encoder_inp, state_h, state_c = encoder(encoder_inp)
        encoder_states = [state_h, state_c]
    else:
        if cell_type == 'RNN':
            encoder = layers.SimpleRNN(latent_dim, return_state=True, return_sequences=True, dropout = dropout)
        else:
            encoder = layers.GRU(latent_dim, return_state=True, return_sequences=True, dropout = dropout)
        encoder_inp, state_h = encoder(encoder_inp)
        encoder_states = [state_h]
    
    if(embed_dim == None):
        decoder_inputs = tf.keras.Input(shape=(None, output_v_len))
        decoder_out = decoder_inputs
    else:
        decoder_inputs = tf.keras.Input(shape=(None,))
        decoder_out = layers.Embedding(input_dim=output_v_len, output_dim=embed_dim)(decoder_inputs)
    
    if cell_type == 'LSTM':
        decoder = layers.LSTM(latent_dim, return_sequences=True, return_state=True, dropout = dropout)
        decoder_out, _, _ = decoder(decoder_out, initial_state=encoder_states)
    else:
        if cell_type == 'RNN':
            decoder = layers.SimpleRNN(latent_dim, return_sequences=True, return_state=True, dropout = dropout)
        else:
            decoder = layers.GRU(latent_dim, return_sequences=True, return_state=True, dropout = dropout)
        decoder_out, _ = decoder(decoder_out, initial_state=encoder_states)
    
    decoder_outputs = decoder_out
    
    # Attention layer
    attn_layer = AttentionLayer(name='attention_layer')
    attn_out, _ = attn_layer([encoder_inp, decoder_outputs])
    decoder_concat_input = layers.Concatenate(axis=-1, name='concat_layer')([decoder_outputs, attn_out])
    
    decoder_dense = layers.Dense(output_v_len, activation="softmax")
    decoder_preds = decoder_dense(decoder_concat_input)
    
    model = tf.keras.Model([encoder_inputs, decoder_inputs], decoder_preds)
    
    return model

In [None]:
# Training the best attention model

best_attn_model = myRNN_attn(cell_type = 'GRU', latent_dim = 256, dropout = 0.2, embed_dim = 256)

train(best_attn_model, epochs = 5, embed_dim = 256, wandb_cb=False)

In [None]:
# Getting the encoder and decoder models

cell_type = 'GRU'
latent_dim = 256
embed_dim = 256

offset = 0 if embed_dim == None else 2

encoder_inputs = best_attn_model.input[0]
encoder_out, *encoder_states = best_attn_model.layers[2 + offset].output    
encoder_attn_model = tf.keras.Model(encoder_inputs, [encoder_out] + encoder_states)

decoder_inputs = best_attn_model.input[1]  # input_2
if embed_dim == None:
    decoder_out = decoder_inputs
else:
    decoder_out = best_attn_model.layers[1 + offset](decoder_inputs)

decoder_state_inputs = []
decoder_state_outputs = []

if cell_type == 'LSTM':
    temp_inputs = [tf.keras.Input(shape=(latent_dim,), name = 'decoder_0_0'), tf.keras.Input(shape=(latent_dim,), name = 'decoder_1_0')]
else:
    temp_inputs = [tf.keras.Input(shape=(latent_dim,), name = 'decoder_0')]
decoder_state_inputs += temp_inputs

decoder = best_attn_model.layers[3 + offset]
decoder_outputs, *temp_states = decoder(decoder_out, initial_state=temp_inputs)
decoder_state_outputs += temp_states

attn_layer = best_attn_model.layers[4 + offset]
attn_inp = tf.keras.Input(shape=(encoder_out.shape[1],encoder_out.shape[2]))
attn_out, attn_states = attn_layer([attn_inp, decoder_outputs])
decoder_concat_input = layers.Concatenate(axis=-1, name='concat_layer')([decoder_outputs, attn_out])
                                                                                             
decoder_dense = best_attn_model.layers[6 + offset]
decoder_preds = decoder_dense(decoder_concat_input)

decoder_attn_model = tf.keras.Model([decoder_inputs] + decoder_state_inputs + [attn_inp], [decoder_preds] + decoder_state_outputs + [attn_states])

In [None]:
# Decode the sequence

def decode_sequence_attn(input_seq):
    
    encoder_out, *enc_states = encoder_attn_model.predict(input_seq)

    if(embed_dim == None):
        target_seq = np.zeros((1, 1, output_v_len))
        target_seq[0, 0, output_inv["\t"]] = 1.0
    else:
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = output_inv["\t"]

    stop_condition = False
    final_ans = ""
    all_attn = []
    while not stop_condition:
        output_chars, *h, attn_wts = decoder_attn_model.predict([target_seq] + enc_states + [encoder_out])
        enc_states = h
        all_attn += [attn_wts]
        
        sampled_char_index = np.argmax(output_chars[0, -1, :])
        sampled_char = reverse_out[sampled_char_index]
        final_ans += sampled_char

        if sampled_char == "\n" or len(final_ans) > max_output_len:
            stop_condition = True

        if(embed_dim == None):
            target_seq = np.zeros((1, 1, output_v_len))
            target_seq[0, 0, sampled_char_index] = 1.0
        else:
            target_seq = np.zeros((1, 1))
            target_seq[0, 0] = sampled_char_index
    
    return final_ans[:-1], all_attn[:-1]

def get_acc_attn(input_seq, test_seq):
    n = len(input_seq)
    
    encoder_out, *enc_states = encoder_attn_model.predict(input_seq)
    
    if(embed_dim == None):
        target_seq = np.zeros((n, 1, output_v_len))
        target_seq[:, 0, output_inv["\t"]] = 1.0
    else:
        target_seq = np.zeros((n, 1))
        target_seq[:, 0] = output_inv["\t"]
    
    final_ans = ["" for _ in range(n)]
    for _ in range(max_output_len):
        output_chars, *h, attn_wts = decoder_attn_model.predict([target_seq] + enc_states + [encoder_out])
        enc_states = h
        
        sampled_char_index = np.argmax(output_chars[:, -1, :], axis=1)
        sampled_char = [reverse_out[sampled_char_index[ii]] for ii in range(n)]
        final_ans = [ x + y for x, y in zip(final_ans, sampled_char)]
        
        if(embed_dim == None):
            target_seq = np.zeros((n, 1, output_v_len))
            target_seq[range(n), 0, sampled_char_index] = 1.0
        else:
            target_seq = np.zeros((n, 1))
            target_seq[:, 0] = sampled_char_index
    
    final_ans = [x.split('\n')[0] for x in final_ans]
    
    final_acc = sum([x == y for x, y in zip(final_ans, test_seq)]) / n
    return final_ans, final_acc

In [None]:
# Evaluating the model

test_in, test_out, _ = onehot_embed(X_test, y_test)
test_pred, test_acc = get_acc_attn(test_in, y_test)

print(test_acc)

test_dict = { 'Input' : X_test, 'Prediction' : test_pred, 'True Output' : y_test}
attention = pd.DataFrame(data=test_dict)

attention.to_csv('predictions_attention.csv', index=False)

attention_sample = attention.sample(10, replace=False)
attention_sample

In [None]:
# Connectivity Visualization


# get html element
def cstr(s, color='black'):
        return "<text style=color:#000;background-color:{}>{} </text>".format(color, s)

# print html
def print_color(t):
    okay = ''.join([cstr(ti, color=ci) for ti,ci in t])
    okay = "<p style=text-align:center;font-size:30px>" + okay + "</p>"
    display(html_print(okay))

# get appropriate color for value
def get_clr(value):
    colors = ['#85c2e1', '#89c4e2', '#95cae5', '#99cce6', '#a1d0e8',
            '#b2d9ec', '#baddee', '#c2e1f0', '#eff7fb', '#f9e8e8',
            '#f9e8e8', '#f9d4d4', '#f9bdbd', '#f8a8a8', '#f68f8f',
            '#f47676', '#f45f5f', '#f34343', '#f33b3b', '#f42e2e']
    value = int((value * 100) / 5)
    return colors[value]

In [None]:
# Connectivity Visualization

ii = 642

res, res1 = decode_sequence_attn(test_in[ii: ii + 1])
inp = X_test[ii]
m = len(inp)
n = len(res)
res1 = [res1[jj][0, 0, :m] for jj in range(n)]

gap = [2, 5, 8, 10, 12]

def fn():
    for pos in range(n):
        text_colours = [[], []]
        pos1 = pos - 1 if pos in gap else pos
        for kk in range(n):
            text = (res[kk], get_clr(0.99 if kk == pos1 else 0))
            text_colours[0].append(text)
        for kk in range(m):
            text = (inp[kk], get_clr(res1[pos][kk]))
            text_colours[1].append(text)
        clear_output()
        print_color(text_colours[0])
        print_color(text_colours[1])
        time.sleep(1)
    clear_output()
    
fn()
#print(list(res)) 