In [None]:
import re
import string
import unicodedata
import html
import os
import contractions
from collections import Counter
from bs4 import BeautifulSoup
import pandas as pd
from matplotlib import pyplot as plt
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.util import ngrams
import unittest
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from wordcloud import WordCloud
from tqdm import tqdm
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
import keras
from keras import ops
from keras import layers
import gradio as gr
import random
from tokenizers import ByteLevelBPETokenizer
from tokenizers import Tokenizer
from tokenizers.pre_tokenizers import ByteLevel
from tokenizers.processors import BertProcessing
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import (
    Embedding, LSTM, GRU, Dense, Dropout,
    Bidirectional, Conv1D, MaxPooling1D, Flatten,
    Input, Lambda, Layer
)
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
tf.keras.mixed_precision.set_global_policy("mixed_float16")

In [None]:
nltk.download("stopwords", quiet=True)
nltk.download("wordnet", quiet=True)
nltk.download('punkt_tab')
nltk.download('punkt', quiet=True)

In [None]:
df = pd.read_parquet("hf://datasets/elricwan/HarryPotter/data/train-00000-of-00001.parquet")

In [None]:
hp_text = df['content']
hp_text

In [None]:
hp_text = ' '.join(hp_text.astype(str).tolist())

In [None]:
hp_text

In [None]:
chars = sorted(list(set(hp_text)))
print(f"Unique characters: {len(chars)}")

In [None]:
char_counts = Counter(hp_text)
top_chars = char_counts.most_common(20)

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

chars_top = [c[0] for c in top_chars]
counts_top = [c[1] for c in top_chars]
axes[0, 0].bar(chars_top, counts_top, color='steelblue')
axes[0, 0].set_title('Top 20 Character Frequencies', fontsize=14)
axes[0, 0].set_xlabel('Character')
axes[0, 0].set_ylabel('Count')
axes[0, 0].tick_params(axis='x', rotation=45)

words = re.findall(r'\b\w+\b', hp_text[:100000])
word_lengths = [len(w) for w in words]
axes[0, 1].hist(word_lengths, bins=20, color='coral', edgecolor='black')
axes[0, 1].set_title('Word Length Distribution', fontsize=14)
axes[0, 1].set_xlabel('Word Length')
axes[0, 1].set_ylabel('Frequency')

axes[1, 0].axis('off')
axes[1, 1].axis('off')
wordcloud = WordCloud(width=800, height=400, background_color='white',
                      max_words=200, contour_width=3, contour_color='steelblue')
wordcloud.generate(' '.join(words))
axes[1, 1].imshow(wordcloud, interpolation='bilinear')
axes[1, 1].set_title('Harry Potter Word Cloud', fontsize=14)

In [None]:
print("N-gram Analysis:")
bigrams = list(ngrams(words[:1000], 2))
trigrams = list(ngrams(words[:1000], 3))

bigram_counts = Counter(bigrams)
trigram_counts = Counter(trigrams)

print(f"Most common bigrams: {bigram_counts.most_common(5)}")
print(f"Most common trigrams: {trigram_counts.most_common(5)}")

In [None]:
def to_lowercase(text: str) -> str:
    return text.lower()

In [None]:
def normalize_whitespace(text: str) -> str:
    return re.sub(r"\s+", " ", text).strip()

In [None]:
def remove_html_tags(text: str) -> str:
    return BeautifulSoup(text, "html.parser").get_text()

In [None]:
def expand_contractions(text: str) -> str:
    return contractions.fix(text)

In [None]:
def clean_text(text):
    text = expand_contractions(text)
    text = to_lowercase(text)
    text = remove_html_tags(text)
    text = normalize_whitespace(text)
    return text.strip()

In [None]:
class TestTextCleaner(unittest.TestCase):

    def test_to_lowercase(self):
        self.assertEqual(to_lowercase("HELLO"), "hello")

    def test_normalize_whitespace(self):
        self.assertEqual(normalize_whitespace(" a   b  "), "a b")

    def test_remove_html_tags(self):
        self.assertEqual(remove_html_tags("<p>Hello</p>"), "Hello")

    def test_expand_contractions(self):
        self.assertEqual(expand_contractions("can't"), "cannot")

