# **Step by step implementation of Transformer model from the scratch using Numpy**

### **Import libraries**

libraries for data handling, text preprocessing, and embedding creation

In [52]:
import numpy as np
import pandas as pd
import re
import math
from nltk.corpus import stopwords

# Ensure you have the stopwords corpus
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# **Data loading and Pre-processing**

loading the data from a CSV file and preparing it for training by creating source and target columns

In [53]:
file_path = r'/content/sample_data/en-fr.csv'
df = pd.read_csv(file_path)
df['source'] = df['English words/sentences']
df['target'] = df['French words/sentences'].apply(lambda x: '[start] ' + x + ' [end]')
df = df.drop(['English words/sentences', 'French words/sentences'], axis=1)

print(df.head(5))


  source                    target
0    Hi.      [start] Salut! [end]
1   Run!     [start] Cours ! [end]
2   Run!    [start] Courez ! [end]
3   Who?       [start] Qui ? [end]
4   Wow!  [start] Ça alors ! [end]


shuffle the data and split it into training, validation, and test sets

In [54]:
df = df.sample(frac=1).reset_index(drop=True)

In [55]:
train_size = int(len(df) * 0.7)
val_size = int(len(df) * 0.2)
test_size = int(len(df) * 0.1)

print(f"Train size: {train_size}, Val size: {val_size}, Test size: {test_size}")  # Check split sizes

train_df = df[:train_size]
val_df = df[train_size:train_size + val_size]
test_df = df[train_size + val_size:]

print(f"Train set size: {len(train_df)}, Validation set size: {len(val_df)}, Test set size: {len(test_df)}")  # Verify dataset splits

Train size: 122934, Val size: 35124, Test size: 17562
Train set size: 122934, Validation set size: 35124, Test set size: 17563


Normalize the text by converting to lowercase, removing punctuation, and removing stopwords and very short tokens

In [56]:
# Preprocess sentences to normalize text
stop_words = set(stopwords.words('english'))

def preprocess_sentence(sentence):
    # Convert to lowercase
    sentence = sentence.lower()
    # Remove punctuation (except for tokens like [start] and [end])
    sentence = re.sub(r'[^a-z0-9\s\[\]]', '', sentence)
    # Remove stop words and very short tokens
    sentence = ' '.join([word for word in sentence.split() if word not in stop_words and len(word) > 2])
    return sentence

In [57]:
# Apply preprocessing
train_df['source'] = train_df['source'].apply(preprocess_sentence)
train_df['target'] = train_df['target'].apply(preprocess_sentence)
val_df['source'] = val_df['source'].apply(preprocess_sentence)
val_df['target'] = val_df['target'].apply(preprocess_sentence)
test_df['source'] = test_df['source'].apply(preprocess_sentence)
test_df['target'] = test_df['target'].apply(preprocess_sentence)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df['source'] = train_df['source'].apply(preprocess_sentence)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df['target'] = train_df['target'].apply(preprocess_sentence)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  val_df['source'] = val_df['source'].apply(preprocess_sentence)
A value 

In [58]:
print(train_df.head())  # Verify preprocessing

                                      source  \
0                          tom might traitor   
1                                      agony   
2  would sit hours reading detective stories   
3                working report day tomorrow   
4                   dont always obey parents   

                                              target  
0              [start] tom pourrait tre tratre [end]  
1                        [start] jtais lagonie [end]  
2  [start] sassirait des heures lire des histoire...  
3  [start] travaillerai mon rapport toute journe ...  
4  [start] ils nobissent pas toujours leurs paren...  


### **Create Vocabulary**

create a simple vocabulary from the training data

In [59]:
def create_vocab(sentences):
    vocab = set()
    for sentence in sentences:
        vocab.update(sentence.split())
    special_tokens = ['[start]', '[end]', '[pad]', '[unk]']
    vocab.update(special_tokens)
    vocab = {word: idx for idx, word in enumerate(vocab)}
    return vocab

vocab = create_vocab(train_df['source'].tolist() + train_df['target'].tolist())

# **Input Embedding**

Initialize the embedding matrix with random values and create functions to get word embeddings

In [60]:
# Hyperparameters
embedding_dim = 512

def initialize_embeddings(vocab, embedding_dim):
    vocab_size = len(vocab)
    embedding_matrix = np.random.rand(vocab_size, embedding_dim)
    return embedding_matrix

