In [None]:
from __future__ import annotations

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

from helper import (
    start_time,
    time_since,
    ShakespeareDataset,
    TokenMapping,
    build_model,
    next_token,
    # Characeter-based helpers
    encode_text,
    # Token-based helpers
    encode_text_from_tokenizer,
    tokenize_text_from_tokenizer,
    tokens_to_id_tensor,
)

In [None]:
# Deterministic training
torch.use_deterministic_algorithms(True)
torch.manual_seed(0)

# Attempt GPU; if not, stay on CPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

# Load Data

In [None]:
# Reduced data to make it manageable for smaller systems
DATA_FILE: str = '../data/shakespeare_small.txt'

with open(DATA_FILE, 'r') as data_file:
    raw_text = data_file.read()

print(f'Number of characters in text file: {len(raw_text):,}')

# Character-Based Text Generation

The first model we'll try for text generation will be by training the model and
generating by character.

This will mean each token will be a single character from the text and the model
will learn to predict the next character (a token).

To generate text the text, the model will take in a new string,
character-by-character, and then generate a new likely character based on the
past input. Then the model will take into account that new character and
genearate the following character and so on and so on until the model has
produced a set number of characters.

## Encode Text into Integer Tokens

### Normalization

> - Skip?
> - lowercase?

In [None]:
def normalize_text(text: str) -> str:
    # COMPLETE: Normalize incoming text; can be multiple actions
    # Just lowercase letters 
    normalized_text = text.lower()
    # Keeping punctuation & other characters
    return normalized_text

In [None]:
# TEST: Is your text normalized the way you expected?
# Only the first 500 characters of the original text
normalized_text = normalize_text(raw_text[:500])
print(normalized_text)

### Pretokenization

In [None]:
def pretokenize_text(text: str) -> str | list[str]:
    # COMPLETE: Pretokenize normalized text into character strings
    smaller_pieces = [char for char in text]
    return smaller_pieces

In [None]:
# TEST: Is your (normalized) text pretokenized the way you expected?
# Only the first 500 characters of the original text
pretokenized_text = pretokenize_text(normalized_text)
print(pretokenized_text)

### Tokenize

In [None]:
# Combine normalization and pretokenization steps
def tokenize_text(text: str) -> str | list[str]:
    normalized_text: str = normalize_text(text)
    pretokenized_text: str | list[str] = pretokenize_text(normalized_text)
    # Characters are already tokens so pretokenized text is already tokenized
    tokenized_text = pretokenized_text
    return tokenized_text

In [None]:
# TEST: Is your tokenized text the way you expected?
tokenized_text = tokenize_text(raw_text[:500])
print(tokenized_text)

### Postprocessing

We'll skip postprocessing since we don't have any special tokens we want to
consider for our task here.

### Encode (Tokens → Integer IDs)

We have `encode_text()` from our helper module that can encode our text based on
our tokenization process from our created `tokenize_text()` function.

This will also provide us with `character_mapping`, an object that we can use to
map our tokens back and forth from strings to integer IDs.

In [None]:
encoded_text, character_mapping = encode_text(raw_text, tokenize_text)

## Prepare Dataset

In [None]:
n_tokens = character_mapping.n_tokens
dataset_size = len(encoded_text)
print(f'Size of dataset: {dataset_size:,} characters')

In [None]:
# Defining sequence length that will be taken in at a time by our model
sequence_length = 32 # Number of characters
batch_size = 32 # Number 

train_dataset = ShakespeareDataset(encoded_text, sequence_length)
train_loader = DataLoader(
    train_dataset,
    shuffle=False, # Ensure determinsitic training
    batch_size=batch_size,
)

## Define Model

We'll provide a defined model today, but this could be a step that you would
modify and experiment in other NLP projects you'll do.

In [None]:
# Defining the model to be trained and generate text with
model = build_model(n_tokens)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

## Define Text Genaration

The `generate_text_by_char()` function will use your tokenizer and NLP model to
generate new text token-by-token (character-by-character in this case) by taking
in the input text and token sampling parameters.

We can use temperature and top-k sampling to adjust the "creativeness" of the
generated text.

We also pass in the `num_chars` parameter to tell the function how many tokens
(characters in this case) to generate.

In [None]:
def generate_text_by_char(
    input_str: str,
    model,
    token_mapping: TokenMapping = character_mapping,
    num_chars: int = 100,
    temperature: float = 1.0,
    topk: int | None = None,
) -> str:
    # Uses your character-based tokenizer
    tokenized_text: list[str] = tokenize_text(input_str)
    # Generates token-by-token and creates a list of those tokens
    generated_tokens = []
    for _ in range(num_chars):
        # Uses the input text and generated text (so far) to get next token
        new_char = next_token(
            tokenized_text=(tokenized_text + generated_tokens),
            model=model,
            token_mapping=token_mapping,
            # Temperature & top-k sampling are used in determing the next token
            temperature=temperature,
            topk=topk,
        )
        generated_tokens.append(new_char)
    # Returns input string plus the full generated string (of generated tokens)
    full_text = ''.join(tokenized_text + generated_tokens)
    return full_text

## Train Model

At this point, the model has not been trained so the code below will train the
NLP model that will be used to generate new text.

The model will take in the text data (broken by tokens by our character-based
tokenizer) and attempt to predict the next token. Over time, the model should
hopefully get better in predicting the next token (given the previous text).

To help us visualize how the model is training, at the end of every epoch, we
generate text using the `TEST_PHRASE` with the improving model.

In [None]:
TEST_PHRASE = 'To be or not to be'
epochs = 5

