<a href="https://colab.research.google.com/github/suvasish114/Deep-Learning/blob/main/English%20to%20Bengali%20Language%20Translation%20using%20Transformer/Translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# English to Bengali translation using sequence to sequence transformer

Status: `IN DEVELOPMENT`

In [1]:
# !pip install keras --upgrade

In [2]:
# !pip install tensorflow --upgrade

In [3]:
# IMPORT
import numpy as np
import pandas as pd
import tensorflow as tf
import keras

In [4]:
print(tf.__version__)
print(keras.__version__)

2.15.0
3.0.1


## Parsing

In [5]:
# Open dataframe
PATH = "drive/MyDrive/Datasets/eng_to_ben.txt"
with open(PATH, "r", encoding="utf-8") as f:
    lines = f.read().split("\n")

# Total lines
print(f"total number of lines: {len(lines)}")

total number of lines: 5937


In [6]:
# Sample line
print(lines[0])

Go.	যাও।	CC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #5545004 (tanay)


In [7]:
# Creating <Eng, Ben> pair
text_pairs = list()
for line in lines[:len(lines)-1]:
    eng_text, ben_text = line.split("\t")[:2]
    text_pairs.append((eng_text, "[start] " + ben_text + " [end]"))

# Sample line
text_pairs[-1]

('January, February, March, April, May, June, July, August, September, October, November and December are the twelve months of the year.',
 '[start] বছরের বারোটা মাস হলো জানুয়ারি, ফেব্রুয়ারি, মার্চ, এপ্রিল, মে, জুন জুলাই, আগস্ট, সেপ্টেম্বর, অক্টোবর, নভেম্বর আর ডিসেম্বর। [end]')

In [8]:
# Suffle dataset
import random
random.shuffle(text_pairs)

In [9]:
# Split start train datasets
from sklearn.model_selection import train_test_split
train, test = train_test_split(text_pairs, test_size = 0.1, random_state = 12)

# Status
print(f"train paris: {len(train)}")
print(f"train paris: {len(test)}")

train paris: 5342
train paris: 594


## Vectorizing Text Data

In [10]:
# Remove punchuation from the input string
import re
import string
strip_chars = string.punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
vocab_size = 15000
sequence_length = 20
batch_size = 64

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

In [11]:
# Sample string
custom_standardization(" বছরের বারোটা মাস হলো জানুয়ারি, ফেব্রুয়ারি, মার্চ, এপ্রিল, মে, জুন জুলাই, আগস্ট, সেপ্টেম্বর, অক্টোবর, নভেম্বর আর ডিসেম্বর। ").numpy().decode('UTF-8')

' বছরের বারোটা মাস হলো জানুয়ারি ফেব্রুয়ারি মার্চ এপ্রিল মে জুন জুলাই আগস্ট সেপ্টেম্বর অক্টোবর নভেম্বর আর ডিসেম্বর। '

In [12]:
# English vectorization
eng_vectorization = tf.keras.layers.TextVectorization(
    max_tokens = vocab_size,
    output_sequence_length = sequence_length
)

In [13]:
# Bengali vectorization
ben_vectorization = tf.keras.layers.TextVectorization(
    max_tokens = vocab_size,
    output_sequence_length = sequence_length + 1
)

In [14]:
# Vectorize data
train_eng_texts = [pair[0] for pair in train]
train_ben_texts = [pair[1] for pair in train]
eng_vectorization.adapt(train_eng_texts)
ben_vectorization.adapt(train_ben_texts)

In [15]:
# Sample string
# eng_vectorization("January, February, March, April, May, June, July, August, September, October, November and December are the twelve months of the year.")

In [16]:
def format_dataset(eng, ben):
    eng = eng_vectorization(eng)
    ben = ben_vectorization(ben)
    return ({"encoder_inputs": eng,
             "decoder_inputs": ben[:, :-1],},ben[:, 1:],)

In [17]:
# Make datasets
def make_dataset(pairs):
    eng_texts, ben_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    ben_texts = list(ben_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, ben_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2048).prefetch(16)

train_ds = make_dataset(train)

In [18]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


## Building Model

```
[encoder] => [POS_embedding] => [decoder]
```


In [19]:
import keras
from keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(num_heads = num_heads,
                                                   key_dim = embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim, activation = "relu"),
                                            layers.Dense(embed_dim),])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask = None):
        if mask is not None:
            padding_mask = keras.ops.cast(mask[:, None, :], dtype="int32")
        else:
            padding_mask = None

        attention_output = self.attention(query = inputs,
                                          value = inputs,
                                          key = inputs,
                                          attention_mask = padding_mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({"embed_dim": self.embed_dim,
                       "dense_dim": self.dense_dim,
                       "num_heads": self.num_heads,})
        return config

In [20]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim = vocab_size,
                                                 output_dim = embed_dim)
        self.position_embeddings = layers.Embedding(input_dim = sequence_length,
                                                    output_dim = embed_dim)
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = keras.ops.shape(inputs)[-1]
        positions = keras.ops.arange(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask = None):
        if mask is None:
            return None
        else:
            return keras.ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update({"sequence_length": self.sequence_length,
                       "vocab_size": self.vocab_size,
                       "embed_dim": self.embed_dim,})
        return config