embedding_matrix = initialize_embeddings(vocab, embedding_dim)


In [61]:
def get_embedding(word, vocab, embedding_matrix):
    idx = vocab.get(word, vocab['[unk]'])  # Use [unk] token for unknown words
    return embedding_matrix[idx]

# **Positional Encoding**



In [62]:
# Function to create positional encoding
def get_positional_encoding(max_len, embedding_dim):
    position = np.arange(max_len)[:, np.newaxis]
    div_term = np.exp(np.arange(0, embedding_dim, 2) * -(math.log(10000.0) / embedding_dim))
    pos_encoding = np.zeros((max_len, embedding_dim))
    pos_encoding[:, 0::2] = np.sin(position * div_term)
    pos_encoding[:, 1::2] = np.cos(position * div_term)
    return pos_encoding

### **Combine Embeddings and Positional Encodings**

combine word embeddings and positional encodings for a given sentence

In [63]:
sentence = train_df['source'].iloc[0].split()
sentence_len = len(sentence)
print(f"Sentence: {sentence}, Length: {sentence_len}")  # Verify sentence and length

sentence_embeddings = np.array([get_embedding(word, vocab, embedding_matrix) for word in sentence])
print(f"Sentence Embeddings shape: {sentence_embeddings.shape}")  # Verify embeddings shape

positional_encodings = get_positional_encoding(sentence_len, embedding_dim)
print(f"Positional Encodings shape: {positional_encodings.shape}")  # Verify positional encodings shape

# Add input embeddings and positional encodings
input_embedding_with_position = sentence_embeddings + positional_encodings[:sentence_len, :]
print(f"Combined Embedding and Positional Encoding shape: {input_embedding_with_position.shape}")
print(f"Combined Embedding and Positional Encoding:\n{input_embedding_with_position}")

Sentence: ['tom', 'might', 'traitor'], Length: 3
Sentence Embeddings shape: (3, 512)
Positional Encodings shape: (3, 512)
Combined Embedding and Positional Encoding shape: (3, 512)
Combined Embedding and Positional Encoding:
[[ 0.91440777  1.97063465  0.40761718 ...  1.06115982  0.1081431
   1.46614788]
 [ 1.20990732  0.62919128  1.42647248 ...  1.87951286  0.77585246
   1.75267445]
 [ 1.23112107 -0.23051104  1.82407744 ...  1.52251075  0.37818941
   1.05917547]]


# **Encoder**

Layer normalization layer

In [64]:
def layer_normalization(x, epsilon=1e-6):
    mean = np.mean(x, axis=-1, keepdims=True)
    variance = np.var(x, axis=-1, keepdims=True)
    normalized_x = (x - mean) / np.sqrt(variance + epsilon)
    return normalized_x

Multi-head Self Attention

In [65]:
def multi_head_attention(query, key, value, num_heads):
    d_k = query.shape[-1]  # Dimensionality of the key/query/value
    assert d_k % num_heads == 0
    depth = d_k // num_heads  # Depth of each head

    # Split the embedding into multiple heads
    def split_heads(x):
        seq_len, d_model = x.shape
        new_shape = (seq_len, num_heads, depth)
        x = x.reshape(new_shape)
        x = x.transpose(1, 0, 2)  # (num_heads, seq_len, depth)
        return x

    query = split_heads(query)
    key = split_heads(key)
    value = split_heads(value)

    # Scaled Dot-Product Attention
    def scaled_dot_product_attention(q, k, v):
        matmul_qk = np.matmul(q, k.transpose((0, 2, 1)))  # (num_heads, seq_len, seq_len)
        dk = q.shape[-1]
        scaled_attention_logits = matmul_qk / np.sqrt(dk)
        attention_weights = np.exp(scaled_attention_logits - np.max(scaled_attention_logits, axis=-1, keepdims=True))
        attention_weights /= np.sum(attention_weights, axis=-1, keepdims=True)
        output = np.matmul(attention_weights, v)
        return output

    attention_output = scaled_dot_product_attention(query, key, value)

    # Concatenate heads
    def concatenate_heads(x):
        num_heads, seq_len, depth = x.shape
        new_shape = (seq_len, num_heads * depth)
        x = x.transpose(1, 0, 2).reshape(new_shape)
        return x

    attention_output = concatenate_heads(attention_output)
    return attention_output

