In [1]:
# pip install transformers

In [2]:
# pip install tensorflow

In [3]:
    # pip install --upgrade pip

In [4]:
# pip install --upgrade tensorflow

In [5]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization, LayerNormalization
from tensorflow.keras.layers import Bidirectional,GRU,LSTM,Embedding,Dropout,Layer,MultiHeadAttention,Dense
from tensorflow.keras import Sequential,Input
from tensorflow.keras.callbacks import ModelCheckpoint

In [6]:
text_file = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"

In [7]:
with open(text_file, 'r', encoding='utf-8', errors='ignore') as f:
    lines = f.read().split("\n")[:-1]
lng_pairs = []
for line in lines:
    eng, spa = line.split("\t")
    spa = "[start] " + spa + " [end]"
    lng_pairs.append((eng, spa))

In [8]:
for _ in range(5):
    print(random.choice(lng_pairs))

('Tom is dating Mary.', '[start] Tom está saliendo con Mary. [end]')
('I like to play soccer.', '[start] Me gusta jugar al futbol. [end]')
('Tom told Mary not to be late.', '[start] Tom le dijo a Mary que no llegara tarde. [end]')
('You are mad to go out in the snow without a coat.', '[start] Tienes que estar loco para ir a la nieve sin un abrigo. [end]')
('They finished their meal.', '[start] Ellos terminaron su comida. [end]')


In [9]:
import random

subset_size = 5000

# Randomly select a subset of pairs
text_pairs = random.sample(lng_pairs, subset_size)


In [10]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

5000 total pairs
3500 training pairs
750 validation pairs
750 test pairs


In [11]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

In [12]:
vocab_size = 5000
sequence_length = 10
batch_size = 34

In [13]:
def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


eng_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
spa_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
eng_train_texts = [pair[0] for pair in train_pairs]
spa_train_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(eng_train_texts)
spa_vectorization.adapt(spa_train_texts)

In [14]:
def vector_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return ({"encoder_inputs": eng, "decoder_inputs": spa[:, :-1],}, spa[:, 1:])


