## Generacja tekstu z użyciem prostego transformera

In [None]:
! pip install keras_nlp

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import keras_nlp
import numpy as np
import random
from tensorflow.keras.layers import TextVectorization

Wczytanie danych do zmiennych

In [None]:
crime_and_punishment_url = 'https://www.gutenberg.org/files/2554/2554-0.txt'
brothers_of_karamazov_url = 'https://www.gutenberg.org/files/28054/28054-0.txt'
the_idiot_url = 'https://www.gutenberg.org/files/2638/2638-0.txt'

paths = [crime_and_punishment_url, brothers_of_karamazov_url, the_idiot_url]
names = ['Crime and Punishment', 'Brothers of Karamazov', 'The Idiot']
texts = ''
for index, path in enumerate(paths):
    filepath = keras.utils.get_file(f'{names[index]}.txt', origin=path)
    text = ''
    with open(filepath, encoding='utf-8') as f:
        text = f.read()
        # First 50 lines are the Gutenberg intro and preface
        # Skipping first 10k characters for each book should be approximately
        # removing the intros and prefaces.
        texts += text[10000:]

In [None]:
# przykładowe 500 znaków
texts[25000:25500]

In [None]:
text_list = texts.split('.')
len(text_list) # 50835 - estymacja ilości zdań

Podział danych na treningowe, testowe i walidacyjne

In [None]:
random.shuffle(text_list)
length = len(text_list)
text_train = text_list[:int(0.7*length)]
text_test = text_list[int(0.7*length):int(0.85*length)]
text_valid = text_list[int(0.85*length):]

Definicja tworzenia sekwencji i vectorizera

In [None]:
len(max(text_list).split(' '))

In [None]:
def custom_standardization(input_string):
    sentence = tf.strings.lower(input_string)
    sentence = tf.strings.regex_replace(sentence, "\n", " ")
    return sentence

# określenie długości zdania
maxlen = len(max(text_list).split(' ')) # 25

vectorize_layer = TextVectorization(
    standardize = custom_standardization,
    output_mode="int",
    output_sequence_length=maxlen+1,
)

vectorize_layer.adapt(text_list)
vocab = vectorize_layer.get_vocabulary()

Ustalenie wielkości słownika

In [None]:
vocab_size = len(vocab)
vocab_size # 40677

In [None]:
index_lookup = dict(zip(range(len(vocab)), vocab))
index_lookup[5] # of

Przykładowy wektor reprezentujący zdanie

In [None]:
v = vectorize_layer(['The final goal of life is...'])
print(v, '\n', len(v[0]))

Stworzenie sekwencji wejściowych z podziałem na zbiór treningowy, walidacyjny i testowy

In [None]:
batch_size = 64

train_dataset = tf.data.Dataset.from_tensor_slices(text_train)
train_dataset = train_dataset.shuffle(buffer_size=256)
train_dataset = train_dataset.batch(batch_size)

test_dataset = tf.data.Dataset.from_tensor_slices(text_test)
test_dataset = test_dataset.shuffle(buffer_size=256)
test_dataset = test_dataset.batch(batch_size)

valid_dataset = tf.data.Dataset.from_tensor_slices(text_valid)
valid_dataset = valid_dataset.shuffle(buffer_size=256)
valid_dataset = valid_dataset.batch(batch_size)

Przygotowanie danych do treningu wraz z optymalizacją przetwarzania kolejnych batchy z przez równoległe pobieranie z wyprzedzeniem w Tensorflow (https://www.tensorflow.org/guide/data_performance?hl=pl#prefetching)

In [None]:
def preprocess_text(text):
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y


train_dataset = train_dataset.map(preprocess_text)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

test_dataset = test_dataset.map(preprocess_text)
test_dataset = test_dataset.prefetch(tf.data.AUTOTUNE)

valid_dataset = valid_dataset.map(preprocess_text)
valid_dataset = valid_dataset.prefetch(tf.data.AUTOTUNE)

Przykład przeprocesowanego tekstu

In [None]:
for entry in train_dataset.take(1):
    print(entry)

Definicja Transformera z keras nlp

In [None]:
embed_dim = 128
num_heads = 4

def create_model():
    inputs = keras.layers.Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer = keras_nlp.layers.TokenAndPositionEmbedding(vocab_size, maxlen, embed_dim)(inputs)
    decoder = keras_nlp.layers.TransformerDecoder(intermediate_dim=embed_dim,
                                                            num_heads=num_heads,
                                                            dropout=0.5)(embedding_layer)

    outputs = keras.layers.Dense(vocab_size, activation='softmax')(decoder)

    model = keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer="adam",
        loss='sparse_categorical_crossentropy',
        metrics=[keras_nlp.metrics.Perplexity(), 'accuracy']
    )
    return model

model = create_model()
model.summary()

In [None]:
class TextSampler(keras.callbacks.Callback):
    def __init__(self, start_prompt, max_tokens):
        self.start_prompt = start_prompt
        self.max_tokens = max_tokens

    # Próbkowanie kolejnego tokenu z 5 tokenów transformera z najwiekszym PDP
    def sample_token(self, logits):
        logits, indices = tf.math.top_k(logits, k=5, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def on_epoch_end(self, epoch, logs=None):
        decoded_sample = self.start_prompt

        for i in range(self.max_tokens-1):
            tokenized_prompt = vectorize_layer([decoded_sample])[:, :-1]
            predictions = self.model.predict([tokenized_prompt], verbose=0)
            # To find the index of the next word in the prediction array.
            # The tokenized prompt is already shorter than the original decoded sample
            # by one, len(decoded_sample.split()) is two words
            # (shorter lenght and and len is always +1 than index ) ahead -
            # so we remove 1 to get the next word in the sequence
            sample_index = len(decoded_sample.strip().split())-1

            sampled_token = self.sample_token(predictions[0][sample_index])
            sampled_token = index_lookup[sampled_token]
            decoded_sample += " " + sampled_token

        print(f"\nSample text:\n{decoded_sample}...\n")

# First 5 words of a random sentence to be used as a seed
random_sentence = ' '.join(random.choice(text_valid).replace('\n', ' ').split(' ')[:4])
sampler = TextSampler(random_sentence, maxlen)
reducelr = keras.callbacks.ReduceLROnPlateau(patience=10, monitor='val_loss')

Trening modelu

In [None]:
model = create_model()
history = model.fit(train_dataset,
                    validation_data=valid_dataset,
                    epochs=5,
                    callbacks=[sampler, reducelr])

Funkcja pomocnicza do generacji tekstu

In [None]:
def sample_token(logits):
        logits, indices = tf.math.top_k(logits, k=5, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

def generate_text(prompt, response_length=20):
    decoded_sample = prompt
    for i in range(response_length-1):
        tokenized_prompt = vectorize_layer([decoded_sample])[:, :-1]
        predictions = model.predict([tokenized_prompt], verbose=0)
        sample_index = len(decoded_sample.strip().split())-1

        sampled_token = sample_token(predictions[0][sample_index])
        sampled_token = index_lookup[sampled_token]
        decoded_sample += " " + sampled_token
    return decoded_sample

Przykład generacji tekstu z użyciem transformera i sekwencji początkowej

In [None]:
generate_text("Last summer, ")
