### Transformer Model

In [1]:
import tensorflow as tf
import tensorflow_addons as tfa
print(tf.__version__)
from sklearn.model_selection import train_test_split
import os
import io
import numpy as np
import re
import unicodedata
import urllib3
import shutil
import zipfile
import itertools
from tensorflow import keras

2.1.0


### Create Dataset

In [None]:
path_to_file = "./data/bwd_sample5000.txt"

In [None]:
def unicode_to_ascii(s):
    """ Converts the unicode file to ascii """
    return ''.join(c for c in unicodedata.normalize('NFD', s)
      if unicodedata.category(c) != 'Mn')

In [None]:
def preprocess_sentence(w):
    w = unicode_to_ascii(w.lower().strip())
    # adding a start and an end token to the sentence
    w = '<start> ' + w + ' <end>'
    return w

In [None]:
def create_dataset(path, num_examples=None):
    """ create dataset Clean the sentences and Return word pairs in the format: [equation, integral] """
    lines = io.open(path, encoding='UTF-8').read().strip().split('\n')

    word_pairs = [[preprocess_sentence(w) for w in l.split('\t')]  for l in lines[:num_examples]]

    return zip(*word_pairs)

In [None]:
eq, intgr = create_dataset(path_to_file)

### Preprocess dataset

In [None]:
def tokenize(inp, sequence_length):
    """ word to index """
    tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    tokenizer.fit_on_texts(inp)
    sequences = tokenizer.texts_to_sequences(inp)
    sequences = tf.keras.preprocessing.sequence.pad_sequences(sequences, padding='post', maxlen=sequence_length, truncating='post')
    return  sequences, tokenizer

### Train test split

In [None]:
sequence_length = 512

# Tokenize each word into index and return the tokenized list and tokenizer
X , X_tokenizer = tokenize(eq)
Y,  Y_tokenizer = tokenize(intgr)
X_train,  X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2)


In [None]:
# tokenize by frequency
X_tokenizer.word_index['<start>']   

In [None]:
# vocabulary size # add 1 for 0 padding 
input_vocab_size = len(X_tokenizer.word_index) + 1 
output_vocab_size = len(Y_tokenizer.word_index)+ 1

print("input_vocab_size : ", input_vocab_size)
print("output_vocab_size : " ,output_vocab_size)

### Build transformer 
- building in ...

In [2]:
from tensorflow.keras import models, layers
from tensorflow.keras import backend as K

In [3]:
### only for model test
sequence_length = 512
input_vocabulary_size = 1000
output_vocabulary_size = 1000
###
d_model = 512
embedding_size = 512
num_layers = 6
num_heads = 8
depth = d_model // num_heads
dff = 2048
rate = 0.1
training = True

#### encoder

input and embedding

In [4]:
encoder_input = layers.Input(shape=(sequence_length, ), name="encoder_input")
x_en = encoder_input
x_en = layers.Embedding(
    input_vocabulary_size,
    embedding_size,
)(x_en)

positioning encoding

In [5]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

In [6]:
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)
  
    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
  
    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    
    pos_encoding = angle_rads[np.newaxis, ...]
    
    return tf.cast(pos_encoding, dtype=tf.float32)

In [7]:
x_en = layers.Add()([x_en * tf.math.sqrt(tf.cast(d_model, tf.float32)), positional_encoding(sequence_length, d_model)])

In [8]:
x_en = layers.Dropout(rate=rate)(x_en, training=training)

encoder layer

In [9]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, depth):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
    
        assert d_model % self.num_heads == 0
    
    
        self.wq = tf.keras.layers.Dense(depth)
        self.wk = tf.keras.layers.Dense(depth)
        self.wv = tf.keras.layers.Dense(depth)
        self.attention = tf.keras.layers.Attention(use_scale=True,)
    
        self.dense = tf.keras.layers.Dense(d_model)
        
    
    def call(self, inputs, mask):
        for i in range(num_heads):
            q_i = self.wq(inputs[0])
            k_i = self.wk(inputs[1])
            v_i = self.wv(inputs[2])
            self_attention = self.attention([q_i, v_i, k_i])
            if i == 0:
                concat_attention = tf.concat([self_attention], axis=2)
            else:
                concat_attention = tf.concat([concat_attention, self_attention], axis=2)      
        self_attention = self.dense(concat_attention)

        return self_attention