In [None]:
unittest.main(argv=[''], exit=False)

In [None]:
cleaned_text = clean_text(hp_text)

In [None]:
hp_word_count = len(hp_text.split())
cleaned_word_count = len(cleaned_text.split())

In [None]:
word_counts_data = {
    'Original Text': hp_word_count,
    'Cleaned Text': cleaned_word_count
}

word_counts_df = pd.DataFrame(word_counts_data.items(), columns=['Text Type', 'Word Count'])
print(word_counts_df)

In [None]:
plt.figure(figsize=(8, 6))
sns.barplot(x='Text Type', y='Word Count', data=word_counts_df, palette='viridis', hue='Text Type', legend=False)
plt.title('Word Count Comparison: Original vs. Cleaned Text')
plt.xlabel('Text Type')
plt.ylabel('Word Count')
plt.ylim(2200000, 2400000)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

In [None]:
file_path = "hp_corpus.txt"

with open(file_path, "w", encoding="utf-8") as f:
    f.write(hp_text)

files = [file_path]

In [None]:
tokenizer = ByteLevelBPETokenizer()
tokenizer.train(files, vocab_size=30000, min_frequency=2)

In [None]:
os.makedirs("tokenizer_model", exist_ok=True)

In [None]:
tokenizer.save_model("tokenizer_model")

In [None]:
class BPEPreprocessor:
    def __init__(self, tokenizer_path, seq_length=20):
        self.seq_length = seq_length

        self.tokenizer = ByteLevelBPETokenizer(
            tokenizer_path + "/vocab.json",
            tokenizer_path + "/merges.txt"
        )

        self.vocab_size = self.tokenizer.get_vocab_size()

    def decode(self, token_ids):
        return self.tokenizer.decode(token_ids, skip_special_tokens=False)

    def encode_text(self, text):
        return self.tokenizer.encode(text).ids

    def create_sequences(self, token_ids, step=1):
        sequences = []
        next_tokens = []

        for i in range(0, len(token_ids) - self.seq_length, step):
            sequences.append(token_ids[i:i+self.seq_length])
            next_tokens.append(token_ids[i+self.seq_length])

        return sequences, next_tokens

    def vectorize(self, sequences, next_tokens):
        X = np.zeros((len(sequences), self.seq_length), dtype=np.int32)
        y = np.array(next_tokens, dtype=np.int32)

        for i, seq in enumerate(sequences):
            X[i] = seq

        return X, y

    def preprocess(self, text, validation_split=0.1):
        token_ids = self.encode_text(text)

        sequences, next_tokens = self.create_sequences(token_ids)
        X, y = self.vectorize(sequences, next_tokens)

        split_idx = int(len(X) * (1 - validation_split))
        return (
            X[:split_idx], X[split_idx:],
            y[:split_idx], y[split_idx:]
        )

In [None]:
class TextPreprocessor:

  def __init__(self, text, seq_length=100):
        self.text = clean_text(text)
        self.seq_length = seq_length
        self.chars = sorted(list(set(text)))
        self.char_to_idx = {ch: i for i, ch in enumerate(self.chars)}
        self.idx_to_char = {i: ch for i, ch in enumerate(self.chars)}
        self.vocab_size = len(self.chars)

  def create_sequences(self):
        step = 3

        sentences = []
        next_chars = []

        for i in tqdm(range(0, len(self.text) - self.seq_length, step)):
            sentences.append(self.text[i:i + self.seq_length])
            next_chars.append(self.text[i + self.seq_length])

        return sentences, next_chars

  def vectorize_sequences(self, sentences, next_chars):

        X = np.zeros((len(sentences), self.seq_length, self.vocab_size), dtype=np.bool_)
        y = np.zeros((len(sentences), self.vocab_size), dtype=np.bool_)

        for i, sentence in tqdm(enumerate(sentences)):
            for t, char in enumerate(sentence):
                X[i, t, self.char_to_idx[char]] = 1
            y[i, self.char_to_idx[next_chars[i]]] = 1

        return X, y

  def preprocess_for_rnn(self, validation_split=0.1):
        sentences, next_chars = self.create_sequences()
        X, y = self.vectorize_sequences(sentences, next_chars)

        split_idx = int(len(X) * (1 - validation_split))
        X_train, X_val = X[:split_idx], X[split_idx:]
        y_train, y_val = y[:split_idx], y[split_idx:]

        return X_train, X_val, y_train, y_val, sentences, next_chars