Feed Forward Network

In [66]:
def feed_forward_network(x, d_ff, embedding_dim):
    W1 = np.random.randn(embedding_dim, d_ff)
    b1 = np.random.randn(d_ff)
    W2 = np.random.randn(d_ff, embedding_dim)
    b2 = np.random.randn(embedding_dim)

    def feed_forward(x):
        x = np.maximum(0, np.matmul(x, W1) + b1)  # ReLU activation
        x = np.matmul(x, W2) + b2
        return x

    return feed_forward(x)


Encoder block

In [67]:
def encoder_layer(x, num_heads, d_ff, embedding_dim, epsilon=1e-6):
    # Self-Attention
    attention_output = multi_head_attention(x, x, x, num_heads)
    attention_output += x  # Residual connection
    attention_output = layer_normalization(attention_output, epsilon)  # Add & Normalize

    # Feed-Forward Network
    ff_output = feed_forward_network(attention_output, d_ff, embedding_dim)
    ff_output += attention_output  # Residual connection
    ff_output = layer_normalization(ff_output, epsilon)  # Add & Normalize

    return ff_output


Encoder stack

In [68]:
def encoder_stack(x, num_layers, num_heads, d_ff, embedding_dim, epsilon=1e-6):
    for _ in range(num_layers):
        x = encoder_layer(x, num_heads, d_ff, embedding_dim, epsilon)
    return x


In [69]:
# # test
# num_layers = 6
# num_heads = 8
# d_ff = 2048
# embedding_dim = 512

# batch_size = 1
# sentence_length = 10
# sentence_embeddings = np.random.rand(batch_size, sentence_length, embedding_dim)

# # encoder stack with 6 encoder blocks
# encoded_output = encoder_stack(sentence_embeddings, num_layers, num_heads, d_ff, embedding_dim)
# print(f"Encoded Output after 6 encoder layers:\n{encoded_output}")


# **Decoder**

In [70]:
# Decoder layer
def decoder_layer(x, encoder_output, num_heads, d_ff, embedding_dim, epsilon=1e-6):
    # Masked Self-Attention
    masked_attention_output = multi_head_attention(x, x, x, num_heads)
    masked_attention_output += x  # Residual connection
    masked_attention_output = layer_normalization(masked_attention_output, epsilon)  # Add & Normalize

    # Encoder-Decoder Attention
    attention_output = multi_head_attention(masked_attention_output, encoder_output, encoder_output, num_heads)
    attention_output += masked_attention_output  # Residual connection
    attention_output = layer_normalization(attention_output, epsilon)  # Add & Normalize

    # Feed-Forward Network
    ff_output = feed_forward_network(attention_output, d_ff, embedding_dim)
    ff_output += attention_output  # Residual connection
    ff_output = layer_normalization(ff_output, epsilon)  # Add & Normalize

    return ff_output

def decoder_stack(x, encoder_output, num_layers, num_heads, d_ff, embedding_dim, epsilon=1e-6):
    for _ in range(num_layers):
        x = decoder_layer(x, encoder_output, num_heads, d_ff, embedding_dim, epsilon)
    return x

# **Transformer**

In [71]:
class Transformer:
    def __init__(self, num_encoder_layers, num_decoder_layers, num_heads, d_ff, embedding_dim, vocab_size, max_len, epsilon=1e-6):
        self.num_encoder_layers = num_encoder_layers
        self.num_decoder_layers = num_decoder_layers
        self.num_heads = num_heads
        self.d_ff = d_ff
        self.embedding_dim = embedding_dim
        self.vocab_size = vocab_size
        self.max_len = max_len
        self.epsilon = epsilon

        self.embedding_matrix = np.random.rand(vocab_size, embedding_dim)
        self.positional_encodings = get_positional_encoding(max_len, embedding_dim)

    def encode(self, sentence):
        sentence_len = len(sentence)
        sentence_embeddings = np.array([get_embedding(word, vocab, self.embedding_matrix) for word in sentence])
        input_embedding_with_position = sentence_embeddings + self.positional_encodings[:sentence_len, :]
        encoded_output = encoder_stack(input_embedding_with_position, self.num_encoder_layers, self.num_heads, self.d_ff, self.embedding_dim, self.epsilon)
        return encoded_output

    def decode(self, sentence, encoder_output):
        sentence_len = len(sentence)
        sentence_embeddings = np.array([get_embedding(word, vocab, self.embedding_matrix) for word in sentence])
        input_embedding_with_position = sentence_embeddings + self.positional_encodings[:sentence_len, :]
        decoded_output = decoder_stack(input_embedding_with_position, encoder_output, self.num_decoder_layers, self.num_heads, self.d_ff, self.embedding_dim, self.epsilon)
        return decoded_output

    def forward(self, source_sentence, target_sentence):
        encoder_output = self.encode(source_sentence)
        decoder_output = self.decode(target_sentence, encoder_output)
        return decoder_output

