In [32]:

import tensorflow as tf### models
import numpy as np### math computations
import matplotlib.pyplot as plt### plotting bar chart
import sklearn### machine learning library
import cv2## image processing
from sklearn.metrics import confusion_matrix, roc_curve### metrics
import seaborn as sns### visualizations
import datetime
import pathlib
import io
import os
import re
import string
import time
from numpy import random
import tensorflow_datasets as tfds
import tensorflow_probability as tfp
from keras.models import Model
from keras.layers import Layer
from keras.layers import (Dense,Flatten,SimpleRNN,InputLayer,Conv1D,Bidirectional,GRU,LSTM,BatchNormalization,Dropout,Input, Embedding,TextVectorization)
from keras.losses import BinaryCrossentropy,CategoricalCrossentropy, SparseCategoricalCrossentropy
from keras.metrics import Accuracy,TopKCategoricalAccuracy, CategoricalAccuracy, SparseCategoricalAccuracy
from keras.optimizers import Adam
from keras.layers import MultiHeadAttention, LayerNormalization
from google.colab import drive
from google.colab import files
from tensorboard.plugins import projector

In [2]:
#@ Downloading datasets:
!wget https://www.manythings.org/anki/fra-eng.zip

--2025-01-20 14:00:21--  https://www.manythings.org/anki/fra-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7943074 (7.6M) [application/zip]
Saving to: ‘fra-eng.zip’


2025-01-20 14:00:22 (17.6 MB/s) - ‘fra-eng.zip’ saved [7943074/7943074]



In [3]:
!unzip '/content/fra-eng.zip' -d '/content/dataset' # -d flag specifies directories

Archive:  /content/fra-eng.zip
  inflating: /content/dataset/_about.txt  
  inflating: /content/dataset/fra.txt  


#### Data Preprocessing

In [4]:
text_dataset=tf.data.TextLineDataset('/content/dataset/fra.txt') #each line is treated as separate string

In [5]:
for i in text_dataset.take(3):
  print(i)

tf.Tensor(b'Go.\tVa !\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #1158250 (Wittydev)', shape=(), dtype=string)
tf.Tensor(b'Go.\tMarche.\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8090732 (Micsmithel)', shape=(), dtype=string)
tf.Tensor(b'Go.\tEn route !\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8267435 (felix63)', shape=(), dtype=string)


In [6]:
#@ Setting up the Parameters:
VOCAB_SIZE=20000 #unique tokens from dataset, setting value 20000 for efficiency
ENGLISH_SEQUENCE_LENGTH=32 #max length of i/p sequence[in tokens]
FRENCH_SEQUENCE_LENGTH=32 #max len of o/p sequence[in tokens]
EMBEDDINGS_DIM=512 #size of vectors to represent tokens(as per paper)
BATCH_SIZE=128 #for data size processed during training

In [7]:
#@ for english word:
english_vectorize_layer=TextVectorization(
                      standardize='lower_and_strip_punctuation',
                      max_tokens=VOCAB_SIZE,
                      output_mode='int', #mapping wrt to the integer index
                      output_sequence_length=ENGLISH_SEQUENCE_LENGTH
)

#@ for french word:
french_vectorize_layer=TextVectorization(
                       standardize='lower_and_strip_punctuation',
                       max_tokens=VOCAB_SIZE,
                       output_mode='int',
                       output_sequence_length=FRENCH_SEQUENCE_LENGTH
)

In [8]:
def seperator(input_text):
  split_text=tf.strings.split(input_text, '\t')
  return {
      'input_1':split_text[0:1],
      'input_2':'starttoken' + split_text[1:2]
      }, split_text[1:2]+' endtoken'

In [9]:
text='hello\tprijal'
seperator(text)


({'input_1': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'hello'], dtype=object)>,
  'input_2': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'starttokenprijal'], dtype=object)>},
 <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'prijal endtoken'], dtype=object)>)

In [10]:
#@ Initializing dataset:
init_dataset=text_dataset.map(seperator)

In [11]:
for i in init_dataset.take(3):
  print(i)


