# Download Data

In [None]:
from google.colab import files
files.upload() #upload kaggle.json

In [None]:
!pip install -q kaggle
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!ls ~/.kaggle
!chmod 600 /root/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d backalla/words-mnist

In [None]:
!unzip words-mnist.zip

In [None]:
!ls dataset/v011_words_small

In [None]:
!cat v011_labels_small.json

# Imports

In [None]:
import tensorflow as tf
import json
import pandas as pd
import numpy as np
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPool2D
from tensorflow.keras import Model
import time
from PIL import Image
import matplotlib.pyplot as plt
import os

# Load Data

In [None]:
with open("v011_labels_small.json") as f:
  data = json.load(f)

In [None]:
list_data = []
for i, j in data.items():
  list_data.append(["dataset/v011_words_small/"+i, j])
list_data = np.array(list_data)

In [None]:
text_data = list_data[:, 1]

In [None]:
text_data

In [None]:
vocab = set()

In [None]:
vocab.update(["<start>"])
vocab.update(["<end>"])

In [None]:
vocab.update(set("".join(text_data)))

In [None]:
vocab = sorted(vocab)

In [None]:
vocab

In [None]:
char2idx = {u:i+1 for i, u in enumerate(vocab)}
idx2char = {i+1: u  for i, u in enumerate(vocab)}

text_as_int = [[char2idx["<start>"]]+[char2idx[c] for c in text]+[char2idx["<end>"]] for text in text_data]

In [None]:
char2idx

In [None]:
idx2char

In [None]:
tensor = tf.keras.preprocessing.sequence.pad_sequences(text_as_int,
                                                        padding='post')


In [None]:
tensor[0]

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((list_data[:,0], tensor))

In [None]:
IMAGE_HEIGHT = 32
IMAGE_WIDTH  = 112
IMAGE_SHAPE = (IMAGE_HEIGHT, IMAGE_WIDTH, 3)
BATCH_SIZE = 48
BUFFER_SIZE = 10000


BATCH_SIZE_SMALL = 2
BUFFER_SIZE_SMALL = 10

embedding_dim = 256
units = 512
vocab_size = len(vocab) + 1
num_steps = len(text_data) // BATCH_SIZE

In [None]:
def decode_img(img):
  img = tf.image.decode_jpeg(img, channels=3)
  img = tf.image.convert_image_dtype(img, tf.float32)

  return tf.image.resize(img, [IMAGE_HEIGHT, IMAGE_WIDTH])

def process_path(image_path, label):
  img   = tf.io.read_file(image_path)
  img   = decode_img(img)
  return img, label

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

labeled_ds = dataset.map(process_path, num_parallel_calls=AUTOTUNE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

In [None]:
for ex in labeled_ds.take(1):
  print(ex[0].shape)
  plt.imshow(ex[0][0])
  print(ex[1][0])

In [None]:
# Helper Function to give words from the tensor
def tensor_to_word(tensor):
  words=""
  for ten in tensor:
    if idx2char[ten] == "<start>":
      pass
    elif idx2char[ten] == "<end>":
      break
    else:
      words += idx2char[ten]
  return words

print(tensor_to_word([30, 85, 73, 70, 29, 0, 0, 0, 0]))

In [None]:
# Helper Function to give tensor from the word
def word_to_tensor(string):
  tensor = []
  tensor.append(char2idx["<start>"])
  for st in string:
    tensor.append(char2idx[st])
  tensor.append(char2idx["<end>"])
  return tensor

print(word_to_tensor("tensor"))

# Model

In [None]:
class CNNEncoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
    super(CNNEncoder, self).__init__()

    self.batch_sz = batch_sz
    self.enc_units = enc_units

    ## Add CNN Encoder
    base_model = tf.keras.applications.VGG16(input_shape=IMAGE_SHAPE, include_top=False, weights='imagenet')

    new_input = base_model.input
    hidden_layer = base_model.layers[-6].output # We want middle input

    self.image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

    # shape after fc == (batch_size, 64, embedding_dim)
    self.fc = tf.keras.layers.Dense(embedding_dim)


    # self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.enc_units,
                                   return_sequences=True, # Read this if got confused: https://stackoverflow.com/questions/54887411/what-is-the-difference-between-return-state-and-return-sequence-in-a-keras-gru-l
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')

  def call(self, x, hidden):
    # x = self.embedding(x)
    x = self.image_features_extract_model(x)
    x = self.fc(x)
    x = tf.nn.relu(x)
    # print(x.shape)
    x = tf.reshape(x, (x.shape[0], -1, x.shape[3]))
    # print(x.shape)
    output, state = self.gru(x, initial_state = hidden)
    return output, state

  def hidden_state_initializer(self):
    return tf.zeros((self.batch_sz, self.enc_units))

In [None]:
encoder = CNNEncoder(vocab_size, embedding_dim, units, BATCH_SIZE)
# Initialialize hiddne shaep
sample_hidden = encoder.hidden_state_initializer()
for image,label in labeled_ds:
  break
# image.shape, label.shape
# ex[0]

sample_output, sample_hidden = encoder(ex[0], sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder Hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))