In [None]:
class WordLevelPreprocessor:
    def __init__(self, text, seq_length=20, min_word_freq=2):
        self.text = clean_text(text)
        self.seq_length = seq_length
        self.min_word_freq = min_word_freq
        self.words = word_tokenize(self.text)
        self._build_vocabulary()

    def _build_vocabulary(self):
        word_counts = Counter(self.words)

        self.vocab = [word for word, count in word_counts.items()
                     if count >= self.min_word_freq]

        self.vocab = ['<UNK>', '<PAD>', '<START>', '<END>'] + self.vocab

        self.word_to_idx = {word: i for i, word in enumerate(self.vocab)}
        self.idx_to_word = {i: word for i, word in enumerate(self.vocab)}
        self.vocab_size = len(self.vocab)


        self.words_processed = [
            word if word in self.word_to_idx and word not in ['<UNK>', '<PAD>', '<START>', '<END>']
            else '<UNK>'
            for word in self.words
        ]

    def create_sequences(self, step=1):
        sequences = []
        next_words = []

        for i in tqdm(range(0, len(self.words_processed) - self.seq_length, step)):
            sequences.append(self.words_processed[i:i + self.seq_length])
            next_words.append(self.words_processed[i + self.seq_length])

        return sequences, next_words

    def vectorize_sequences(self, sequences, next_words):
        X = np.zeros((len(sequences), self.seq_length), dtype=np.int32)
        y = np.zeros((len(sequences), self.vocab_size), dtype=np.bool_)

        for i, seq in tqdm(enumerate(sequences)):
            for t, word in enumerate(seq):
                X[i, t] = self.word_to_idx.get(word, self.word_to_idx['<UNK>'])
            y[i, self.word_to_idx.get(next_words[i], self.word_to_idx['<UNK>'])] = 1

        return X, y

    def preprocess_for_rnn(self, validation_split=0.1):
        sequences, next_words = self.create_sequences(step=10)
        X, y = self.vectorize_sequences(sequences, next_words)

        split_idx = int(len(X) * (1 - validation_split))
        X_train, X_val = X[:split_idx], X[split_idx:]
        y_train, y_val = y[:split_idx], y[split_idx:]

        return X_train, X_val, y_train, y_val

    def words_to_indices(self, words):
        return [self.word_to_idx.get(word, self.word_to_idx['<UNK>']) for word in words]

    def indices_to_words(self, indices):
        return [self.idx_to_word[idx] for idx in indices]

In [None]:
bpe_preprocessor = BPEPreprocessor("tokenizer_model", seq_length=20)

In [None]:
preprocessor = TextPreprocessor(hp_text[:100000], seq_length=60)
word_preprocessor = WordLevelPreprocessor(
    text=hp_text[:100000],
    seq_length=15,
    min_word_freq=2
)

In [None]:
X_train, X_val, y_train, y_val, sentences, next_chars = preprocessor.preprocess_for_rnn(validation_split=0.1)

In [None]:
def build_lstm_model(vocab_size, seq_length, lstm_units=128):
    model = Sequential([
        LSTM(lstm_units, input_shape=(seq_length, vocab_size), return_sequences=True),
        Dropout(0.2),
        LSTM(lstm_units),
        Dropout(0.2),
        Dense(vocab_size, activation='softmax')
    ])

    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
lstm_model = build_lstm_model(preprocessor.vocab_size, preprocessor.seq_length)
lstm_model.summary()

