# Import

In [None]:
import numpy as np
import pandas as pd
import os
import torch
import torch.nn as nn
from collections import Counter
import warnings
import string
import nltk
import matplotlib.pyplot as plt
import seaborn as sns
import kagglehub
import itertools
from copy import deepcopy
import collections
from sklearn.model_selection import train_test_split
import json
import random
import re
import torch.optim as optim
from collections import defaultdict
import kagglehub

# Dataset

## Loading of the dataset

In [None]:
# Download latest version of the shakespeare dataset and save the path
path = kagglehub.dataset_download("kewagbln/shakespeareonline")
print("Path to dataset files:", path)
DATA_PATH = os.path.join(path, "t8.shakespeare.txt")
OUTPUT_DIR = "processed_data/"

Path to dataset files: /root/.cache/kagglehub/datasets/kewagbln/shakespeareonline/versions/1


## Paramters

In [None]:
TRAIN_FRACTION = 0.8  # percentage of the training data
SEQ_LEN = 128  # length of the sequence for the model
BATCH_SIZE = 32
N_VOCAB = 128  # Numero di caratteri nel vocabolario (ASCII)
EPOCHS = 200
LEARNING_RATE = 0.01

# Dataset Parsing and Preprocessing
This section includes regular expressions and functions to parse Shakespeare's text into plays, characters, and their respective dialogues. The parsing handles special cases like "The Comedy of Errors" and prepares training and test datasets.

In [None]:
CHARACTER_RE = re.compile(r'^  ([a-zA-Z][a-zA-Z ]*)\. (.*)')  # Matches character lines
CONT_RE = re.compile(r'^    (.*)')  # Matches continuation lines
COE_CHARACTER_RE = re.compile(r'^([a-zA-Z][a-zA-Z ]*)\. (.*)')  # Special regex for Comedy of Errors
COE_CONT_RE = re.compile(r'^(.*)')  # Continuation for Comedy of Errors

def parse_shakespeare_file(filepath):
    """
    Reads and splits Shakespeare's text into plays, characters, and their dialogues.
    Returns training and test datasets based on the specified fraction.
    """
    with open(filepath, "r") as f:
        content = f.read()
    plays, _ = _split_into_plays(content)  # Split the text into plays
    _, train_examples, test_examples = _get_train_test_by_character(
        plays, test_fraction=1 - TRAIN_FRACTION
    )
    return train_examples, test_examples

def _split_into_plays(shakespeare_full):
    """
    Splits the full Shakespeare text into individual plays and characters' dialogues.
    Handles special parsing for "The Comedy of Errors".
    """
    plays = []
    slines = shakespeare_full.splitlines(True)[1:]  # Skip the first line (title/header)
    current_character = None
    comedy_of_errors = False

    for i, line in enumerate(slines):
        # Detect play titles and initialize character dictionary
        if "by William Shakespeare" in line:
            current_character = None
            characters = defaultdict(list)
            title = slines[i - 2].strip() if slines[i - 2].strip() else slines[i - 3].strip()
            comedy_of_errors = title == "THE COMEDY OF ERRORS"
            plays.append((title, characters))
            continue

        # Match character lines or continuation lines
        match = _match_character_regex(line, comedy_of_errors)
        if match:
            character, snippet = match.group(1).upper(), match.group(2)
            if not (comedy_of_errors and character.startswith("ACT ")):
                characters[character].append(snippet)
                current_character = character
        elif current_character:
            match = _match_continuation_regex(line, comedy_of_errors)
            if match:
                characters[current_character].append(match.group(1))

    # Filter out plays with insufficient dialogue data
    return [play for play in plays if len(play[1]) > 1], []

def _match_character_regex(line, comedy_of_errors=False):
    """Matches character dialogues, with special handling for 'The Comedy of Errors'."""
    return COE_CHARACTER_RE.match(line) if comedy_of_errors else CHARACTER_RE.match(line)

def _match_continuation_regex(line, comedy_of_errors=False):
    """Matches continuation lines of dialogues."""
    return COE_CONT_RE.match(line) if comedy_of_errors else CONT_RE.match(line)

