# Poetry Notebook

In this notebook we will be implementing GPT to generate text based on the work of Edgar Allan Poe.

In [54]:
# Installing dependencies
!pip install torch

# Downloading dataset from the GitHub
!wget https://raw.githubusercontent.com/kocenko/Poetry-Synthesis/dev/data/poe_data.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
--2023-05-16 17:51:23--  https://raw.githubusercontent.com/kocenko/Poetry-Synthesis/dev/data/poe_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1930488 (1.8M) [text/plain]
Saving to: ‘poe_data.txt.1’


2023-05-16 17:51:23 (249 MB/s) - ‘poe_data.txt.1’ saved [1930488/1930488]



In [55]:
# Essential imports
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

In [56]:
# Testing if GPU is available
if torch.cuda.is_available():
  device = "cuda"
else:
  device = "cpu"

In [57]:
# Dataset class definition
### (Option) We can use different data to train it on
### (Option) What if the context affects not the following
###          but the one after the following token? (bigger offset)

class PoeDataset(Dataset):
    valid_split_params = ["train", "valid"]

    def __init__(self, text: str, split: str, split_ratio: float, context_length: int, tokenizer, offset: int = 1):
        ''' Poe Dataset constructor

        Args:
            str:
                file_path: Path to the file containing dataset
                splt: String indicating what type of data this dataset contains
            float:
                split_ratio: Value between (0, 1] of what should be the ratio
                             between training and validation set
            int:
                context_length: Length of the context
                offset: An offset between the end of the context and the target
        '''

        assert split in PoeDataset.valid_split_params, f"{split} is the wrong split type"
        assert split_ratio <= 1 and split_ratio > 0, f"Split ratio value should be from range (0, 1]"
        assert len(text) > 0, f"Dataset file should not be empty"
        assert context_length < len(text), f"Context length should not be more than {len(text) - 1}"

        self.text = text
        self.offset = offset
        self.context_length = context_length
        self.tokenizer = tokenizer
        self.data = torch.tensor(self.tokenizer.encode(self.text), dtype=torch.int32, device=device)

        split_idx = int(len(self.data) * split_ratio)
        if split == "train":
            self.data = self.data[:split_idx]
        else:
            self.data = self.data[split_idx:]

    def __len__(self):
        ''' Returns the size of the dataset
        
        Returns:
            Number of possible shifts in the dataset for choosing the context chunk
        '''
        return len(self.data) - self.context_length - self.context_length + 1
    
    def __getitem__(self, index):
        ''' Returns an item of given index

        Params:
            index: Which item should be returned
        
        Returns:
            Sample of given index
        '''
        
        x = self.data[index: index + self.context_length]
        y = self.data[index + self.offset: index + self.context_length + self.offset]

        return x, y



In [58]:
# Defined tokenizer class
import torch
from typing import List


class Tokenizer:
    ''' Class for character-wise tokenization'''

    def __init__(self, text: str):
        assert len(text) > 0, "Text used for creating tokenizer cannot be empty"

        self.text = text
        self.symbols = sorted(list(set(self.text)))
        self.vocab_size = len(self.symbols)
        self.stoi = { ch:i for i, ch in enumerate(self.symbols)}
        self.itos = { i:ch for i, ch in enumerate(self.symbols)}

    def encode(self, text: str) -> List:
        ''' Encodes string to list of ints '''

        return [self.stoi[ch] for ch in text]
    
    def decode(self, tokens: List) -> str:
        ''' Decodes list of ints to string '''
        
        return ''.join([self.itos[token] for token in tokens])


In [59]:
# Simple Decoder Class definition
### (Option) Different split, test data?
from typing import Tuple
import torch.nn.functional as F


class OnlyDecoder(nn.Module):
    def __init__(self, config: dict):
        super().__init__()

        self.vocab_size = config["vocab_size"]
        self.embedding_table = nn.Embedding(self.vocab_size, self.vocab_size, device=device)

    def forward(self, token_idx: int, targets=None):
        logits = self.embedding_table(token_idx)

        if targets is None:
          loss = None
        else:
          B, T, C = logits.shape
          logits = logits.view(B*T, C)
          targets = targets.view(B*T)
          targets = targets.type(torch.LongTensor).to(device)

          loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate_new_text(self, idx, sym_limit: int) -> torch.Tensor:
        for _ in range(sym_limit):
          logits, loss = self(idx)
          logits = logits[:, -1, :]
          probabilities = F.softmax(logits, dim=-1)
          idx_next = torch.multinomial(probabilities, num_samples=1) # Take best
          idx = torch.cat((idx, idx_next), dim=1)
        return idx


