In [None]:
!pip install keras_nlp

In [None]:
import pickle

import tensorflow as tf
import keras_nlp
import numpy as np
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split
import pandas as pd

np.random.seed(2)
tokenizer = AutoTokenizer.from_pretrained('t5-base', bos_token="<start>")


class TransformerEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, max_seq_len):
        super(TransformerEmbedding, self).__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.positional_encoding = keras_nlp.layers.SinePositionEncoding()

    def call(self, sequences):
        embeddings = self.embedding(sequences)
        positional_encoding = self.positional_encoding(embeddings)
        outputs = embeddings + positional_encoding
        return outputs

    def get_weights(self):
        return self.embedding.get_weights()

    def set_weights(self, weights):
        self.embedding.set_weights(weights)


class FeedForward(tf.keras.layers.Layer):
    def __init__(self, dModel):
        super(FeedForward, self).__init__()
        self.l1 = tf.keras.layers.Dense(dModel * 4, activation='relu')
        self.l2 = tf.keras.layers.Dense(dModel)

    def call(self, x, *args, **kwargs):
        x = self.l1(x)
        x = self.l2(x)
        return x

    def get_weights(self):
        return self.l1.get_weights() + self.l2.get_weights()

    def set_weights(self, weights):
        l1_weights = weights[:2]
        l2_weights = weights[2:]
        self.l1.set_weights(l1_weights)
        self.l2.set_weights(l2_weights)


