In [1]:
import pandas as pd
import re
import os
import time
import random
import numpy as np

try:
  %tensorflow_version 2.x # enable TF 2.x in Colab
except Exception:
  pass

import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from sklearn.model_selection import train_test_split
from google.colab import drive
import pickle

import spacy

nlp = spacy.load("en_core_web_sm")
from nltk.translate.bleu_score import corpus_bleu

Colab only includes TensorFlow 2.x; %tensorflow_version has no effect.


In [2]:
tf.__version__

'2.12.0'

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
csv_file_path = '/content/drive/MyDrive/NLP Mandate/MathIQ_Dataset - Sheet1.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(csv_file_path)

In [5]:
df.shape

(29653, 2)

In [6]:
df.head(20)

Unnamed: 0,Question,Equation
0,the banker ' s gain of a certain sum due 3 yea...,x=((100*((36*100)/(3*10)))/(3*10))
1,average age of students of an adult school is ...,x=(((((32+4)*120)-(120*32))/(40-(32+4)))*4)
2,sophia finished 2 / 3 of a book . she calculat...,x=(90/(1-(2/3)))
3,120 is what percent of 50 ?,x=((120/50)*100)
4,there are 10 girls and 20 boys in a classroom ...,x=(10/20)
5,an empty fuel tank with a capacity of 218 gall...,x=(((218*(16/100))-30)/((16/100)-(12/100)))
6,an article is bought for rs . 823 and sold for...,x=(100-((1000*100)/823))
7,6 workers should finish a job in 8 days . afte...,x=(((6*8)-(3*6))/(6+4))
8,j is 25 % less than p and 20 % less than t . t...,x=((25*25)/100)
9,a student was asked to find 4 / 5 of a number ...,x=((((36*(4/5))*(4/5))/(1-((4/5)*(4/5))))/(4/5))


In [7]:
X = list(df['Question'].values)

In [8]:
def spacify(s):
    return ' '.join(list(s))

In [9]:
Y = list(df['Equation'].apply(lambda y: spacify(y)).values)

In [10]:
X[:10]