({'input_1': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Go.'], dtype=object)>, 'input_2': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'starttokenVa !'], dtype=object)>}, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Va ! endtoken'], dtype=object)>)
({'input_1': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Go.'], dtype=object)>, 'input_2': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'starttokenMarche.'], dtype=object)>}, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Marche. endtoken'], dtype=object)>)
({'input_1': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Go.'], dtype=object)>, 'input_2': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'starttokenEn route !'], dtype=object)>}, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'En route ! endtoken'], dtype=object)>)


### Vocab Creation

In [12]:
english_training_data=init_dataset.map(lambda x, y:x['input_1'])
english_vectorize_layer.adapt(english_training_data)

french_training_data=init_dataset.map(lambda x, y:y)
french_vectorize_layer.adapt(french_training_data)


In [13]:
#@ Grouping and  Vectorization for training:
def vectorizer(inputs, output):
  return {'input_1':english_vectorize_layer(inputs['input_1']),
          'input_2':french_vectorize_layer(inputs['input_2'])}, french_vectorize_layer(output)

In [14]:
init_dataset

<_MapDataset element_spec=({'input_1': TensorSpec(shape=(None,), dtype=tf.string, name=None), 'input_2': TensorSpec(shape=(None,), dtype=tf.string, name=None)}, TensorSpec(shape=(None,), dtype=tf.string, name=None))>

In [15]:
dataset=init_dataset.map(vectorizer)

In [16]:
for i in init_dataset.take(3):
  print(i)

({'input_1': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Go.'], dtype=object)>, 'input_2': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'starttokenVa !'], dtype=object)>}, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Va ! endtoken'], dtype=object)>)
({'input_1': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Go.'], dtype=object)>, 'input_2': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'starttokenMarche.'], dtype=object)>}, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Marche. endtoken'], dtype=object)>)
({'input_1': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Go.'], dtype=object)>, 'input_2': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'starttokenEn route !'], dtype=object)>}, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'En route ! endtoken'], dtype=object)>)


In [17]:
for i in dataset.take(1):
  print(i)

({'input_1': <tf.Tensor: shape=(1, 32), dtype=int64, numpy=
array([[45,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0]])>, 'input_2': <tf.Tensor: shape=(1, 32), dtype=int64, numpy=
array([[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])>}, <tf.Tensor: shape=(1, 32), dtype=int64, numpy=
array([[103,   2,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0]])>)


In [18]:
dataset=dataset.shuffle(2048).unbatch().batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [19]:
NUM_BATCHES=int(200000/BATCH_SIZE)

In [20]:
#@ Training and testing split
train_dataset=dataset.take(int(0.9*NUM_BATCHES))
val_dataset=dataset.skip(int(0.9*NUM_BATCHES))

### Model Architecture

In [21]:
#@ Positional Encoding:
def PositionalEncoding(d_model, SEQUENCE_LENGTH):
  output=[]
  for pos in range(SEQUENCE_LENGTH):
    PE=np.zeros(d_model)
    for i in range(d_model):
      if i % 2 == 0: #even position, sine formula is used
        PE[i]=np.sin(pos/(10000**(2*i/d_model)))
      else:
        PE[i]=np.cos(pos/(10000**(2*i/d_model)))
    output.append(tf.expand_dims(PE, axis=0))
  out=tf.concat(output, axis=0)
  out=tf.expand_dims(out, axis=0)
  return tf.cast(out, dtype=tf.float32)


In [22]:
print(PositionalEncoding(512, 32))

tf.Tensor(
[[[ 0.0000000e+00  1.0000000e+00  0.0000000e+00 ...  1.0000000e+00
    0.0000000e+00  1.0000000e+00]
  [ 8.4147096e-01  5.6969500e-01  8.0196178e-01 ...  1.0000000e+00
    1.0746079e-08  1.0000000e+00]
  [ 9.0929741e-01 -3.5089520e-01  9.5814437e-01 ...  1.0000000e+00
    2.1492157e-08  1.0000000e+00]
  ...
  [-6.6363388e-01 -9.5558822e-01  9.6020764e-01 ...  1.0000000e+00
    3.1163626e-07  1.0000000e+00]
  [-9.8803163e-01 -7.8659910e-01  3.4962672e-01 ...  1.0000000e+00
    3.2238236e-07  1.0000000e+00]
  [-4.0403765e-01  5.9345119e-02 -5.4249090e-01 ...  1.0000000e+00
    3.3312844e-07  1.0000000e+00]]], shape=(1, 32, 512), dtype=float32)


### Input Embeddings

In [40]:
from keras.layers import Layer, Dense
class Embeddings(Layer):
  def __init__(self, sequence_length, vocab_size, embedding_dim):
    super(Embeddings, self).__init__()
    self.token_embeddings=Embedding(input_dim=vocab_size, output_dim=embedding_dim)
    self.sequence_length=sequence_length
    self.vocab_size=vocab_size
    self.embedding_dim=embedding_dim

  def call(self, inputs):
    embedded_tokens=self.token_embeddings(inputs)
    embedded_positions=PositionalEncoding(self.embedding_dim, self.sequence_length)
    mask=tf.math.not_equal(inputs, 0)
    return embedded_tokens + embedded_positions, mask

  # def compute_mask(self, inputs, mask=None):
  #   return tf.math.not_equal(inputs, 0)


### Custome Attention Layer

- Self attention layer

In [24]:
class CustomSelfAttention(Layer):
  def __init__(self, model_size):
    super(CustomSelfAttention, self).__init__()
    self.model_size=model_size

  def call(self, query, key, value, masking):
    score=tf.matmul(query, key, transpose_b=True)

    score/= tf.math.sqrt(tf.cast(self.model_size, dtype=tf.float32))

    masking=tf.cast(masking, dtype=tf.float32)
    score -= (1.0-masking)* 1e10

    attention_weights=tf.nn.softmax(score, axis=1) * masking

    head_output=tf.matmul(attention_weights, value)

    return head_output

### Multi-headed Attention
- Multihead Attention allows model to focus on different part of input sequence simultaneously and combine these prespective into comprehensive representation.

- For example: "Harry saw a man with binoculars'. This sentence can have two meanings, they are either it can be harry saw a man using binoculars or it can be harry saw a man who has binoculars. These both can be correct. So transformer has to understand both these meanings which self-attention fails to recognize that's why multi-head attention is used.

In [52]:
class MultiHeadAttention(Layer):
    def __init__(self, n_heads, key_dim):
        super(MultiHeadAttention, self).__init__()

        self.n_heads = n_heads
        self.dense_q = [Dense(key_dim // n_heads) for _ in range(n_heads)]
        self.dense_k = [Dense(key_dim // n_heads) for _ in range(n_heads)]
        self.dense_v = [Dense(key_dim // n_heads) for _ in range(n_heads)]
        self.dense_o = Dense(key_dim)
        self.attention = CustomSelfAttention(key_dim)

    def call(self, query, key, value, attention_mask=None):
        heads = []

        for i in range(self.n_heads):
            q = self.dense_q[i](query)
            k = self.dense_k[i](key)
            v = self.dense_v[i](value)

            # Pass mask as None if not provided
            head = self.attention(q, k, v, masking=attention_mask if attention_mask is not None else tf.zeros_like(query))
            heads.append(head)

        # Concatenate all heads and apply output transformation
        heads = tf.concat(heads, axis=2)
        heads = self.dense_o(heads)
        return heads



## Encoder

In [51]:
from tensorflow.keras.layers import Layer, Dense, LayerNormalization
import tensorflow as tf

class Encoder(Layer):
    def __init__(self, embeddings_dim, dense_dim, n_heads):
        super(Encoder, self).__init__()
        self.embeddings_dim = embeddings_dim
        self.dense_dim = dense_dim
        self.n_heads = n_heads

        # Initialize the multi-head attention layer
        self.attention = MultiHeadAttention(n_heads=n_heads, key_dim=embeddings_dim)

        # Dense projection layers
        self.dense_projection = tf.keras.Sequential([
            Dense(self.dense_dim, activation='relu'),
            Dense(self.embeddings_dim)
        ])

        # Layer normalization layers
        self.layernorm1 = LayerNormalization()
        self.layernorm2 = LayerNormalization()

        # Masking support
        self.supports_masking = True

    def call(self, inputs, mask=None):
        # If mask is provided, handle it; otherwise, skip masking logic
        if mask is not None:
            mask = tf.cast(mask[:, tf.newaxis, :], dtype='int32')
            T = tf.shape(mask)[2]
            padding_mask = tf.repeat(mask, T, axis=1)
        else:
            padding_mask = None

        # Apply the multi-head attention layer
        attention_output = self.attention(query=inputs, value=inputs, key=inputs, attention_mask=padding_mask)

        # Apply residual connection and layer normalization after attention
        projection_input = self.layernorm1(inputs + attention_output)

        # Dense projection and residual connection
        projection_output = self.dense_projection(projection_input)
        return self.layernorm2(projection_input + projection_output)



### Decoder

In [36]:
class Decoder(Layer):
  def __init__(self, embeddings_dim, n_heads, latent_dim):
    super(Decoder, self).__init__()
    self.embeddings_dim=embeddings_dim
    self.n_heads=n_heads
    self.latent_dim=latent_dim

    self.attention_1=MultiHeadAttention(n_heads=n_heads, key_dim=embeddings_dim) # self-attention
    self.attention_2=MultiHeadAttention(n_heads=n_heads, key_dim=embeddings_dim) #cross attention


    self.dense_projection=tf.keras.Sequential(
        [Dense(latent_dim, activation='relu'), Dense(embedding_dim)]
    ) #feedforward layer

    self.layernorm_1=LayerNormalization()
    self.layernorm_2=LayerNormalization()
    self.layernorm_3=LayerNormalization()

  def call(self, inputs, encoder_output, encoder_mask, mask=None):
    if mask is not None:
      causal_mask = tf.linalg.band_part(
          tf.ones([tf.shape(inputs)[0],
                   tf.shape(inputs)[1],
                   tf.shape(inputs)[1]], dtype=tf.int32), -1, 0)
      # the role of causal mask is to prevent peeking into the future tokens for the decoder to predict better
      # the band_part method makes it really easier to do this

      mask = tf.cast(
          mask[:, tf.newaxis, :], dtype='int32'
      )
      enc_mask = tf.cast(
          enc_mask[:, tf.newaxis, :], dtype='int32'
      )

      T = tf.shape(mask)[2] # T is the number of queries from the decoder
      padding_mask = tf.repeat(mask, T, axis=1)
      cross_attn_mask = tf.repeat(enc_mask, T, axis=1)
      combined_mask = tf.minimum(padding_mask, causal_mask)

    attention_output_1=self.attention_1(query=inputs, key=inputs, value=inputs, attention_mask=combined_mask)
    output_1=self.layernorm_1(inputs + attention_output_1)

    attention_output_2=self.attention_2(query=output_1, key=encoder_output, value=encoder_output, attention_mask=cross_attn_mask, return_attention_scores=True)
    output_2=self.layernorm_2(output_1 + attention_output_2)

    projection_output=self.dense_projection(output_2)

    return self.layernorm_3(output_2 + projection_output)



### Full transformer Model

In [37]:
EMBEDDING_DIMS=512
LATENT_DIMS=2048
NUM_HEADS=8
NUM_LAYER=1
NUM_EPOCH=10
attention_scores={}

In [53]:

encoder_inputs = Input(shape=(None,), dtype='int64', name='input_1')
embeddings = Embeddings(ENGLISH_SEQUENCE_LENGTH, VOCAB_SIZE, EMBEDDING_DIMS)
x, enc_mask = embeddings(encoder_inputs)


for _ in range(NUM_LAYER): # there can be N number of layers as mentioned by paper
  x = Encoder(EMBEDDING_DIMS, LATENT_DIMS, NUM_HEADS)(x)
encoder_outputs = x

decoder_inputs = Input(shape=(None,), dtype='int64', name='input_2')
x = Embeddings(FRENCH_SEQUENCE_LENGTH, VOCAB_SIZE, EMBEDDING_DIMS)(decoder_inputs)

for i in range(NUM_LAYER):
  x, scores = Decoder(EMBEDDING_DIMS, LATENT_DIMS, NUM_HEADS)(x, encoder_outputs, enc_mask)
  attention_scores[f'decoder_layer{i+1}_block2'] = scores

x = tf.keras.layers.Dropout(0.5)(x)
decoder_outputs = Dense(VOCAB_SIZE, activation='softmax')(x)

attention_score_model = tf.keras.Model(
    [encoder_inputs, decoder_inputs],
    attention_scores, name='attention_score_model'
)


transformer = tf.keras.Model(
    [encoder_inputs, decoder_inputs],
    decoder_outputs, name='transformer'
)

transformer.summary()

1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''Exception encountered when calling CustomSelfAttention.call().

[1mDimensions must be equal, but are 32 and 512 for '{{node multi_head_attention_5_1/custom_self_attention_5_1/sub_1}} = Sub[T=DT_FLOAT](multi_head_attention_5_1/custom_self_attention_5_1/truediv, multi_head_attention_5_1/custom_self_attention_5_1/mul)' with input shapes: [?,32,32], [?,32,512].[0m

Arguments received by CustomSelfAttention.call():
  • query=tf.Tensor(shape=(None, 32, 64), dtype=float32)
  • key=tf.Tensor(shape=(None, 32, 64), dtype=float32)
  • value=tf.Tensor(sha

ValueError: Exception encountered when calling Encoder.call().

[1mCould not automatically infer the output shape / dtype of 'encoder_6' (of type Encoder). Either the `Encoder.call()` method is incorrect, or you need to implement the `Encoder.compute_output_spec() / compute_output_shape()` method. Error encountered:

Exception encountered when calling CustomSelfAttention.call().

[1mDimensions must be equal, but are 32 and 512 for '{{node multi_head_attention_5_1/custom_self_attention_5_1/sub_1}} = Sub[T=DT_FLOAT](multi_head_attention_5_1/custom_self_attention_5_1/truediv, multi_head_attention_5_1/custom_self_attention_5_1/mul)' with input shapes: [?,32,32], [?,32,512].[0m

Arguments received by CustomSelfAttention.call():
  • query=tf.Tensor(shape=(None, 32, 64), dtype=float32)
  • key=tf.Tensor(shape=(None, 32, 64), dtype=float32)
  • value=tf.Tensor(shape=(None, 32, 64), dtype=float32)
  • masking=tf.Tensor(shape=(None, 32, 512), dtype=float32)[0m

Arguments received by Encoder.call():
  • args=('<KerasTensor shape=(None, 32, 512), dtype=float32, sparse=False, name=keras_tensor_34>',)
  • kwargs={'mask': 'None'}