# Modelo de transformer para un mini GPT
Realizamos cambios al codigo existente en un gpt [cuya referencia se encuentra en el siguiente link](https://keras.io/examples/generative/text_generation_with_miniature_gpt/).
Los comentarios se mantendrán en ingles para mantener cohesion con el resto del codigo base.

## Importaciones

In [None]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization
import numpy as np
import os
import string
import random
import tensorflow
import tensorflow.data as tf_data
import tensorflow.strings as tf_strings

## Transformer block layer

In [2]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Mask the upper half of the dot product matrix in self attention.
    This prevents flow of information from future tokens to current token.
    1's in the lower triangle, counting from the lower right corner.
    """
    i = ops.arange(n_dest)[:, None]
    j = ops.arange(n_src)
    m = i >= j - n_src + n_dest
    mask = ops.cast(m, dtype)
    mask = ops.reshape(mask, [1, n_dest, n_src])
    mult = ops.concatenate(
        [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])], 0
    )
    return ops.tile(mask, mult)


class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(ff_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        input_shape = ops.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, "bool")
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

## Embedding layer

In [3]:
from tensorflow.keras import layers
import tensorflow as tf

class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions


## Modelo Mini GPT

In [None]:
vocab_size = 30000  # Tamaño del vocabulario de 30k palabras
maxlen = 70  # Tamaño máximo
embed_dim = 256  # Tamaño del embedding para cada token
feed_forward_dim = 256  # Tamaño de hidden layer
num_heads = 2

def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype="int32")
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
    x = transformer_block(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam",
        loss=[loss_fn, None],
    )
    return model

## Prepara los datos para la ingestión del modelo

Creamos un dataset a partir de los titulares. Puesto que el dataset es enano 1k de lineas, lo hemos duplicado, además de ajustar ciertos parámetros:
* Un vocabulario más amplio
* `maxlen` más grande para parecerse más a los títulos del dataset
* Un buffer más grande en el shuffle, puesto que el dataset es pequeño y nos lo podemos permitir.
* `text_ds.repeat()` para asegurarnos que durante el entrenamiento el modelo no se quede sin datos.


In [None]:
import os
import random
import tensorflow as tf
from tensorflow.data import AUTOTUNE
from tensorflow.keras.layers import TextVectorization
import string

batch_size = 128

file_path = './clean_titulares.txt'

# Generamos el dataset
text_ds = tf.data.TextLineDataset(file_path)
text_ds = text_ds.shuffle(buffer_size=1024)  # Nos podemos permitir un buffer grande
text_ds = text_ds.batch(batch_size)

def custom_standardization(input_string):
    """Remove html line-break tags and handle punctuation"""
    lowercased = tf.strings.lower(input_string)
    stripped_html = tf.strings.regex_replace(lowercased, "<br />", " ")
    return tf.strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")

# Adaptamos el dataset
vectorize_layer = TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size - 1,
    output_mode='int',
    output_sequence_length=maxlen + 1
)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()  # Recuperar las palabras a raiz de los tokens

def prepare_lm_inputs_labels(text):
    """
    Shift word sequences by 1 position so that the target for position (i) is
    word at position (i+1). The model will use all words till position (i)
    to predict the next word.
    """
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

# Repite el proceso tantas veces como haga falta para no quedarnos sin datos durante el entrenamiento
text_ds = text_ds.map(prepare_lm_inputs_labels, num_parallel_calls=tf.data.AUTOTUNE)
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)
text_ds = text_ds.repeat() 


## Callback para generar el texto

Aquí hemos modificado los tokens iniciales, puesto que lo que buscamos es generar títulos, no reviews de películas

In [None]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = ops.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(ops.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x, verbose=0)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"Texto generado:\n{txt}\n")


# Tokenizamos un prompt inicial
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "el"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

## Train

Hemos ajustado los steps por época para garantizar el uso del dataset completo de forma uniforme

In [10]:
# Calcula el los steps necesarios por época
total_samples = sum(1 for _ in open(file_path, 'r'))
steps_per_epoch = total_samples // batch_size

model = create_model()

model.fit(text_ds, steps_per_epoch=steps_per_epoch, verbose=2, epochs=25, callbacks=[text_gen_callback])

Epoch 1/25
Texto generado:
el la el        de     la        del  y  para con  la para  en     para   

16/16 - 26s - 2s/step - loss: 7.2123
Epoch 2/25
Texto generado:
el que la era                                      

16/16 - 23s - 1s/step - loss: 1.5435
Epoch 3/25
Texto generado:
el y el futuro y la moda en un mundo                                

16/16 - 23s - 1s/step - loss: 0.5459
Epoch 4/25
Texto generado:
el futuro de enfermedades                                      

16/16 - 23s - 1s/step - loss: 0.4288
Epoch 5/25
Texto generado:
el cine para un planeta en la ia                                  

16/16 - 23s - 1s/step - loss: 0.3492
Epoch 6/25
Texto generado:
el impacto de la salud mental en la pantalla                                 

16/16 - 23s - 1s/step - loss: 0.3017
Epoch 7/25
Texto generado:
el impacto de la gratitud y desafios                                   

16/16 - 23s - 1s/step - loss: 0.2666
Epoch 8/25
Texto generado:
el arte de enfermedades con estilo y el b

<keras.src.callbacks.history.History at 0x225fe2db850>

Guardamos el modelo

In [11]:
model_save_path = './GPT_Transformer.keras'
model.save(model_save_path)


## Generador titulos a partir de nuestro modelo entrenado

Para ello, partimos de una lista de prompts iniciales. Es importante usar vocabulario que se encuentre en el dataset a la hora de elegir los prompts iniciales

In [None]:
import numpy as np
import tensorflow as tf

start_prompt = "el"

def generate_text(model, start_prompt, num_tokens_to_generate, vocab, word_to_index, top_k=10):
    start_tokens = [word_to_index.get(word, word_to_index.get("[UNK]", 1)) for word in start_prompt.split()]
    input_tokens = start_tokens.copy()

    for _ in range(num_tokens_to_generate):
        # prepara el input (1, maxlen)
        trimmed = input_tokens[-maxlen:]
        pad_len = maxlen - len(trimmed)
        x = trimmed + [0] * pad_len
        input_array = np.array([x], dtype=np.int32)

        # Obtenemos las predicciones
        logits = model.predict(input_array, verbose=0)[0]
        last_valid_index = len(trimmed) - 1
        predictions = logits[0, last_valid_index, :]  # Predice el siguiente token

        next_token = sample_from(predictions, top_k)
        input_tokens.append(next_token)

    # Detokenizar
    generated_words = []
    for token in input_tokens:
        if 0 <= token < len(vocab):
            generated_words.append(vocab[token])
        else:
            generated_words.append("[UNK]")

    return ' '.join(generated_words)


def sample_from(logits, top_k=10):
    """
    Sample an index from logits array with probability proportional to the softmax of logits.
    
    Args:
    logits (np.array): Array of logits from model predictions.
    top_k (int): Top k logits to consider for sampling.
    
    Returns:
    int: Sampled index corresponding to predicted word.
    """
    logits, indices = tf.math.top_k(logits, k=top_k)
    probabilities = tf.nn.softmax(logits).numpy()
    sampled_index = np.random.choice(indices.numpy(), p=probabilities)
    return sampled_index


num_tokens_generated = 45
vocab = vectorize_layer.get_vocabulary()
word_to_index = {word: idx for idx, word in enumerate(vocab)}

# Generamos un texto a raiz de una lista de prompts iniciales
initial_prompts = [
    "hacia un mundo",
    "la historia",
    "diagnosticos y tecnicas con",
    "cuando el",
    "el futuro a traves",
    "las ciudades"
    ]

for prompt in initial_prompts:
    generated_text = generate_text(model, prompt, 40, vocab, word_to_index)
    print(f"{generated_text}")



hacia un mundo mas verde                                      
la historia de la realidad aumentada integracion en la inclusion                                
diagnosticos y tecnicas con la medicina de precision                                    
cuando el auge de la medicina de los coches electricos                                
el futuro a traves de la forma en la justicia social                                 
las ciudades de precision culinarias en el mundo                                  
