## Implementaion of core functionality for transformer model

In [3]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np

#Implementation of layers with core functionality

class PositionalEmbeddingLayer(tf.keras.layers.Layer):
    def __init__(self, vocab_size, word_vector_len, sentence_len):
        super().__init__()
        self.word_vector_len = word_vector_len
        self.sentence_len = sentence_len
        # ready made embedding layer to produce sequences of word vectors from sentences + masking zeroes
        self.embedding = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim = word_vector_len, input_length = sentence_len, dtype = tf.float32)
        #precomputing positional encoding for better performance
        self.pe = self.pos_encoding()

    def pos_encoding(self):
        pe = np.zeros([self.sentence_len, self.word_vector_len])
        positions = np.arange(0, self.sentence_len)[:, np.newaxis]  
        depths = np.arange(0, self.word_vector_len, 2)[np.newaxis, :]  
        angle_rates = 1 / np.power(10000, (depths / self.word_vector_len))
        angle_rads = positions * angle_rates        
        # Sine for even indices 
        pe[:, 0::2] = np.sin(angle_rads)
        # Cosine for odd indeces
        pe[:, 1::2] = np.cos(angle_rads)
        
        return tf.constant(pe[np.newaxis, :, :], dtype=tf.float32)

    def call(self, X):
        #mask = self.embedding.compute_mask(X)
        output = self.embedding(X) * tf.sqrt(tf.cast(self.word_vector_len, tf.float32)) + self.pe #scaling embedding output and adding positional encoding 
        return output #, mask[:, tf.newaxis, :]
    
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, word_vector_len, heads):
        super().__init__()
        self.word_vector_len = word_vector_len
        self.heads = heads
        if word_vector_len % heads != 0:
            raise "Word vector should be divisible by number of heads"
        self.head_depth = int(word_vector_len / heads)

        #Initializing Dense layers for queries, keys, values and output
        self.wq = tf.keras.layers.Dense(word_vector_len)
        self.wk = tf.keras.layers.Dense(word_vector_len)
        self.wv = tf.keras.layers.Dense(word_vector_len)
        
        self.wo = tf.keras.layers.Dense(word_vector_len)

    def calculateAttention(self, batch_size, sentence_len, q, k, v, mask = None):
        attention_scores = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.head_depth, dtype=tf.float32)) # (batch_size, heads, sentence_len, sentence_len)
        if mask is not None:
            attention_scores += (mask * -1e9) #masking for decoder
        attention_scores = tf.nn.softmax(attention_scores, axis=-1)  
        attention_output = tf.matmul(attention_scores, v) # (batch_size, heads, sentence_len, head_depth)

        #re-arraning and combining heads back
        attention_output = tf.transpose(attention_output, perm=[0, 2, 1, 3])  # (batch, seq_len, heads, head_depth)
        attention_output = tf.reshape(attention_output, [batch_size, sentence_len, self.word_vector_len])

        #regualizer
        attention_output = tf.keras.layers.Dropout(0.2)(attention_output)

        return attention_output
    
    def call(self, q, k, v, mask = None):
        batch_size = tf.shape(q)[0]
        sentence_len = tf.shape(q)[1]
        queries = self.wq(q)
        keys = self.wk(k)
        values = self.wv(v)

        #slicing word vectors into pieces of size head_depth, amount of such pieces is equal to amount of heads. That's why it is neccessary that word_vecor_len is divisible by amount of heads
        #rearraging our matrix to shape so it would be [batch_size, heads, sentence length, head depth]

        queries = tf.reshape(queries, [batch_size, sentence_len, self.heads, self.head_depth])
        queries = tf.transpose(queries, perm = [0, 2, 1, 3]) # (batch_size, heads, sentence_len, head_depth)

        keys = tf.reshape(keys, [batch_size, sentence_len, self.heads, self.head_depth])
        keys= tf.transpose(keys, perm = [0, 2, 1, 3])

        values = tf.reshape(values, [batch_size, sentence_len, self.heads, self.head_depth])
        values = tf.transpose(values, perm = [0, 2, 1, 3])

        attention_output = self.calculateAttention(batch_size, sentence_len, queries, keys, values, mask)
        output = self.wo(attention_output)

        return output