In [66]:
import torch


@torch.no_grad()
def calc_loss(model, iterations, batch_size, train_set, val_set):
    ''' Used to evalute model by averaging on many iterations
    
    Args:
        model: Evaluated model
        iterations: Number of iterations to average through
        batch_size: Batch size
        train_set: Training dataset
        val_set: Validation dataset

    Returns:
        Dictionary with averaged losses for 'train' nad 'valid'
    '''

    split_type = ["train", "valid"]
    outcome_losses = {}
    model.eval()
    for t, split in enumerate([train_set, val_set]):
        loader = DataLoader(split, batch_size = batch_size, shuffle=True, drop_last=True)
        loader = iter(loader)
        losses = torch.zeros(iterations)
        for i in range(iterations):
            x, y = loader.__next__()
            _, loss = model(x, y)
            losses[i] = loss.item()
        outcome_losses[split_type[t]] = losses.mean()
    model.train()
    return outcome_losses


def train_model(model, train_set, valid_set, hyper_params: dict):
    ''' Trains the model

    Args:
        model: Model to train
        train_set: Training dataset
        valid_set: Validation dataset
        hyper_params: dict of hyperparameters
    '''

    lr = hyper_params["lr"]
    epochs = hyper_params["epochs"]
    batch_size = hyper_params["batch_size"]
    eval_each = hyper_params["eval_each"]
    eval_iterations = hyper_params["eval_iterations"]
    break_iter = hyper_params["break_iter"]

    train_dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True, drop_last=True)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

    for e in range(epochs):
        for i, (x, y) in enumerate(train_dataloader):
            if i == break_iter:
                break

            if i % eval_each == 0:
                losses = calc_loss(model, eval_iterations, batch_size, train_set, valid_set)
                print(f"Epoch: {e} Step: {i}, train loss: {losses['train']:.4f}, val loss: {losses['valid']:.4f}")
            
            logits, loss = model(x, y)
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            optimizer.step()


In [67]:
# Setting up the dataset parameters
### (Option 1) We can use different tokenizer, like SentencePiece
### (Option 2) We can build our own tokenizer, using huggingface library

file_path = "poe_data.txt"

# Reading file, preparing tokenizer
with open(file_path, 'r', encoding="utf-8") as f:
            text = f.read()

# Setting up dataset
split_ratio = 0.85
context_length = 8
offset = 1  # I am wondering what would be the results for 2, for example

tokenizer = Tokenizer(text)

# Setting up model
net_config = {
    "vocab_size": tokenizer.vocab_size
}
symbols_limit = 50
model = OnlyDecoder(net_config)

# Training parameters
hypers = {
    "lr": .3e-4,
    "epochs": 1,
    "batch_size": 32,
    "eval_each": 200,
    "eval_iterations": 200,
    "break_iter": 1000
}

# Training
train_set = PoeDataset(text, 'train', split_ratio, context_length, tokenizer, offset=offset)
val_set = PoeDataset(text, 'valid', split_ratio, context_length, tokenizer, offset=offset)
train_model(model, train_set, val_set, hypers)

# Test it
starter = torch.zeros((1,1), dtype=torch.long, device=device)
print(tokenizer.decode(model.generate_new_text(starter, 20)[0].tolist()))

Epoch: 0 Step: 0, train loss: 5.1475, val loss: 5.1490
Epoch: 0 Step: 200, train loss: 5.1323, val loss: 5.1439
Epoch: 0 Step: 400, train loss: 5.1243, val loss: 5.1281
Epoch: 0 Step: 600, train loss: 5.1216, val loss: 5.1188
Epoch: 0 Step: 800, train loss: 5.1122, val loss: 5.1111

“;äzLç[?5g‘—twfhv2 t
