<a href="https://colab.research.google.com/github/nicolai5965/Transformer_scratch_tensorflow/blob/main/Load_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Import required libraries
import numpy as np
import pandas as pd
import string
import re

# Import Natural Language Toolkit and download required corpora
import nltk
nltk.download('words')
nltk.download('punkt')

# Import Tensorflow and Keras related libraries
import tensorflow as tf
from tensorflow.keras.layers import Layer
from keras.preprocessing.text import Tokenizer

# Import Google Colab library for Google Drive integration
from google.colab import drive
# Mount Google Drive to load the dataset
drive.mount('/content/drive')


[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Package words is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
class TextTokenizer:
    # Initialize the tokenizer
    def __init__(self):
        self.tokenizer = Tokenizer()

    # Fit the tokenizer to the texts and add Start and End of Sentence tokens
    def fit(self, texts):
        self.tokenizer.fit_on_texts(texts)
        self.tokenizer.word_index['<SOS>'] = len(self.tokenizer.word_index) + 1
        self.tokenizer.word_index['<EOS>'] = len(self.tokenizer.word_index) + 1

    # Tokenize the texts
    def tokenize(self, texts):
        return [self._custom_tokenize_text(t) for t in texts]

    # Convert texts to sequences
    def texts_to_sequences(self, texts):
        return self.tokenizer.texts_to_sequences(texts)

    # Custom tokenize text method that also retains punctuation
    def _custom_tokenize_text(self, text):
        return re.findall(r'\b\w+\b|[' + string.punctuation + ']', text)

    # Getter method for word index
    @property
    def word_index(self):
        return self.tokenizer.word_index

    # Getter method for index word
    @property
    def index_word(self):
        return self.tokenizer.index_word

In [3]:
class MultiHeadAttention(Layer):
    def __init__(self, d_model, num_attention_heads, name="multi_head_attention", **kwargs):
        super(MultiHeadAttention, self).__init__(name=name, **kwargs)
        self.num_attention_heads = num_attention_heads
        self.d_model = d_model
        self.scaled_dot_product_attention = ScaledDotProductAttention()

        self.depth = d_model // self.num_attention_heads  # Calculate depth for each head

        # Initializing linear transformation layers
        self.query_lin = tf.keras.layers.Dense(d_model, name=name+"_query_lin")  # Linear transformation layer for queries
        self.key_lin = tf.keras.layers.Dense(d_model, name=name+"_key_lin")  # Linear transformation layer for keys
        self.value_lin = tf.keras.layers.Dense(d_model, name=name+"_value_lin")  # Linear transformation layer for values

        self.final_lin = tf.keras.layers.Dense(d_model, name=name+"_final_lin")  # Linear transformation layer for final output

    # Function to split input into multiple heads
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_attention_heads, self.depth))  # Reshape input tensor to split the last dimension
        return tf.transpose(x, perm=[0, 2, 1, 3])  # Transpose the tensor dimensions

    # Function for forward propagation
    def call(self, v, k, q, mask=None):
        batch_size = tf.shape(q)[0]  # Get the batch size

        # Apply linear transformations
        W_q = self.query_lin(q)  # Linear transformation for queries
        W_k = self.key_lin(k)  # Linear transformation for keys
        W_v = self.value_lin(v)  # Linear transformation for values

        # Apply scaled dot product attention
        q_split = self.split_heads(W_q, batch_size)  # Split queries into multiple heads
        k_split = self.split_heads(W_k, batch_size)  # Split keys into multiple heads
        v_split = self.split_heads(W_v, batch_size)  # Split values into multiple heads

        scaled_attention, attention_weights = self.scaled_dot_product_attention(
            q_split, k_split, v_split, mask)  # Call to scaled dot product attention layer

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # Transpose attention output

        # Reshape and apply final linear transformation
        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model))
        output = self.final_lin(concat_attention)

        return output, attention_weights

    # Function to get layer's configuration
    def get_config(self):
        config = super(MultiHeadAttention, self).get_config()
        config.update({
            'd_model': self.d_model,
            'num_attention_heads': self.num_attention_heads,
            'name': self.name,
        })
        return config

    # Classmethod to create layer from its config
    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [4]:
class LossFunction:
    def __init__(self):
        # Initialize the loss function with specific parameters
        self.loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
            from_logits=True,
            reduction='none'
        )

    def compute(self, real, pred):
        # Exclude padding for the computation of the loss
        mask = tf.math.logical_not(tf.math.equal(real, 0))
        loss_ = self.loss_object(real, pred)

        # Convert mask to the same type as loss
        mask = tf.cast(mask, dtype=loss_.dtype)
        # Apply the mask to the loss
        loss_ *= mask

        # Return the mean loss
        return tf.reduce_sum(loss_) / tf.reduce_sum(mask)

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        # Cast the model size and warmup steps to float32 for later computations
        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = tf.cast(warmup_steps, tf.float32)

    def __call__(self, step):
        # Cast the step to float32
        step = tf.cast(step, tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        # Compute the learning rate as per the formula
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

    def get_config(self):
        # Override the get_config method to include additional arguments
        return {
            "d_model": float(self.d_model.numpy()),
            "warmup_steps": float(self.warmup_steps.numpy())
        }

    @classmethod
    def from_config(cls, config):
        # Override the from_config method to handle custom arguments
        return cls(**config)

In [5]:
model_name = 'Transformer_SentimentAnalysis_15_epochs_0.08_data_20230718_1053_v1_0.09acc' # Transformer_SentimentAnalysis_50_epochs_0.08_data_20230717_0945_v1_0.35acc

# Define the model path
model_path = f'/content/drive/MyDrive/Colab Notebooks/Machine Learning/TensorFlow/Transformer/Transformer_Weight/{model_name}'

loss_function = LossFunction()

model_saved = tf.keras.models.load_model(
    model_path,
    custom_objects={
        "CustomSchedule": CustomSchedule,
        "compute": loss_function.compute,
        "MultiHeadAttention": MultiHeadAttention
    }
)
model_saved.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 decoder_input (InputLayer)     [(None, None)]       0           []                               
                                                                                                  
 encoder_input (InputLayer)     [(None, None)]       0           []                               
                                                                                                  
 decoder_lam_shape (Lambda)     ()                   0           ['decoder_input[0][0]']          
                                                                                                  
 padding_mask_decoder (Lambda)  (None, None)         0           ['decoder_input[0][0]']          
                                                                                              

In [6]:
# Access the first layer of the model
first_layer = model_saved.layers[-1]

# Print out the configuration of the first layer
print(first_layer.get_config())


{'name': 'Transformer_Block', 'trainable': True, 'dtype': 'float32', 'enc_num_layers': 6, 'dec_num_layers': 6, 'd_model': 510, 'enc_num_attention_heads': 6, 'dec_num_attention_heads': 6, 'enc_dim_ff': 800, 'dec_dim_ff': 800, 'input_vocab_size': 25198, 'target_vocab_size': 25198, 'pe_input': 60, 'pe_target': 60, 'enc_rate': 0.14, 'dec_rate': 0.16, 'verbose': False, 'encoder': {'class_name': 'Encoder', 'config': {'name': 'Encoder', 'trainable': True, 'dtype': 'float32', 'num_layers': 6, 'd_model': 510, 'num_attention_heads': 6, 'dim_ff': 800, 'input_vocab_size': 25198, 'max_position': 60, 'rate': 0.14, 'embedding_layer': {'class_name': 'MyEmbeddingLayer', 'config': {'name': 'encoder_embedding', 'trainable': True, 'dtype': 'float32', 'vocab_size': 25198, 'd_model': 510, 'training': True, 'verbose': False}}, 'pos_encoding_layer': {'class_name': 'PositionalEncoding', 'config': {'name': 'encoder_encoding', 'trainable': True, 'dtype': 'float32', 'position': 60, 'd_model': 510}}, 'encoder_laye

In [7]:
import pprint

# Access a layer by name
layer = model_saved.get_layer('Transformer_Block')

# Print out the configuration of the layer
#pprint.pprint(layer.get_config())



In [8]:
# Access a layer by name
layer = model_saved.get_layer('Transformer_Block')

# Get the layer configuration
config = layer.get_config()

# Get the encoder's and decoder's configuration
encoder_config = config['encoder']['config']
decoder_config = config['decoder']['config']

# Convert these configurations into pandas DataFrames
encoder_config_df = pd.DataFrame.from_dict(encoder_config, orient='index', columns=['Value'])
decoder_config_df = pd.DataFrame.from_dict(decoder_config, orient='index', columns=['Value'])

# Print the DataFrames
print("Encoder Configuration:")
print(encoder_config_df)
print("\nDecoder Configuration:")
print(decoder_config_df)


Encoder Configuration:
                                                                 Value
name                                                           Encoder
trainable                                                         True
dtype                                                          float32
num_layers                                                           6
d_model                                                            510
num_attention_heads                                                  6
dim_ff                                                             800
input_vocab_size                                                 25198
max_position                                                        60
rate                                                              0.14
embedding_layer      {'class_name': 'MyEmbeddingLayer', 'config': {...
pos_encoding_layer   {'class_name': 'PositionalEncoding', 'config':...
encoder_layers       [{'class_name': 'EncoderLayer', '

In [9]:
# Get the encoder's configuration
encoder_config = config['encoder']['config']

# Extract the encoder layers
encoder_layers_config = encoder_config['encoder_layers']

# For each layer in the encoder layers, convert its configuration into a DataFrame and print it
for i, layer_config in enumerate(encoder_layers_config):
    layer_config_df = pd.DataFrame.from_dict(layer_config['config'], orient='index', columns=['Value'])
    print(f"Encoder Layer {i + 1} Configuration:")
    print(layer_config_df)
    print("\n")

Encoder Layer 1 Configuration:
                                                                 Value
name                                                   encoder_layer_1
trainable                                                         True
dtype                                                          float32
d_model                                                            510
num_attention_heads                                                  6
dim_ff                                                             800
rate                                                              0.14
mha                  {'class_name': 'MultiHeadAttention', 'config':...
ffn                  {'class_name': 'PointWiseFeedForwardNetwork', ...
norm_and_add1        {'class_name': 'NormAndAdd', 'config': {'name'...
norm_and_add2        {'class_name': 'NormAndAdd', 'config': {'name'...


Encoder Layer 2 Configuration:
                                                                 Value
name         

# Loading tokenizer

In [10]:
import pickle
tokenizer_path = f'/content/drive/MyDrive/Colab Notebooks/Machine Learning/TensorFlow/Transformer/Transformer_Weight/{model_name}_tokenizer.pkl'
def load_tokenizer(path):
    # Load the tokenizer using pickle
    with open(path, 'rb') as f:
        tokenizer = pickle.load(f)
    return tokenizer

tokenizer = load_tokenizer(tokenizer_path)

# Text generator

In [11]:

class StochasticBeamSearch:
    def __init__(self, model, tokenizer, beam_size=3):
        self.model = model
        self.tokenizer = tokenizer
        self.beam_size = beam_size

    def decode_sequence(self, sequence):
        index_to_word = dict((i, word) for word, i in self.tokenizer.word_index.items())
        # Exclude the '<EOS>' token in the final output
        return ' '.join(index_to_word.get(token, '?') for token in sequence if index_to_word.get(token, '?') != '<EOS>')

    def predict(self, start_sentence, max_length=10, temperature=1.0, repetition_penalty=0.5):
        start_tokens = self.tokenizer.texts_to_sequences([start_sentence])
        start_tokens = np.squeeze(start_tokens, axis=0)

        # Initialize beam with the start tokens
        beam = [(start_tokens, 0.0)]  # each element in the beam is (token_sequence, log_probability)

        for _ in range(max_length):
            all_candidates = []

            for tokens, log_prob in beam:
                # Predict next tokens for all current sequences in the beam
                tokens = np.expand_dims(tokens, axis=0)
                predictions = self.model.predict([tokens, tokens], verbose=0)

                # Select the last token from predictions
                predictions = predictions[0, -1, :]

                # Apply temperature scaling
                predictions /= temperature

                # Get top k tokens and probabilities
                top_k_probs, top_k_tokens = tf.math.top_k(predictions, k=self.beam_size)

                # Form next candidates by adding new tokens to current sequences
                for k in range(self.beam_size):
                    if tokens[0][-1] != top_k_tokens[k]:
                        updated_tokens = np.append(tokens, top_k_tokens[k])
                        epsilon = 1e-9  # Small constant
                        updated_log_prob = log_prob + np.log(top_k_probs[k].numpy() + epsilon)

                        # Subtract repetition penalty for each repeated token
                        token_counts = np.bincount(updated_tokens)
                        repeated_token_count = len(token_counts[token_counts > 1])
                        updated_log_prob -= repetition_penalty * repeated_token_count

                        all_candidates.append((updated_tokens, updated_log_prob))

            # Select new beam probabilistically
            beam_probs = np.array([c[1] for c in all_candidates])
            beam_probs = np.exp(beam_probs)  # Convert from log probabilities to probabilities

            # Check for NaN values
            if np.isnan(beam_probs).any():
                print("NaN values detected in beam_probs. Replacing with uniform probabilities.")
                beam_probs = np.ones_like(beam_probs) / len(beam_probs)

            # Make sure the probabilities sum up to 1
            beam_probs = beam_probs / np.sum(beam_probs)
            beam_indices = np.random.choice(range(len(beam_probs)), size=self.beam_size, p=beam_probs)

            beam = [all_candidates[i] for i in beam_indices]

            # Select the sequence with the highest probability from the final beam
            tokens, _ = max(beam, key=lambda x: x[1])

            # Check for EOS token and stop if found
            if self.tokenizer.word_index['<EOS>'] in tokens:
                break

        # Decode tokens into text and return
        return self.decode_sequence(tokens)


In [12]:
beam_search = StochasticBeamSearch(model_saved, tokenizer, beam_size=6)
text = beam_search.predict("the sun dipped below the horizon casting a ", 50, temperature=1.3, repetition_penalty=0.3)
print(text)




the sun below the horizon casting a