class EncoderBlock(tf.keras.layers.Layer):
    def __init__(self, dModel, num_heads):
        super(EncoderBlock, self).__init__()
        self.dModel = dModel
        self.num_heads = num_heads
        self.MhA = tf.keras.layers.MultiHeadAttention(num_heads, dModel // num_heads)
        self.LayerNorm1 = tf.keras.layers.LayerNormalization()
        self.LayerNorm2 = tf.keras.layers.LayerNormalization()
        self.Add = tf.keras.layers.Add()
        self.FeedForward = FeedForward(dModel)

    def call(self, x, *args, **kwargs):
        skip = x
        x = self.MhA(key=x, query=x, value=x)
        x = self.Add([x, skip])
        x = self.LayerNorm1(x)
        skip = x
        x = self.FeedForward(x)
        x = self.Add([x, skip])
        x = self.LayerNorm2(x)
        return x

    def get_weights(self):
        mha_weights=self.MhA.get_weights()

        weights= mha_weights+ self.LayerNorm1.get_weights() + \
            self.LayerNorm2.get_weights() + self.FeedForward.get_weights()
        return weights

    def set_weights(self, weights):
        mhA_weights = weights[:8]
        lNorm1_weights = weights[8:10]
        lNorm2_weights = weights[10:12]
        ff_weights = weights[12:]
        self.MhA.set_weights(mhA_weights)
        self.LayerNorm1.set_weights(lNorm1_weights)
        self.LayerNorm2.set_weights(lNorm2_weights)
        self.FeedForward.set_weights(ff_weights)


class DecoderBlock(tf.keras.layers.Layer):
    def __init__(self, dModel, num_heads):
        super(DecoderBlock, self).__init__()
        self.dModel = dModel
        self.num_heads = num_heads
        self.MhA = tf.keras.layers.MultiHeadAttention(num_heads, dModel // num_heads)
        self.MMhA= tf.keras.layers.MultiHeadAttention(num_heads, dModel // num_heads)
        self.LayerNorm1 = tf.keras.layers.LayerNormalization()
        self.LayerNorm2 = tf.keras.layers.LayerNormalization()
        self.LayerNorm3 = tf.keras.layers.LayerNormalization()
        self.Add = tf.keras.layers.Add()
        self.FeedForward = FeedForward(dModel)

    def call(self, x, encoder_out, *args, **kwargs):
        skip = x
        x = self.MMhA(query=x, value=x, key=x, use_causal_mask=True)
        x = self.Add([x, skip])
        x = self.LayerNorm1(x)
        skip = x
        x = self.MhA(query=x, key=encoder_out, value=encoder_out)
        x = self.Add([x, skip])
        x = self.LayerNorm2(x)
        skip = x
        x = self.FeedForward(x)
        x = self.Add([x, skip])
        x = self.LayerNorm3(x)
        return x

    def get_weights(self):
        return self.MMhA.get_weights() + self.MhA.get_weights() + \
               self.LayerNorm1.get_weights() + self.LayerNorm2.get_weights() + \
               self.LayerNorm3.get_weights() + self.FeedForward.get_weights()

    def set_weights(self, weights):
        mmhA_weights = weights[:8]
        mhA_weights = weights[8:16]
        lNorm1_weights = weights[16:18]
        lNorm2_weights = weights[18:20]
        lNorm3_weights = weights[20:22]
        ff_weights = weights[22:]
        self.MMhA.set_weights(mmhA_weights)
        self.MhA.set_weights(mhA_weights)
        self.LayerNorm1.set_weights(lNorm1_weights)
        self.LayerNorm2.set_weights(lNorm2_weights)
        self.LayerNorm3.set_weights(lNorm3_weights)
        self.FeedForward.set_weights(ff_weights)


class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_blocks, vocab_size, dModel, num_heads):
        super(Decoder, self).__init__()
        self.num_blocks = num_blocks
        self.blocks = [DecoderBlock(dModel, num_heads) for _ in range(num_blocks)]
        self.linear= tf.keras.layers.Dense(vocab_size,activation='softmax')

    def call(self, x, encoder_out ,*args, **kwargs):
        for block in self.blocks:
            x = block(x, encoder_out)
        x = self.linear(x)
        return x

    def get_weights(self):
        weights = []
        for block in self.blocks:
            weights.extend(block.get_weights())
        return weights

    def set_weights(self, weights):
        for block in self.blocks:
            block.set_weights(weights[:len(block.get_weights())])
            weights = weights[len(block.get_weights()):]


class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_blocks, dModel, num_heads):
        super(Encoder, self).__init__()
        self.num_blocks = num_blocks
        self.blocks = [EncoderBlock(dModel, num_heads) for _ in range(num_blocks)]

    def call(self, x, *args, **kwargs):
        for block in self.blocks:
            x = block(x)
        return x

    def get_weights(self):
        weights = []
        for block in self.blocks:
            weights.extend(block.get_weights())
        return weights

    def set_weights(self, weights):
        for block in self.blocks:
            block.set_weights(weights[:len(block.get_weights())])
            weights = weights[len(block.get_weights()):]


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

In [None]:

class Transformer(tf.keras.Model):
    def __init__(self, embedding_layer, encoder, decoder, optimizer):
        super(Transformer, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.embedding_layer = embedding_layer
        self.optimizer = optimizer

    def save(self, filepath):
        weights = self.get_weights()
        with open(filepath, 'wb') as f:
            pickle.dump(weights, f)

    def load(self, filepath):
        with open(filepath, 'rb') as f:
            weights = pickle.load(f)
        self.set_weights(weights)

    def get_weights(self):
        encoder_weights = self.encoder.get_weights()
        decoder_weights = self.decoder.get_weights()
        embedding_weights = self.embedding_layer.get_weights()
        return encoder_weights, decoder_weights, embedding_weights

    def set_weights(self, weights):
        encoder_weights, decoder_weights, embedding_weights = weights
        self.encoder.set_weights(encoder_weights)
        self.decoder.set_weights(decoder_weights)
        self.embedding_layer.set_weights(embedding_weights)
    def compute_loss(self, targets, predictions):
        targets_flat = tf.reshape(targets, [-1])
        predictions_flat = tf.reshape(predictions, [-1, tf.shape(predictions)[-1]])
        targets_one_hot = tf.one_hot(targets_flat, depth=predictions_flat.shape[-1])
        print(f"prediction shape before loss {predictions_flat.shape} , target shape before loss {targets_one_hot.shape}")
        loss = tf.keras.losses.categorical_crossentropy(targets_one_hot, predictions_flat, from_logits=True)
        loss = tf.reduce_mean(loss)
        return loss

    def train_step(self, eng_tokens, fr_tokens):
        eng_batch = self.embedding_layer(eng_tokens)
        fr_batch = self.embedding_layer(fr_tokens)

        with tf.GradientTape() as tape:
            encoder_out = self.encoder(eng_batch)
            decoder_out = self.decoder(fr_batch, encoder_out)
            loss = self.compute_loss(fr_tokens[:, 1:], decoder_out[:, :-1])  # Ignore <BOS> token in targets

        gradients = tape.gradient(loss, self.encoder.trainable_variables + self.decoder.trainable_variables)
        self.optimizer.apply_gradients(
            zip(gradients, self.encoder.trainable_variables + self.decoder.trainable_variables))

        return loss

    def fit(self, train_df, test_df, num_epochs, batch_size):
        train_english_sentences = train_df["English"].values
        train_french_sentences = train_df["French"].values

        for epoch in range(num_epochs):
            epoch_loss = 0
            num_batches = len(train_english_sentences) // batch_size
            for i in range(1, int(train_english_sentences.shape[0] / batch_size)):
                eng = train_english_sentences[batch_size * i:batch_size * (i + 1)]
                fr = train_french_sentences[batch_size * i:batch_size * (i + 1)]

                eng_token = np.array(tokenizer(list(eng), padding=True)['input_ids'])
                fr_token = np.array(tokenizer(list(fr), padding=True)['input_ids'])
                fr_token = np.insert(fr_token, 0, tokenizer.bos_token_id, axis=1)

                loss = self.train_step(eng_token, fr_token)
                epoch_loss += loss

                print(f"Batch {i}/{num_batches} Loss: {loss:.4f} Epoch {epoch + 1}")

            model.save(f'weights_run2_{epoch}.pkl')
            print(f"Epoch {epoch + 1}, Loss: {epoch_loss / num_batches:.4f}")

            test_loss = self.evaluate(test_df, batch_size)
            print(f"Validation Loss: {test_loss:.4f}")

    def evaluate(self, test_df, batch_size):
        test_english_sentences = test_df["English"].values
        test_french_sentences = test_df["French"].values
        total_loss = 0

        num_batches = len(test_english_sentences) // batch_size
        for i in range(1, int(test_english_sentences.shape[0] / batch_size)):
            eng = test_english_sentences[batch_size * i:batch_size * (i + 1)]
            fr = test_french_sentences[batch_size * i:batch_size * (i + 1)]

            eng_token = np.array(tokenizer(list(eng), padding=True)['input_ids'])
            fr_token = np.array(tokenizer(list(fr), padding=True)['input_ids'])
            fr_token = np.insert(fr_token, 0, tokenizer.bos_token_id, axis=1)

            loss = self.compute_loss(fr_token[:, 1:], self.decoder(self.embedding_layer(fr_token),
                                                                   self.encoder(self.embedding_layer(eng_token)))[:,
                                                      :-1])
            total_loss += loss

        return total_loss / num_batches

    def inference(self, input_text, max_length=50):
        input_tokens = np.array(tokenizer([input_text], padding=True)['input_ids'])

        output_tokens = np.array([[tokenizer.bos_token_id]])

        for _ in range(max_length):
            input_embeddings = self.embedding_layer(input_tokens)
            output_embeddings = self.embedding_layer(output_tokens)

            encoder_out = self.encoder(input_embeddings)

            decoder_out = self.decoder(output_embeddings, encoder_out)

            last_token_logits = decoder_out[:, -1, :]

            next_token_id = tf.argmax(last_token_logits, axis=-1)

            output_tokens = np.concatenate([output_tokens, next_token_id[:, tf.newaxis]], axis=-1)

            if next_token_id[0] == tokenizer.eos_token_id:
                print('GOT EOS TOKEN')
                break
        output_text = tokenizer.decode(output_tokens[0])

        return output_text

In [None]:
df = pd.read_csv("eng_fr.csv", header=None, names=["English", "French"])
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

embedding_layer = TransformerEmbedding(tokenizer.vocab_size + 1, 128, 60)
encoder = Encoder(dModel=128, num_blocks=6, num_heads=4)
decoder= Decoder(vocab_size=tokenizer.vocab_size + 1, dModel= 128, num_heads=4, num_blocks=3)
model = Transformer(embedding_layer, encoder, decoder,optimizer=tf.keras.optimizers.Adam(learning_rate=0.005))

#test...

response=model.inference("Hello! How are you?")
print(response)


model.fit(train_df, test_df, 3, )



<start> patches patches patches acid tablespoon Development Development Development Developmentmustermuster Immobilien Immobilien Immobilien Immobilien Immobilien Williams Immobilien Immobilien electronically electronically Immobilien Immobilien governor governor rebelfaţafaţafaţafaţa Granite GranitefrequentPSTicketgutgutgut unabhängig unabhängig Mark nie nie Reform Reform Reform Reform Reform Reform Reform
prediction shape before loss (2112, 32101) , target shape before loss (2112, 32101)
Batch 1/2195 Loss: 10.3767 Epoch 1
prediction shape before loss (1408, 32101) , target shape before loss (1408, 32101)
Batch 2/2195 Loss: 10.3766 Epoch 1
prediction shape before loss (2048, 32101) , target shape before loss (2048, 32101)
Batch 3/2195 Loss: 10.3767 Epoch 1
prediction shape before loss (2176, 32101) , target shape before loss (2176, 32101)
Batch 4/2195 Loss: 10.3767 Epoch 1
prediction shape before loss (1792, 32101) , target shape before loss (1792, 32101)
Batch 5/2195 Loss: 10.3767 Ep

KeyboardInterrupt: 

In [None]:
#Alternative Transformer class for question answering tasks

class Transformer(tf.keras.Model):
    def __init__(self, embedding_layer, encoder, decoder, optimizer):
        super(Transformer, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.embedding_layer = embedding_layer
        self.optimizer = optimizer

    def save(self, filepath):
        weights = self.get_weights()
        with open(filepath, 'wb') as f:
            pickle.dump(weights, f)

    def load(self, filepath):
        with open(filepath, 'rb') as f:
            weights = pickle.load(f)
        self.set_weights(weights)

    def get_weights(self):
        encoder_weights = self.encoder.get_weights()
        decoder_weights = self.decoder.get_weights()
        embedding_weights = self.embedding_layer.get_weights()
        return encoder_weights, decoder_weights, embedding_weights

    def set_weights(self, weights):
        encoder_weights, decoder_weights, embedding_weights = weights
        self.encoder.set_weights(encoder_weights)
        self.decoder.set_weights(decoder_weights)
        self.embedding_layer.set_weights(embedding_weights)

    def compute_loss(self, targets, predictions):


        max_length = max(tf.shape(targets)[1], tf.shape(predictions)[1])
        pad_targets = tf.pad(targets, [[0, 0], [0, max_length - tf.shape(targets)[1]]])
        pad_predictions = tf.pad(predictions, [[0, 0], [0, max_length - tf.shape(predictions)[1]], [0, 0]])



        targets_flat = tf.reshape(pad_targets, [-1])
        predictions_flat = tf.reshape(pad_predictions, [-1, tf.shape(pad_predictions)[-1]])
        targets_one_hot = tf.one_hot(targets_flat, depth=predictions_flat.shape[-1])
        loss = tf.keras.losses.categorical_crossentropy(targets_one_hot, predictions_flat, from_logits=True)
        loss = tf.reduce_mean(loss)
        return loss


    def train_step(self, context_tokens, question_tokens, answer_tokens):
        context_batch = self.embedding_layer(context_tokens)
        question_batch = self.embedding_layer(question_tokens)

        with tf.GradientTape() as tape:
            encoder_out = self.encoder(context_batch)
            decoder_out = self.decoder(question_batch, encoder_out)

            loss = self.compute_loss(answer_tokens[:, 1:], decoder_out[:, :-1])  # Ignore <BOS> token in targets

        gradients = tape.gradient(loss, self.encoder.trainable_variables + self.decoder.trainable_variables)
        self.optimizer.apply_gradients(
            zip(gradients, self.encoder.trainable_variables + self.decoder.trainable_variables))

        return loss

    def fit(self, train_data, test_data, num_epochs, batch_size):
        num_batches = len(train_data['context']) // batch_size
        for epoch in range(num_epochs):
            epoch_loss = 0
            for i in range(num_batches):
                context_batch = train_data['context'][i * batch_size:(i + 1) * batch_size]
                question_batch = train_data['question'][i * batch_size:(i + 1) * batch_size]
                answer_batch = train_data['answer'][i * batch_size:(i + 1) * batch_size]

                context_tokens = np.array(tokenizer(list(context_batch), padding=True)['input_ids'])
                question_tokens = np.array(tokenizer(list(question_batch), padding=True)['input_ids'])
                answer_tokens = np.array(tokenizer(list(answer_batch), padding=True)['input_ids'])
                answer_tokens = np.insert(answer_tokens, 0, tokenizer.bos_token_id, axis=1)

                loss = self.train_step(context_tokens, question_tokens, answer_tokens)
                epoch_loss += loss

                print(f"Batch {i + 1}/{num_batches} Loss: {loss:.4f} Epoch {epoch + 1}")

            avg_epoch_loss = epoch_loss / num_batches if num_batches > 0 else epoch_loss
            self.save(f'weights_run2_{epoch}.pkl')
            print(f"Epoch {epoch + 1}, Loss: {avg_epoch_loss:.4f}")

            test_loss = self.evaluate(test_data, batch_size)
            print(f"Validation Loss: {test_loss:.4f}")

    def evaluate(self, test_data, batch_size):
        total_loss = 0
        num_batches = len(test_data) // batch_size
        for i in range(num_batches):
            context_batch = test_data[i * batch_size:(i + 1) * batch_size]['context']
            question_batch = test_data[i * batch_size:(i + 1) * batch_size]['question']
            answer_batch = test_data[i * batch_size:(i + 1) * batch_size]['answer']

            context_tokens = np.array(tokenizer(list(context_batch), padding=True)['input_ids'])
            question_tokens = np.array(tokenizer(list(question_batch), padding=True)['input_ids'])
            answer_tokens = np.array(tokenizer(list(answer_batch), padding=True)['input_ids'])
            answer_tokens = np.insert(answer_tokens, 0, tokenizer.bos_token_id, axis=1)

            loss = self.compute_loss(answer_tokens[:, 1:], self.decoder(self.embedding_layer(answer_tokens),
                                                                       self.encoder(self.embedding_layer(context_tokens)))[:, :-1])
            total_loss += loss
        if(num_batches==0):
          num_batches=1
        return total_loss / 1

    def inference(self, context_text, question_text, max_length=50):
        context_tokens = np.array(tokenizer([context_text], padding=True)['input_ids'])
        question_tokens = np.array(tokenizer([question_text], padding=True)['input_ids'])

        output_tokens = np.array([[tokenizer.bos_token_id]])

        for _ in range(max_length):
            context_embeddings = self.embedding_layer(context_tokens)
            question_embeddings = self.embedding_layer(question_tokens)
            output_embeddings = self.embedding_layer(output_tokens)

            encoder_out = self.encoder(context_embeddings)

            decoder_out = self.decoder(output_embeddings, encoder_out)

            last_token_logits = decoder_out[:, -1, :]

            next_token_id = tf.argmax(last_token_logits, axis=-1)

            output_tokens = np.concatenate([output_tokens, next_token_id[:, tf.newaxis]], axis=-1)

            if next_token_id[0] == tokenizer.eos_token_id:
                print('GOT EOS TOKEN')
                break

        output_text = tokenizer.decode(output_tokens[0])
        return output_text