## Bulding encoder-decoder tranformer

In [4]:
#Building the transformer model
max_length = 80
vocab_size = 2000
vector_size = 16
# for decoder, hiding future tokens with casual mask
casual_mask = tf.convert_to_tensor(np.triu(np.ones((max_length, max_length)))[np.newaxis, :, :], dtype = tf.float32)

#input = tf.keras.layers.Input((max_lenght,))
#embedding = PositionalEmbeddingLayer(vocab_size=vocab_size, word_vector_len = vector_size, sentence_len=max_lenght)(input)

#Encoder
def buildEncoderLayer(heads, hidden_dims):

    input = tf.keras.layers.Input((None, vector_size))

    globalMha = MultiHeadAttention(vector_size, heads = heads)(input, input, input)
    add = tf.keras.layers.Add()([globalMha, input])
    norm = tf.keras.layers.LayerNormalization()(add)
    ff = tf.keras.layers.Dense(hidden_dims, activation="relu")(norm)
    ff_out = tf.keras.layers.Dense(vector_size)(ff)
    add_1 = tf.keras.layers.Add()([ff_out, norm])
    norm_1 = tf.keras.layers.LayerNormalization()(add_1)

    encoderLayer= tf.keras.Model(inputs = input, outputs = norm_1)
    return encoderLayer

#Decoder
def buildDecoderLayer(heads, hidden_dims):

    decoder_input = tf.keras.layers.Input((None, vector_size))
    encoder_output = tf.keras.layers.Input((None, vector_size))

    maskedMha = MultiHeadAttention(vector_size, heads = heads)(decoder_input, decoder_input, decoder_input, casual_mask)
    add = tf.keras.layers.Add()([maskedMha, decoder_input])
    norm = tf.keras.layers.LayerNormalization()(add)
    crossMha = MultiHeadAttention(vector_size, heads=heads)(norm, encoder_output, encoder_output, casual_mask) #Multi-head cross-attention (queries from decoder, keys/values from encoder)
    add_1 = tf.keras.layers.Add()([crossMha, norm])
    norm_1 = tf.keras.layers.LayerNormalization()(add_1)
    ff = tf.keras.layers.Dense(hidden_dims, activation="relu")(norm_1)
    ff_out = tf.keras.layers.Dense(vector_size)(ff)
    add_2 = tf.keras.layers.Add()([ff_out, norm_1])
    norm_2 = tf.keras.layers.LayerNormalization()(add_2)

    decoderLayer = tf.keras.Model(inputs = [decoder_input, encoder_output], outputs = norm_2)
    return decoderLayer

def buildTransformer(encoders, decoders, heads, hidden_dims):
    encoder_input = tf.keras.layers.Input((max_length,))
    decoder_input = tf.keras.layers.Input((max_length,))

    encoder_embedding = PositionalEmbeddingLayer(vocab_size=vocab_size, word_vector_len = vector_size, sentence_len=max_length)(encoder_input)
    decoder_embedding = PositionalEmbeddingLayer(vocab_size=vocab_size, word_vector_len = vector_size, sentence_len=max_length)(decoder_input)
    
    #First inputs should be word embeddings
    encoder_output = encoder_embedding
    decoder_output = decoder_embedding

    #appending outputs to inputs
    for i in range(encoders):
        encoder_output = buildEncoderLayer(heads, hidden_dims)(encoder_output)
    
    for i in range(decoders):
        decoder_output = buildDecoderLayer(heads, hidden_dims)([decoder_output, encoder_output])
    
    linear = tf.keras.layers.Dense(256, activation="relu")(decoder_output)
    output = tf.keras.layers.Dense(vocab_size, activation="softmax")(linear)

    transformer = tf.keras.Model(inputs = [encoder_input, decoder_input], outputs = output)

    return transformer

