# CSCE 636 Project 2
## Rahaan Gandhi - 434007427

Please checkout Instructions at bottom of notebook for instructions on how to load and use a saved model

In [2]:
# Import all important libraries required for the transformer model
# Note for TA/ Professor: run this cell
import warnings
import pickle
import random
import numpy as np
import string
import re
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
warnings.filterwarnings('ignore')

In [15]:
# Below is the file path for training input and training output files, This data will be used to train for translation
# Please replace below paths with correct file paths for the same files on your device
# Note for TA/ Professor: run this cell

input_texts_path = "/content/drive/MyDrive/Colab Notebooks/636 stuff/project2/Train_input"
output_texts_path = "/content/drive/MyDrive/Colab Notebooks/636 stuff/project2/Train_output"
train_inputs=pickle.load(open(input_texts_path, 'rb'))
train_outputs=pickle.load(open(output_texts_path, 'rb'))

In [None]:
# We split dataset to obtain a list of 1000 strings approx, on which we can test our model for its actual performance acc.
# Note for TA/ Professor: dont need to run this cell, this cell is only to divide data into train/test

from sklearn.model_selection import train_test_split
train_inputs, test_inputs, train_outputs, test_outputs = train_test_split(train_inputs, train_outputs, test_size=0.009)

In [16]:
# Here we are setting up the vocab for both input language and output language
# Note for TA/ Professor: run this cell

input_vocab = set()
for i in range(len(train_inputs)):
  for j in train_inputs[i].split():
    input_vocab.add(j)

print(len(input_vocab))

output_vocab=set()
for i in range(len(train_outputs)):
  for j in train_outputs[i].split():
    output_vocab.add(j)

print(len(output_vocab))

8
18


In [17]:
# Here we split the dataset in train and validation dataset after combining input and output language as a pair
# Note for TA/ Professor: run this cell

sentence_pair=[]
for i in range(len(train_inputs)):
  input = train_inputs[i]
  output = "[start] " + train_outputs[i] + " [end]"
  sentence_pair.append((input,output))

random.shuffle(sentence_pair)
num_val = int(0.15 * len(sentence_pair))
num_train = len(sentence_pair) - 2 * num_val
train_sentences = sentence_pair[:num_train]
validation_sentences = sentence_pair[num_train:num_train + num_val]

In [18]:
# Below defined code and functions are important since they are used to pre-process data
# so that it can be vectorized and fed to the model for better translation
# Note for TA/ Professor: run this cell

vocab_size = 35 #since total vocabulary size is 34
sequence_length = 100 #since maximum length currently for output is 95

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def sentence_vectorizer(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=sentence_vectorizer,
)

train_input_texts = [pair[0] for pair in train_sentences]
train_output_texts = [pair[1] for pair in train_sentences]
source_vectorization.adapt(train_input_texts)
target_vectorization.adapt(train_output_texts)

batch_size = 32

def format_dataset(inp, out):
    inp = source_vectorization(inp)
    out = target_vectorization(out)
    return ({
        "input": inp,
        "output": out[:, :-1],
    }, out[:,1:])