def _get_train_test_by_character(plays, test_fraction=0.2):
    """
    Splits dialogues by characters into training and testing datasets.
    Ensures each character has at least one example in the training set.
    """
    all_train_examples = defaultdict(list)
    all_test_examples = defaultdict(list)

    def add_examples(example_dict, example_tuple_list):
        """Adds examples to the respective dataset dictionary."""
        for play, character, sound_bite in example_tuple_list:
            example_dict[f"{play}_{character}".replace(" ", "_")].append(sound_bite)

    for play, characters in plays:
        for character, sound_bites in characters.items():
            examples = [(play, character, sound_bite) for sound_bite in sound_bites]
            if len(examples) <= 2:
                continue

            # Calculate the number of test samples
            num_test = max(1, int(len(examples) * test_fraction))
            num_test = min(num_test, len(examples) - 1)  # Ensure at least one training example

            # Split into train and test sets
            train_examples = examples[:-num_test]
            test_examples = examples[-num_test:]

            add_examples(all_train_examples, train_examples)
            add_examples(all_test_examples, test_examples)

    return {}, all_train_examples, all_test_examples




# Text Processing
Functions to convert characters and words into numerical representations
for use in neural networks. Includes padding logic for batch processing.

In [None]:
def letter_to_vec(c, n_vocab=128):
    """Converts a single character to a vector index based on the vocabulary size."""
    return ord(c) % n_vocab

def word_to_indices(word, n_vocab=128):
    """
    Converts a word or list of words into a list of indices.
    Each character is mapped to an index based on the vocabulary size.
    """
    if isinstance(word, list):  # If input is a list of words
        res = []
        for stringa in word:
            res.extend([ord(c) % n_vocab for c in stringa])  # Convert each word to indices
        return res
    else:  # If input is a single word
        return [ord(c) % n_vocab for c in word]

def process_x(raw_x_batch, seq_len, n_vocab):
    """
    Processes raw input data into padded sequences of indices.
    Ensures all sequences are of uniform length.
    """
    x_batch = [word_to_indices(word, n_vocab) for word in raw_x_batch]
    x_batch = [x[:seq_len] + [0] * (seq_len - len(x)) for x in x_batch]
    return torch.tensor(x_batch, dtype=torch.long)

def process_y(raw_y_batch, seq_len, n_vocab):
    """
    Processes raw target data into padded sequences of indices.
    Similar to process_x but for target labels.
    """
    y_batch = [word_to_indices(word, n_vocab) for word in raw_y_batch]
    y_batch = [y[:seq_len] + [0] * (seq_len - len(y)) for y in y_batch]
    return torch.tensor(y_batch, dtype=torch.long)

def create_batches(data, batch_size, seq_len, n_vocab):
    """
    Creates batches of input and target data from dialogues.
    Each batch contains sequences of uniform length.
    """
    x_batches = []
    y_batches = []
    dialogues = list(data.values())
    random.shuffle(dialogues)  # Shuffle to ensure randomness in batches

    batch = []
    for dialogue in dialogues:
        batch.append(dialogue)
        if len(batch) == batch_size:
            x_batch = process_x(batch, seq_len, n_vocab)
            y_batch = process_y(batch, seq_len, n_vocab)
            x_batches.append(x_batch)
            y_batches.append(y_batch)
            batch = []

    # Add the last batch if it's not full
    if batch:
        x_batch = process_x(batch, seq_len, n_vocab)
        y_batch = process_y(batch, seq_len, n_vocab)
        x_batches.append(x_batch)
        y_batches.append(y_batch)

    return x_batches, y_batches



# Data Loader
This code defines a custom PyTorch Dataset class for processing Shakespeare dialogues.
The dataset converts raw text data into sequences of indices for training a character-level model.


In [None]:
from torch.utils.data import Dataset, DataLoader

