In [1]:
import numpy as np
import tensorflow as tf

In [65]:
def scaled_dot_product(q, k, v, mask = None):
    d_k = q.shape[-1]
    scaled = tf.matmul(q, tf.transpose(k, perm = [0,1,3,2])) / np.sqrt(d_k)
    if mask is not None:
        scaled += mask
    attention = tf.keras.activations.softmax(scaled, axis=-1)
    values = tf.matmul(attention, v)
    return attention, values

In [102]:
class MultiheadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = tf.keras.layers.Dense(3 * d_model)
        self.linear_layer = tf.keras.layers.Dense(d_model)

    def call(self, x, mask = None):
        batch_size, sequence_length, d_model = x.shape
        qkv = self.qkv_layer(x)
        print(qkv.shape)
        print(self.num_heads, self.head_dim)
        qkv = tf.reshape(qkv, [batch_size, sequence_length, self.num_heads, self.head_dim * 3])
        qkv = tf.transpose(qkv, perm=[0,2,1,3])
        q, k, v = tf.split(qkv, num_or_size_splits=3, axis=-1)
        attention, values = scaled_dot_product(q, k, v)
        values = tf.reshape(values, [batch_size, sequence_length, self.num_heads * self.head_dim])
        out = self.linear_layer(values)
        return out        

In [104]:
class LayerNormalization(tf.keras.layers.Layer):
    def __init__(self, parameters_shape, eps = 1e-5):
        super().__init__()
        self.parameters_shape = parameters_shape
        self.eps = eps
        self.gamma = tf.Variable(tf.ones(parameters_shape))
        self.beta = tf.Variable(tf.zeros(parameters_shape))

    def call(self, x):
        dims = [-(i+1) for i in range(len(self.parameters_shape))]
        mean = tf.reduce_mean(x, axis=dims, keepdims=True)
        var = tf.reduce_mean((x - mean)**2, axis = dims, keepdims=True)
        std = tf.sqrt(var + self.eps)
        y = (x - mean) / std
        out = self.gamma * y + self.beta
        return out

In [105]:
class PositionwiseFeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, hidden, drop_prob):
        super().__init__()
        self.linear1 = tf.keras.layers.Dense(hidden, activation='relu')
        self.linear2 = tf.keras.layers.Dense(d_model)
        self.dropout = tf.keras.layers.Dropout(drop_prob)

    def call(self, x):
        x = self.linear1(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [106]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super().__init__()
        self.attention = MultiheadAttention(d_model = d_model, num_heads = num_heads)
        self.norm1 = LayerNormalization(parameters_shape = [d_model])
        self.dropout1 = tf.keras.layers.Dropout(drop_prob)
        self.ffn = PositionwiseFeedForward(d_model = d_model, hidden = ffn_hidden, drop_prob = drop_prob)
        self.norm2 = LayerNormalization(parameters_shape = [d_model])
        self.dropout2 = tf.keras.layers.Dropout(drop_prob)

    def call(self, x):
        residual_x = x
        print("--------------ATTENTION 1----------------")
        x = self.attention(x)
        print("--------------DROPOUT 1------------------")
        x = self.dropout1(x)
        print("--------------ADD and LAYER NORM---------")
        x = self.norm1(x + residual_x)
        residual_x = x
        print("--------------ATTENTION 2----------------")
        x = self.ffn(x)
        print("--------------DROPOUT 2------------------")
        x = self.dropout2(x)
        print("--------------ADD and LAYER NORM---------")
        x = self.norm2(x + residual_x)
        return x

In [116]:
class Encoder(tf.keras.Model):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.num_layers = num_layers
        self.encoder_layers = tf.keras.Sequential([EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def call(self, x):
        for i in range(self.num_layers):
            encoder_layer = self.encoder_layers.layers[i]
            print(f"\n--------------Layer {i+1}----------------\n")
            x = encoder_layer(x)
        return x


In [117]:
sequence_length = 200
batch_size = 30
d_model = 512
num_heads = 8
drop_prob = 0.1
ffn_hidden = 2048
num_layers = 5

encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)

In [118]:
x = tf.random.normal((batch_size, sequence_length,d_model))
out = encoder(x)


--------------Layer 1----------------

--------------ATTENTION 1----------------
(30, 200, 1536)
8 64
--------------DROPOUT 1------------------
--------------ADD and LAYER NORM---------
--------------ATTENTION 2----------------
--------------DROPOUT 2------------------
--------------ADD and LAYER NORM---------

--------------Layer 2----------------

--------------ATTENTION 1----------------
(30, 200, 1536)
8 64
--------------DROPOUT 1------------------
--------------ADD and LAYER NORM---------
--------------ATTENTION 2----------------
--------------DROPOUT 2------------------
--------------ADD and LAYER NORM---------

--------------Layer 3----------------

--------------ATTENTION 1----------------
(30, 200, 1536)
8 64
--------------DROPOUT 1------------------
--------------ADD and LAYER NORM---------
--------------ATTENTION 2----------------
--------------DROPOUT 2------------------
--------------ADD and LAYER NORM---------

--------------Layer 4----------------

--------------ATTENTI