In [None]:
import pickle
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
import numpy as np
import warnings
warnings.filterwarnings('ignore')

In [None]:
# load training data

path = "dataset"
train_input = pickle.load(open(path % 'train_input', 'rb'))
train_output = pickle.load(open(path % 'train_output', 'rb'))

In [None]:
# print vocabulary and its size

vocab = set()
for text in train_input:
  vocab = vocab.union(set(text.split()))

for text in train_output:
  vocab = vocab.union(set(text.split()))

print(f"Vocab: {vocab}\nSize: {len(vocab)}")

Vocab: {'ed', 'm', 'l', 'i', 'b', 'd', 'f', 'k', 'e', 'ef', 'a', 'eg', 'g', 'eh', 'c', 'h', 'ee', 'j'}
Size: 18


In [None]:
def prepend_and_append_token(texts):
  """
  prepend "[start]" and append "[end]" token to each sentence
  @texts: list of sentences
  returns: list of sentences
  """
  for i in range(len(texts)):
    texts[i] = "[start] " + texts[i].strip() + " [end]"
  return texts

def remove_tokens(texts):
  """
  remove "[start]" and "[end]" tokens
  @texts: list of sentences
  returns: list of sentences
  """
  return [text.replace("[start]", "").replace("[end]", "").strip() for text in texts]

In [None]:
# prepend and append tokens to training data

train_input = prepend_and_append_token(train_input)
train_output = prepend_and_append_token(train_output)

In [None]:
# split data b/w training, validation and testing

input_train, input_test, output_train, output_test = train_test_split(train_input, train_output, shuffle=True, test_size=0.15)
input_train, input_val, output_train, output_val = train_test_split(input_train, output_train, shuffle=True, test_size=0.15)

In [None]:
# remove extra tokens from test data

input_test = remove_tokens(input_test)
output_test = remove_tokens(output_test)

In [None]:
# vectorize input and output vocabulary

vocab_size = 200
sequence_length = 256

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=tf.identity,
)

source_vectorization.adapt(train_input)
target_vectorization.adapt(train_output)

In [None]:
# create batch dataset

batch_size = 64

def format_dataset(inpL, outL):
    inpL = source_vectorization(inpL)
    outL = target_vectorization(outL)
    return ({
        "input": inpL,
        "output": outL[:, :-1],
    }, outL[:, 1:])

