#Self Attention Calculation

In [None]:
import numpy as np
import math

## Step-1 : Basic Embeddings

In [None]:
# Add contextual embeddings for Example 1 (Riverbank)
'''
The farmer planted a row of seeds near the bank of the river,
hoping they would bear fruit by the summer.
'''

embeddings_example1 = {
    "Bank": np.array([0.1, 0.2, 0.3]),
    "Bear": np.array([0.4, 0.1, 0.6]),
    "Row": np.array([0.3, 0.5, 0.2]),
    "River": np.array([0.2, 0.3, 0.5]),
    "Seeds": np.array([0.3, 0.1, 0.4]),
    "Summer": np.array([0.6, 0.2, 0.1]),
}

# Add contextual embeddings for Example 2 (Financial institution)
'''
The row between the bank and investors grew heated when they argued
about whether the bear market would affect the company's financial stability.
'''
embeddings_example2 = {
    "Bank": np.array([0.1, 0.2, 0.3]),
    "Bear": np.array([0.4, 0.1, 0.6]),
    "Row": np.array([0.3, 0.5, 0.2]),
    "Investors": np.array([0.7, 0.3, 0.4]),
    "Market": np.array([0.2, 0.8, 0.5]),
    "Dispute": np.array([0.4, 0.6, 0.7]),
}

## Step-2: Attention Function

In [None]:
# Step 2: Define the Scaled Dot-Product Attention Function
def scaled_dot_product_attention(Q, K, V):
    """
    Q: Query matrix
    K: Key matrix
    V: Value matrix
    """
    # Step 2.1: Compute dot products between Query and Key
    scores = np.dot(Q, K.T)  # Shape: (n_words, n_words)

    # Step 2.2: Scale the scores by sqrt(d_k) where d_k is the embedding size
    d_k = K.shape[1]
    scaled_scores = scores / math.sqrt(d_k)

    # Step 2.3: Apply softmax to get attention weights
    attention_weights = np.exp(scaled_scores) / np.sum(np.exp(scaled_scores), axis=-1, keepdims=True)

    # Step 2.4: Compute the weighted sum of values
    output = np.dot(attention_weights, V)

    return attention_weights, output

def calculate_attention(embeddings):
    words = list(embeddings.keys())
    embedding_matrix = np.array(list(embeddings.values()))

    # Use the same embeddings for Q, K, and V in self-attention
    Q = embedding_matrix  # Query
    K = embedding_matrix  # Key
    V = embedding_matrix  # Value

    # Compute self-attention
    attention_weights, attention_output = scaled_dot_product_attention(Q, K, V)

    return words, attention_weights, attention_output

## Step-3 : Calculate Attention

In [None]:
# Calculate Attention for Example 1
print("Example 1: Riverbank, Produce, Arrangement")
words1, attention_weights1, output1 = calculate_attention(embeddings_example1)

#print("Words:", words1)
#print("Attention Weights:\n", attention_weights1)
#print("Attention Output (weighted sum of values):\n", output1)
for i in range(len(words1)):
  print(f"Attention values for {words1[i]}: {output1[i]}")


# Calculate Attention for Example 2
print("\nExample 2: Financial Institution, Stock Market, Dispute")
words2, attention_weights2, output2 = calculate_attention(embeddings_example2)

#print("Words:", words2)
#print("Attention Weights:\n", attention_weights2)
for i in range(len(words2)):
  print(f"Attention values for {words2[i]}: {output2[i]}")

# English-to-Spanish translation model with Transformer


## Background
In this example, we'll build a sequence-to-sequence Transformer model, which
we'll train on an English-to-Spanish machine translation task.



## Import the packages

In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import pathlib
import random
import string
import re
import numpy as np
import tensorflow.data as tf_data
import tensorflow.strings as tf_strings
import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization

#The below package is a custom transformer package stored on my github

import Transformer
from Transformer import TransformerEncoder, TransformerDecoder, PositionalEmbedding

## Downloading the data

