In [2]:
import matplotlib.pyplot as plt
import numpy as np
import platform
import time
import pathlib
import os

import torch

torch.set_float32_matmul_precision('high')

# see if CUDA is available
if torch.cuda.is_available():
    print("CUDA is available")

# see if cuDNN is available
if torch.backends.cudnn.enabled:
    print("cuDNN is available")

# set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device is set to:", device)

CUDA is available
cuDNN is available
Device is set to: cuda


In [3]:
import urllib.request

# Define the location and file name for the dataset
dataset_file_name = 'shakespeare.txt'
dataset_file_origin = 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt'
dataset_dir = './data'
dataset_file_path = os.path.join(dataset_dir, dataset_file_name)

# Ensure the directory exists
os.makedirs(dataset_dir, exist_ok=True)

# Download the file if it doesn't exist
if not os.path.isfile(dataset_file_path):
    urllib.request.urlretrieve(dataset_file_origin, dataset_file_path)
    print(f"Downloaded: {dataset_file_path}")
else:
    print(f"Dataset already exists at: {dataset_file_path}")

Dataset already exists at: ./data\shakespeare.txt


In [4]:
text = open(dataset_file_path, mode='r').read()

print('Length of text: {} characters'.format(len(text)))

print(text[:250])

Length of text: 1115394 characters
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.



## Processing

In [5]:
vocab = sorted(set(text))

print('{} unique characters'.format(len(vocab)))
print('vocab:', vocab)

65 unique characters
vocab: ['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


In [6]:
# Map characters to their indices in vocabulary.
char2index = {char: index for index, char in enumerate(vocab)}

print('{')
for char, _ in zip(char2index, range(20)):
    print('  {:4s}: {:3d},'.format(repr(char), char2index[char]))
print('  ...\n}')

{
  '\n':   0,
  ' ' :   1,
  '!' :   2,
  '$' :   3,
  '&' :   4,
  "'" :   5,
  ',' :   6,
  '-' :   7,
  '.' :   8,
  '3' :   9,
  ':' :  10,
  ';' :  11,
  '?' :  12,
  'A' :  13,
  'B' :  14,
  'C' :  15,
  'D' :  16,
  'E' :  17,
  'F' :  18,
  'G' :  19,
  ...
}


In [7]:
# Map character indices to characters from vacabulary.
index2char = np.array(vocab)
print(index2char)

['\n' ' ' '!' '$' '&' "'" ',' '-' '.' '3' ':' ';' '?' 'A' 'B' 'C' 'D' 'E'
 'F' 'G' 'H' 'I' 'J' 'K' 'L' 'M' 'N' 'O' 'P' 'Q' 'R' 'S' 'T' 'U' 'V' 'W'
 'X' 'Y' 'Z' 'a' 'b' 'c' 'd' 'e' 'f' 'g' 'h' 'i' 'j' 'k' 'l' 'm' 'n' 'o'
 'p' 'q' 'r' 's' 't' 'u' 'v' 'w' 'x' 'y' 'z']


In [8]:
# Convert chars in text to indices.
text_as_int = np.array([char2index[char] for char in text])

print('text_as_int length: {}'.format(len(text_as_int)))
print('{} --> {}'.format(repr(text[:15]), repr(text_as_int[:15])))

text_as_int length: 1115394
'First Citizen:\n' --> array([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0])


## Creating sequences

In [9]:
from torch.utils.data import Dataset, DataLoader
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
import tensorboard
import pytorch_lightning as pl
from torch import nn

# Create the Dataset
class ShakespeareDataset(Dataset):
    def __init__(self, text, sequence_length):
        self.text = text
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.text) - self.sequence_length

    def __getitem__(self, index):
        input_seq = torch.tensor(self.text[index:index+self.sequence_length], device=device, dtype=torch.long)
        target_seq = torch.tensor(self.text[index+1:index+self.sequence_length+1], device=device, dtype=torch.long)
        return (input_seq, target_seq)

sequence_length = 100 
dataset = ShakespeareDataset(text_as_int, sequence_length)