def make_dataset(pairs):
    input_texts, output_texts = zip(*pairs)
    input_texts = list(input_texts)
    output_texts = list(output_texts)
    dataset = tf.data.Dataset.from_tensor_slices((input_texts, output_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

Train = make_dataset(train_sentences)
validate = make_dataset(validation_sentences)

In [7]:
# Definition of transformer's encoder layer function
# Note for TA/ Professor: run this cell

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, hidden_layer, dropout_prob, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.hidden_layer = hidden_layer
        self.dropout_prob = dropout_prob
        self.att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(hidden_layer, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.normalization_layer_1 = layers.LayerNormalization()
        self.normalization_layer_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        att_out = self.att(
            inputs, inputs, attention_mask=mask)
        proj_input = self.normalization_layer_1(inputs + att_out)
        proj_output = self.dense_proj(proj_input)
        return self.normalization_layer_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
            "hidden_layer": self.hidden_layer,
            "dropout_prob": self.dropout_prob,
        })
        return config

In [6]:
# Definition of transformer's decoder layer function
# Note for TA/ Professor: run this cell

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, hidden_layer, dropout_prob, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.hidden_layer = hidden_layer
        self.dropout_prob = dropout_prob
        self.att_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.att_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(hidden_layer, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.normalization_layer_1 = layers.LayerNormalization()
        self.normalization_layer_2 = layers.LayerNormalization()
        self.normalization_layer_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
            "hidden_layer": self.hidden_layer,
            "dropout_prob": self.dropout_prob,
        })
        return config

    def get_causal_att_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_att_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        att_out_1 = self.att_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        att_out_1 = self.normalization_layer_1(inputs + att_out_1)
        att_out_2 = self.att_2(
            query=att_out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        att_out_2 = self.normalization_layer_2(
            att_out_1 + att_out_2)
        proj_output = self.dense_proj(att_out_2)
        return self.normalization_layer_3(att_out_2 + proj_output)

In [4]:
# Defining the Positional Embedder layer function
# Note for TA/ Professor: run this cell

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
# This cell has configurations for each layer, model build by layer and compile and fit function calls
# Configurations for layers
# Note for TA/ Professor: run this cell
embed_size = 256
dense_size = 2048
num_heads = 8
hidden_layer = 512
dropout_prob = 0.5

In [None]:
# Custom transformer's layers
# Note for TA/ Professor: run this cell only if you want to build a new model

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="input")
x = PositionalEmbedding(sequence_length, vocab_size, embed_size)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_size, dense_size, num_heads, hidden_layer, dropout_prob)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="output")
x = PositionalEmbedding(sequence_length, vocab_size, embed_size)(decoder_inputs)
x = TransformerDecoder(embed_size, dense_size, num_heads, hidden_layer, dropout_prob)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
# Note for TA/ Professor: run this cell only if you want to build a new model

transformer.compile(
    optimizer=keras.optimizers.RMSprop(),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])

transformer.fit(Train, epochs=20, validation_data=validate)
transformer.save("transformer0.h5")

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
# Load the model using custom_object_scope
# If model created in this running instance use below code
# Note for TA/ Professor: run this cell if building a new model using above code

from keras.utils import custom_object_scope
from tensorflow.keras import models

# Define the custom objects dictionary
custom_objects = {'PositionalEmbedding': PositionalEmbedding, 'TransformerEncoder': TransformerEncoder, 'TransformerDecoder': TransformerDecoder}

with custom_object_scope(custom_objects):
    loaded_model = transformer

In [9]:
# If a saved model has to be loaded then use the below lines
# Note for TA/ Professor: run this cell if loading a model for testing

from keras.utils import custom_object_scope
from tensorflow.keras import models

# Define the custom objects dictionary
custom_objects = {'PositionalEmbedding': PositionalEmbedding, 'TransformerEncoder': TransformerEncoder, 'TransformerDecoder': TransformerDecoder}

with custom_object_scope(custom_objects):
    model_path = "/content/drive/MyDrive/Colab Notebooks/636 stuff/project2/transformer0.h5" # path to model
    loaded_model = models.load_model(model_path)

In [19]:
# Phase 2 of program where we obtain translated vectorized output from our custom transformer model
# We feed that output data into below defined function to decode the vectorized data to readable data
# Note for TA/ Professor: run this cell


target_vocab = target_vectorization.get_vocabulary()
target_index_lookup = dict(zip(range(len(target_vocab)), target_vocab))
max_translated_sentence_length = 100

def translate(input_sentence):
    vect_input_sentence = source_vectorization([input_sentence])
    translated_sentence = "[start]"
    for i in range(max_translated_sentence_length):
        vect_target_sentence = target_vectorization(
            [translated_sentence])[:, :-1]
        pred = loaded_model(
            [vect_input_sentence, vect_target_sentence])
        sampled_token_index = np.argmax(pred[0, i, :])
        sampled_token = target_index_lookup[sampled_token_index]
        translated_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return translated_sentence