In [21]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(num_heads = num_heads,
                                                     key_dim = embed_dim)
        self.attention_2 = layers.MultiHeadAttention(num_heads = num_heads,
                                                     key_dim = embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(latent_dim, activation="relu"),
                                            layers.Dense(embed_dim),])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask = None):
        causal_mask = self.get_casual_attention_mask(inputs)
        if mask is not None:
            padding_mask = keras.ops.cast(mask[:, None, :], dtype = "int32")
            padding_mask = keras.ops.minimum(padding_mask, causal_mask)
        else:
            padding_mask = None

        attention_output_1 = self.attention_1(query = inputs,
                                              value = inputs,
                                              key = inputs,
                                              attention_mask = causal_mask)
        out_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(query = out_1,
                                              value = encoder_outputs,
                                              key = encoder_outputs,
                                              attention_mask = padding_mask,)
        out_2 = self.layernorm_2(out_1 + attention_output_2)
        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_casual_attention_mask(self, inputs):
        input_shape = keras.ops.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = keras.ops.arange(sequence_length)[:, None]
        j = keras.ops.arange(sequence_length)
        mask = keras.ops.cast(i >= j, dtype = "int32")
        mask = keras.ops.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = keras.ops.concatenate([keras.ops.expand_dims(batch_size, -1),
                                      keras.ops.convert_to_tensor([1, 1])], axis = 0,)
        return keras.ops.tile(mask, mult)

    def get_config(self):
        config = super().get_config()
        config.update({"embed_dim": self.embed_dim,
                       "latent_dim": self.latent_dim,
                       "num_heads": self.num_heads,})
        return config

In [22]:
# Binding model layers
embed_dim = 256
latent_dim = 2048
num_heads = 8

# Encoder
encoder_inputs = keras.Input(shape=(None,), dtype = "int64", name = "encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

# Decoder
decoder_inputs = keras.Input(shape = (None,), dtype = "int64", name = "decoder_inputs")
encoded_seq_inputs = keras.Input(shape = (None, embed_dim), name = "decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation = "softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)
decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs, name = "transformer")

## Training Model

In [24]:
epochs = 10  # This should be at least 30 for convergence

transformer.summary()
transformer.compile("rmsprop",
                    loss = "sparse_categorical_crossentropy",
                    metrics = ["accuracy"])
transformer.fit(train_ds, epochs = epochs)

Epoch 1/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m335s[0m 4s/step - accuracy: 0.7857 - loss: 1.6133
Epoch 2/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m326s[0m 4s/step - accuracy: 0.7918 - loss: 1.4886
Epoch 3/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m375s[0m 4s/step - accuracy: 0.7975 - loss: 1.3834
Epoch 4/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m312s[0m 4s/step - accuracy: 0.8052 - loss: 1.2788
Epoch 5/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m326s[0m 4s/step - accuracy: 0.8101 - loss: 1.2054
Epoch 6/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m325s[0m 4s/step - accuracy: 0.8077 - loss: 1.2149
Epoch 7/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m329s[0m 4s/step - accuracy: 0.8114 - loss: 1.1708
Epoch 8/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m370s[0m 4s/step - accuracy: 0.8238 - loss: 1.0599
Epoch 9/10
[1m84/84[0m [32m━━━━━━━━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x7cf1cc4a2200>

## Decode Text Sentences

In [25]:
ben_vocab = ben_vectorization.get_vocabulary()
ben_index_lookup = dict(zip(range(len(ben_vocab)), ben_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = ben_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        # ops.argmax(predictions[0, i, :]) is not a concrete value for jax here
        sampled_token_index = keras.ops.convert_to_numpy(keras.ops.argmax(predictions[0, i, :])).item(0)
        sampled_token = ben_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

In [27]:
test_eng_texts = [pair[0] for pair in test]
for _ in range(5):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(input_sentence)
    print(translated)

Tom is not happy to be here.
[start] আমি একটা নতুন গাড়ি বিক্রি করতে চাই। end            
Who are those guys?
[start] আমি একটা নতুন গাড়ি বিক্রি করতে চাই। end            
They are doctors.
[start] আমি একটা নতুন গাড়ি বিক্রি করতে চাই। end            
Tom lives here.
[start] আমি একটা নতুন গাড়ি বিক্রি করতে চাই। end            
Do you understand what I want to say?
[start] আমি একটা নতুন গাড়ি বিক্রি করতে চাই। end            


```
by Suvasish Das
```