In [None]:
def build_bidirectional_lstm(vocab_size, seq_length, lstm_units=128):
    model = Sequential([
        Bidirectional(LSTM(lstm_units, return_sequences=True),
                     input_shape=(seq_length, vocab_size)),
        Dropout(0.3),
        Bidirectional(LSTM(lstm_units)),
        Dropout(0.3),
        Dense(lstm_units // 2, activation='relu'),
        Dropout(0.2),
        Dense(vocab_size, activation='softmax')
    ])

    model.compile(
        optimizer=Adam(learning_rate=0.0005),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
bi_lstm_model = build_bidirectional_lstm(preprocessor.vocab_size, preprocessor.seq_length)
bi_lstm_model.summary()

In [None]:
callbacks = [
    EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
]

In [None]:
bi_lstm_history = bi_lstm_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    batch_size=256,
    epochs=10,
    callbacks=callbacks,
    verbose=1
)

In [None]:
lstm_history = lstm_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    batch_size=256,
    epochs=10,
    callbacks=callbacks,
    verbose=1
)

In [None]:
def build_gru_model(vocab_size, seq_length, gru_units=128):
    model = Sequential([
        GRU(gru_units, return_sequences=True, input_shape=(seq_length, vocab_size)),
        Dropout(0.2),
        GRU(gru_units),
        Dropout(0.2),
        Dense(gru_units // 2, activation='relu'),
        Dense(vocab_size, activation='softmax')
    ])

    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
gru_model = build_gru_model(preprocessor.vocab_size, preprocessor.seq_length)
gru_model.summary()

In [None]:
gru_history = gru_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    batch_size=256,
    epochs=50,
    callbacks=callbacks,
    verbose=1
)

In [None]:
X_train, X_val, y_train, y_val = word_preprocessor.preprocess_for_rnn()

