In [None]:
import sys
sys.path.append('..')  # Add the parent directory to sys.path

In [None]:
import torch, os
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
from src.utils import train, set_device, beam_search, plot_performance_over_time, compute_accuracy
from src.models import GenerativeRNN, GenerativeLSTM

In [None]:
SEED = 265
torch.manual_seed(SEED)
DEVICE = set_device("cuda")
print(f"Using device: {DEVICE}")

In [None]:
PATH_GENERATED = "../generated_data/"
mapping = torch.load(PATH_GENERATED + "mapping.pt")
embedding = torch.load(PATH_GENERATED + "embedding_matrix.pt")
vocab = torch.load(PATH_GENERATED + "vocabulary.pt")
words_train = torch.load(PATH_GENERATED + "words_train.pt")
words_val = torch.load(PATH_GENERATED + "words_val.pt")
words_test = torch.load(PATH_GENERATED + "words_test.pt")

In [None]:
# Dataset for text generation
def create_dataset(text, vocab, context_size, map_target=None):
    """
    Create a pytorch dataset of context / target pairs from a text
    """

    n_text = len(text)
    n_vocab = len(vocab)

    if map_target is None:
        map_target = {i: i for i in range(n_vocab)}

    txt = [vocab[w] for w in text]

    contexts = []
    targets = []

    for i in range(context_size, n_text):

        t = txt[i]
        # exclude <unk>(0) and/or punctuation(1) from targets
        if map_target[t] in ["<unk>", ",", ".", "(", ")", "?", "!"]:
            pass
        else:
            # Contex before target
            c = txt[i - context_size : i]
            targets.append(t)
            contexts.append(torch.tensor(c))

    # contexts of shape (N_dataset, contexts_size)
    # targets of shape (N_dataset)
    contexts = torch.stack(contexts)
    targets = torch.tensor(targets)
    return TensorDataset(contexts, targets)

In [None]:
CONTEXT_SIZE = 12

In [None]:
if os.path.isfile(PATH_GENERATED + "text_generation_data.pt"):
    data_train, data_val, data_test = torch.load(PATH_GENERATED+"text_generation_data.pt")
else:
    data_train = create_dataset(words_train, vocab, CONTEXT_SIZE, mapping)
    data_val = create_dataset(words_val, vocab, CONTEXT_SIZE, mapping)
    data_test = create_dataset(words_test, vocab, CONTEXT_SIZE, mapping)
    torch.save((data_train, data_val, data_test), PATH_GENERATED+"text_generation_data.pt")

# Training

In [None]:
batch_size = 64
n_epochs = 2
loss_fn = nn.CrossEntropyLoss()

print(f"-- Global Parameters --")
print(f"{batch_size=}")
print(f"{n_epochs=}")
print(f"{CONTEXT_SIZE=}") 

model_architectures = [GenerativeRNN, GenerativeLSTM]
# Each model parameter corresponds to the architecture at the same position
model_parameters = [
    [
        {"num_hiddens": 8, "num_layers": 4, "dropout": 0},   
        # {"num_hiddens": 16, "num_layers": 8, "dropout": 0.1},   
    ],
    [
        {"num_hiddens": 8, "num_layers": 4, "dropout": 0},   
        # {"num_hiddens": 16, "num_layers": 8, "dropout": 0.1},   
    ]
]
parameter_search = [
    {"lr":0.008},
    # {"lr":0.001},
    # {"lr":0.01},
    # {"lr":0.0005},
]

In [None]:
train_loader = DataLoader(data_train, batch_size=batch_size)
val_loader = DataLoader(data_val, batch_size=batch_size)

In [None]:
train_losses = []
val_losses = []
train_accs = []
val_accs = []
val_perfs = []
models = []

if os.path.isfile(PATH_GENERATED + "text_generation_model.pt"):
    print("Skipping training, loading existing model...")
else:
    for architecture, m_params in zip(model_architectures, model_parameters):
        for params in parameter_search:
            print("\n-- Training with following parameters --:")
            print("\nModel architecture: ", architecture)
            for name, val in params.items():
                print(f"{name}: {val}")
            for m_param in m_params:
                print(m_param)
                
                embedding = embedding.to(DEVICE)
                torch.manual_seed(SEED)
                model = architecture(embedding, **m_param)
                model.to(DEVICE)
                optimizer = Adam(model.parameters(), lr=params["lr"])
    
                train_loss, val_loss, train_acc, val_acc = train(n_epochs, model, optimizer, loss_fn, train_loader, val_loader, DEVICE)
                
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                train_accs.append(train_acc)
                val_accs.append(val_acc)
                val_perfs.append(val_acc[-1])
                models.append(model)
                print(f"Train accuracy: {train_acc[-1]*100:.3f}%")
                print(f"Validation accuracy: {val_acc[-1]*100:.3f}%\n")
            

In [None]:
if os.path.isfile(PATH_GENERATED + "text_generation_model.pt"):
    chosen_model = torch.load(PATH_GENERATED + "text_generation_model.pt")
    chosen_index, train_losses, val_losses, train_accs, val_accs = torch.load(PATH_GENERATED + "text_generation_plots.pt")
else:
    chosen_index = val_perfs.index(max(val_perfs))
    chosen_model = models[chosen_index]
    torch.save(chosen_model, PATH_GENERATED + "text_generation_model.pt")
    torch.save((chosen_index, train_losses, val_losses, train_accs, val_accs), PATH_GENERATED + "text_generation_plots.pt")
print(chosen_model)

In [None]:
plot_performance_over_time(train_losses[chosen_index], val_losses[chosen_index],
                           "Training and Validation loss of chosen model", "loss",
                            f_name="../images/text_generation_loss.png", save=True)
plot_performance_over_time(train_accs[chosen_index], val_accs[chosen_index],
                           "Training and Validation accuracy of chosen model", "accuracy",
                            f_name="../images/text_generation_accuracy.png", save=True)

In [None]:
test_loader = DataLoader(data_test, batch_size=batch_size)

In [None]:
test_acc = compute_accuracy(chosen_model, test_loader, device=DEVICE)
print(f"Test accuracy: {test_acc*100:.3f}%")

### Beam Search

In [None]:
import sys
sys.path.append('..')  # Add the parent directory to sys.path

In [None]:
import torch, os
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
from src.utils import train, set_device, beam_search, plot_performance_over_time, compute_accuracy
from src.models import GenerativeRNN, GenerativeLSTM

In [None]:
SEED = 265
torch.manual_seed(SEED)
DEVICE = set_device("cuda")
print(f"Using device: {DEVICE}")

In [None]:
# Load a model
model = torch.load("../generated_data/text_generation_model.pt")

In [None]:
# Create a test sequence
vocab = torch.load("../generated_data/vocabulary.pt")
# test_seq = ["the", "cat", "jumped", "over"]
test_seq = ["what", "is", "the", "meaning", "of"]
# test_seq = ["i", "have", "never"]
# test_seq = ["the", "woman", "was", "sitting"]
# test_seq = ["as", "i", "opened", "the"]
test_seq = ["to", "be", "or", "not", "to", "be", "?"]
test_seq = "a king and queen once upon a time".split()  # Exists in the training data

test_seq_indeces = [vocab[token] for token in test_seq]
print(test_seq_indeces)

In [None]:
gen_seq = beam_search(model, test_seq_indeces, beam_width=10, max_len=10, print_search_tree=True)
mapping = torch.load("../generated_data/mapping.pt")
gen_seq_to_text = [mapping[token_i] for token_i in gen_seq[0]]
print("\n\nGenerated sequence: ", gen_seq_to_text)