In [12]:
import tensorflow as tf
import numpy as np

In [13]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

In [14]:
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)

    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

In [4]:
def normalized_dot_product_attention(Q, K, V, mask):
    """
    Calculates the dot product attention weights
    and returns a matrix Z, which is the same size as 
    Q, K, and V.
    
    Parameters:
    
    Q - The result of applying Wq to X. 
      - Shape (..., sequence_len, dim_Wq)
    K - The result of applying Wk to X. 
      - Shape (..., sequence_len, dim_Wk)
    V - The result of applying Wv to X. 
      - Shape (..., sequence_len, dim_Wv)
    
    Returns:
    
    Z - The matrix created by applying the scaled attention 
            weights to the V matrix.
      - Shape (..., sequence_len, model_dim)
      
    """
    #Normalizing Q
    Q = tf.divide(Q, tf.norm(Q, axis=-1, keepdims=True))
    
    #Normalizing K
    K = tf.divide(K, tf.norm(K, axis=-1, keepdims=True))
    
    #compute the dot product of all query and key vectors (b/c they are normalized 0 >= values <=1
    attention_logits = tf.matmul(Q, K, transpose_b=True)
    
    attention_logits *= 1e2
        
    if mask is not None:
        attention_logits += (mask * -1e9)
    #apply softmax to find the weights
    attention_weights = tf.nn.softmax(attention_logits, axis=-1)
    #multiply the weights by the Value matrix
    Z = tf.matmul(attention_weights, V)
    
    return Z, attention_weights

In [5]:
class MHAttention(tf.keras.layers.Layer):
    
    def __init__(self, num_heads, embedding_dim):
        super(MHAttention, self).__init__()
        assert embedding_dim % num_heads == 0

        self.head_dim = embedding_dim // num_heads
        self.num_heads = num_heads
        self.embedding_dim = embedding_dim

        self.Wq = tf.keras.layers.Dense(self.embedding_dim)
        self.Wk = tf.keras.layers.Dense(self.embedding_dim)
        self.Wv = tf.keras.layers.Dense(self.embedding_dim)

        self.Wz = tf.keras.layers.Dense(self.embedding_dim)
        
    def create_heads(self, x, batch_size):
        
        return tf.reshape(tf.transpose(x), (batch_size, self.num_heads, -1, self.head_dim))
        
    def call(self, q, k, v, mask):
         
        batch_size = q.shape[0]
        
        q = self.Wq(q)
        k = self.Wk(k)
        v = self.Wv(v)
        
        q = self.create_heads(q, batch_size)
        k = self.create_heads(k, batch_size)
        v = self.create_heads(v, batch_size)
        
        z, attention_weights = normalized_dot_product_attention(q, k, v, mask)
        
        concat_z = tf.transpose(z, perm=[0, 2, 1, 3])
        
        concat_z = tf.reshape(concat_z, (batch_size, -1, self.embedding_dim))
        
        z = self.Wz(concat_z)
        
        return z, attention_weights


In [6]:
def feed_forward(embedding_dim, ff_hidden_dim):
    
    hidden_layer = tf.keras.layers.Dense(ff_hidden_dim, activation='relu') #(batch size, seq len, hidden dim)
    output_layer = tf.keras.layers.Dense(embedding_dim) #(batch size, seq len, embedding dim)
    
    return tf.keras.Sequential([
        hidden_layer,
        output_layer
    ])

In [7]:
class Encoder(tf.keras.layers.Layer):
    
    def __init__(self, embedding_dim, num_heads, ff_hidden_dim, dropout_rate=0.1):
        super(Encoder, self).__init__()
        
        self.mha = MHAttention(num_heads, embedding_dim)
        
        self.ff = feed_forward(embedding_dim, ff_hidden_dim)
        
        self.layernorm1 = tf.keras.layers.LayerNormalization()
        self.layernorm2 = tf.keras.layers.LayerNormalization()
        
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, input_tensor, training):
        
        #Sublayer 1
        mha_output, _ = self.mha(input_tensor)
        mha_output = self.dropout1(mha_output, training=training)
        sublayer_1_output = self.layernorm1(mha_output + input_tensor)
        
        #Sublayer 2
        ff_output = self.ff(sublayer_1_output)
        ff_output = self.dropout2(ff_output, training=training)
        return self.layernorm2(ff_output + sublayer_1_output)

In [8]:
class EncoderStack(tf.keras.layers.Layer):
    def __init__(self, num_encoders, embedding_dim, num_heads, ff_hidden_dim, dropout = 0.1):
        super(EncoderStack, self).__init__()
        
        self.num_encoders = num_encoders
        
        self.encoders = []
        for i in range(self.num_encoders):
            self.encoders.append(Encoder(embedding_dim, num_heads, ff_hidden_dim))
            
    def call(self, input_tensor, training):
        
        output_tensor = input_tensor
        
        for i in range(self.num_encoders):
            output_tensor = self.encoders[i](output_tensor, training)
            
        return output_tensor    

In [45]:
class TransformerEncoderStack(tf.keras.layers.Layer):
    def __init__(self, num_encoders, embedding_dim, num_heads, ff_hidden_dim, vocab_size, dropout=0.1):
        super(TransformerEncoderStack, self).__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.positional_encoding = positional_encoding(vocab_size, embedding_dim)
        
        self.encoders = EncoderStack(num_encoders, embedding_dim, num_heads, ff_hidden_dim)
        
        self.dropout = tf.keras.layers.Dropout(dropout)
        
    def call(self, input_tensor, training):
        
        print(f'input shape:{input_tensor.shape}')
        
        max_sequence_length = input_tensor.shape[1]
        
        embeddings = self.embedding(input_tensor)
        print(f'embedding_shape:{embeddings.shape}')
        
        embeddings += self.positional_encoding[:, :max_sequence_length, :]
        print(self.positional_encoding.shape)
        
        embeddings = self.dropout(embeddings, training=training)
        
        output = self.encoders(embeddings, training)
        print(embeddings.shape)
        
        return output

In [47]:
y = tf.random.uniform((5, 30)) #this would be an input of 5 sentences with a max input length of 30
tes = TransformerEncoderStack(2, 128, 4, 512, 30)
out = tes(y, False)
out.shape

input shape:(5, 30)
embedding_shape:(5, 30, 128)
(1, 30, 128)
(5, 30, 128)


TensorShape([5, 30, 128])