start = start_time()
for epoch in range(epochs):
    # Set model into "training mode"
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch.to(device))
        loss = criterion(output.transpose(1, 2), y_batch.to(device))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(train_loader)}')
    print('[%s (%d %d%%) %.4f]' % (time_since(start), epoch, epoch / epochs * 100, loss))
    print('-'*72)
    gen_output = generate_text_by_char(
        input_str=TEST_PHRASE,
        model=model,
        num_chars=100,
    )
    print(gen_output)

## Generate Text

Now that the model has been trained, go ahead and observe how it performs!

Try adjusting the different sampling methods using the `temperature` and `topk`
parameters on the same input string to see the differences.

You might also try different phrases as well as how many tokens (`num_chars`) to
generate and observe how it does.

In [None]:
output = generate_text_by_char(
    input_str='To be or not to be',
    model=model,
    num_chars=100,
    temperature=1.0,
    topk=None,
)
print(output)

# Token-Based Text Generation

The next model we'll try is use subword as tokens instead of characters to train
a model and ultimately generate text token by token.

Although this could be done by creating/training our own tokenizer, we'll use
Hugging Face to use a pretrained tokenizer to tokenizer our data.

After training the model with the subword tokens, we can generate text by
again providing a text input but this time using the tokenizer to create subword
tokens. The model will then take this sequence of subwords to generate a new
token (subword), add this token as part of the sequence, produce a new token,
and so on until a set number of tokens have been generated. We can then take
this list of subword tokens and decode back to a string of text!

## Encode Text into Integer Tokens

### Tokenize (Choose)

In [None]:
# Choose a pretrained tokenizer to use
xlmr_model_name = 'xlm-roberta-base'
bert_model_name = 'bert-base-cased'
bert_model_name_uncased = 'bert-base-uncased'

my_tokenizer = AutoTokenizer.from_pretrained(
    bert_model_name_uncased,
)

### Encode (Tokens → Integer IDs)

In [None]:
encoded, token_mapping = encode_text_from_tokenizer(
    text=raw_text,
    tokenizer=my_tokenizer,
)

## Prepare Dataset

In [None]:
n_tokens = token_mapping.n_tokens
dataset_size = len(encoded)

In [None]:
length = 16  # Tokens 
batch_size = 32

train_dataset = ShakespeareDataset(encoded, length)
train_loader = DataLoader(
    train_dataset,
    shuffle=False, # Ensure determinsitic training
    batch_size=batch_size,
)

## Define Model

In [None]:
model = build_model(n_tokens)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

## Define Text Genaration

In [None]:
def generate_text_by_token(
    input_str: str,
    model,
    token_mapping: TokenMapping = token_mapping,
    tokenizer = my_tokenizer,
    num_tokens: int = 100,
    temperature: float = 1.0,
    topk: int | None = None,
) -> str:
    tokenized_text = tokenize_text_from_tokenizer(
        tokenizer=tokenizer,
        text=input_str,
    )
    generated_tokens = []
    for _ in range(num_tokens):
        new_token = next_token(
            tokenized_text=(tokenized_text + generated_tokens),
            model=model,
            token_mapping=token_mapping,
            temperature=temperature,
            topk=topk,
        )
        generated_tokens.append(new_token)
    # Input string and generated string
    output_ids = tokenizer.convert_tokens_to_ids(tokenized_text + generated_tokens)
    full_text = tokenizer.decode(output_ids)
    return full_text

In [None]:
def generate_text(
    tokenizer,
    model,
    input_str: str,
    num_tokens: int = 100,
    temperature: float = 1.0,
) -> str:
    # Set model into "evaluation mode" (deactivates things like Dropout layers)
    model.eval()
    tokenized_text = tokenize_text_from_tokenizer(
        tokenizer=tokenizer,
        text=input_str,
    )
    input_tensor = tokens_to_id_tensor(
        tokens=tokenized_text,
        token_id_mapping=token_mapping.token2id,
    )

    generated_text = []
    with torch.no_grad():
        for _ in range(num_tokens):
            output = model(input_tensor.to(device))
            probabilities = nn.functional.softmax(
                output[0, -1] / temperature,
                dim=0,
            )
            next_token_idx = torch.multinomial(probabilities, 1).item()
            generated_text.append(token_mapping.id2token(next_token_idx))
            input_tensor = torch.cat(
                [
                    input_tensor,
                    torch.tensor([[next_token_idx]], dtype=torch.long),
                ],
                1,
            )
    # Convert to text again
    output_ids = tokenizer.convert_tokens_to_ids(generated_text)
    output_str = input_str + ' ' + tokenizer.decode(output_ids)
    return output_str

## Train Model

In [None]:
epochs = 5

start = start_time()
for epoch in range(epochs):
    # Set model into "training mode"
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch.to(device))
        loss = criterion(output.transpose(1, 2), y_batch.to(device))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(train_loader)}')
    print('[%s (%d %d%%) %.4f]' % (time_since(start), epoch, epoch / epochs * 100, loss))
    print('-'*72)
    output = generate_text_by_token(
        input_str='To be or not to be',
        model=model,
        token_mapping=token_mapping,
        tokenizer=my_tokenizer,
        num_tokens=30,
        temperature=1.0,
    )
    print(output)

## Generate Text

In [None]:
output = generate_text_by_token(
        input_str='To be or not to be',
        model=model,
        token_mapping=token_mapping,
        tokenizer=my_tokenizer,
        num_tokens=30,
        temperature=1.0,
        topk=10,
    )
print(output)

# Comparison Between Generation