def new_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(vector_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_data = new_dataset(train_pairs)
val_data = new_dataset(val_pairs)

In [15]:
for inputs, targets in train_data.take(1):
    print(f'inputs encoder shape: {inputs["encoder_inputs"].shape}')
    print(f'decoder shape: {inputs["decoder_inputs"].shape}')
    print(f"targets shape: {targets.shape}")

inputs encoder shape: (34, 10)
decoder shape: (34, 10)
targets shape: (34, 10)


In [16]:
# class TransformerEncoder(Layer):
#     def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
#         super(TransformerEncoder, self).__init__(**kwargs)
#         self.embed_dim = embed_dim
#         self.dense_dim = dense_dim
#         self.num_heads = num_heads
#         self.attention = MultiHeadAttention(
#             num_heads=num_heads, key_dim=embed_dim
#         )
#         self.dense_proj = Sequential(
#             [Dense(dense_dim, activation="relu"), Dense(embed_dim),]
#         )
#         self.layernorm_1 = LayerNormalization()
#         self.layernorm_2 = LayerNormalization()
#         self.supports_masking = True

#     def call(self, inputs, mask=None):
#         if mask is not None:
#             padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
#         attention_output = self.attention(
#             query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
#         )
        
#         proj_input = self.layernorm_1(inputs + attention_output)
#         proj_output = self.dense_proj(proj_input)
#         return self.layernorm_2(proj_input + proj_output)
#     def get_config(self):
#         config = super().get_config()
#         config.update({
#             "embed_dim": self.embed_dim,
#             "dense_dim": self.dense_dim,
#             "num_heads": self.num_heads,
#         })
#         return config


In [17]:
# class TransformerDecoder(Layer):
#     def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
#         super().__init__(**kwargs)
#         self.embed_dim = embed_dim
#         self.dense_dim = dense_dim
#         self.num_heads = num_heads
#         self.attention_1 = MultiHeadAttention(
#             num_heads=num_heads, key_dim=embed_dim)
#         self.attention_2 = MultiHeadAttention(
#             num_heads=num_heads, key_dim=embed_dim)
#         self.dense_proj = keras.Sequential(
#             [Dense(dense_dim, activation="relu"),
#              Dense(embed_dim),]
#         )
#         self.layernorm_1 = LayerNormalization()
#         self.layernorm_2 = LayerNormalization()
#         self.layernorm_3 = LayerNormalization()
#         self.supports_masking = True

#     def get_config(self):
#         config = super().get_config()
#         config.update({
#             "embed_dim": self.embed_dim,
#             "num_heads": self.num_heads,
#             "dense_dim": self.dense_dim,
#         })
#         return config

#     def get_causal_attention_mask(self, inputs):
#         input_shape = tf.shape(inputs)
#         batch_size, sequence_length = input_shape[0], input_shape[1]
#         i = tf.range(sequence_length)[:, tf.newaxis]
#         j = tf.range(sequence_length)
#         mask = tf.cast(i >= j, dtype="int32")
#         mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
#         mult = tf.concat(
#             [tf.expand_dims(batch_size, -1),
#              tf.constant([1, 1], dtype=tf.int32)], axis=0)
#         return tf.tile(mask, mult)

#     def call(self, inputs, encoder_outputs, mask=None):
#         causal_mask = self.get_causal_attention_mask(inputs)
#         if mask is not None:
#             padding_mask = tf.cast(
#                 mask[:, tf.newaxis, :], dtype="int32")
#             padding_mask = tf.minimum(padding_mask, causal_mask)
#         attention_output_1 = self.attention_1(
#             query=inputs,
#             value=inputs,
#             key=inputs,
#             attention_mask=causal_mask)
#         attention_output_1 = self.layernorm_1(inputs + attention_output_1)
#         attention_output_2 = self.attention_2(
#             query=attention_output_1,
#             value=encoder_outputs,
#             key=encoder_outputs,
#             attention_mask=padding_mask,
#         )
#         attention_output_2 = self.layernorm_2(
#             attention_output_1 + attention_output_2)
#         proj_output = self.dense_proj(attention_output_2)
#         return self.layernorm_3(attention_output_2 + proj_output)

In [18]:
# class PositionalEmbedding(Layer):
#     def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
#         super().__init__(**kwargs)
#         self.token_embeddings = Embedding(
#             input_dim=input_dim, output_dim=output_dim)
#         print(input_dim,output_dim)
#         #intermediate = self.getPositionEncoding(seq_len=input_dim,d=vocab_size,n=output_dim)
#         self.position_embeddings = Embedding(input_dim=input_dim, output_dim=output_dim)
#         self.sequence_length = sequence_length
#         self.input_dim = input_dim
#         self.output_dim = output_dim

#     def getPositionEncoding(self,seq_len, d, n = sequence_length):
#         P = np.zeros((seq_len, d))
#         for k in range(seq_len):
#             for i in np.arange(int(d/2)):
#                 denominator = np.power(n, 2*i/d)
#                 P[k, 2*i] = np.sin(k/denominator)
#                 P[k, 2*i+1] = np.cos(k/denominator)
#         tensor = tf.convert_to_tensor(P)
#         print(tensor.shape)
#         return tensor
    
#     def call(self, inputs):
#         length = tf.shape(inputs)[-1]
#         positions = tf.range(start=0, limit=length, delta=1)
#         embedded_tokens = self.token_embeddings(inputs)
#         embedded_positions = self.position_embeddings(positions)
#         return embedded_tokens + embedded_positions

#     def compute_mask(self, inputs, mask=None):
#         return tf.math.not_equal(inputs, 0)

#     def get_config(self):
#         config = super(PositionalEmbedding, self).get_config()
#         config.update({
#             "output_dim": self.output_dim,
#             "sequence_length": self.sequence_length,
#             "input_dim": self.input_dim,
#         })
#         return config

In [19]:
# embed_dim = 256
# dense_dim = 2048
# num_heads = 8

# encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
# x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
# encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

# decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
# x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
# x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
# x = Dropout(0.5)(x)
# decoder_outputs = Dense(vocab_size, activation="softmax")(x)
# transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [20]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)



class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = keras.layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
keras.utils.get_custom_objects()['PositionalEmbedding'] = PositionalEmbedding


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)
        proj_output = self.dense_proj(out_1)
        return self.layernorm_2(out_1 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [21]:
embed_dim = 62
latent_dim = 512
num_heads = 2

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])


In [22]:
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [23]:
epochs = 5  # This should be at least 30 for convergence

transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_data, epochs=epochs, validation_data=val_data)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_inputs (InputLayer  [(None, None)]               0         []                            
 )                                                                                                
                                                                                                  


 positional_embedding (Posi  (None, None, 62)             310620    ['encoder_inputs[0][0]']      
 tionalEmbedding)                                                                                 
                                                                                                  
 decoder_inputs (InputLayer  [(None, None)]               0         []                            
 )                                                                                                
                                                                                                  
 transformer_encoder (Trans  (None, None, 62)             95496     ['positional_embedding[0][0]']
 formerEncoder)                                                                                   
                                                                                                  
 model_1 (Functional)        (None, None, 5000)           721116    ['decoder_inputs[0][0]',      
          

<keras.src.callbacks.History at 0x2210e5b8e50>

In [24]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 10

def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(10):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
print(input_sentence)
print(translated)

What time do you want me to be here?
[start] tom se está en la habitación [end]


In [25]:
transformer.save('F:/Projects/Language Translator/model.h5')

  saving_api.save_model(


In [26]:
# import pickle
# pickle.dump(transformer,open('F:/Projects/Language Translator/model.pkl','wb'))

In [27]:
# pip install flask