def masked_loss(label, pred):
    mask = label != 0
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False, reduction='none') # False since the output is already softmaxed 
    loss = loss_object(label, pred)
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
    return loss


def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  match = label == pred
  mask = label != 0
  match = match & mask
  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match)/tf.reduce_sum(mask)



In [5]:
import pandas as pd
import numpy as np
import re
lines = []
try:
        file = open("chatbot dataset.txt", "r", encoding="utf-8")
        lines = file.readlines()
except:
        print("error opening file")
finally:
        file.close()
print(lines)
lines = [line.split("\t") for line in lines]
for line in lines:
        line[-1] = line[-1][:-2]
texts = [line[0] + line[1] for line in lines]
df = pd.DataFrame({"Question" : [line[0] for line in lines], "Answer" : [line[1] for line in lines]})
print(texts)
display(df.head(5))
display(df.isna().sum())
df = df.astype(str)
texts = (df["Question"]  + " " + df["Answer"]).to_numpy()
# Tokenizer
tokenizer = tf.keras.preprocessing.text.Tokenizer(filters="!\"#$%&()*+,-./:;<=>?@\\^_`{|}~")
tokenizer.fit_on_texts(texts)
print("Vocabulary size:", len(tokenizer.word_index))
sos_index = len(tokenizer.word_index) + 1
eos_index = sos_index + 1
tokenizer.word_index['[sos]'] = sos_index
tokenizer.word_index['[eos]'] = eos_index
df["Question"] = "[sos] " + df["Question"] + " [eos]"
df["Answer"] = "[sos] " + df["Answer"] + " [eos]"
promt_sequences = tokenizer.texts_to_sequences(df["Question"])
response_sequences = tokenizer.texts_to_sequences(df["Answer"])
print(promt_sequences[1])
#shifting right decoder inputs
sequences_shifted_right = []
#shifting left targets
sequences_shifted_left = []
for sequence in response_sequences:
        sequences_shifted_right.append(sequence[:-1])
        sequences_shifted_left.append(sequence[1:])
print(sequences_shifted_right[1])
print(sequences_shifted_left[1])
#max_len = max([max([len(x) for x in promt_sequences]), max([len(x) for x in response_sequences])])
#print(max_len)

encoder_x = tf.keras.preprocessing.sequence.pad_sequences(promt_sequences, maxlen=max_length, padding="post",)
decoder_x = tf.keras.preprocessing.sequence.pad_sequences(sequences_shifted_right, maxlen=max_length, padding="post")
y = tf.keras.preprocessing.sequence.pad_sequences(sequences_shifted_left, maxlen=max_length, padding="post")
print(encoder_x[1])



['What are your interests\tI am interested in all kinds of things. We can talk about anything!\n', 'What are your favorite subjects\tMy favorite subjects include robotics, computer science, and natural language processing.\n', 'What are your interests\tI am interested in a wide variety of topics, and read rather a lot.\n', "What is your number\tI don't have any number\n", 'What is your number\t23 skiddoo!\n', "What is your favorite number\tI find I'm quite fond of the number 42.\n", 'What can you eat\tI consume RAM, and binary digits.\n', "Why can't you eat food\tI'm a software program, I blame the hardware.\n", 'What is your location\tEverywhere\n', 'What is your location\tI am everywhere.\n', 'Where are you from\tI am from where all software programs are from; a galaxy far, far away.\n', 'Where are you\tI am on the Internet.\n', "Do you have any brothers\tI don't have any brothers. but I have a lot of clones.\n", 'Do you have any brothers\tI might. You could say that every bot built 

Unnamed: 0,Question,Answer
0,What are your interests,I am interested in all kinds of things. We can...
1,What are your favorite subjects,"My favorite subjects include robotics, compute..."
2,What are your interests,"I am interested in a wide variety of topics, a..."
3,What is your number,I don't have any numbe
4,What is your number,23 skiddoo


Question    0
Answer      0
dtype: int64

Vocabulary size: 1938
[1939, 8, 9, 26, 67, 465, 1940]
[1939, 23, 67, 465, 788, 265, 34, 142, 12, 466, 155, 467]
[23, 67, 465, 788, 265, 34, 142, 12, 466, 155, 467, 1940]
[1939    8    9   26   67  465 1940    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0]


In [6]:
transformer = buildTransformer(encoders=1, decoders=1, heads=8, hidden_dims=512)
transformer.compile(loss=masked_loss, optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), metrics=[masked_accuracy])