["the banker ' s gain of a certain sum due 3 years hence at 10 % per annum is rs . 36 . what is the present worth ?",
 'average age of students of an adult school is 40 years . 120 new students whose average age is 32 years joined the school . as a result the average age is decreased by 4 years . find the number of students of the school after joining of the new students .',
 'sophia finished 2 / 3 of a book . she calculated that she finished 90 more pages than she has yet to read . how long is her book ?',
 '120 is what percent of 50 ?',
 'there are 10 girls and 20 boys in a classroom . what is the ratio of girls to boys ?',
 'an empty fuel tank with a capacity of 218 gallons was filled partially with fuel a and then to capacity with fuel b . fuel a contains 12 % ethanol by volume and fuel b contains 16 % ethanol by volume . if the full fuel tank contains 30 gallons of ethanol , how many gallons of fuel a were added ?',
 'an article is bought for rs . 823 and sold for rs . 1000 , find

In [11]:
Y[:10]

['x = ( ( 1 0 0 * ( ( 3 6 * 1 0 0 ) / ( 3 * 1 0 ) ) ) / ( 3 * 1 0 ) )',
 'x = ( ( ( ( ( 3 2 + 4 ) * 1 2 0 ) - ( 1 2 0 * 3 2 ) ) / ( 4 0 - ( 3 2 + 4 ) ) ) * 4 )',
 'x = ( 9 0 / ( 1 - ( 2 / 3 ) ) )',
 'x = ( ( 1 2 0 / 5 0 ) * 1 0 0 )',
 'x = ( 1 0 / 2 0 )',
 'x = ( ( ( 2 1 8 * ( 1 6 / 1 0 0 ) ) - 3 0 ) / ( ( 1 6 / 1 0 0 ) - ( 1 2 / 1 0 0 ) ) )',
 'x = ( 1 0 0 - ( ( 1 0 0 0 * 1 0 0 ) / 8 2 3 ) )',
 'x = ( ( ( 6 * 8 ) - ( 3 * 6 ) ) / ( 6 + 4 ) )',
 'x = ( ( 2 5 * 2 5 ) / 1 0 0 )',
 'x = ( ( ( ( 3 6 * ( 4 / 5 ) ) * ( 4 / 5 ) ) / ( 1 - ( ( 4 / 5 ) * ( 4 / 5 ) ) ) ) / ( 4 / 5 ) )']

In [12]:
def preprocess_X(s):
    s = s.lower().strip()
    s = re.sub(r"([?.!,’])", r" \1 ", s)
    s = re.sub(r"([0-9])", r" \1 ", s)
    s = re.sub(r'[" "]+', " ", s)
    s = s.rstrip().strip()
    return s

def preprocess_Y(e):
    e = e.lower().strip()
    return e

In [13]:
X_pp = list(map(preprocess_X, X))
Y_pp = list(map(preprocess_Y, Y))

In [14]:
X_pp[:10]

["the banker ' s gain of a certain sum due 3 years hence at 1 0 % per annum is rs . 3 6 . what is the present worth ?",
 'average age of students of an adult school is 4 0 years . 1 2 0 new students whose average age is 3 2 years joined the school . as a result the average age is decreased by 4 years . find the number of students of the school after joining of the new students .',
 'sophia finished 2 / 3 of a book . she calculated that she finished 9 0 more pages than she has yet to read . how long is her book ?',
 '1 2 0 is what percent of 5 0 ?',
 'there are 1 0 girls and 2 0 boys in a classroom . what is the ratio of girls to boys ?',
 'an empty fuel tank with a capacity of 2 1 8 gallons was filled partially with fuel a and then to capacity with fuel b . fuel a contains 1 2 % ethanol by volume and fuel b contains 1 6 % ethanol by volume . if the full fuel tank contains 3 0 gallons of ethanol , how many gallons of fuel a were added ?',
 'an article is bought for rs . 8 2 3 and sold f

In [15]:
Y_pp[:10]

['x = ( ( 1 0 0 * ( ( 3 6 * 1 0 0 ) / ( 3 * 1 0 ) ) ) / ( 3 * 1 0 ) )',
 'x = ( ( ( ( ( 3 2 + 4 ) * 1 2 0 ) - ( 1 2 0 * 3 2 ) ) / ( 4 0 - ( 3 2 + 4 ) ) ) * 4 )',
 'x = ( 9 0 / ( 1 - ( 2 / 3 ) ) )',
 'x = ( ( 1 2 0 / 5 0 ) * 1 0 0 )',
 'x = ( 1 0 / 2 0 )',
 'x = ( ( ( 2 1 8 * ( 1 6 / 1 0 0 ) ) - 3 0 ) / ( ( 1 6 / 1 0 0 ) - ( 1 2 / 1 0 0 ) ) )',
 'x = ( 1 0 0 - ( ( 1 0 0 0 * 1 0 0 ) / 8 2 3 ) )',
 'x = ( ( ( 6 * 8 ) - ( 3 * 6 ) ) / ( 6 + 4 ) )',
 'x = ( ( 2 5 * 2 5 ) / 1 0 0 )',
 'x = ( ( ( ( 3 6 * ( 4 / 5 ) ) * ( 4 / 5 ) ) / ( 1 - ( ( 4 / 5 ) * ( 4 / 5 ) ) ) ) / ( 4 / 5 ) )']

In [16]:
def tokenize(lang):
    lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    lang_tokenizer.fit_on_texts(lang)
    tensor = lang_tokenizer.texts_to_sequences(lang)
    return tensor, lang_tokenizer

In [17]:
X_tensor, X_lang_tokenizer = tokenize(X_pp)
len(X_lang_tokenizer.word_index)

7692

In [18]:
Y_tensor, Y_lang_tokenizer = tokenize(Y_pp)
len(Y_lang_tokenizer.word_index)

19

In [19]:
previous_length = len(Y_lang_tokenizer.word_index)

Add integers for < start > and < end > tokens for input problems and target math expressions.

In [20]:
def append_head_tail(x, last_int):
    l = []
    l.append(last_int + 1)
    l.extend(x)
    l.append(last_int + 2)
    return l

In [21]:
X_tensor_list = [append_head_tail(i, len(X_lang_tokenizer.word_index)) for i in X_tensor]
Y_tensor_list = [append_head_tail(i, len(Y_lang_tokenizer.word_index)) for i in Y_tensor]

Padding the sequences with 0's to make them equal in length.

In [22]:
X_tensor = tf.keras.preprocessing.sequence.pad_sequences(X_tensor_list, padding='post')
Y_tensor = tf.keras.preprocessing.sequence.pad_sequences(Y_tensor_list, padding='post')

In [23]:
X_tensor

array([[7693,    1, 1403, ...,    0,    0,    0],
       [7693,   39,  110, ...,    0,    0,    0],
       [7693, 5179,  859, ...,    0,    0,    0],
       ...,
       [7693,   23,   32, ...,    0,    0,    0],
       [7693,   19,    1, ...,    0,    0,    0],
       [7693,    1,   71, ...,    0,    0,    0]], dtype=int32)

In [24]:
Y_tensor

array([[20,  9, 10, ...,  0,  0,  0],
       [20,  9, 10, ...,  0,  0,  0],
       [20,  9, 10, ...,  0,  0,  0],
       ...,
       [20,  9, 10, ...,  0,  0,  0],
       [20,  9, 10, ...,  0,  0,  0],
       [20,  9, 10, ...,  0,  0,  0]], dtype=int32)

Increasing the vocabulary size of the target by including some fodder words which won't be used. This is done to avoid problems later which manifest due to short vocabulary size.

In [25]:
keys = ['10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21',
        '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33',
        '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45',
        '46', '47', '48', '49', '50']

for idx,key in enumerate(keys):
    Y_lang_tokenizer.word_index[key] = len(Y_lang_tokenizer.word_index) + idx + 4

In [26]:
len(Y_lang_tokenizer.word_index)

60

**Train-Test Split → 95 : 5**

In [27]:
X_tensor_train, X_tensor_test, Y_tensor_train, Y_tensor_test = train_test_split(X_tensor, Y_tensor, test_size=0.05, random_state=42)

In [28]:
print(len(X_tensor_train), len(X_tensor_test), len(Y_tensor_train), len(Y_tensor_test))

28170 1483 28170 1483


**Hyperparameters**

In [29]:
TRAINING_SET_SIZE = len(X_tensor_train)
BATCH_SIZE = 64
steps_per_epoch = np.floor(TRAINING_SET_SIZE/BATCH_SIZE)

data = tf.data.Dataset.from_tensor_slices((X_tensor_train, Y_tensor_train)).shuffle(TRAINING_SET_SIZE)
data = data.batch(BATCH_SIZE, drop_remainder=True)

num_layers = 4
d_model = 128       # Embedding dimension
dff = 512           # Dimensionality of inner-layer of FNN
num_heads = 8       # Number of parallel attention layers (heads)
dropout_rate = 0

X_vocabulary_size = len(X_lang_tokenizer.word_index) + 3
Y_vocabulary_size = len(Y_lang_tokenizer.word_index) + 3

In [30]:
data = data.prefetch(tf.data.experimental.AUTOTUNE)

In [31]:
data

<_PrefetchDataset element_spec=(TensorSpec(shape=(64, 382), dtype=tf.int32, name=None), TensorSpec(shape=(64, 2543), dtype=tf.int32, name=None))>

In [32]:
X_batch_example, Y_batch_example = next(iter(data))

print(X_batch_example, Y_batch_example)

tf.Tensor(
[[7693   47  319 ...    0    0    0]
 [7693    1  248 ...    0    0    0]
 [7693 4358 1795 ...    0    0    0]
 ...
 [7693   23   32 ...    0    0    0]
 [7693   13    5 ...    0    0    0]
 [7693   20    5 ...    0    0    0]], shape=(64, 382), dtype=int32) tf.Tensor(
[[20  9 10 ...  0  0  0]
 [20  9 10 ...  0  0  0]
 [20  9 10 ...  0  0  0]
 ...
 [20  9 10 ...  0  0  0]
 [20  9 10 ...  0  0  0]
 [20  9 10 ...  0  0  0]], shape=(64, 2543), dtype=int32)


**Positional Encoding**

In [33]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

In [34]:
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                            np.arange(d_model)[np.newaxis, :],
                            d_model)

    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

**Masking**

We mask all the padding elements so that they are not considered as input to the model. The position of the pad tokens are the positions at which the mask shows 1 and at the other locations it shows 0. The subsequent tokens in a sequence are masked using the look-ahead mask and this mask indicates the entries that should be avoided.

In [35]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)

    # add extra dimensions to add the padding
    # to the attention logits.
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