In [None]:
test_path = "test file path here"
my_test_input = pickle.load(open(test_path, 'rb'))
output_list = []
for i in range(len(my_test_input)):
  input_sentence = my_test_input[i]
  output = translate(input_sentence)
  output = output[8:-6]
  output_list.append(output)

output_file_path = "Rahaan_Gandhi_434007427_Project2_Prediction"
with open(output_file_path, "wb") as path:
  pickle.dump(output_list, path)

In [None]:
# Here is a custom defined function thats used to calculate accuracy of model by feeding it 1000 strings from an independent
# test set and then comparing the given output against ideal translation to get actual testing accuracy of the model.
''' acc = 0
realdata = test_outputs # replace with test_outputs
output_list = []
my_test_input = test_inputs # replace with test_inputs
for i in range(len(my_test_input)):
  input_sentence = my_test_input[i]
  output = translate(input_sentence)
  output = output[8:-6]
  output_list.append(output)
  for j in range(len(output)):
    if output[j] != realdata[i][j]:
      print(f"\nfor iteration {i}:")
      print("actual output:", output)
      print("ideal output: ", realdata[i])
      flag = False
      break
    else:
      flag = True

  if flag == True:
    print(f"\nfor iteration {i}: match!")
    acc += 1

acc = (acc/len(my_test_input)) * 100.00
print(f"acc = {acc}%")'''

## Instructions:
Run the below Cell, to load model for testing purposes

In [12]:
# Import all important libraries required for the transformer model
# Note for TA/ Professor: run this cell
import warnings
import pickle
import random
import numpy as np
import string
import re
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
warnings.filterwarnings('ignore')

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, hidden_layer, dropout_prob, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.hidden_layer = hidden_layer
        self.dropout_prob = dropout_prob
        self.att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(hidden_layer, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.normalization_layer_1 = layers.LayerNormalization()
        self.normalization_layer_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        att_out = self.att(
            inputs, inputs, attention_mask=mask)
        proj_input = self.normalization_layer_1(inputs + att_out)
        proj_output = self.dense_proj(proj_input)
        return self.normalization_layer_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
            "hidden_layer": self.hidden_layer,
            "dropout_prob": self.dropout_prob,
        })
        return config

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, hidden_layer, dropout_prob, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.hidden_layer = hidden_layer
        self.dropout_prob = dropout_prob
        self.att_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.att_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(dense_dim, activation="relu"),
             layers.Dropout(dropout_prob),
             layers.Dense(hidden_layer, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.normalization_layer_1 = layers.LayerNormalization()
        self.normalization_layer_2 = layers.LayerNormalization()
        self.normalization_layer_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
            "hidden_layer": self.hidden_layer,
            "dropout_prob": self.dropout_prob,
        })
        return config

    def get_causal_att_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_att_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        att_out_1 = self.att_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        att_out_1 = self.normalization_layer_1(inputs + att_out_1)
        att_out_2 = self.att_2(
            query=att_out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        att_out_2 = self.normalization_layer_2(
            att_out_1 + att_out_2)
        proj_output = self.dense_proj(att_out_2)
        return self.normalization_layer_3(att_out_2 + proj_output)

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

# If a saved model has to be loaded then use the below lines
# Note for TA/ Professor: run this cell if loading a model for testing
# Make sure to add correct path to model in model_path variable


from keras.utils import custom_object_scope
from tensorflow.keras import models

# Define the custom objects dictionary
custom_objects = {'PositionalEmbedding': PositionalEmbedding, 'TransformerEncoder': TransformerEncoder, 'TransformerDecoder': TransformerDecoder}

with custom_object_scope(custom_objects):
    model_path = "/content/drive/MyDrive/Colab Notebooks/636 stuff/project2/Rahaan_Gandhi_434007427_Project2_Model.h5" # path to model
    loaded_model = models.load_model(model_path)

# After running this cell, the model will be loaded into loaded_model and will be ready for use!