In [72]:
# Define hyperparameters
num_encoder_layers = 6
num_decoder_layers = 6
num_heads = 8
d_ff = 2048
embedding_dim = 512
vocab_size = len(vocab)
max_len = 100  # Assuming max length of the sentence

# Create transformer model instance
transformer = Transformer(num_encoder_layers, num_decoder_layers, num_heads, d_ff, embedding_dim, vocab_size, max_len)

# Test with a sample sentence
source_sentence = train_df['source'].iloc[0].split()
target_sentence = train_df['target'].iloc[0].split()

# Forward pass through the transformer model
output = transformer.forward(source_sentence, target_sentence)
print(f"Transformer model output:\n{output}")

Transformer model output:
[[ 1.80158113  0.11364658  0.56420411 ... -2.19337955  2.18923926
  -0.93490039]
 [ 1.80388736  0.1131103   0.56379054 ... -2.19225459  2.18726508
  -0.93634658]
 [ 1.80623934  0.11188299  0.56006097 ... -2.18851046  2.19022638
  -0.93444424]
 [ 1.8064679   0.11245047  0.55633796 ... -2.19061637  2.18666444
  -0.9327703 ]
 [ 1.80898632  0.10891185  0.55694253 ... -2.19065296  2.18481266
  -0.93418914]
 [ 1.81067434  0.1085136   0.55440942 ... -2.18875003  2.1846308
  -0.934971  ]]


In [73]:
def translate(transformer, source_sentence, vocab, max_len=100):
    # Prepare the source sentence
    source_tokens = source_sentence.split()

    # Encode the source sentence
    encoder_output = transformer.encode(source_tokens)

    # Initialize target sentence with the start token
    target_sentence = ['[start]']

    for _ in range(max_len):
        # Prepare the target input for the decoder
        target_tokens = target_sentence[-(max_len - 1):]  # Ensure max length is respected
        target_tokens_padded = target_tokens + ['[pad]'] * (max_len - len(target_tokens) - 1)

        # Decode to get the next token probabilities
        decoder_output = transformer.decode(target_tokens_padded, encoder_output)

        # Get the predicted token
        next_token_probs = decoder_output[-1]  # Last timestep output
        predicted_token_idx = np.argmax(next_token_probs, axis=-1)

        # Convert index to token
        inverse_vocab = {idx: word for word, idx in vocab.items()}
        predicted_token = inverse_vocab.get(predicted_token_idx, '[unk]')

        if predicted_token == '[end]':
            break

        target_sentence.append(predicted_token)

    return ' '.join(target_sentence[1:])  # Exclude the [start] token

# Example source sentence
source_sentence = "tom hiding something"  # Example source sentence
translated_sentence = translate(transformer, source_sentence, vocab)
print(f"Source Sentence: {source_sentence}")
print(f"Translated Sentence: {translated_sentence}")


Source Sentence: tom hiding something
Translated Sentence: repainted 112 cra flip rendezvous contains bienaime nextdoor stress physicians origine prcurseur finale werewolves phare cramique carelessly cderont years stay ouvrant captur consigne helmets rases pal compagnie nextdoor modernes touille niece nchapperas verbes financirement baromtre survivrai contributing avare compltes words coutume crit ouvrant fabriqus lastrologie chanteraistu imply lid cruaut ttre paroles chanteraistu cra laborer obirez oversleep transferred rver fisc counter vehemently payerons reprogramm contains ntre plaisantezvous drawing regardiezvous dimaginer devoid fins twinkle drapeaux assigned shifting faille violences racle renards battage particles get veng ncessite nul ceo finale compltes leve dextinction entretenues chienne ground plage lenclos leve entretenues oreillons taitelle 112