In [None]:
def build_word_level_model(vocab_size, seq_length, embedding_dim=256, lstm_units=256):
    model = Sequential([
        Embedding(input_dim=vocab_size,
                 output_dim=embedding_dim,
                 input_length=seq_length,
                 mask_zero=True),
        LSTM(lstm_units, return_sequences=True, use_cudnn=False),
        Dropout(0.3),
        LSTM(lstm_units // 2, use_cudnn=False),
        Dropout(0.3),
        Dense(lstm_units // 2, activation='relu'),
        Dropout(0.2),
        Dense(vocab_size, activation='softmax')
    ])

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
word_level_model = build_word_level_model(
    vocab_size=word_preprocessor.vocab_size,
    seq_length=word_preprocessor.seq_length,
)
word_level_model.summary()

In [None]:
word_level_model_history = word_level_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    batch_size=128,
    epochs=50,
    callbacks=callbacks,
    verbose=1
)

In [None]:
def generate_bpe_text_stable(model, preprocessor, seed_text, length=50, temperature=0.7, top_k=0, top_p=0.9, repetition_penalty=1.2):
    seed_ids = preprocessor.encode_text(seed_text)

    if len(seed_ids) > preprocessor.seq_length:
        seed_ids = seed_ids[-preprocessor.seq_length:]
    elif len(seed_ids) < preprocessor.seq_length:
        pad_len = preprocessor.seq_length - len(seed_ids)
        seed_ids = [0] * pad_len + seed_ids

    generated_ids = seed_ids.copy()

    for _ in range(length):
        x_pred = np.array([seed_ids], dtype=np.int32)

        preds = model.predict(x_pred, verbose=0)[0]

        log_probs = np.log(np.maximum(preds, 1e-7))

        seen_ids = [id for id in seed_ids if id != 0]

        for token_id in seen_ids:
            logit_i = log_probs[token_id]

            if logit_i >= 0:
                log_probs[token_id] = logit_i / repetition_penalty
            else:
                log_probs[token_id] = logit_i * repetition_penalty

        log_probs_max = np.max(log_probs)
        exp_preds = np.exp(log_probs - log_probs_max)
        preds = exp_preds / np.sum(exp_preds)

        preds /= np.sum(preds)

        if top_p > 0 and top_p < 1:
            sorted_indices = np.argsort(preds)[::-1]
            sorted_preds = preds[sorted_indices]

            cumulative_probs = np.cumsum(sorted_preds)

            nucleus_indices = np.where(cumulative_probs >= top_p)[0]

            if nucleus_indices.size == 0:
                top_indices = sorted_indices[:1]
            else:
                nucleus_index = nucleus_indices[0] + 1
                top_indices = sorted_indices[:nucleus_index]

            top_probs = preds[top_indices]

        elif top_k > 0:
            top_indices = np.argsort(preds)[-top_k:]
            top_probs = preds[top_indices]

        else:
            top_indices = np.arange(len(preds))
            top_probs = preds

        top_probs /= np.sum(top_probs)

        next_id = np.random.choice(top_indices, p=top_probs)

        generated_ids.append(next_id)
        seed_ids = seed_ids[1:] + [next_id]

    final_ids = [id for id in generated_ids if id != 0]

    return preprocessor.decode(final_ids)

In [None]:
def sample(preds, temperature=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

In [None]:
def generate_text(model, seed_text, num_chars=300, temperature=1.0):
    generated = seed_text

    for i in range(num_chars):
        x_pred = np.zeros((1, preprocessor.seq_length, preprocessor.vocab_size))
        for t, char in enumerate(seed_text):
            if char in preprocessor.char_to_idx:
                x_pred[0, t, preprocessor.char_to_idx[char]] = 1.

        preds = model.predict(x_pred, verbose=0)[0]
        next_index = sample(preds, temperature)
        next_char = preprocessor.idx_to_char[next_index]

        generated += next_char
        seed_text = seed_text[1:] + next_char

    return generated

In [None]:
def compare_models_generation(seed_text, num_chars=200, temperature=0.7):
    print(f"COMPARING MODELS WITH SEED: '{seed_text}'")
    print(f"Temperature: {temperature}")
    print("="*60)

    models = {
        "LSTM": lstm_model,
        "Bidirectional LSTM": bi_lstm_model,
        "GRU": gru_model
    }

    results = {}
    for name, model in models.items():
        print(f"\n{name}:")
        generated = generate_text(model, seed_text, num_chars, temperature)
        print(generated)
        results[name] = generated
        print("-" * 40)

    return results

In [None]:
test_seeds = [
    "harry potter was a very unusual boy",
    "the dark lord shall rise again",
    "hermione opened the ancient book and read",
    "in the great hall of hogwarts, dumbledore"
]

for seed in test_seeds[:2]:
    compare_models_generation(seed, num_chars=150, temperature=0.8)


In [None]:
@keras.saving.register_keras_serializable()
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim, mask_zero=True)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = ops.shape(x)[-1]
        positions = ops.arange(start=0, stop=maxlen, step=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [None]:
@keras.saving.register_keras_serializable()
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.mha1 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.mha2 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="gelu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.dropout3 = layers.Dropout(rate)

    def call(self, x, encoder_output=None):
        batch_size, seq_length = ops.shape(x)[0], ops.shape(x)[1]
        causal_mask = ops.triu(
            ops.ones((batch_size, 1, seq_length, seq_length)) * -np.inf,
            k=1
        )

        attn1 = self.mha1(x, x, attention_mask=causal_mask, use_causal_mask=True)
        attn1 = self.dropout1(attn1)
        out1 = self.layernorm1(x + attn1)

        if encoder_output is not None:
            attn2 = self.mha2(out1, encoder_output, encoder_output)
            attn2 = self.dropout2(attn2)
            out2 = self.layernorm2(out1 + attn2)
        else:
            out2 = self.layernorm2(out1)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output)
        return self.layernorm3(out2 + ffn_output)

In [None]:
def build_transformer_model(vocab_size, seq_length, num_layers, embed_dim=128, num_heads=8, ff_dim=512):
    inputs = tf.keras.layers.Input(shape=(seq_length,))

    embedding_layer = TokenAndPositionEmbedding(seq_length, vocab_size, embed_dim)
    x = embedding_layer(inputs)

    for _ in range(num_layers):
      transformer_block = TransformerDecoder(embed_dim, num_heads, ff_dim)
      x = transformer_block(x)

    x = tf.keras.layers.Lambda(lambda x: x[:, -1, :])(x)

    x = tf.keras.layers.Dense(embed_dim, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)

    outputs = tf.keras.layers.Dense(vocab_size, activation='softmax')(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
bpe_preprocessor = BPEPreprocessor("tokenizer_model", seq_length=100)
X_train_bpe, X_val_bpe, y_train_bpe, y_val_bpe = bpe_preprocessor.preprocess(
    hp_text
)

In [None]:
transformer_model = build_transformer_model(
    vocab_size=bpe_preprocessor.vocab_size,
    seq_length=bpe_preprocessor.seq_length,
    embed_dim=300,
    num_heads=10,
    ff_dim=800,
    num_layers=2
)
transformer_model.summary()

In [None]:
callbacks = [
    EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True, min_delta=0.02),
    ]

transformer_history = transformer_model.fit(
    X_train_bpe, y_train_bpe,
    validation_data=(X_val_bpe, y_val_bpe),
    batch_size=1024,
    epochs=5,
    callbacks=callbacks,
    verbose=1
)

In [None]:
generated_bpe = generate_bpe_text_stable(
      model=transformer_model,
      preprocessor=bpe_preprocessor,
      seed_text=clean_text("""Harry Potter, Hermione Granger and Ron Weasley stood silently in the Great Hall,
      listening to Professor Dumbledore's words about the return of the Dark Lord.
      """),
      length=150,
      temperature=.75,
      top_p=.8)


generated_bpe

In [None]:
seed_texts = [
    "harry potter was a very unusual boy",
    "the dark lord shall rise again",
    "hermione opened the ancient book and read",
    "in the great hall of hogwarts, dumbledore",
    "draco malfoy smirked, his pale face reflecting the light",
    "voldemort's name was rarely spoken aloud, for fear",
    "the forbidden forest loomed dark and mysterious"
]

In [None]:
def generate_text_gradio(seed_text, length, temperature, top_k, top_p):
    if not seed_text or seed_text.strip() == "":
        seed_text = random.choice(seed_texts)

    cleaned_seed_text = clean_text(seed_text)

    generated_text = generate_bpe_text_stable(
        model=transformer_model,
        preprocessor=bpe_preprocessor,
        seed_text=cleaned_seed_text,
        length=length,
        temperature=temperature,
        top_k=top_k,
        top_p=top_p
    )
    return generated_text

In [None]:
iface = gr.Interface(
    fn=generate_text_gradio,
    inputs=[
        gr.Textbox(label="Seed Text (leave blank for random)", lines=2),
        gr.Slider(minimum=10, maximum=500, value=150, step=10, label="Generation Length"),
        gr.Slider(minimum=0.1, maximum=2.0, value=0.75, step=0.05, label="Temperature"),
        gr.Slider(minimum=0, maximum=100, value=0, step=1, label="Top K (0 for disabled)"),
        gr.Slider(minimum=0.0, maximum=1.0, value=0.8, step=0.05, label="Top P (0 for disabled)")
    ],
    outputs=gr.Textbox(label="Generated Text", lines=10),
    title="Harry Potter Text Generator (Transformer)",
    description="Generate Harry Potter-style text using a fine-tuned Transformer model. Leave seed text blank for a random starting phrase."
)

print("Gradio interface created.")

In [None]:
iface.launch(debug=True)

In [None]:
def calculate_metrics(reference_text, generated_text):
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    rouge_scores = scorer.score(reference_text, generated_text)

    reference_tokens = nltk.word_tokenize(reference_text)
    generated_tokens = nltk.word_tokenize(generated_text)

    bleu_score = sentence_bleu([reference_tokens], generated_tokens)

    meteor = meteor_score([reference_tokens], generated_tokens)

    return {
        "rouge": rouge_scores,
        "bleu": bleu_score,
        "meteor": meteor
    }

In [None]:
reference_text = "Harry Potter, Hermione Granger and Ron Weasley stood silently in the Great Hall, listening to Professor Dumbledore's words about the return of the Dark Lord. The atmosphere was heavy with dread, and the young wizards braced themselves for the coming battle."
seed_text_for_eval = "Harry Potter, Hermione Granger and Ron Weasley stood silently in the Great Hall, listening to Professor Dumbledore's words about the return of the Dark Lord."

generated_text_for_eval = generate_bpe_text_stable(
    model=transformer_model,
    preprocessor=bpe_preprocessor,
    seed_text=clean_text(seed_text_for_eval),
    length=100,
    temperature=0.7,
    top_p=0.8
)

evaluation_results = calculate_metrics(reference_text, generated_text_for_eval)

print("\n--- Evaluation Results ---")
print(f"Reference: {reference_text}")
print(f"Generated: {generated_text_for_eval}")
print(f"ROUGE Scores: {evaluation_results['rouge']}")
print(f"BLEU Score: {evaluation_results['bleu']}")
print(f"METEOR Score: {evaluation_results['meteor']}")

In [None]:
rouge1 = evaluation_results['rouge']['rouge1']
rouge2 = evaluation_results['rouge']['rouge2']
rougeL = evaluation_results['rouge']['rougeL']

bleu_score = evaluation_results['bleu']
meteor_score = evaluation_results['meteor']

rouge_labels = ['R1-P', 'R1-R', 'R1-F1', 'R2-P', 'R2-R', 'R2-F1', 'RL-P', 'RL-R', 'RL-F1']
rouge_values = [
    rouge1.precision, rouge1.recall, rouge1.fmeasure,
    rouge2.precision, rouge2.recall, rouge2.fmeasure,
    rougeL.precision, rougeL.recall, rougeL.fmeasure
]

other_metrics_labels = ['BLEU', 'METEOR']
other_metrics_values = [bleu_score, meteor_score]

fig, axes = plt.subplots(1, 2, figsize=(15, 6))

axes[0].bar(rouge_labels, rouge_values, color=['skyblue', 'salmon', 'lightgreen']*3)
axes[0].set_title('ROUGE Scores (Precision, Recall, F1)')
axes[0].set_ylabel('Score')
axes[0].set_ylim(0, 1)
axes[0].tick_params(axis='x', rotation=45)

axes[1].bar(other_metrics_labels, other_metrics_values, color=['purple', 'orange'])
axes[1].set_title('BLEU and METEOR Scores')
axes[1].set_ylabel('Score')
axes[1].set_ylim(0, 1)

plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(18, 12))
fig.suptitle('Training and Validation Loss for All Models', fontsize=16)

