<a href="https://colab.research.google.com/github/niyeldeii/Transformers/blob/main/Transformers_from_Scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

First we have to import the required dependencies which in this case is mostly tensorflow and numpy

In [2]:
import tensorflow as tf
import numpy as np

Next we implement the Positional Encoding algorithm because transformers process the entire sequence at once unlike RNNs which are sequential

In [23]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, position, d_model):
        super().__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)

    def positional_encoding(self, position, d_model):
        angles = self._get_angles(np.arange(position)[:, np.newaxis],
                                np.arange(d_model)[np.newaxis, :],
                                d_model)

        # apply sin to even indices
        angles[:, 0::2] = np.sin(angles[:, 0::2])
        # apply cos to odd indices
        angles[:, 1::2] = np.cos(angles[:, 1::2])

        pos_encoding = angles[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def _get_angles(self, pos, i, d_model):
        angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
        return pos * angle_rates

    def call(self, x):
        return x + self.pos_encoding[:, :tf.shape(x)[1], :]


The core of the attention mechanism in the Transformer is the Scaled Dot-Product Attention

In [24]:
def scaled_dot_product_attention(query, key, value):
    matmul_qk = tf.matmul(query, key, transpose_b=True)

    # Scale matmul_qk
    dk = tf.cast(tf.shape(key)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # Softmax is normalized on the last axis (seq_len_k)
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

    output = tf.matmul(attention_weights, value)
    return output


Multi Head Attention layer applied multiple attention to different parts of the sequence, and then concatenate the output

In [25]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention = scaled_dot_product_attention(q, k, v)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        output = self.dense(concat_attention)
        return output


Now let's apply the Multi-Head Attention followed by a Feed-Forward Networks, Layer Normalization and Dropout to build the Transformer Block

In [26]:
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model)
        ])

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training):
        # Self Attention
        attn_output = self.mha(x, x, x)  # Self attention
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        # Feed Forward Network
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2


Now the the full transformer model
integrating all the previous components

In [27]:
class TextClassifier(tf.keras.Model):
    def __init__(self, vocab_size, max_length, d_model, num_heads, dff, num_layers=2):
        super().__init__()

        self.d_model = d_model
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(max_length, d_model)

        self.transformer_blocks = [
            TransformerBlock(d_model, num_heads, dff)
            for _ in range(num_layers)
        ]

        self.pool = tf.keras.layers.GlobalAveragePooling1D()
        self.dropout = tf.keras.layers.Dropout(0.1)
        self.final_layer = tf.keras.layers.Dense(2)  # 2 classes

    def call(self, x, training=False):  # Make training default to False
        # Embedding
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.pos_encoding(x)

        x = self.dropout(x, training=training)

        # Transformer blocks (encoder layers)
        for transformer_block in self.transformer_blocks:
            x = transformer_block(x, training=training)  # Pass training explicitly

        # Pooling and classification
        x = self.pool(x)
        x = self.dropout(x, training=training)
        return self.final_layer(x)


Now let's do a simple text classification task for album reviews

In [28]:
def prepare_data():
    text = [
        'Solid Album Kendrick is Amazing',
        'I can not believe he dropped this trash in 2024',
        'Great Album, Album of the Year',
        'worst album ever',
        'his career is washed without Drake',
        'He should have kept this one',
        'This is going to be a classic',
        'He really is the greatest'
    ]
    labels = [1, 0, 1, 0, 0, 0, 1, 1]

    # Build vocabulary
    words = set()
    for line in text:
        words.update(line.split())
    vocab = sorted(words)
    word_index = {word: index + 1 for index, word in enumerate(vocab)}

    # Determine max_length
    max_length = max([len(line.split()) for line in text])

    # Convert text to padded sequences
    sequences = []
    for line in text:
        seq = [word_index[word] for word in line.split()]
        seq += [0] * (max_length - len(seq))  # Pad sequences with 0s
        sequences.append(seq)

    # Return sequences, labels, vocab_size, and max_length
    return np.array(sequences), np.array(labels), len(vocab) + 1, max_length

def train_model():

  sequences, labels, vocab_size, max_length = prepare_data()
  print(f"Data shapes: {sequences.shape}, {labels.shape}")
  print(f"Vocabulary size: {vocab_size}")


  d_model = 32
  num_heads = 2
  dff = 64


  model = classifier(
        vocab_size=vocab_size,
        max_length=max_length,
        d_model=d_model,
        num_heads=num_heads,
        dff=dff
    )

    # Compile
  model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

    # Train
  history = model.fit(
        sequences,
        labels,
        epochs=20,
        batch_size=2,
        validation_split=0.2,
        verbose=1
    )

  return model, history

# Start training
print("Starting training...")
model, history = train_model()

# Prediction
def predict(text, model):
    # Process text similarly to training data
    sequence = np.array([[1, 2, 0, 0]])  # Example of padding
    prediction = model(sequence)
    return prediction



Starting training...
Data shapes: (8, 10), (8,)
Vocabulary size: 42
Epoch 1/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 222ms/step - accuracy: 0.3542 - loss: 1.4156 - val_accuracy: 0.0000e+00 - val_loss: 0.8274
Epoch 2/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step - accuracy: 0.5000 - loss: 0.7338 - val_accuracy: 0.0000e+00 - val_loss: 1.8172
Epoch 3/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.7083 - loss: 0.8598 - val_accuracy: 0.0000e+00 - val_loss: 1.9652
Epoch 4/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.5833 - loss: 0.7226 - val_accuracy: 0.0000e+00 - val_loss: 1.6843
Epoch 5/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step - accuracy: 0.3750 - loss: 0.8860 - val_accuracy: 0.0000e+00 - val_loss: 1.4708
Epoch 6/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.8333 - loss: 0.4509

In [47]:
def preprocess_text(text, word_index, max_length):
    # Tokenize and convert text to a sequence of integers
    sequence = [word_index.get(word, 0) for word in text.split()]
    # Pad the sequence to match the max_length
    sequence += [0] * (max_length - len(sequence))
    return np.array([sequence])  # Return as a batch

def predict_sentiment(text, model, word_index, max_length):
    # Preprocess the text
    sequence = preprocess_text(text, word_index, max_length)
    # Get the model prediction
    logits = model(sequence, training=False)  # Inference mode
    probabilities = tf.nn.softmax(logits).numpy()
    predicted_class = np.argmax(probabilities, axis=-1)[0]
    confidence = probabilities[0, predicted_class]

    return predicted_class, confidence

# Prepare the vocabulary and maximum length
sequences, labels, vocab_size, max_length = prepare_data()

# Word index is created in prepare_data, replicate here
text = [
    'Solid Album Kendrick is Amazing',
    'I can not believe he dropped this trash in 2024',
    'Great Album, Album of the Year',
    'worst album ever',
    'his career is washed without Drake',
    'He should have kept this one',
    'This is going to be a classic',
    'He really is the greatest'
]
words = set()
for line in text:
    words.update(line.split())
vocab = sorted(words)
word_index = {word: index + 1 for index, word in enumerate(vocab)}

# Example text to predict
new_text = " masterpiece"
predicted_class, confidence = predict_sentiment(new_text, model, word_index, max_length)

print(f"Predicted Class: {'Positive' if predicted_class == 1 else 'Negative'}, Confidence: {confidence:.2f}")


Predicted Class: Negative, Confidence: 0.66
