Step-1 : Importing Libraries

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np

Step 2: Implement Positional Encoding

In [None]:
def get_positional_encoding(seq_len, d_model):
    """
    Adds positional information to token embeddings (since Transformers have no recurrence).

    Args:
        seq_len (int): Length of the input sequence (e.g., 100 tokens)
        d_model (int): Embedding dimension (e.g., 128)

    Returns:
        pos_encoding (Tensor): Shape (1, seq_len, d_model)
    """
    positions = np.arange(seq_len)[:, np.newaxis]  # (seq_len, 1)
    i = np.arange(d_model)[np.newaxis, :]         # (1, d_model)

    # Compute angles for sine/cosine
    angles = positions / np.power(10000, (2 * (i // 2)) / np.float32(d_model))

    # Apply sin to even indices, cos to odd indices
    pos_encoding = np.zeros_like(angles)
    pos_encoding[:, 0::2] = np.sin(angles[:, 0::2])  # even
    pos_encoding[:, 1::2] = np.cos(angles[:, 1::2])  # odd

    return tf.cast(pos_encoding[np.newaxis, ...], dtype=tf.float32)  # (1, seq_len, d_model)

Step 3: Build Multi-Head Self-Attention Layer

In [None]:
class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(f"embed_dim ({embed_dim}) must be divisible by num_heads ({num_heads})")

        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        """Scaled dot-product attention."""
        score = tf.matmul(query, key, transpose_b=True)  # (batch, heads, seq, seq)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)  # (batch, heads, seq, depth)
        return output, weights

    def separate_heads(self, x, batch_size):
        """Split embedding dim into num_heads."""
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])  # (batch, heads, seq, depth)

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]

        # Linear projections
        query = self.query_dense(inputs)  # (batch, seq, embed_dim)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)

        # Split into heads
        query = self.separate_heads(query, batch_size)
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)

        # Apply attention
        attention_output, _ = self.attention(query, key, value)

        # Recombine heads
        attention_output = tf.transpose(attention_output, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(attention_output, (batch_size, -1, self.embed_dim))

        # Final linear layer
        output = self.combine_heads(concat_attention)
        return output

Step 4: Build Transformer Encoder Block

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential([
            layers.Dense(ff_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        # Self-attention + residual connection
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)

        # Feed-forward + residual connection
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

Step 5: Assemble Full Transformer Model

In [None]:
class TransformerModel(Model):
    def __init__(self, vocab_size, embed_dim, num_heads, ff_dim, max_seq_len, num_classes, num_blocks=2):
        super(TransformerModel, self).__init__()
        self.embed_dim = embed_dim
        self.max_seq_len = max_seq_len

        # Embedding + positional encoding
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_enc = get_positional_encoding(max_seq_len, embed_dim)

        # Transformer blocks
        self.transformer_blocks = [
            TransformerBlock(embed_dim, num_heads, ff_dim) for _ in range(num_blocks)
        ]

        # Classifier head
        self.global_pool = layers.GlobalAveragePooling1D()
        self.dropout = layers.Dropout(0.1)
        self.classifier = layers.Dense(num_classes, activation="softmax")

    def call(self, inputs, training=False):
        # Embed tokens
        x = self.token_emb(inputs)  # (batch, seq_len, embed_dim)
        x *= tf.math.sqrt(tf.cast(self.embed_dim, tf.float32))  # Scale embeddings

        # Add positional encoding
        x += self.pos_enc[:, :tf.shape(inputs)[1], :]

        # Apply Transformer blocks
        for block in self.transformer_blocks:
            x = block(x, training)

        # Pool and classify
        x = self.global_pool(x)
        x = self.dropout(x, training=training)
        return self.classifier(x)

Step 6: Instantiate and Compile Model

In [None]:
# Configuration
VOCAB_SIZE = 10000    # e.g., unique words in your logs/docs
MAX_SEQ_LEN = 100     # Max tokens per sequence
EMBED_DIM = 128       # Embedding size
NUM_HEADS = 8         # Attention heads
FF_DIM = 128          # Feed-forward hidden size
NUM_CLASSES = 2       # e.g., [normal, anomaly]

# Create model
model = TransformerModel(
    vocab_size=VOCAB_SIZE,
    embed_dim=EMBED_DIM,
    num_heads=NUM_HEADS,
    ff_dim=FF_DIM,
    max_seq_len=MAX_SEQ_LEN,
    num_classes=NUM_CLASSES,
    num_blocks=2
)

# Compile
model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

model.summary()