axes[0, 0].plot(bi_lstm_history.history['loss'], label='Train Loss')
axes[0, 0].plot(bi_lstm_history.history['val_loss'], label='Validation Loss')
axes[0, 0].set_title('Bidirectional LSTM Learning Curve')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].legend()
axes[0, 0].grid(True)

axes[0, 1].plot(lstm_history.history['loss'], label='Train Loss')
axes[0, 1].plot(lstm_history.history['val_loss'], label='Validation Loss')
axes[0, 1].set_title('LSTM Learning Curve')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Loss')
axes[0, 1].legend()
axes[0, 1].grid(True)

axes[1, 0].plot(gru_history.history['loss'], label='Train Loss')
axes[1, 0].plot(gru_history.history['val_loss'], label='Validation Loss')
axes[1, 0].set_title('GRU Learning Curve')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Loss')
axes[1, 0].legend()
axes[1, 0].grid(True)

axes[1, 1].plot(transformer_history.history['loss'], label='Train Loss')
axes[1, 1].plot(transformer_history.history['val_loss'], label='Validation Loss')
axes[1, 1].set_title('Transformer Learning Curve')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Loss')
axes[1, 1].legend()
axes[1, 1].grid(True)

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

### Konklúzió

A karakteralapú RNN-modellek (LSTM, BiLSTM, GRU) képesek alapvető mintákat megtanulni, de korlátozott kontextuskezelésük miatt a generált szöveg gyakran ismétlődő vagy kevésbé koherens. A GRU stabilan tanult, a BiLSTM pedig javított a kontextusérzékelésen, de továbbra is karakterszinten maradtak. Ezzel szemben a Transformer modell sokkal természetesebb, hosszabb távú összefüggéseket is megtartó és stílusosabb szöveget állított elő. A Transformer bizonyult a legjobban teljesítő modellnek emberi olvasatra, amely nagy részben a BPE tokenizációnak köszönhető. Összességében ez a modell adta a legalkalmazhatóbb és legélethűbb eredményeket a generatív feladatban.