class ShakespeareDataset(Dataset):
    """
    Custom PyTorch Dataset for processing Shakespeare dialogues.
    Converts input data into sequences of indices for input and target processing.
    """
    def __init__(self, data, seq_len, n_vocab):
        """
        Initializes the ShakespeareDataset instance.

        Args:
            data: Dictionary containing dialogues (e.g., train_data or test_data).
            seq_len: Length of sequences to generate for the model.
            n_vocab: Size of the vocabulary for mapping characters to indices.
        """
        self.data = list(data.values())  # Convert the dictionary values to a list
        self.seq_len = seq_len  # Sequence length for the model
        self.n_vocab = n_vocab  # Vocabulary size

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        Returns:
            int: Number of dialogues in the dataset.
        """
        return len(self.data)

    def __getitem__(self, idx):
        """
        Retrieves a single sample (input and target) from the dataset.

        Args:
            idx: Index of the sample to retrieve.

        Returns:
            Tuple: Processed input (x) and target (y) tensors for the model.
        """
        dialogue = self.data[idx]  # Get the dialogue at the specified index
        x = process_x([dialogue], self.seq_len, self.n_vocab)[0]  # Prepare input tensor
        y = process_y([dialogue], self.seq_len, self.n_vocab)[0]  # Prepare target tensor
        return x, y

# Model Definition
CharLSTM is a character-based LSTM model designed for text generation.
It includes embedding, LSTM layers, and a fully connected layer for output.

In [None]:
import torch.nn.functional as F

class CharLSTM(nn.Module):
    """
    Character-level LSTM model for text processing tasks.
    Includes embedding, LSTM, and a fully connected output layer.
    """
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, output_size):
        super(CharLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden):
        """
        Forward pass through the model.
        Outputs log probabilities for each character.
        """
        embedded = self.embedding(x)
        output, hidden = self.lstm(embedded, hidden)
        output = self.fc(output)  # Fully connected layer for output
        output = F.log_softmax(output, dim=-1)  # Log probabilities
        return output, hidden

    def init_hidden(self, batch_size):
        """Initializes hidden and cell states for the LSTM."""
        return (torch.zeros(self.num_layers, batch_size, self.hidden_size),
                torch.zeros(self.num_layers, batch_size, self.hidden_size))


# Training Loop
Trains the CharLSTM model using the prepared data and batches.

In [None]:
from torch.utils.data import Dataset, DataLoader

if __name__ == "__main__":
    print("Parsing dataset...")
    train_data, test_data = parse_shakespeare_file(DATA_PATH)

    train_dataset = ShakespeareDataset(train_data, seq_len=SEQ_LEN, n_vocab=N_VOCAB)
    val_dataset = ShakespeareDataset(test_data, seq_len=SEQ_LEN, n_vocab=N_VOCAB)

    # Creazione dei DataLoader
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
    print("Preparing model...")
    model = CharLSTM(input_size=N_VOCAB, embedding_size=8, hidden_size=256, num_layers=2, output_size=N_VOCAB)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.NLLLoss()

    print("Training model...")
    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0

        # Create batches for the current epoch
        x_batches, y_batches = create_batches(train_data, BATCH_SIZE, SEQ_LEN, N_VOCAB)

        for x_batch, y_batch in train_loader:
            state = model.init_hidden(x_batch.size(0))  # Initialize hidden state for the batch
            optimizer.zero_grad()

            logits, _ = model(x_batch, state)  # Forward pass
            loss = criterion(logits, y_batch)  # Calculate loss
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {total_loss / len(x_batches):.4f}")

Parsing dataset...
Preparing model...
Training model...
Epoch 1/200, Loss: 4.8474
Epoch 2/200, Loss: 4.8291
Epoch 3/200, Loss: 4.8229
Epoch 4/200, Loss: 4.8197
Epoch 5/200, Loss: 4.8150
Epoch 6/200, Loss: 4.8101
Epoch 7/200, Loss: 4.8024
Epoch 8/200, Loss: 4.7895
Epoch 9/200, Loss: 4.7711
Epoch 10/200, Loss: 4.7407
Epoch 11/200, Loss: 4.6985
Epoch 12/200, Loss: 4.6426
Epoch 13/200, Loss: 4.5835
Epoch 14/200, Loss: 4.5250
Epoch 15/200, Loss: 4.4620
Epoch 16/200, Loss: 4.4000
Epoch 17/200, Loss: 4.3419
Epoch 18/200, Loss: 4.2886
Epoch 19/200, Loss: 4.2385
Epoch 20/200, Loss: 4.1811
Epoch 21/200, Loss: 4.1306
Epoch 22/200, Loss: 4.0876
Epoch 23/200, Loss: 4.0406
Epoch 24/200, Loss: 4.0028
Epoch 25/200, Loss: 3.9697
Epoch 26/200, Loss: 3.9266
Epoch 27/200, Loss: 3.8821
Epoch 28/200, Loss: 3.8448
Epoch 29/200, Loss: 3.8133
Epoch 30/200, Loss: 3.7763
Epoch 31/200, Loss: 3.7468
Epoch 32/200, Loss: 3.7186
Epoch 33/200, Loss: 3.6944
Epoch 34/200, Loss: 3.6708
Epoch 35/200, Loss: 3.6359
Epoch 36