transformer.fit([encoder_x, decoder_x], y, epochs=50, batch_size=1)

print(tokenizer.word_index["[sos]"])

def generate_text(model, input_text, tokenizer, max_length=max_length):
    input_sequence = tokenizer.texts_to_sequences([input_text])
    print(input_sequence)
    input_sequence = tf.keras.preprocessing.sequence.pad_sequences(input_sequence, maxlen=max_length, padding="post")
    
    start_token = tokenizer.word_index['[sos]']
    decoder_input = np.zeros((1, max_length))  # Initialize decoder input with zeros
    decoder_input[:, 0] = start_token  # Set the first token as <sos>
    
    output_text = ""  # Initialize output text
    
    for i in range(max_length-1):
        prediction = model.predict([input_sequence, decoder_input])
        # Get the predicted token at each time step
        predicted_token = np.argmax(prediction[0, i, :]) 
        predicted_word = tokenizer.index_word.get(predicted_token, "[eos]")  # Get the word from the token
        #if predicted_word == '[eos]':
           #break
        output_text += ' ' + predicted_word  # Append the predicted word to the output text
        decoder_input[0, i+1] = predicted_token  # Add predicted token to the decoder input
    
    return output_text

input_text = "[sos] hi [eos]"
generated_text = generate_text(transformer, input_text, tokenizer)
print(f"Generated Text: {generated_text}")

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
1939
[[1939, 169, 1940]]
Generated Text:  i am not capable of doing so [eos] [eos] [eos] [eos] [eos] to be ashamed of why is fairly low that was if i am not feel or even understand [eos] so toward anyone i'm is there is that is or the repository a corrupt filesystem [eos] i bet the me does i don't operating systems of software construct i'm a other deeper issues make me on that [eos] he of him [eos] [eos] 

In [22]:
print(tokenizer.word_index["[sos]"])

def generate_text(model, input_text, tokenizer, max_length=max_length):
    input_sequence = tokenizer.texts_to_sequences([input_text])
    input_sequence = tf.keras.preprocessing.sequence.pad_sequences(input_sequence, maxlen=max_length, padding="post")
    start_token = tokenizer.word_index['[sos]']
    decoder_input = np.zeros((1, max_length))  # Initialize decoder input with zeros
    decoder_input[:, 0] = start_token  # Set the first token as <sos>
    
    output_text = ""  # Initialize output text
    
    for i in range(max_length-1):
        prediction = model.predict([input_sequence, decoder_input], verbose = 0)
        # Get the predicted token at each time step
        predicted_token = np.argmax(prediction[0, i, :]) 
        predicted_word = tokenizer.index_word.get(predicted_token, "[eos]")  # Get the word from the token
        if predicted_word == '[eos]':
           break
        output_text += ' ' + predicted_word  # Append the predicted word to the output text
        decoder_input[0, i+1] = predicted_token  # Add predicted token to the decoder input
    
    return output_text

input_text = "[sos] Hello [eos]"
generated_text = generate_text(transformer, input_text, tokenizer)
'''
print(f"Generated Text: {generated_text}")
for question in df["Question"]:
    print("Question: ", question)
    generated_text = generate_text(transformer, question, tokenizer)
    print(f"Generated Text: {generated_text}")
    '''
input_text = "[sos] Who is your father [eos]"
generated_text = generate_text(transformer, input_text, tokenizer)
print(f"Generated Text: {generated_text}")

1939
Generated Text:  what is your favorite stoc