We'll be working with an English-to-Spanish translation dataset
provided by [Anki](https://www.manythings.org/anki/). Let's download it:

In [None]:
datafile_location = "http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip"
eng_spa_raw = keras.utils.get_file(
    fname="spa-eng.zip",
    origin=datafile_location,
    extract=True,
)
eng_spa_raw = pathlib.Path(eng_spa_raw).parent / "spa-eng" / "spa.txt"
'''
### Parsing the data

Each line contains an English sentence and its corresponding Spanish sentence.
The English sentence is the source sequence and Spanish one is the target sequence.
We attach the tokens [start] and [end] to the Spanish sentence.
'''
with open(eng_spa_raw) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    eng, spa = line.split("\t")
    spa = "[start] " + spa + " [end]"
    text_pairs.append((eng, spa))

print("Sample Data Points")
for i in random.sample(range(1, 80000), 5):
    print(text_pairs[i])

## Split the Data into Train and Test

Now, let's split the sentence pairs into a training set, a validation set,
and a test set.

In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

## Vectorizing the text data

We'll use two instances of the `TextVectorization` layer to vectorize the text
data (one for English and one for Spanish),
that is to say, to turn the original strings into integer sequences
where each integer represents the index of a word in a vocabulary.

The English layer will use the default string standardization (strip punctuation characters)
and splitting scheme (split on whitespace), while
the Spanish layer will use a custom standardization, where we add the character
`"¿"` to the set of punctuation characters to be stripped.

In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf_strings.lower(input_string)
    return tf_strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

vocab_size = 15000
sequence_length = 20
batch_size = 64

eng_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
spa_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)

train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)

#print vectorized text
for i in random.sample(range(1, 100000), 3):
  print(train_eng_texts[i],'\n',eng_vectorization(train_eng_texts[i]))
  print(train_spa_texts[i],'\n',spa_vectorization(train_spa_texts[i]))
  print("=============")

## Data Pre-processing

At each training step, the model will seek to predict target words N+1 (and beyond)
using the source sentence and the target words 0 to N.

As such, the training dataset will yield a tuple `(inputs, targets)`, where:

- `inputs` is a dictionary with the keys `encoder_inputs` and `decoder_inputs`.
`encoder_inputs` is the vectorized source sentence and `encoder_inputs` is the target sentence "so far",
that is to say, the words 0 to N used to predict word N+1 (and beyond) in the target sentence.
- `target` is the target sentence offset by one step:
it provides the next words in the target sentence -- what the model will try to predict.

In [None]:
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return (
        {
            "encoder_inputs": eng,
            "decoder_inputs": spa[:, :-1],
        },
        spa[:, 1:],
    )

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf_data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2048).prefetch(16)


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
#shape of train_ds
#dataset=train_ds.take(3)
#list(dataset.as_numpy_iterator())

Let's take a quick look at the sequence shapes
(we have batches of 64 pairs, and all sequences are 20 steps long):

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

## Model Configuration


In [None]:
embed_dim = 256
latent_dim = 2048 # Nodes in the Dense Layer over the multi head attention layer
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)([x, encoder_outputs])
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

transformer = keras.Model(
    {"encoder_inputs": encoder_inputs, "decoder_inputs": decoder_inputs},
    decoder_outputs,
    name="transformer",
)

#transformer.summary()
transformer.compile(
    "rmsprop",
    loss=keras.losses.SparseCategoricalCrossentropy(ignore_class=0),
    metrics=["accuracy"],
)

## Training our model

We'll use accuracy as a quick way to monitor training progress on the validation data.
Note that machine translation typically uses BLEU scores as well as other metrics, rather than accuracy.

Here we only train for 1 epoch, but to get the model to actually converge
you should train for at least 30 epochs.

In [None]:
epochs = 2  # This should be at least 30 for convergence
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

## Save and Load the Model

In [None]:
transformer.save_weights("eng_spa_2epochs.weights.h5")
transformer.load_weights("eng_spa_2epochs.weights.h5")
transformer.fit(train_ds, epochs=1, validation_data=val_ds)

In [None]:
#Download the model weights file
!gdown 'https://drive.google.com/uc?export=download&id=1jMLFnlXPQXlRRVmXfr2sIOuxUO4VGuKo' -O eng_spa_50epochs.weights.h5

# Load the model weights
transformer.load_weights("eng_spa_50epochs.weights.h5")
transformer.fit(train_ds, epochs=1, validation_data=val_ds)

## Making Predictions

Finally, let's demonstrate how to translate brand new English sentences.
We simply feed into the model the vectorized English sentence
as well as the target token `"[start]"`, then we repeatedly generated the next token, until
we hit the token `"[end]"`.

In [None]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer(
            {
                "encoder_inputs": tokenized_input_sentence,
                "decoder_inputs": tokenized_target_sentence,
            }
        )

        sampled_token_index = ops.convert_to_numpy(
            ops.argmax(predictions[0, i, :])
        ).item(0)
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

## Prediction Samples

In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]
for i in random.sample(range(1, 1000), 10):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(input_sentence, " ==> ", translated)