In [36]:
import tensorflow as tf
import keras
from keras import layers
from keras.utils import register_keras_serializable
from keras.layers import Layer, Embedding, Input, Dense, TextVectorization
from keras.models import Model
from keras.layers import MultiHeadAttention, LayerNormalization, Dropout
from keras.callbacks import ModelCheckpoint

import numpy as np
import os
import string
import re
import random


print(f'Tensorflow Version : {tf.__version__}')
print(f'Keras Version : {keras.__version__}')

Tensorflow Version : 2.18.0
Keras Version : 3.8.0


In [37]:
# Parameters
vocab_size = 5000  # Vocabulary size
max_len = 20  # Maximum length of input sequences
d_model = 128  # Dimension of the model
num_heads = 2  # Number of attention heads
ff_dim = 512  # Dimension of the feed-forward layer
num_blocks = 2  # Number of transformer blocks
dropout = 0.1  # Dropout rate

batch_size = 64

In [38]:
filename = './data/clearned_corpus.txt'
with open(filename, 'r') as f:
    lines = f.readlines()


# Define a custom standardization function to add a special token
def custom_standardization(input_string):
    # Add a special token at the beginning of each sentence
    return tf.strings.join([input_string, '[end]'])

vectorization = TextVectorization(
    max_tokens=vocab_size, 
    output_mode="int", 
    output_sequence_length=max_len,
    standardize=None
    )
vectorization.adapt(lines)
print('Vocab Size:', len(vectorization.get_vocabulary()))
vocab = [str(x) for x in vectorization.get_vocabulary()]

text_pairs = []
for line in lines:
    _split = line.split()

    x, y = (' '.join(_split[:-1]), ' '.join(_split[1:]))
    text_pairs.append((x, y))

#random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

Vocab Size: 5000


In [39]:
import random
pair = random.choice(text_pairs)
print(f'length: {len(pair[0].split())} - text: [{pair[0]}] - vector:\n{vectorization(pair[0])}')
print(f'length: {len(pair[1].split())} - text: [{pair[1]}] - vector:\n{vectorization(pair[1])}')

length: 20 - text: [ainda em criança d afonso casou com a princesa isabel de aragão filha mais velha dos reis católicos isabel i] - vector:
[ 227    9 4928   59   57    1   16    4 3036 1004    2  266 1718   31
  842   18  615 1923 1004  144]
length: 20 - text: [em criança d afonso casou com a princesa isabel de aragão filha mais velha dos reis católicos isabel i [end]] - vector:
[   9 4928   59   57    1   16    4 3036 1004    2  266 1718   31  842
   18  615 1923 1004  144    3]


In [43]:
def format_dataset(x, y):
    x = vectorization(x)
    y = vectorization(y)
    return x, y 


def make_dataset(pairs):
    x, y = zip(*pairs)
    x = list(x)
    y = list(y)
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset.shuffle(2048).prefetch(tf.data.AUTOTUNE).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [65]:
if False:
    for text in train_ds.take(1).cache():
        print(text)

(<tf.Tensor: shape=(1, 20), dtype=int64, numpy=
array([[  15,   20,   24,  155, 3079,    5,  351,   28,    4,    1,    5,
         334, 2470,    1,    4,    1, 4656,    1,    8,    4]])>, <tf.Tensor: shape=(1, 20), dtype=int64, numpy=
array([[  20,   24,  155, 3079,    5,  351,   28,    4,    1,    5,  334,
        2470,    1,    4,    1, 4656,    1,    8,    4,    3]])>)


2025-01-30 12:23:34.412133: W tensorflow/core/kernels/data/cache_dataset_ops.cc:914] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


In [10]:
@register_keras_serializable()
class PositionalEmbedding(Layer):
    def __init__(self, vocab_size, d_model, max_len, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.max_len = max_len
        self.token_emb = Embedding(input_dim=vocab_size, output_dim=d_model)
        self.pos_emb = Embedding(input_dim=max_len, output_dim=d_model)
        

    def call(self, x):
        max_len = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=max_len, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions
    
    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            'vocab_size': self.vocab_size,
            'd_model': self.d_model,
            'max_len': self.max_len
        })
        return config    

In [11]:
def transformer_block(inputs, head_size, num_heads, ff_dim, dropout=0):
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads)(inputs, inputs)
    x = Dropout(dropout)(x)
    x = LayerNormalization(epsilon=1e-6)(x)
    res = x + inputs

    x = Dense(ff_dim, activation="relu")(res)
    x = Dropout(dropout)(x)
    x = Dense(inputs.shape[-1])(x)
    x = LayerNormalization(epsilon=1e-6)(x)
    return x + res

def build_model(vocab_size, max_len, d_model, num_heads, ff_dim, num_blocks, dropout=0):
    inputs = Input(shape=(max_len,))
    x = PositionalEmbedding(vocab_size, d_model, max_len)(inputs)
    for _ in range(num_blocks):
        x = transformer_block(x, d_model, num_heads, ff_dim, dropout)
    outputs = Dense(vocab_size, activation="softmax")(x)
    return Model(inputs, outputs)

In [None]:
# Build and compile the model
model = build_model(vocab_size, max_len, d_model, num_heads, ff_dim, num_blocks, dropout)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

# Summary of the model
model.summary()

In [13]:
class TextGeneration(tf.keras.callbacks.Callback):
    def __init__(self, tokenizer, start_string, max_length=100, temperature=1.0, model = None):
        super(TextGeneration, self).__init__()
        self.tokenizer = tokenizer
        self.start_string = start_string
        self.max_length = max_length
        self.temperature = temperature
        self.vocab = self.tokenizer.get_vocabulary()
        self.index_word = dict(zip(range(len(self.vocab)), self.vocab))
        self.m = model

    def on_epoch_end(self, epoch, logs=None):
        print(f'\nGenerating text after epoch: {epoch + 1}')
        print(self.generate_text())

    def generate_text(self):
        input_eval = self.tokenizer(self.start_string)
        input_eval = tf.expand_dims(input_eval, 0)
        text_generated = []
        #self.model.reset_states()
        for i in range(self.max_length):
            if self.m is None:
                predictions = self.model(input_eval)
            else:
                predictions = self.m(input_eval)

            predictions = tf.squeeze(predictions, 0)

            predictions = predictions / self.temperature
            predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()

            input_eval = tf.expand_dims([predicted_id], 0)
            word_predicted = self.index_word[predicted_id]
            if word_predicted == '[end]':
                break
            text_generated.append(word_predicted)

        return self.start_string + ' ' + ' '.join(text_generated)

In [None]:
model_filename = './nano_gpt_by_marcelo'

if os.path.exists(model_filename + '.keras'):
    model = tf.keras.models.load_model(model_filename)
    print(f'Model loaded from {model_filename}')
    print(model)

In [None]:
checkpoint_callback = ModelCheckpoint(
        filepath='./nano_gpt_by_marcelo.keras',
        save_weights_only=False,
        save_freq='epoch',
        monitor='val_loss')

    # Create the TextGeneration callback
text_gen_callback = TextGeneration(vectorization, start_string="o brasil é um país da américa do sul", max_length=max_len, temperature=0.5)


    # Train the model with the callback
model.fit(text_ds, epochs=50, callbacks=[checkpoint_callback, text_gen_callback])

In [None]:
text_gen_callback = TextGeneration(vectorization, start_string="a antropologia do grego anthropos", max_length=max_len, temperature=0.5, model=model)
t = text_gen_callback.generate_text()
print(f'Tokens {len(t.split())} - Length: {len(t)}\n{t}')