In [1]:
import tensorflow as tf
from tensorflow.keras import layers , models
import math
import numpy as np

In [80]:
class selfAttention(tf.keras.layers.Layer):
    def __init__(self , dimentions):
        super(selfAttention , self).__init__()
        self.d = dimentions
    
    def build(self, input_shape):
        self.Wq = self.add_weight(shape = (input_shape[-1] , self.d) , initializer = 'glorot_uniform' , trainable = True , dtype = 'float32')
        self.Wk = self.add_weight(shape = (input_shape[-1] , self.d) , initializer = 'glorot_uniform' , trainable = True , dtype = 'float32')
        self.Wv = self.add_weight(shape = (input_shape[-1] , self.d) , initializer = 'glorot_uniform' , trainable = True , dtype = 'float32')
    
    def call(self, q_x, k_x, v_x, mask=None):
        # Computing query, key and value
        q = tf.matmul(q_x,self.Wq) #[None, t, d]
        k = tf.matmul(k_x,self.Wk) #[None, t, d]
        v = tf.matmul(v_x,self.Wv) #[None, t, d]
        
        # Computing the probability matrix
        p = tf.matmul(q, k, transpose_b=True)/math.sqrt(self.d) # [None, t, t]
                
        if mask is None:
            p = tf.nn.softmax(p)
        else:
            # Creating the mask
            p += mask * -1e9
            p = tf.nn.softmax(p)
                
        # Computing the final output
        h = tf.matmul(p, v) # [None, t, t] . [None, t, d] => [None, t, d]
        return h,p
n_seq = 7
x = tf.constant(np.random.normal(size=(1,n_seq,512)), dtype='float32')
layer = selfAttention(512)
mask = 1 - tf.linalg.band_part(tf.ones((7, 7)), -1, 0)
h, p = layer(x, x, x, mask)
print(h.shape)

(1, 7, 512)


In [81]:
class fclayer(tf.keras.layers.Layer):
    def __init__(self , d1 , d2):
        super(fclayer , self).__init__()
        self.layer1 = tf.keras.layers.Dense(d1 , activation = 'relu')
        self.layer2 = tf.keras.layers.Dense(d2)
    
    def call(self , x):
        x = self.layer1(x)
        y = self.layer2(x)
        return y

In [82]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self , d , n_heads):
        super(EncoderLayer, self).__init__()
        # Feature dimensionality
        self.d = d
        # Dimensionality of a head
        self.d_head = int(d/n_heads) 
        # Number of heads
        self.n_heads = n_heads
        # Actual attention heads
        self.attn_heads = [selfAttention(self.d_head) for i in range(self.n_heads)]
        # Fully connected layer
        self.fclayer = fclayer(2048, self.d)

    def call(self, x):
        
        def compute_multihead_output(x):
            """ Computing the multi head attention output"""
            outputs = [head(x, x, x)[0] for head in self.attn_heads]            
            outputs = tf.concat(outputs, axis=-1)
            return outputs
        
        h1 = compute_multihead_output(x)
        y = self.fclayer(h1)
        
        return y
    

In [83]:
class DecoderLayer(layers.Layer):
    """ The decoder layer """
    def __init__(self, d, n_heads):
        super(DecoderLayer, self).__init__()
        # Feature dimensionality
        self.d = d
        # Dimensionality of a single head
        self.d_head = int(d/n_heads)
        # Actual self attention heads (decoder inputs)
        self.dec_attn_heads = [selfAttention(self.d_head) for i in range(n_heads)]
        # Actual self attention heads (encoder outputs)
        self.attn_heads = [selfAttention(self.d_head) for i in range(n_heads)]
        # Fully connected layer
        self.fc_layer = fclayer(2048, self.d)
        
    def call(self, de_x, en_x, mask=None):
        
        def compute_multihead_output(attn_heads, de_x, en_x, mask=None):
            """ Computing the multi head attention output"""
            outputs = [head(en_x, en_x, de_x, mask)[0] for head in attn_heads]
            outputs = tf.concat(outputs, axis=-1)
            return outputs
        
        # Multi head attention layer output (from decoder inputs)
        h1 = compute_multihead_output(self.dec_attn_heads, de_x, de_x, mask)        
        # Multi head attention layer output (from encoder outputs)
        h2 = compute_multihead_output(self.attn_heads, h1, en_x)
        y = self.fc_layer(h2)
        return y

In [84]:
n_steps = 25
env = 300
dv = 400
n_head = 8
d = 512
mask = 1 - tf.linalg.band_part(tf.ones((n_steps , n_steps)), -1, 0)

In [85]:
en_inp = layers.Input(shape = (n_steps,))
en_emb = layers.Embedding(env , d, input_length = n_steps)(en_inp)
eno1 = EncoderLayer(d , n_head)(en_emb)
en02 = EncoderLayer(d , n_head)(eno1)

de_inp = layers.Input(shape=(n_steps,))
de_emb = layers.Embedding(dv, 512, input_length=n_steps)(de_inp)
de_out1 = DecoderLayer(d, n_head)(de_emb, en02, mask)
de_out2 = DecoderLayer(d, n_head)(de_out1, en02, mask)
de_pred = layers.Dense(dv, activation='softmax')(de_out2) 



In [86]:
transformer = models.Model(
    inputs=[en_inp, de_inp], outputs=de_pred, name='MinTransformer'
)
transformer.compile(
    loss='categorical_crossentropy', optimizer='adam', metrics=['acc']
)
transformer.summary()

Rough Cells


In [48]:
nseq = 7
x = tf.constant(np.random.normal(size = (1 , nseq , 512)))
mask = 1 - tf.linalg.band_part(tf.ones((7,7)) , -1 , 0)
x.shape

TensorShape([1, 7, 512])

In [49]:
layer = selfAttention(512)
p , h = layer(x, x, x, mask)
print(p.numpy())

[[0.         0.         0.         0.         0.         1.
  0.        ]
 [0.         0.         0.         0.         0.         0.
  1.        ]
 [0.         0.         0.         1.         0.         0.
  0.        ]
 [0.         0.         0.         0.         1.         0.
  0.        ]
 [0.         0.         0.         0.         0.         0.
  1.        ]
 [0.         0.         0.         0.         0.         0.
  1.        ]
 [0.14285715 0.14285715 0.14285715 0.14285715 0.14285715 0.14285715
  0.14285715]]


In [50]:
ff = fclayer(2048, 512)(h)
print(ff.shape)

(1, 7, 512)
