In [1]:
#Algorithm
## Step 1:
## -> We need to compute the relevancy score of each and every word vector with
##    every other word in the sentence. This will be our attention scores. The
##    scores are obtained by the dot product of the word vectors. Then the scores
##    will go through a scaling function and a softmax function.

## Step 2:
# -> We compute the sum of all word vectors in the sentence weighted by our relevancy
#    scores. The resulting vector is our new representation for a specific word

import numpy as np
from tensorflow.keras.activations import softmax
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

def self_attention(input_sequence):
    output = np.zeros(shape=input_sequence.shape)
    for i, pivot_vector in enumerate(input_sequence):
        scores = np.zeros(shape=(len(input_sequence),))
        for j, vector in enumerate(input_sequence):
            scores[j] = np.dot(pivot_vector, vector.T)
        scores /= np.sqrt(input_sequence.shape[1])
        scores = softmax(scores)
        new_pivot_vector = np.zeros(shape=(pivot_vector.shape,))
        for j, vector in enumerate(input_sequence):
            new_pivot_vector += vector * scores[j]
        output[i] = new_pivot_vector
    return output

In [2]:
import pathlib

base_dir = pathlib.Path('../../n-grams_with_tf-idf/resources/aclImdb')

batch_size = 32
train_ds = tf.keras.utils.text_dataset_from_directory(base_dir/'train',
                                                      batch_size=batch_size)
val_ds = tf.keras.utils.text_dataset_from_directory(base_dir/'val',
                                                    batch_size=batch_size)
test_ds = tf.keras.utils.text_dataset_from_directory(base_dir/'test',
                                                     batch_size=batch_size)


Found 20000 files belonging to 2 classes.
Found 5000 files belonging to 2 classes.
Found 25000 files belonging to 2 classes.


In [3]:
from keras.layers import TextVectorization

max_tokens = 20000
max_length = 600
text_vectorization = TextVectorization(max_tokens=max_tokens, output_mode='int',
                                       output_sequence_length=max_length)
text_only_train_ds = train_ds.map(lambda x, y: x)
text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(lambda x, y : (text_vectorization(x), y),
                            num_parallel_calls=4)
int_val_ds = val_ds.map(lambda x, y : (text_vectorization(x), y),
                        num_parallel_calls=4)
int_test_ds = test_ds.map(lambda x, y : (text_vectorization(x), y),
                          num_parallel_calls=4)

In [4]:
# Implementing a transformer encoder

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation='relu'),
             layers.Dense(embed_dim, )]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            'embed_dim' : self.embed_dim,
            'dense_dim' : self.dense_dim,
            'num_heads' : self.num_heads
        })
        return config


In [None]:
vocab_size = 20000
embed_dim = 256
num_heads = 4
dense_dim = 32

inputs = keras.Input(shape=(None, ), dtype='int64')
x = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPool1D()(x)
x = layers.Dropout(0.5)(x)
output = layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(inputs, output)
model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

callback_list = [
    keras.callbacks.ModelCheckpoint(
        filepath='transformer_encoder.keras',
        save_best_only=True,
    ),
]

model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callback_list)

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, None)]            0         
                                                                 
 embedding_2 (Embedding)     (None, None, 256)         5120000   
                                                                 
 transformer_encoder_2 (Tran  (None, None, 256)        1069600   
 sformerEncoder)                                                 
                                                                 
 global_max_pooling1d (Globa  (None, 256)              0         
 lMaxPooling1D)                                                  
                                                                 
 dropout (Dropout)           (None, 256)               0         
                                                                 
 dense_6 (Dense)             (None, 1)                 257   

In [5]:
model = keras.models.load_model('transformer_encoder.keras',
                                custom_objects={'TransformerEncoder':TransformerEncoder})
print(f'Test accuracy is: {model.evaluate(int_test_ds)[1]:.3%}')

Test accuracy is: 88.064%


In [11]:
# Positional embedding
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim=input_dim, output_dim=output_dim)
        self.positions_embedding = layers.Embedding(input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs, *args, **kwargs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.positions_embedding(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            'output_dim' : self.output_dim,
            'sequence_length' : self.sequence_length,
            'input_dim' : self.input_dim
        })
        return config

In [13]:
sequence_length = 600
vocab_size = 20000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None, ), dtype='int64')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

callback_list = [
    keras.callbacks.ModelCheckpoint(
        filepath='complete_transformer_encoder.keras',
        save_best_only=True,
    ),
]

model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callback_list)

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, None)]            0         
                                                                 
 positional_embedding_1 (Pos  (None, None, 256)        5273600   
 itionalEmbedding)                                               
                                                                 
 transformer_encoder_1 (Tran  (None, None, 256)        543776    
 sformerEncoder)                                                 
                                                                 
 global_max_pooling1d_1 (Glo  (None, 256)              0         
 balMaxPooling1D)                                                
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                           

<keras.callbacks.History at 0x24f071b6ac0>

In [14]:
model = keras.models.load_model('complete_transformer_encoder.keras',
                                custom_objects={'TransformerEncoder':TransformerEncoder,
                                                'PositionalEmbedding':PositionalEmbedding})
print(f'Test accuracy is: {model.evaluate(int_test_ds)[1]:.3%}')

Test accuracy is: 87.292%