# DataLoader for handling batching
class DataModule(pl.LightningDataModule):
    def __init__(self, dataset, batch_size=64):
        super().__init__()
        self.dataset = dataset
        self.batch_size = batch_size

    def train_dataloader(self):
        return DataLoader(self.dataset, batch_size=self.batch_size, shuffle=True, drop_last=True, num_workers=0)

# Instantiate the data module
data_module = DataModule(dataset)

# You can access the DataLoader like this:
train_loader = data_module.train_dataloader()

# To check the output
for batch in train_loader:
    input_text, target_text = batch
    print('Input:', input_text)
    print('Target:', target_text)
    break  # Only print the first batch to check


Input: tensor([[43, 56, 10,  ...,  6,  1, 41],
        [49,  6,  1,  ..., 58,  1, 58],
        [ 1, 46, 43,  ..., 43, 10,  0],
        ...,
        [ 0,  5, 32,  ..., 23, 17,  1],
        [ 1, 39, 52,  ..., 35, 46, 39],
        [57, 57, 43,  ..., 52, 42, 50]], device='cuda:0')
Target: tensor([[56, 10,  1,  ...,  1, 41, 53],
        [ 6,  1, 47,  ...,  1, 58, 46],
        [46, 43, 56,  ..., 10,  0, 32],
        ...,
        [ 5, 32, 47,  ..., 17,  1, 27],
        [39, 52, 42,  ..., 46, 39, 58],
        [57, 43, 42,  ..., 42, 50, 43]], device='cuda:0')


## Model Building

In [10]:
class TextGenerationLSTM(pl.LightningModule):
    def __init__(self, vocab_size, embedding_dim, rnn_units, batch_size):
        super(TextGenerationLSTM, self).__init__()
        self.batch_size = batch_size
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=rnn_units, batch_first=True)
        self.fc = nn.Linear(rnn_units, vocab_size)
        self.hidden = None

    def forward(self, x):
        x = self.embedding(x)
        x, self.hidden = self.lstm(x, self.hidden)
        x = self.fc(x)
        return x
    
    def reset_hidden_state(self):
        self.hidden = None

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        self.reset_hidden_state()
        loss = nn.functional.cross_entropy(y_hat.transpose(1, 2), y)
        self.log('train_loss', loss)
        return {'loss': loss}

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.0001)

## Model Training

In [11]:
# Instantiate the model
model = TextGenerationLSTM(vocab_size=len(vocab), embedding_dim=256, rnn_units=1024, batch_size=64)

# Define checkpoint callback
checkpoint_callback = ModelCheckpoint(
    monitor='train_loss',
    dirpath='checkpoints/',
    filename='model-{epoch:02d}-{train_loss:.2f}',
    save_top_k=3,
    mode='min',
)

logger = TensorBoardLogger("tb_logs", name="text_generation_lstm")

# Instantiate the PyTorch Lightning trainer
trainer = pl.Trainer(
    max_epochs=20,
    devices=1, 
    accelerator='gpu',
    callbacks=[checkpoint_callback]
)

trainer.fit(model, data_module)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
c:\Users\Owner\Documents\MMA\Assignment 2 RNN\.venv\Lib\site-packages\pytorch_lightning\callbacks\model_checkpoint.py:653: Checkpoint directory C:\Users\Owner\Documents\MMA\Assignment 2 RNN\checkpoints exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type      | Params
----------------------------------------
0 | embedding | Embedding | 16.6 K
1 | lstm      | LSTM      | 5.3 M 
2 | fc        | Linear    | 66.6 K
----------------------------------------
5.3 M     Trainable params
0         Non-trainable params
5.3 M     Total params
21.337    Total estimated model params size (MB)
c:\Users\Owner\Documents\MMA\Assignment 2 RNN\.venv\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing

Epoch 19: 100%|██████████| 17426/17426 [06:33<00:00, 44.28it/s, v_num=19]

`Trainer.fit` stopped: `max_epochs=20` reached.