def make_dataset(inp_texts, out_texts):
    inp_texts = list(inp_texts)
    out_texts = list(out_texts)
    dataset = tf.data.Dataset.from_tensor_slices((inp_texts, out_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(input_train, output_train)
val_ds = make_dataset(input_val, output_val)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['input'].shape: {inputs['input'].shape}")
    print(f"inputs['output'].shape: {inputs['output'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['input'].shape: (64, 256)
inputs['output'].shape: (64, 256)
targets.shape: (64, 256)


In [None]:
class TransformerEncoder(layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads
    self.attention = layers.MultiHeadAttention(
      num_heads=num_heads, key_dim=embed_dim)
    self.dense_proj = keras.Sequential(
      [layers.Dense(dense_dim, activation="relu"),
        layers.Dense(embed_dim),]
    )
    self.layernorm_1 = layers.LayerNormalization()
    self.layernorm_2 = layers.LayerNormalization()

  def call(self, inputs, mask=None):
    if mask is not None:
      mask = mask[:, tf.newaxis, :]
    attention_output = self.attention(
      inputs, inputs, attention_mask=mask)
    proj_input = self.layernorm_1(inputs + attention_output)
    proj_output = self.dense_proj(proj_input)
    return self.layernorm_2(proj_input + proj_output)

  def get_config(self):
    config = super().get_config()
    config.update({
      "embed_dim": self.embed_dim,
      "num_heads": self.num_heads,
      "dense_dim": self.dense_dim,
    })
    return config

In [None]:
class PositionalEmbedding(layers.Layer):
  def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.token_embeddings = layers.Embedding(
      input_dim=input_dim, output_dim=output_dim)
    self.position_embeddings = layers.Embedding(
      input_dim=sequence_length, output_dim=output_dim)
    self.sequence_length = sequence_length
    self.input_dim = input_dim
    self.output_dim = output_dim

  def call(self, inputs):
    length = tf.shape(inputs)[-1]
    positions = tf.range(start=0, limit=length, delta=1)
    embedded_tokens = self.token_embeddings(inputs)
    embedded_positions = self.position_embeddings(positions)
    return embedded_tokens + embedded_positions

  def compute_mask(self, inputs, mask=None):
    return tf.math.not_equal(inputs, 0)

  def get_config(self):
    config = super(PositionalEmbedding, self).get_config()
    config.update({
      "output_dim": self.output_dim,
      "sequence_length": self.sequence_length,
      "input_dim": self.input_dim,
    })
    return config

In [None]:
class TransformerDecoder(layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads
    self.attention_1 = layers.MultiHeadAttention(
      num_heads=num_heads, key_dim=embed_dim)
    self.attention_2 = layers.MultiHeadAttention(
      num_heads=num_heads, key_dim=embed_dim)
    self.dense_proj = keras.Sequential(
      [layers.Dense(dense_dim, activation="relu"),
        layers.Dense(embed_dim),]
    )
    self.layernorm_1 = layers.LayerNormalization()
    self.layernorm_2 = layers.LayerNormalization()
    self.layernorm_3 = layers.LayerNormalization()
    self.supports_masking = True

  def get_config(self):
    config = super().get_config()
    config.update({
      "embed_dim": self.embed_dim,
      "num_heads": self.num_heads,
      "dense_dim": self.dense_dim,
    })
    return config

  def get_causal_attention_mask(self, inputs):
    input_shape = tf.shape(inputs)
    batch_size, sequence_length = input_shape[0], input_shape[1]
    i = tf.range(sequence_length)[:, tf.newaxis]
    j = tf.range(sequence_length)
    mask = tf.cast(i >= j, dtype="int32")
    mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
    mult = tf.concat(
      [tf.expand_dims(batch_size, -1),
        tf.constant([1, 1], dtype=tf.int32)], axis=0)
    return tf.tile(mask, mult)

  def call(self, inputs, encoder_outputs, mask=None):
    causal_mask = self.get_causal_attention_mask(inputs)
    if mask is not None:
      padding_mask = tf.cast(
        mask[:, tf.newaxis, :], dtype="int32")
      padding_mask = tf.minimum(padding_mask, causal_mask)
    else:
      padding_mask = mask
    attention_output_1 = self.attention_1(
      query=inputs,
      value=inputs,
      key=inputs,
      attention_mask=causal_mask)
    attention_output_1 = self.layernorm_1(inputs + attention_output_1)
    attention_output_2 = self.attention_2(
      query=attention_output_1,
      value=encoder_outputs,
      key=encoder_outputs,
      attention_mask=padding_mask,
    )
    attention_output_2 = self.layernorm_2(
      attention_output_1 + attention_output_2)
    proj_output = self.dense_proj(attention_output_2)
    return self.layernorm_3(attention_output_2 + proj_output)

In [None]:
def Transformer():
  embed_dim = 256
  dense_dim = 1024
  num_heads = 8

  encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="input")
  x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
  encoder_outputs1 = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
  encoder_outputs2 = TransformerEncoder(embed_dim, dense_dim * 2, num_heads)(encoder_outputs1)

  decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="output")
  x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
  x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs2)
  x = TransformerDecoder(embed_dim, dense_dim * 2, num_heads)(x, encoder_outputs2)
  x = layers.Dropout(0.5)(x)
  decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
  transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
  transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
  return transformer

In [None]:
# create and train the transformer model

transformer = Transformer()
transformer.fit(train_ds,
                epochs=100,
                validation_data=val_ds,
                callbacks=[
                    keras.callbacks.ModelCheckpoint(
                        save_best_only=True,
                        filepath='Vaibhav_Pundir_734004197_Project2_Model.h5',
                        monitor='val_accuracy'
                    ),
                    keras.callbacks.EarlyStopping(
                        monitor='val_accuracy',
                        patience=10
                    )
                ])

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100


<keras.src.callbacks.History at 0x7e568dee8280>

In [None]:
# load weights from the best model

transformer.load_weights(path % 'Vaibhav_Pundir_734004197_Project2_Model.h5')

In [None]:
# translate sentences from input to output language in batches

out_vocab = target_vectorization.get_vocabulary()
out_index_lookup = dict(zip(range(len(out_vocab)), out_vocab))
max_decoded_sentence_length = 100

def decode_sequences(input_sentences, transformer):
    tokenized_input_sentences = source_vectorization(input_sentences)
    decoded_sentences = ["[start]"] * len(input_sentences)
    completed_texts = [False] * len(input_sentences)
    batch_size = len(input_sentences)
    count_completed = 0

    for i in range(max_decoded_sentence_length):
        if count_completed == batch_size:
          break
        tokenized_target_sentences = target_vectorization(
            decoded_sentences)[:, :-1]
        predictions = transformer(
            [tokenized_input_sentences, tokenized_target_sentences])
        sampled_token_indexes = np.argmax(predictions[:, i, :], axis=-1)
        for j, sampled_token_index in enumerate(sampled_token_indexes):
          if not completed_texts[j]:
            sampled_token = out_index_lookup[sampled_token_index]
            decoded_sentences[j] += " " + sampled_token
            if sampled_token == "[end]":
              completed_texts[j] = True
              count_completed += 1
    return decoded_sentences

In [None]:
# helper function to call decode_sequences and return translated sentences

def translate(input, transformer):
  BATCH_SIZE = 128
  decoded = []
  for start in range(0, len(input), BATCH_SIZE):
    input_texts = prepend_and_append_token(input[start: start + BATCH_SIZE])
    decoded_texts = remove_tokens(decode_sequences(input_texts, transformer))
    decoded.extend(decoded_texts)
    print(f"\r{start+len(input_texts)}/{len(input)} done...", end="")

  return decoded

In [None]:
# accuracy on held out test data

predicted = translate(input_test, transformer)
match = sum(pred.strip() == true.strip() for pred, true in zip(predicted, output_test))
test_acc = match / len(output_test) * 100
print(f"\nTest Accuracy: {test_acc:.2f}")

16773/16800 done...
Test Accuracy: 99.84