## Add Attention Layer

In [None]:
class Attention(tf.keras.layers.Layer):
  def __init__(self, units):
    super(Attention, self).__init__()
    self.W1 = tf.keras.layers.Dense(units)
    self.W2 = tf.keras.layers.Dense(units)
    self.V = tf.keras.layers.Dense(1)

  def call(self, query, values):
    
    query = tf.expand_dims(query, 1) # Add an extra axis for each time step of output from the decoder.

    W1_val = self.W1(query) # get the weight value from the hidden  
    W2_val = self.W2(values)

    # Get the attention weights for each example in batch
    attention_weights = tf.nn.softmax(self.V(tf.nn.tanh(W1_val+W2_val)), axis=1)
    # multiply that attention with values of output
    context_vector = tf.reduce_sum(attention_weights * values, axis=1)

    return context_vector

In [None]:
attention_layer = Attention(10)
attention_result = attention_layer(sample_hidden, sample_output)

print("Attention result shape: (batch size, units) {}".format(attention_result.shape))
# print("Attention weights shape: (batch_size, sequence_length, 1) {}".format(attention_weights.shape))

In [None]:
class Decoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
    super(Decoder, self).__init__()
    self.batch_sz = batch_sz
    self.dec_units = dec_units
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.dec_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.fc = tf.keras.layers.Dense(vocab_size)

    self.attention = Attention(self.dec_units)

  def call(self, x, hidden, enc_output):
    context_vector = self.attention(hidden, enc_output)
    x = self.embedding(x)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

    output, state = self.gru(x)
    output = tf.reshape(output, (-1, output.shape[2]))
    x = self.fc(output)

    return x, state

In [None]:
decoder = Decoder(vocab_size, embedding_dim, units, BATCH_SIZE)

sample_decoder_output, _ = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                      sample_hidden, sample_output)

print ('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))

## Add Optimizer and Loss Function

In [None]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

## Add Checkpoint

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

In [None]:
@tf.function
def train_step(inp, targ, enc_hidden):
  loss = 0

  with tf.GradientTape() as tape:
    enc_output, enc_hidden = encoder(inp, enc_hidden)

    dec_hidden = enc_hidden

    dec_input = tf.expand_dims([char2idx['<start>']] * BATCH_SIZE, 1)

    # Teacher forcing - feeding the target as the next input
    for t in range(1, targ.shape[1]):
      # passing enc_output to the decoder
      predictions, dec_hidden = decoder(dec_input, dec_hidden, enc_output)

      loss += loss_function(targ[:, t], predictions)

      # using teacher forcing
      dec_input = tf.expand_dims(targ[:, t], 1)

  batch_loss = (loss / int(targ.shape[1]))

  variables = encoder.trainable_variables + decoder.trainable_variables

  gradients = tape.gradient(loss, variables)

  optimizer.apply_gradients(zip(gradients, variables))

  return batch_loss

In [None]:
EPOCHS = 50

for epoch in range(EPOCHS):
  start = time.time()

  enc_hidden = encoder.hidden_state_initializer()
  total_loss = 0

  for (batch, (inp, targ)) in enumerate(labeled_ds.take(num_steps)):
    batch_loss = train_step(inp, targ, enc_hidden)
    total_loss += batch_loss

    if batch % 100 == 0:
      print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
                                                   batch,
                                                   batch_loss.numpy()))
  # saving (checkpoint) the model every 2 epochs
  if (epoch + 1) % 2 == 0:
    checkpoint.save(file_prefix = checkpoint_prefix)

  print('Epoch {} Loss {:.4f}'.format(epoch + 1,
                                      total_loss / num_steps))
  print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

In [None]:
def evaluate(image):
  # attention_plot = np.zeros((max_length_targ, max_length_inp))

  # sentence = preprocess_sentence(sentence)

  # inputs = [inp_lang.word_index[i] for i in sentence.split(' ')]
  # inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
  #                                                        maxlen=max_length_inp,
  #                                                        padding='post')
  # inputs = tf.convert_to_tensor(inputs)

  result = ''
  image_expand_dim = tf.expand_dims(image,axis=0)
  hidden = [tf.zeros((1, units))]
  enc_out, enc_hidden = encoder(image_expand_dim, hidden)

  dec_hidden = enc_hidden
  dec_input = tf.expand_dims([char2idx['<start>']], 0)

  for t in range(30):
    predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                         dec_hidden,
                                                         enc_out)

    # storing the attention weights to plot later on
    attention_weights = tf.reshape(attention_weights, (-1, ))
    # attention_plot[t] = attention_weights.numpy()

    predicted_id = tf.argmax(predictions[0]).numpy()

    result += idx2char[predicted_id] + ' '

    if idx2char[predicted_id] == '<end>':
      return result

    # the predicted ID is fed back into the model
    dec_input = tf.expand_dims([predicted_id], 0)

  return result