Epoch 19: 100%|██████████| 17426/17426 [06:34<00:00, 44.15it/s, v_num=19]


## 

## Text Generation

In [12]:
best_model_path = checkpoint_callback.best_model_path
print("Best model path:", best_model_path)

# Restore the best checkpoint
model = TextGenerationLSTM.load_from_checkpoint(
    checkpoint_path=best_model_path,
    vocab_size=len(vocab),
    embedding_dim=256,
    rnn_units=1024,
    batch_size=64
)

# Print model summary
model

Best model path: C:\Users\Owner\Documents\MMA\Assignment 2 RNN\checkpoints\model-epoch=17-train_loss=0.15.ckpt


TextGenerationLSTM(
  (embedding): Embedding(65, 256)
  (lstm): LSTM(256, 1024, batch_first=True)
  (fc): Linear(in_features=1024, out_features=65, bias=True)
)

In [21]:
def generate_text(model, start_string, num_generate=1000, temperature=1.0):
    model.eval()
    input_indices = torch.tensor([char2index[s] for s in start_string]).unsqueeze(0).to(model.device)
    text_generated = []
    model.reset_hidden_state()

    with torch.no_grad():
        for i in range(num_generate):
            predictions = model(input_indices)
            predictions = predictions[:, -1, :] / temperature
            probabilities = nn.functional.softmax(predictions, dim=-1).squeeze().cpu().numpy()
            predicted_id = np.random.choice(len(probabilities), p=probabilities)

            input_indices = torch.tensor([[predicted_id]]).to(model.device)
            text_generated.append(index2char[predicted_id])

    return start_string + ''.join(text_generated)

# Generate text
print(generate_text(model, start_string="ROMEO: "))

# Generate text with higher temperature
print(generate_text(model, start_string="JULIET: ", temperature=1.5))

ROMEO: alas!

CLAUDIO:
Sweet said, and here and stands avoidings: you were contrary

POMPEY:
I beseech you, sir, let him go with gentle-skeeping part;
And here I take thee affection, rise and stop our further long,
Thou shouldst bear momentime that we have almost
Most great with wrongs in being known to give.

GLOUCESTER:
Look, how this ring encompasseth finger.
Even so thy breast encloseth my poor children.
If thou wilt outward forth from our service done:
My brother Angelo hath lost a honour'd none,
And yet no further than a wanton's bird;
Who lets it hop a little from her hand,
Like a poor prisoner in his twisted gyves,
And with a silk thread plucks it back again,
So loving-jealous of his liberty.

ROMEO:
I would I were thy bird.

JULIET:
Sweet, so would I:
Yet I should kill thee with much cherishing.
Good night, good night! parting is such
sweet land amiss to come hither: Thou
Must, both my adventure in your own pricy.

BAPTISTA:
You are welcome all.

PETRUCHIO:
She hath prevented 

In [24]:
# Other example

print(generate_text(model, start_string="KING"))

print(generate_text(model, start_string="What ", temperature=2))

KING EDWARD IV:
Now here a period of tumultuous broils.
Away with Oxford to Hames Castle straight:
For Somerset, off with his guilty hand:
Madam 'ay 'em, and much better blood I begin: I pare
Before thy bond shall be such severe past
cure of the thing you wot of, unless they kept very
good diet, as I told you,--

FROTH:
All this is true.

POMPEY:
Why, very well, then,--

ESCALUS:
Come, you are a tedious fool: to the purpose. What
was done to Elbow's wife, that he hath cause to
complain of? Come me to what was done to her.

POMPEY:
Sir, your honour cannot come to that yet.

ESCALUS:
No, sir, nor I mean it not.

POMPEY:
Sir, but you shall come again to Mantua.
And this shall free thee from this present shame;
If no inconstant toy, nor woman's flesh.

BAPTISTA:
It was your find this man in holy wedlock bands.

QUEEN MARGARET:
Yes, I agree, and thank he's ever.

LUCENTIO:
Faith, sir, if you had told as many lies in his
behalf as you have uttered words in your own, you
should not pass here;