In [24]:
def encoder_layer(d_model, num_heads, depth, input_layer, layer, training):
    #multi-head self attention 
    self_attention = MultiHeadAttention(d_model, num_heads, depth)([input_layer, input_layer, input_layer], mask=None)
    self_attenton_output = layers.Dropout(rate=0.1)(self_attention, training=training)
    
    #layer norm1
    layernorm1 = layers.LayerNormalization(epsilon=1e-6, name="layer" + str(layer) + "_" + "en_layer_norm1")(self_attenton_output + input_layer)
    
    # feed forward
    ffn1 = tf.keras.layers.Dense(dff, activation='relu')(layernorm1)
    ffn2 = tf.keras.layers.Dense(d_model, name="layer" + str(layer) + "_" + "en_feed_forward")(ffn1)
    ffn2_output = layers.Dropout(rate=0.1)(ffn2, training=training)
    
    #layer norm2
    layernorm2 = layers.LayerNormalization(epsilon=1e-6, name="layer" + str(layer) + "_" + "en_layer_norm2")(ffn2_output+layernorm1)
    return layernorm2

In [25]:
en_input_layer = x_en
for i in range(num_layers):
    en_output_layer = encoder_layer(d_model, num_heads, depth, en_input_layer, i, training)
    en_input_layer = en_output_layer

In [28]:
encoder_output = en_output_layer
en_model = models.Model(encoder_input, encoder_output)
#en_model.summary()

In [27]:
encoder_model_graph = keras.utils.plot_model(en_model, show_shapes=True, to_file="transformer_encoder_model.png")

#### decoder

In [29]:
decoder_input = layers.Input(shape=(sequence_length, ), name="decoder_input")
x_de = decoder_input
x_de = layers.Embedding(
    output_vocabulary_size,
    embedding_size,
)(x_de)

positioning encoding

In [30]:
x_de = layers.Add()([x_de * tf.math.sqrt(tf.cast(d_model, tf.float32)), positional_encoding(sequence_length, d_model)])

In [31]:
x_de = layers.Dropout(rate=rate)(x_de, training=training)

decoder layer

In [40]:
def decoder_layer(input_layer, encoder_output, num_heads, layer, training):
    #multi-head self attention 
    self_attention_de = MultiHeadAttention(d_model, num_heads, depth)([input_layer, input_layer, input_layer], mask=None)
    self_attenton_de_output = layers.Dropout(rate=0.1)(self_attention_de, training=training)
    
    #layer norm1
    layernorm1 = layers.LayerNormalization(epsilon=1e-6, name="layer" + str(layer) + "_" + "de_layer_norm1")(self_attenton_de_output + input_layer)
    
    #encoder decoder attention
    en_de_attention = MultiHeadAttention(d_model, num_heads, depth)([input_layer,encoder_output, encoder_output], mask=None)
    en_de_attenton_output = layers.Dropout(rate=0.1)(en_de_attention, training=training)
    
    #layer norm2
    layernorm2 = layers.LayerNormalization(epsilon=1e-6, name="layer" + str(layer) + "_" + "de_layer_norm2")(en_de_attenton_output + layernorm1)
    
    # feed forward
    ffn1 = tf.keras.layers.Dense(dff, activation='relu')(layernorm2)
    ffn2 = tf.keras.layers.Dense(d_model, name="layer" + str(layer) + "_" + "de_feed_forward")(ffn1)
    ffn2_output = layers.Dropout(rate=0.1)(ffn2, training=training)
    
    #layer norm3
    layernorm3 = layers.LayerNormalization(epsilon=1e-6, name="layer" + str(layer) + "_" + "de_layer_norm3")(ffn2_output + layernorm2)
    return layernorm3

In [41]:
de_input_layer = x_de
for i in range(num_layers):
    de_output_layer = decoder_layer(de_input_layer, encoder_output, num_heads, i, training)
    de_input_layer = de_output_layer

In [42]:
decoder_output = de_output_layer
model = models.Model(inputs=[encoder_input, decoder_input], outputs=decoder_output)
#model.summary()

In [43]:
model_graph = keras.utils.plot_model(model, show_shapes=False, to_file="transformer_model.png")

### Show structure of model

In [None]:
from tensorflow.keras.utils import plot_model
plot_model(model, to_file="model.png")
plot_model(model, show_shapes=True)

### Run model

In [None]:
model.compile(optimizer='rmsprop',
             loss='binary_crossentropy',
             metrics=['accuracy']
             )
history = model.fit(
    x_train, y_train,
    epochs=10,
    batch_size=32,
    validation_split=0.2
)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
plt.plot(model.history.history['loss'], label='loss')
plt.plot(model.history.history['val_loss'], label='val_loss')
plt.legend()
plt.show()
plt.close()

plt.plot(model.history.history['accuracy'], label='accuracy')
plt.plot(model.history.history['val_accuracy'], label='val_accuracy')
plt.legend()
plt.show()
plt.close()