In [36]:
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

**Scaled Dot-product Attention**

A “soft” dictionary lookup which returns a weighted sum of the values in the corpus. This weight represents the usefulness of a particular token in embedding the query token. Then, it is scaled by dividing with sqrt(dk) inside softmax.


In [37]:
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

    # scale matmul_qk
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # add the mask to the scaled tensor.
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    # softmax is normalized on the last axis (seq_len_k) so that the scores
    # add up to 1.
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights

**Multi-Head Attention**

It is a combination of 'h' self-attention heads, where each head is sandwiched between two linear layers.

In [38]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """Split the last dimension into (num_heads, depth).
        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention,
                                    (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

**Point-wise Feed Forward Neural Network**

Every encoder and decoder layer contains a fully connected feed-forward network. It is composed of two fully-connected layers with a ReLU activation function in between them.


In [39]:
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
        tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

**Encoder**

The encoder consists of N layers and each layer has 2 sub-layers. The first sub-layer is a multi-head self attention mechanism layer and the second sub-layer is a position-wise fully connected feed-forward neural network. Each of these sub-layers is preceded by a skip connection or residual connection and succeeded by a layer normalization block. They result in outputs of dimension dmodel to facilitate the skip connections. The residual connections are important as they assist in overcoming the vanishing gradient problem in deep network architectures.

In [40]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        # normalize data per feature instead of batch
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        # Multi-head attention layer
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        # add residual connection to avoid vanishing gradient problem
        out1 = self.layernorm1(x + attn_output)

        # Feedforward layer
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        # add residual connection to avoid vanishing gradient problem
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

In [41]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding,
                                                self.d_model)

        # Create encoder layers (count: num_layers)
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
                        for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):

        seq_len = tf.shape(x)[1]

        # adding embedding and position encoding.
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x

**Decoder**

The decoder consists of N layers and each layer has 3 sub-layers. Two of the sub-layers are the same as those of the encoder. The third sub-layer utilizes the outputs of the encoder stack and performs multi-head attention over them. Decoder feeds on two types of inputs which are, the outputs of the encoder and positionally encoded target output embeddings. And just like the encoder, decoder also has subsequent layer normalization blocks and incorporation of skip connections after and before each sub-layer respectively. The principal task of decoder is to predict token at position n by looking at all the preceding n-1 tokens using the look-ahead mask. The predicted sequence is then passed through a fully-connected neural network layer to generate the final math expression.

In [42]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.rate = rate

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]  # Get the sequence length of x

        # Calculate attention weights for the first multi-head attention layer
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        # Calculate attention weights for the second multi-head attention layer
        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        # Apply point-wise feed forward network
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3, attn_weights_block1, attn_weights_block2


In [43]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
                maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        #self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)


        # Create decoder layers (count: num_layers)
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
                        for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training,
            look_ahead_mask, padding_mask):

        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)

        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))

        x += self.pos_encoding[:,:seq_len,:]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                look_ahead_mask, padding_mask)

        # store attenion weights, they can be used to visualize while translating
        attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
        attention_weights['decoder_layer{}_block2'.format(i+1)] = block2

        return x, attention_weights

In [44]:
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                target_vocab_size, pe_input, pe_target, rate=0.1):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                            input_vocab_size, pe_input, rate)

        self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                            target_vocab_size, pe_target, rate)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inp, tar, training, enc_padding_mask,
            look_ahead_mask, dec_padding_mask):

        # Pass the input to the encoder
        enc_output = self.encoder(inp, training, enc_padding_mask)

        # Pass the encoder output to the decoder
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, dec_padding_mask)

        # Pass the decoder output to the last linear layer
        final_output = self.final_layer(dec_output)

        return final_output, attention_weights

In [45]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        self.d_model = tf.cast(d_model, tf.float32)  # Cast d_model to tf.float32

        self.warmup_steps = tf.cast(warmup_steps, tf.float32)  # Cast warmup_steps to tf.float32

    def __call__(self, step):

        step = tf.cast(step, tf.float32)  # Cast step to tf.float32

        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

**Optimizer**

We used Adam with a custom learning rate scheduler as our optimizer. The learning rate was set according to,

lrate = ((dmodel)^(-0.5)).min(n^-0.5 , n.(w^-0.5))

where, dmodel is the embedding dimension, n is the step number and w is the number of warm-up steps. Here, warm-up steps w simply insinuates that the learning rate rises linearly for the initial w training steps.

In [46]:
learning_rate = CustomSchedule(d_model)

# Adam optimizer with a custom learning rate
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

In [47]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

**Performance Metric**

As output sequences are padded, it is important to apply a padding mask when calculating the loss. We used Sparse Categorical Cross-entropy for loss and Mean for accuracy. In the test-phase we also calcuated the BLEU (BiLingual Evaluation Understudy) score of our model to assess its translation quality.

In [48]:
def loss_function(real, pred):
    # Apply a mask to paddings (0)
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

def accuracy_function(real, pred):
    accuracies = tf.equal(tf.cast(real, dtype=tf.int32), tf.cast(tf.argmax(pred, axis=2), dtype=tf.int32))

    mask = tf.math.logical_not(tf.math.equal(tf.cast(real, dtype=tf.int32), 0))
    accuracies = tf.math.logical_and(mask, accuracies)

    accuracies = tf.cast(accuracies, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)

In [49]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
train_accuracy_mean = tf.keras.metrics.Mean(name='train_accuracy_mean')

In [50]:
transformer = Transformer(num_layers, d_model, num_heads, dff,
                          X_vocabulary_size, Y_vocabulary_size,
                          pe_input=X_vocabulary_size,
                          pe_target=Y_vocabulary_size,
                          rate=dropout_rate)

In [51]:
def create_masks(inp, tar):
    # Encoder padding mask (Used in the 2nd attention block in the decoder too.)
    enc_padding_mask = create_padding_mask(inp)
    dec_padding_mask = create_padding_mask(inp)

    # Used in the 1st attention block in the decoder.
    # It is used to pad and mask future tokens in the input received by
    # the decoder.
    # Look ahead mask (for hiding the rest of the sequence in the 1st decoder attention layer)
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    look_ahead_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, look_ahead_mask, dec_padding_mask

**Training**

In [52]:
EPOCHS = 17

In [53]:
def train_step(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]

    enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(inp, tar_inp)

    with tf.GradientTape() as tape:
        predictions, _ = transformer(inp, tar_inp,
                                    True,
                                    enc_padding_mask,
                                    look_ahead_mask,
                                    dec_padding_mask)
        loss = loss_function(tar_real, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    train_loss(loss)
    train_accuracy(tar_real, predictions)
    train_accuracy_mean(accuracy_function(tar_real, predictions))

In [54]:
for epoch in range(EPOCHS):
    start = time.time()

    train_loss.reset_states()
    train_accuracy.reset_states()
    train_accuracy_mean.reset_states()

    # inp -> math problem, tar -> expression
    for (batch, (inp, tar)) in enumerate(data):
        train_step(inp, tar)

        if batch % 50 == 0:
            print (f'Epoch {epoch + 1}, Batch {batch}, Loss {train_loss.result():.5f},\
             SC Accuracy {train_accuracy.result():.5f}, Mean Accuracy {train_accuracy_mean.result():.5f}')

    print (f'Epoch {epoch + 1}, Loss {train_loss.result():.5f},\
     SC Accuracy {train_accuracy.result():.5f}, Mean Accuracy {train_accuracy_mean.result():.5f}')

    print (f'Training time for this epoch: {(time.time() - start):.5f} seconds\n')

In [55]:
MAX_LENGTH = 40

In [56]:
def evaluate(input_problem):
    start_token = [len(X_lang_tokenizer.word_index)+1]
    end_token = [len(X_lang_tokenizer.word_index)+2]

    # input_problem is the word problem, hence adding the start and end token
    input_problem = start_token + [X_lang_tokenizer.word_index[i] for i in preprocess_X(input_problem).split(' ')] + end_token
    encoder_input = tf.expand_dims(input_problem, 0)

    # start with expression's start token
    decoder_input = [previous_length+1]
    output = tf.expand_dims(decoder_input, 0)

    for i in range(MAX_LENGTH):
        enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(encoder_input, output)

        predictions, attention_weights = transformer(encoder_input,
                                                    output,
                                                    False,
                                                    enc_padding_mask,
                                                    look_ahead_mask,
                                                    dec_padding_mask)

        # select the last word from the seq_len dimension
        predictions = predictions[: ,-1:, :]
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), dtype=tf.int32)

        # return the result if the predicted_id is equal to the end token
        if predicted_id == previous_length + 2:
            return tf.squeeze(output, axis=0), attention_weights

        # concatenate the predicted_id to the output which is given to the decoder
        # as its input.
        output = tf.concat([output, predicted_id], axis=-1)
    return tf.squeeze(output, axis=0), attention_weights

In [57]:
def plot_attention_weights(attention, problem, result, layer):
    fig = plt.figure(figsize=(8, 16))

    sentence = preprocess_X(problem)

    attention = tf.squeeze(attention[layer], axis=0)

    for head in range(attention.shape[0]):
        ax = fig.add_subplot(4, 2, head+1)

        # plot the attention weights
        ax.matshow(attention[head][:-1, :], cmap='viridis')

        fontdict = {'fontsize': 11}

        ax.set_xticks(range(len(sentence.split(' '))+2))
        ax.set_yticks(range(len([Y_lang_tokenizer.index_word[i] for i in list(result.numpy())
                            if i < len(Y_lang_tokenizer.word_index) and i not in [0,previous_length+1,previous_length+2]])+3))


        ax.set_ylim(len([Y_lang_tokenizer.index_word[i] for i in list(result.numpy())
                            if i < len(Y_lang_tokenizer.word_index) and i not in [0,previous_length+1,previous_length+2]]), -0.5)

        ax.set_xticklabels(
            ['<start>']+sentence.split(' ')+['<end>'],
            fontdict=fontdict, rotation=90)

        ax.set_yticklabels([Y_lang_tokenizer.index_word[i] for i in list(result.numpy())
                            if i < len(Y_lang_tokenizer.word_index) and i not in [0,previous_length+1,previous_length+2]],
                        fontdict=fontdict)

        ax.set_xlabel(f'Head {head+1}')

    plt.tight_layout()
    plt.show()

In [58]:
def solve(problem, plot='', plot_Attention_Weights=False):
    prediction, attention_weights = evaluate(problem)
    predicted_expression = [Y_lang_tokenizer.index_word[i] \
                          for i in list(prediction.numpy()) \
                          if (i < len(Y_lang_tokenizer.word_index) and i not in [0,46,47])]
    print(f'Input: {problem}')
    print('Predicted translation: {}'.format(' '.join(predicted_expression)))

    if plot_Attention_Weights:
        plot_attention_weights(attention_weights, problem, prediction, plot)

In [59]:
def evaluate_testset(input_problem):
    start_token = [len(X_lang_tokenizer.word_index)+1]
    end_token = [len(X_lang_tokenizer.word_index)+2]

    # input_problem is the word problem, hence adding the start and end token
    input_problem = start_token + list(input_problem.numpy()[0]) + end_token
    encoder_input = tf.expand_dims(input_problem, 0)

    # start with expression's start token
    decoder_input = [previous_length+1]
    output = tf.expand_dims(decoder_input, 0)

    for i in range(MAX_LENGTH):
        enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(encoder_input, output)

        predictions, attention_weights = transformer(encoder_input,
                                                    output,
                                                    False,
                                                    enc_padding_mask,
                                                    look_ahead_mask,
                                                    dec_padding_mask)

        # select the last word from the seq_len dimension
        predictions = predictions[: ,-1:, :]
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), dtype=tf.int32)

        # return the result if the predicted_id is equal to the end token
        if predicted_id == previous_length + 2:
            return tf.squeeze(output, axis=0), attention_weights

        # concatenate the predicted_id to the output which is given to the decoder
        # as its input.
        output = tf.concat([output, predicted_id], axis=-1)
    return tf.squeeze(output, axis=0), attention_weights

In [62]:
data_test = tf.data.Dataset.from_tensor_slices((X_tensor_test, Y_tensor_test)).shuffle(len(X_tensor_test))
data_test = data_test.batch(1, drop_remainder=True)

In [63]:
Y_true = []
Y_pred = []
correctCount = 0

idx = 0
for(X_test_batch, Y_test_batch) in iter(data_test):
    idx += 1
    if idx % 10 == 0:
        print(f'Samples tested: {idx}, Correctly solved: {correctCount}')
    ground_truth_expression = ''
    for i in Y_test_batch.numpy()[0]:
        if i not in [0, previous_length + 1, previous_length + 2]:
            ground_truth_expression += (Y_lang_tokenizer.index_word[i] + ' ')

    Y_true.append([ground_truth_expression.split(' ')[:-1]])

    prediction, attention_weights = evaluate_testset(X_test_batch)
    predicted_expression = [Y_lang_tokenizer.index_word[i] \
                            for i in list(prediction.numpy()) \
                            if (i < len(Y_lang_tokenizer.word_index) and i not in [0, previous_length + 1, previous_length + 2])]
    Y_pred.append(predicted_expression)
    if ground_truth_expression.split(' ')[:-1] == predicted_expression:
        correctCount += 1