In [2]:
import numpy as np
import tensorflow as tf
import random
from math import inf

In [4]:
def getPositionalEncoding(model_dimension, indexes):
    positional_encodings = np.zeros((indexes.shape[0], indexes.shape[1], model_dimension))
    for i in range(indexes.shape[0]):
      for j in range(indexes.shape[-1]):
          for k in range(model_dimension//2):
              denominator = 10000**(2*k/model_dimension)
              positional_encodings[i,j,2*k] = np.sin(indexes[i,j]/denominator)
              positional_encodings[i,j,2*k+1] = np.cos(indexes[i,j]/denominator)

    return positional_encodings

In [78]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, n_heads, **kwargs):
        super(MultiHeadAttention, self).__init__(**kwargs)
        self.d_model = d_model
        self.n_heads = n_heads
        self.depth = d_model // n_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.final_dense = tf.keras.layers.Dense(d_model, activation='relu')

    def split_heads(self, x, batch_size):
        """
        input shape: batch_size x seq_lenth x d_model
        intermediate shape: batch_size x seq_length/depth x num_heads x depth
        output shape: batch_size x num_heads x seq_length/depth x depth
        """
        x = tf.reshape(x, [batch_size, -1, self.n_heads, self.depth])
        x = tf.transpose(x, perm=[0, 2, 1, 3])
        return x

    def compare_tensors(self, q, k):
        return tf.reduce_sum(tf.cast(tf.equal(q, k), dtype='int32')).numpy().tolist() == q.shape[0]*q.shape[1]*q.shape[2]

    def create_padding_mask(self, seq):
        # Identify positions where the token ID is 0 (padding)
        return tf.cast(tf.math.equal(seq, 0), tf.float32) * 1e-9
        
    def create_look_ahead_mask(self, seq):
        mask = 1 - tf.linalg.band_part(tf.ones(seq.shape), -1, 0)
        return mask  # (seq_len, seq_len)
    
    def combine_masks(self, padding_mask, look_ahead_mask):
        return tf.maximum(padding_mask, look_ahead_mask)

    def call(self, q, k, v): 
        if self.compare_tensors(q, k):
            attention_type = 'auto_attention'
        else:
            attention_type = 'cross_attention'
            
        #linear combination with inputs
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        #splitting heads for multihead attention
        batch_size = q.shape[0]
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        #calculating attention
        scale = self.d_model**0.5
        qk_product = tf.matmul(q, k, transpose_b=True) / scale

        if attention_type == 'auto_attention':
            mask = create_padding_mask(qk_product)
        elif attention_type == 'cross_attention':
            padding_mask = self.create_padding_mask(qk_product)
            look_ahead_mask = self.create_look_ahead_mask(qk_product)
            mask = self.combine_masks(padding_mask, look_ahead_mask)
            
        qk_product += mask

        qk_product = tf.nn.softmax(qk_product)

        attention_result = tf.matmul(qk_product, v)

        #return the values for the initial shape before splitting
        pre_output = tf.transpose(attention_result, perm=[0,2,1,3])
        pre_output = tf.reshape(pre_output, [batch_size, -1, self.d_model])

        #linear combination with non linear activation
        output = self.final_dense(pre_output)

        return output

In [80]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, **kwargs):
        super(Encoder, self).__init__(**kwargs)
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.norm_1 = tf.keras.layers.LayerNormalization()
        self.norm_2 = tf.keras.layers.LayerNormalization()
        self.ffn = tf.keras.Sequential([tf.keras.layers.Dense(d_model, activation='relu'),
                                        tf.keras.layers.Dropout(0.25),
                                        tf.keras.layers.Dense(d_model, activation='relu'),
                                        tf.keras.layers.Dropout(0.25)])

    def call(self, x):
        auto_attention_result = self.mha(x, x, x)
        auto_attention_result = self.norm_1(auto_attention_result + x)

        encoder_result = self.ffn(auto_attention_result)
        encoder_result = self.norm_2(auto_attention_result + encoder_result)

        return encoder_result

In [82]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, **kwargs):
        super(Decoder, self).__init__(**kwargs)
        self.masked_mha = MultiHeadAttention(d_model, num_heads)
        self.norm_1 = tf.keras.layers.LayerNormalization()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.norm_2 = tf.keras.layers.LayerNormalization()

        self.ffn = tf.keras.Sequential([tf.keras.layers.Dense(d_model, activation='relu'),
                                        tf.keras.layers.Dropout(0.25),
                                        tf.keras.layers.Dense(d_model, activation='relu'),
                                        tf.keras.layers.Dropout(0.25)])

        self.norm_3 = tf.keras.layers.LayerNormalization()

    def call(self, x, encoder_result):
        auto_attention_result = self.masked_mha(x, x, x)
        auto_attention_result = self.norm_1(auto_attention_result + x)

        cross_attention_result = self.mha(auto_attention_result, encoder_result, encoder_result)
        cross_attention_result = self.norm_2(cross_attention_result + auto_attention_result)

        ffn_result = self.ffn(cross_attention_result)
        ffn_result = self.norm_3(cross_attention_result + auto_attention_result)

        return ffn_result

In [107]:
class Transformer(tf.keras.Model):
    def __init__(self, d_model, n_heads, vocab_size, **kwargs):
        super(Transformer, self).__init__(**kwargs)
        if d_model % n_heads != 0:
            raise ValueError("Number of heads is not a divisor of the dimension model.")

        self.d_model = d_model
        self.n_heads = n_heads
        self.depth = d_model//n_heads
        self.vocab_size = vocab_size

        self.encoder = Encoder(d_model, n_heads)
        self.decoder = Decoder(d_model, n_heads)

        self.ffn = tf.keras.Sequential([tf.keras.layers.Dense(d_model, activation='relu'),
                                        tf.keras.layers.Dropout(0.25),
                                        tf.keras.layers.Dense(d_model, activation='relu'),
                                        tf.keras.layers.Dropout(0.25)],
                                        tf.keras.layers.Dense(vocab_size, activation='softmax'))

    def call(self, x):
        x_enc, x_dec = x
        
        encoder_result = self.encoder(x_enc)
        decoder_result = self.decoder(x_dec, encoder_result)

        probabilities = self.ffn(decoder_result)

        return probabilities

In [None]:
D_MODEL = 512
N_HEADS = 16
VOCAB_SIZE = 100
TEXT_SIZE = 20
BATCH_SIZE = 4
x_enc = np.array([[random.randint(0, VOCAB_SIZE) for _ in range(TEXT_SIZE)] for _ in range(BATCH_SIZE)])
x_dec = np.array([[random.randint(0, VOCAB_SIZE) for _ in range(TEXT_SIZE)] for _ in range(BATCH_SIZE)])
x_enc = getPositionalEncoding(D_MODEL, x_enc)
x_dec = getPositionalEncoding(D_MODEL, x_dec)
padding_mask = create_padding_mask(x_enc)