# Transformer / GPT

## Referecies
https://keras.io/examples/generative/text_generation_with_miniature_gpt/

In [None]:
import string
import numpy as np
import pandas as pd
import tensorflow as tf

from tqdm import tqdm
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

In [None]:
df_train = pd.read_json("../data/aclIMDB.json", orient='records')
df_train = df_train.sample(frac=1).reset_index(drop=True) # Datafram sorainak megkeverése és újra indexelése
df_train["text"] = df_train["text"].apply(lambda x: str(x))
df_train.info()
df_train.hist()
df_train.head()

## Transfomrer implelmetálása

In [None]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Maszkolja el a ponttermékmátrix felső felét önfigyelembe.
    Ez megakadályozza az információáramlást a jövőbeli tokenekről az aktuális tokenre.
    1 az alsó háromszögben, a jobb alsó sarokból számolva.
    """
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

## GPT modell elkészítése

In [None]:
vocab_size = 5000  # Szótár mérete
maxlen = 80  # A maximális hossza a bemeneti szekvenciának
embed_dim = 32  # Beágyazott vektor mérete
num_heads = 2  # Az attention fejek száma
feed_forward_dim = 32  # Rejtett réteg mérete transformer-en belül

def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
    x = transformer_block(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile("adam", loss=[loss_fn, None],)
    return model

In [None]:
batch_size = 128
text_ds = tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(df_train["text"].values))
text_ds = text_ds.shuffle(buffer_size=32)
text_ds = text_ds.batch(batch_size)

In [None]:
def custom_standardization(input_string):
    """ Távolítsa el a html sortörés címkéket és kezelje az írásjeleket """
    lowercased = tf.strings.lower(input_string)
    stripped_html = tf.strings.regex_replace(lowercased, "<br />", " ")
    return tf.strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")

# Create a vectorization layer and adapt it to the text
vectorize_layer = TextVectorization(standardize=custom_standardization,
                                    max_tokens=vocab_size - 1,
                                    output_mode="int",
                                    output_sequence_length=maxlen + 1)
vectorize_layer.adapt(df_train["text"].values)
vocab = vectorize_layer.get_vocabulary()  # To get words back from token indices

In [None]:
def prepare_lm_inputs_labels(text):
    """
    Eltolja a szósorozatokat 1 pozícióval úgy, hogy az (i) pozíció célpontja az (i+1) pozícióban lévő szó legyen. 
    A modell az (i) pozícióig minden szót felhasznál a következő szó megjósolásához.
    """
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

prepared_train = [prepare_lm_inputs_labels(item) for item in tqdm(df_train["text"].values)]

In [None]:
text_ds_X = np.array([item[0][0] for item in prepared_train])
text_ds_y = np.array([item[1][0] for item in prepared_train])
text_ds_X.shape, text_ds_y.shape

In [None]:
text_ds = tf.data.Dataset.from_tensor_slices((text_ds_X, text_ds_y))
text_ds = text_ds.shuffle(buffer_size=32)
text_ds = text_ds.batch(batch_size)
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

In [None]:
class TextGenerator(tf.keras.callbacks.Callback):
    """
    Callback szöveg generálásához egy betanított modellből.
        1. Tápláljon be néhány indítási parancsot a modellbe
        2. Adja meg a következő token valószínűségét
        3. Minta a következő tokenből, és adja hozzá a következő bemenethez

    Paraméterek:
        max_tokens: Integer, a prompt után generálandó tokenek száma.
        start_tokens: Egész számok listája, a kezdő prompt token indexei.
        index_to_word: A szövegvektorozási rétegből nyert karakterláncok listája.
        top_k: Egész szám, minta a "top_k" token előrejelzéseiből.
        print_every: Integer, nyomtatás ennyi korszak után.
    """

    def __init__(self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = tf.keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")

In [None]:
# Tokenizált belépési pont
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "this movie is"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [None]:
model = create_model()
model.summary()

model.fit(text_ds, epochs=25, callbacks=[text_gen_callback])
model.save_weights("../data/transformer.h5")

In [None]:
model = create_model()
model.load_weights("../data/transformer.h5")

In [None]:
with tf.device('/CPU:0'):
    max_tokens = 40
    pstart_prompt = "The story was"
    start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
    start_tokens = [_ for _ in start_tokens]    
    num_tokens_generated = 0
    tokens_generated = []

    def sample_from(logits):
            logits, indices = tf.math.top_k(logits, k=10, sorted=True)
            indices = np.asarray(indices).astype("int32")
            preds = tf.keras.activations.softmax(tf.expand_dims(logits, 0))[0]
            preds = np.asarray(preds).astype("float32")
            return np.random.choice(indices, p=preds)

    def detokenize(number):
        return vocab[number]
    
    while num_tokens_generated <= max_tokens:
        pad_len = maxlen - len(start_tokens)
        sample_index = len(start_tokens) - 1
        if pad_len < 0:
            x = start_tokens[:maxlen]
            sample_index = maxlen - 1
        elif pad_len > 0:
            x = start_tokens + [0] * pad_len
        else:
            x = start_tokens
        x = np.array([x])
        y, _ = model.predict(x, verbose=0)
        sample_token = sample_from(y[0][sample_index])
        tokens_generated.append(sample_token)
        start_tokens.append(sample_token)
        num_tokens_generated = len(tokens_generated)

    txt = " ".join([detokenize(_) for _ in start_tokens + tokens_generated]).replace("[UNK]","").replace("[UNK]","")
    print(start_prompt, txt)