# Demo 3: Beatles lyrics

In this demo we will train a decoder-only transformer to generate lyrics in the style of The Beatles.
We will do this by training it on a corpus consisting of Beatles lyrics.

In [2]:
import os
import sys
import re
import torch
from torch.utils.data import Dataset, DataLoader

# set to true if working in google colab
colab = False

if colab:
    from google.colab import drive
    drive.mount('/content/drive')
    root_dir = '/content/drive/MyDrive/Projects/Transformer'
    if root_dir not in sys.path:
        sys.path.append(root_dir)

from transformer import layers

if torch.cuda.is_available():
    device = torch.device('cuda')
    print('CUDA is available. Using GPU.')
else:
    device = torch.device('cpu')
    print('CUDA not available. Using CPU.')

CUDA not available. Using CPU.


Let's start by loading the data and tokenizing it. 
We will tokenize by splitting the corpus at word boundaries. 

First we create a `Vocab` class which initializes a vocabulary using the specified corpus, and which contains three methods:
- `tokens`: maps a string to a list of integer tokens
- `words`: maps a list of integer tokens to the corresponding string
- `tokenize_from_file`: tokenizes a text file

In [3]:
filepath = os.path.join('data/beatles.txt')
if colab:
    filepath = os.path.join(root_dir, filepath)

# special tokens
sos = -1   # start of sequence
eos = -2   # end of sequence
unk = -3   # unknown subword
pad = -100 # padding

class Vocab():
    def __init__(self, filepath, pattern=r'(?<=\w)(?=\W)|(?<=\W)(?=\w)'):
        self.pattern = pattern
        self.words_to_tokens = {}
        self.tokens_to_words = {}
        with open(filepath, mode='r') as file:
            token = 0 # next token to be assigned
            for line in file:
                words = re.split(pattern, line, flags=re.IGNORECASE)
                for word in words:
                    if word not in self.words_to_tokens:
                        self.words_to_tokens[word] = token
                        self.tokens_to_words[token] = word
                        token += 1

    def __len__(self):
        return len(self.words_to_tokens)

    def tokens(self, str: str) -> list[int]:
        word_lst = re.split(self.pattern, str, flags=re.IGNORECASE)
        tok_lst = []
        for word in word_lst:
            if word in self.words_to_tokens:
                tok_lst.append(self.words_to_tokens[word])
            else:
                tok_lst.append(unk)

        return tok_lst

    def words(self, tokens: list[int]) -> str:
        word_lst = []
        for token in tokens:
            if token in self.tokens_to_words:
                word_lst.append(self.tokens_to_words[token])
            else:
                raise ValueError(f'{token} not a valid token')

        return ''.join(word_lst)

    def tokenize_from_file(self, filepath: str) -> list[int]:
        tokens = []
        with open(filepath, mode='r') as file:
            for line in file:
                tokens.extend(self.tokens(line))

        return tokens

vocab = Vocab(filepath)

We can now use the vocab to tokenize the corpus:

In [4]:
corpus = torch.tensor(vocab.tokenize_from_file(filepath),
                      dtype=torch.long, device=device)

We define a class `TokenizedDataset` which sections off a corpus into fixed size training examples. 
Each training example is a pair (input, target) where input is the same token sequence as target, but right-shifted by one position to accomodate the start-of-sequence token. 

In [5]:
class TokenizedDataset(Dataset):
    def __init__(self, corpus: torch.LongTensor, context_size: int):
        assert context_size <= len(corpus)
        super().__init__()

        self.corpus = corpus
        self.context_size = context_size

    def __len__(self):
        return len(self.corpus) - self.context_size + 1

    def __getitem__(self, idx):
        assert idx >= 0 and idx < len(self), f'index {idx} out of range'

        context = self.corpus[idx:idx + self.context_size]
        input = torch.cat([torch.tensor([sos], device=device), context])
        target = torch.cat([context, torch.tensor([pad], device=device)])

        return (input, target)

Let's split our corpus so that 90% goes into the training set and 10% into the test set.

In [6]:
context_size = 32
batch_size = 32

train_set = TokenizedDataset(corpus[:round(0.9 * len(corpus))], context_size)
test_set = TokenizedDataset(corpus[round(0.9 * len(corpus)):], context_size)

train_loader = DataLoader(train_set, batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size)

Now that we have the data, let's construct the decoder-only transformer. It will consist of the following layers:
- embedding and positional encoding
- decoder stack
- linear (de-embedding) layer
- softmax

Each decoder in the decoder stack consists of a masked multihead attention layer followed by a position-wise fully connected two-layer feed forward neural network.

In [7]:
model = layers.Transformer(
    vocab=len(vocab),
    d_model=128,
    num_heads=4,
    num_stacks=4,
    d_ff=256,
    dropout=0.0
).to(device)

Now we define the testing and training loops. We will use a cross entropy loss function and an Adam optimizer.

In [8]:
cost_fn = layers.CrossEntropyLoss(ignore_index=pad, swap_dims=True)
optim = torch.optim.Adam(model.parameters(), lr=0.001)

def test(model):
    model.eval()
    with torch.no_grad():
        total_loss = 0
        for input, target in test_loader:
            output = model(input)
            total_loss += cost_fn(output, target)

    model.train()

    num_tokens = len(test_loader.dataset.corpus)

    # return average loss per token
    return total_loss / num_tokens

def train(model):
    print_period = len(train_loader) // 100

    for batch, (input, target) in enumerate(train_loader):
        
        # output
        output = model(input)

        # loss
        loss = cost_fn(output, target)

        # backprop
        optim.zero_grad()
        loss.backward()

        # step
        optim.step()

        # print progress
        if batch % print_period == 0 or batch == len(train_loader) - 1:
            test_loss = test(model)
            print(f'Batch {batch:3d}. Test loss = {test_loss:5.3}')

train(model)

Batch   0. Test loss = 0.232


KeyboardInterrupt: 