<a href="https://colab.research.google.com/github/luiscunhacsc/udemy-ai-en/blob/main/Transformer2017KerasNLP_MostAdvaanced.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# Step 1: Uninstall existing versions if necessary
#!pip uninstall -y tensorflow keras keras_nlp


# Step 2: Install compatible versions of TensorFlow and KerasNLP
#!pip install tensorflow==2.15 keras_nlp==0.3.0

In [9]:

import os
import tensorflow as tf
import keras_nlp
import numpy as np
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras import layers, Model
from tensorflow.keras.callbacks import Callback

# Step 3: Download the dataset
path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip',
    origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

path_to_file = os.path.join(os.path.dirname(path_to_zip), 'spa-eng', 'spa.txt')

# Step 4: Load and prepare the dataset
with open(path_to_file, 'r', encoding='utf-8') as f:
    lines = f.read().split('\n')

# Split the lines into English and Spanish pairs
text_pairs = []
for line in lines:
    if '\t' in line:
        english, spanish = line.split('\t')
        text_pairs.append((english, spanish))

# Convert to numpy arrays for easier handling
english_texts, spanish_texts = zip(*text_pairs)
english_texts = np.array(english_texts)
spanish_texts = np.array(spanish_texts)

# Step 5: Tokenize and Vectorize the data
vocab_size = 15000
sequence_length = 20

# Vectorize English text
english_vectorizer = TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length)

english_vectorizer.adapt(english_texts)

# Vectorize Spanish text
spanish_vectorizer = TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length + 1)

spanish_vectorizer.adapt(spanish_texts)

def vectorize_text(eng, spa):
    eng = english_vectorizer(eng)
    spa = spanish_vectorizer(spa)
    return ({"encoder_inputs": eng, "decoder_inputs": spa[:, :-1]}, spa[:, 1:])

dataset = tf.data.Dataset.from_tensor_slices((english_texts, spanish_texts))
dataset = dataset.batch(64)
dataset = dataset.map(vectorize_text)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(list(dataset)))
train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size)

# Step 6: Build the Transformer Model

def build_encoder(num_layers, d_model, num_heads, dff, input_vocab_size, dropout_rate):
    inputs = tf.keras.Input(shape=(None,), dtype=tf.int64, name='encoder_inputs')
    x = keras_nlp.layers.TokenAndPositionEmbedding(
        vocabulary_size=input_vocab_size,
        sequence_length=sequence_length,
        embedding_dim=d_model
    )(inputs)

    for _ in range(num_layers):
        x = keras_nlp.layers.TransformerEncoder(
            intermediate_dim=dff,
            num_heads=num_heads,
            dropout=dropout_rate
        )(x)

    return tf.keras.Model(inputs, x, name='encoder')

def build_decoder(num_layers, d_model, num_heads, dff, target_vocab_size, dropout_rate):
    inputs = tf.keras.Input(shape=(None,), dtype=tf.int64, name='decoder_inputs')
    enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')

    x = keras_nlp.layers.TokenAndPositionEmbedding(
        vocabulary_size=target_vocab_size,
        sequence_length=sequence_length,
        embedding_dim=d_model
    )(inputs)

    for _ in range(num_layers):
        x = keras_nlp.layers.TransformerDecoder(
            intermediate_dim=dff,
            num_heads=num_heads,
            dropout=dropout_rate
        )(decoder_sequence=x, encoder_sequence=enc_outputs)

    outputs = layers.Dense(target_vocab_size, activation='softmax')(x)
    return tf.keras.Model([inputs, enc_outputs], outputs, name='decoder')

num_layers = 4
d_model = 128
num_heads = 8
dff = 512
dropout_rate = 0.1

encoder = build_encoder(num_layers, d_model, num_heads, dff, vocab_size, dropout_rate)
decoder = build_decoder(num_layers, d_model, num_heads, dff, vocab_size, dropout_rate)

# Define the input layers
encoder_inputs = encoder.input
decoder_inputs = decoder.input[0]
decoder_outputs = decoder([decoder_inputs, encoder.output])

# Define the model
model = Model(inputs=[encoder_inputs, decoder_inputs], outputs=decoder_outputs)

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Custom callback to display translations after each epoch
class TranslationCallback(Callback):
    def __init__(self, model, val_data, num_examples=5):
        self.model = model
        self.val_data = val_data.take(num_examples)
        self.english_texts = [ex[0]['encoder_inputs'].numpy() for ex in self.val_data]
        self.spanish_texts = [ex[1].numpy() for ex in self.val_data]
        self.index_to_word = {i: word for i, word in enumerate(spanish_vectorizer.get_vocabulary())}

    def on_epoch_end(self, epoch, logs=None):
        print(f"\nEpoch {epoch+1} translations:")
        for i in range(len(self.english_texts)):
            english_sentence = self.english_texts[i]
            spanish_sentence = self.spanish_texts[i]
            translated_sentence = self.translate_sentence(english_sentence)
            print(f"English: {' '.join([english_vectorizer.get_vocabulary()[index] for index in english_sentence if index > 0])}")
            print(f"Target Spanish: {' '.join([self.index_to_word[index] for index in spanish_sentence if index > 0])}")
            print(f"Predicted Spanish: {' '.join(translated_sentence)}\n")

    def translate_sentence(self, sentence):
        encoder_input = tf.expand_dims(sentence, axis=0)
        decoder_input = tf.expand_dims([spanish_vectorizer('<start>')], 0)
        output_sentence = []

        for i in range(sequence_length):
            predictions = self.model([encoder_input, decoder_input], training=False)
            predicted_id = tf.argmax(predictions[0, -1, :]).numpy()
            output_sentence.append(predicted_id)

            # Stop prediction if end token is predicted
            if predicted_id == spanish_vectorizer('<end>'):
                break

            decoder_input = tf.concat([decoder_input, tf.expand_dims([predicted_id], 0)], axis=-1)

        return [self.index_to_word[id] for id in output_sentence if id > 0 and id != spanish_vectorizer('<end>')]

# Step 7: Train the model with the callback
translation_callback = TranslationCallback(model, val_dataset, num_examples=5)

model.fit(train_dataset, epochs=20, validation_data=val_dataset, callbacks=[translation_callback])

# Summary of the model
model.summary()

Epoch 1/20
Epoch 1 translations:


InvalidArgumentError: Exception encountered when calling layer 'query' (type EinsumDense).

{{function_node __wrapped__Einsum_N_2_device_/job:localhost/replica:0/task:0/device:GPU:0}} Expected input 0 to have rank 3 but got: 4 [Op:Einsum] name: 

Call arguments received by layer 'query' (type EinsumDense):
  • inputs=tf.Tensor(shape=(1, 64, 20, 128), dtype=float32)