In [110]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import torch
from torch import nn
import lightning as L
from torch.utils.data import DataLoader, Dataset
from lightning import LightningModule, Trainer
from lightning.pytorch.callbacks import ModelCheckpoint
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
from torchmetrics.functional import accuracy
import random
import re


%load_ext autoreload
%autoreload 2
%load_ext rich

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
The rich extension is already loaded. To reload it, use:
  %reload_ext rich


# Prepare Data

## Preprocess data

### Character-level model

In [112]:
class ShakespeareDataset(Dataset):
    def __init__(self, text, seq_length=100):
        self.chars = sorted(list(set(text)))
        self.char_to_int = {ch: i for i, ch in enumerate(self.chars)}
        self.int_to_char = {i: ch for i, ch in enumerate(self.chars)}
        self.data_size, self.vocab_size = len(text), len(self.chars)
        self.seq_length = seq_length
        # self.embedding_dim = L.Embedding(num_embeddings=self.vocab_size, embedding_dim=256)

        # Create training data
        self.x = []
        self.y = []
        for i in range(0, self.data_size - seq_length, 1):
            seq_in = text[i:i + seq_length]
            seq_out = text[i + seq_length]
            self.x.append([self.char_to_int[char] for char in seq_in])
            self.y.append(self.char_to_int[seq_out])
        self.x = np.array(self.x)
        self.y = np.array(self.y)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        return torch.tensor(self.x[index], dtype=torch.long), torch.tensor(self.y[index], dtype=torch.long)

### Word-level model

In [86]:
class ShakespeareDataset(Dataset):
    def __init__(self, text, seq_length=20):
        # Preprocess text to handle special characters
        # text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
        words = text.split()
        self.data_size, self.vocab_size = len(words), len(set(words))
        self.words = sorted(list(set(words)))[:self.vocab_size-1] + ['<UNK>']
        self.word_to_int = {w: i for i, w in enumerate(self.words)}
        self.int_to_word = {i: w for i, w in enumerate(self.words)}
        self.seq_length = seq_length

        self.x = []
        self.y = []
        for i in range(0, self.data_size - seq_length, 1):
            seq_in = words[i:i + seq_length]
            seq_out = words[i + seq_length]
            self.x.append([self.word_to_int.get(word, self.vocab_size-1) for word in seq_in])
            self.y.append(self.word_to_int.get(seq_out, self.vocab_size-1))
        self.x = np.array(self.x)
        self.y = np.array(self.y)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        return torch.tensor(self.x[index], dtype=torch.long), torch.tensor(self.y[index], dtype=torch.long)

## Load dataset

In [113]:
def load_data(file_path, seq_length=100, train_split=0.7, valid_split=0.15):
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    dataset = ShakespeareDataset(text, seq_length)
    train_size = int(len(dataset) * train_split)
    valid_size = int(len(dataset) * valid_split)
    test_size = len(dataset) - train_size - valid_size
    train_dataset, valid_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, valid_size, test_size])
    return dataset, train_dataset, valid_dataset, test_dataset

# Model Definition

In [116]:
class TextGenerator(L.LightningModule):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, layers, dropout=0.2):
        super().__init__()
        self.save_hyperparameters()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm(x)
        x = self.fc(x[:, -1, :])
        return x

    def training_step(self, batch):
        x, y = batch
        y_hat = self.forward(x)
        loss = nn.functional.cross_entropy(y_hat, y)
        self.log('train_loss', loss, logger=True, on_step=False, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch):
        x, y = batch
        y_hat = self.forward(x)
        loss = nn.functional.cross_entropy(y_hat, y)
        self.log('val_loss', loss, logger=True, on_step=False, on_epoch=True, prog_bar=True)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

In [117]:
# Load your datasets
dataset, train_dataset, valid_dataset, test_dataset = load_data('input.txt', seq_length=100)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [118]:
# Create the model
model = TextGenerator(len(train_dataset.dataset.chars), embedding_dim=128, hidden_dim=256, layers=2)

# Setup trainer and fit the model using Lightning's Trainer
trainer = L.Trainer(max_epochs=10, callbacks=[ModelCheckpoint(monitor="val_loss"), EarlyStopping(monitor="val_loss")])
trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=valid_loader)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name      | Type      | Params
----------------------------------------
0 | embedding | Embedding | 8.3 K 
1 | lstm      | LSTM      | 921 K 
2 | fc        | Linear    | 16.7 K
----------------------------------------
946 K     Trainable params
0         Non-trainable params
946 K     Total params
3.786     Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/Users/yyy/Library/Python/3.9/lib/python/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.
/Users/yyy/Library/Python/3.9/lib/python/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

/Users/yyy/Library/Python/3.9/lib/python/site-packages/lightning/pytorch/trainer/call.py:54: Detected KeyboardInterrupt, attempting graceful shutdown...


In [74]:
torch.save(model, 'model11_10_epoch_1_layer_word.pth')

In [104]:
def generate_text(model, start_str, gen_length=100, temperature=1.0):
    model.eval()
    input_seq = [dataset.char_to_int[c] for c in start_str[-dataset.seq_length:]]
    input_seq = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(model.device)

    text = start_str
    hidden = None

    for _ in range(gen_length):
        output, hidden = model.lstm(model.embedding(input_seq), hidden)
        output_logits = model.fc(output[:, -1, :])
        p = torch.nn.functional.softmax(output_logits / temperature, dim=-1).detach().cpu().numpy().squeeze()
        char_ind = np.random.choice(len(dataset.chars), p=p)
        next_char = dataset.int_to_char[char_ind]
        text += next_char

        input_seq = torch.cat((input_seq[:, 1:], torch.tensor([[char_ind]], dtype=torch.long).to(model.device)), dim=1)

    return text

    
print(generate_text(model, 'ROMEO:', gen_length=1000, temperature=1.0))

ROMEO: now bless me stand to be to any of you but we are undone to hate it is a very man Aumerle the very name of his hands and all the Duke of Norfolk to the people and give his last to be a best than my heart Are he hath found us out of your death And look upon him and if the world be the duke will have some men to be a world As she is a kind of late And weakling I am no more of my fathers lord and a noble friend To Lord of my head to the king I will not be ruled with thee And that I did not call thee more than the king and fearful of a means And in the devil of the world I know twas to the king And make thee say I do not say I take thee for thy hand I am too long till thou art a guest of this intent That is your name of Edwards age Now hath I neer heard me to the king and I will not say by my life and my joy of this land And if I be known with you sir I pray you and to the world of it I am the king of mine I will go along and I will take my leave and I shall have no more than you ha