# Assignment 3: Text processing with LSTM in PyTorch

*Author:* Thomas Adler

*Copyright statement:* This  material,  no  matter  whether  in  printed  or  electronic  form,  may  be  used  for  personal  and non-commercial educational use only.  Any reproduction of this manuscript, no matter whether as a whole or in parts, no matter whether in printed or in electronic form, requires explicit prior acceptance of the authors.

In this assignment you will a train an LSTM to generate text. To be able to feed text into (recurrent) neural networks we first have to choose a good representation. There are several options to do so ranging from simple character embeddings to more sophisticated approaches like [word embeddings](https://towardsdatascience.com/introduction-to-word-embedding-and-word2vec-652d0c2060fa) or [token embeddings](https://medium.com/@_init_/why-bert-has-3-embedding-layers-and-their-implementation-details-9c261108e28a). We will use a character embedding in this assignment. 

Character embeddings work as follows. First we define an alphabet, a set of characters that we want to be able to represent. To feed a character into our network we use a one-hot vector. The dimension of this vector is equal to the size of our alphabet and the "hot" position indicates the character we want to represent. While this is logically a decent representation (all characters have the same norm, are orthogonal to one another, etc.) it is inefficient in terms of memory because we have to store a lot of zeros. In the first layer of our network we will multiply our one-hot vector with a weight matrix, i.e. we compute the preactivation by a matrix-vector product of the form $We_i$, where $e_i$ is the $i$-th canonical basis vector. This operation corresponds to selecting the $i$-th column of $W$. So an efficient implementation is to perform a simple lookup operation in $W$. This is how embedding layers work also for word or token embeddings. They are learnable lookup tables. 

## Exercise 1: Encoding characters

Write a class `Encoder` that implements the methods `__init__` and `__call__`. The method `__init__` takes a string as argument that serves as alphabet. The method `__call__` takes one argument. If it is a string then it should return a sequence of integers as `torch.Tensor` of shape  representing the input string. Each integer should represents a character of the alphabet. The alphabet consists of the characters matched by the regex `[a-z0-9 .!?]`. If the input text contains characters that are not in the alphabet, then `__call__` should either remove them or map them to a corresponding character that belongs to the alphabet. If the argument is a `torch.Tensor`, then the method should return a string representation of the input, i.e. it should function as decoder. 

In [1]:
import re
import torch
from typing import Union

class Encoder():

    def __init__(self, alphabet:str):
        self.pattern = r'[a-z0-9 .!?]'
        if not re.match(self.pattern, alphabet):
            raise ValueError("Invalid alphabet")
        self.alphabet = alphabet
        self.lookup = {}
        for i, char in enumerate(self.alphabet):
            self.lookup[char] = i

    def __call__(self, x:Union[str, torch.Tensor]) -> Union[torch.Tensor, str]:
        if isinstance(x, str):
            x = x.lower()
            x = ''.join([char if char in self.alphabet else ' ' for char in x])
            return torch.tensor([self.lookup[char] for char in x], device='mps')
        elif isinstance(x, torch.Tensor):
            res = ''
            for i in x:
                i = int(i)
                if i in self.lookup.values():
                    res += self.alphabet[i]
            return res
        else:
            raise ValueError("Invalid input type")

## Exercise 2: Pytorch Dataset

Write a class `TextDataset` that derives from `torch.utlis.data.Dataset`. It should wrap a text file and utilize it for training with pytorch. Implement the methods `__init__`, `__len__`, `__getitem__`. The method `__init__` should take a path to a text file as string and an integer `l` specifying the length of one sample sequence. The method `__len__` takes no arguments and should return the size of the dataset, i.e. the number of sample sequences in the dataset. The method `__getitem__` should take an integer indexing a sample sequence and should return that sequence as a `torch.Tensor`. The input file can be viewed as one long sequence. The first sample sequence consists of the characters at positions `0..l-1` in the input file. The second sequence consists of the characters at positions `l..2*l-1` and so on. That is, the samples of our dataset are non-overlapping sequences. The last incomplete sequence may be dropped. 

In [2]:
import torch
from torch.utils.data import Dataset

########## YOUR SOLUTION HERE ##########
class TextDataset(Dataset):

    def __init__(self, path: str, l:int):
        super().__init__()
        self.path = path
        self.l = l
        self.enc_dec = Encoder('abcdefghijklmnopqrstuvwxyz0123456789 .!?')

        with open(self.path, 'r') as f:
            self.text = f.read()
        
        self.input = [self.text[i:i+self.l] for i in range(0, len(self.text), self.l)]
        self.target = [self.text[i+1:i+self.l+1] for i in range(0, len(self.text), self.l)]

        if len(self.input[-1]) < self.l:
            self.input.pop()
            self.target.pop()

    def __len__(self):
        return len(self.input)

    def __getitem__(self, idx:int):
        enc_input = self.enc_dec(self.input[idx])
        enc_target = self.enc_dec(self.target[idx])
        return enc_input, enc_target

## Exercise 3: The Model

Write a class `NextCharLSTM` that derives from `torch.nn.Module` and takes `alphabet_size`, the `embedding_dim`, and the `hidden_dim` as arguments. It should consist of a `torch.nn.Embedding` layer that maps the alphabet to embeddings, a `torch.nn.LSTM` that takes the embeddings as inputs and maps them to hidden states, and a `torch.nn.Linear` output layer that maps the hidden states of the LSTM back to the alphabet. Implement the methods `__init__` that sets up the module and `forward` that takes an input sequence and returns the logits (i.e. no activation function on the output layer) of the model prediction at every time step. 

In [3]:
import torch.nn as nn
import torch.nn.functional as F

########## YOUR SOLUTION HERE ##########

class NextCharLSTM(nn.Module):
    def __init__(self, alphabet_size:int, embedding_dim: int, hidden_dim: int):
        super().__init__()
        self.alphabet_size = alphabet_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding(self.alphabet_size, self.embedding_dim, device='mps')
        self.lstm = nn.LSTM(self.embedding_dim, self.hidden_dim, batch_first=True, device='mps')
        self.linear = nn.Linear(self.hidden_dim, self.alphabet_size, device='mps')


    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm(x)
        return self.linear(x)


## Exercise 4: Training/Validation Epoch

Write a function `epoch` that takes a `torch.utils.data.DataLoader`, a `NextCharLSTM`, and a `torch.optim.Optimizer` as arguments, where the last one might be `None`. If the optimizer is `None`, then the function should validate the model. Otherwise it should train the model for next-character prediction in the many-to-many setting. That is, given a sequence `x` of length `l`, the input sequence is `x[:l-1]` and the corresponding target sequence is `x[1:]`. The function should perform one epoch of training/validation and return the loss values of each mini batch as a numpy array. Use the cross-entropy loss function for both training and validation. 

In [4]:
from torch.utils.data import DataLoader
import numpy as np

########## YOUR SOLUTION HERE ##########

def epoch(dataloader: DataLoader, model: NextCharLSTM, optimizer:Union[torch.optim.Optimizer, None]):
    loss_fn = nn.CrossEntropyLoss()

    if optimizer is None:
        losses_eval = []
        model.eval()
        with torch.no_grad():
            for input, target in dataloader:
                output = model(input)
                l = loss_fn(output.swapaxes(1, 2), target)
                losses_eval.append(l.item())
            return np.array(losses_eval)
        
    else:
        model.train()
        losses_train = []
        for input, target in dataloader:
            optimizer.zero_grad()
            output = model(input)
            l = loss_fn(output.swapaxes(1, 2), target)
            l.backward()
            optimizer.step()
            losses_train.append(l.item())
        
        return np.array(losses_train)
        
    

## Exercise 5: Model Selection

Usually, we would now train and validate our model on a grid of with different hyperparameters to see which setting performs best. However, this is pretty expensive in terms of compute so we will provide you with a setting that should work quite well. Train your model for 30 epochs using `torch.optim.Adam`. Validate your model after every epoch and persist the model that performs best on the validation set using `torch.save`. Visualize and discuss the training and validation progress. 

In [None]:
import matplotlib.pyplot as plt

sequence_length = 100
batch_size = 256
embedding_dim = 8
hidden_dim = 512
learning_rate = 1e-3
num_epochs = 100

########## YOUR SOLUTION HERE ##########

alphabet_size = 40
model = NextCharLSTM(alphabet_size, embedding_dim, hidden_dim)
optim = torch.optim.Adam(model.parameters(), lr=learning_rate)
best_model_path = 'best_model.pt'
shuffle = True

dataset_train = TextDataset('trump/trump_train.txt', sequence_length)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=shuffle)
dataset_valid = TextDataset('trump/trump_val.txt', sequence_length)
dataloader_valid = DataLoader(dataset_valid, batch_size=batch_size, shuffle=shuffle)

train_losses = []
valid_losses = []
best_valid_loss = np.inf

for current_epoch in range(num_epochs):
    train_loss = epoch(dataloader_train, model, optim)
    valid_loss = epoch(dataloader_valid, model, None)
    v_loss = valid_loss.mean()
    t_loss = train_loss.mean()
    train_losses.append(t_loss)
    valid_losses.append(v_loss)
    if v_loss < best_valid_loss:
        torch.save(model, best_model_path)
        best_valid_loss = v_loss
    
    print(f'Epoch {current_epoch+1}/{num_epochs} - Train loss: {t_loss}, Valid loss: {v_loss}')

plt.plot(train_losses, label='Train loss')
plt.plot(valid_losses, label='Valid loss')
plt.legend()
plt.show()


Epoch 1/100 - Train loss: 3.02092102595738, Valid loss: 2.8625807762145996
Epoch 2/100 - Train loss: 2.7928356851850236, Valid loss: 2.737060546875
Epoch 3/100 - Train loss: 2.6425550052097866, Valid loss: 2.601284980773926
Epoch 4/100 - Train loss: 2.4856954642704556, Valid loss: 2.4776763916015625
Epoch 5/100 - Train loss: 2.3524480751582555, Valid loss: 2.3809051513671875


## Exercise 6: Top-$k$ Accuracy

Write a function `topk_accuracy` that takes a list of integers $k$, a model, and a data loader and returns the top-$k$ accuracy of the model on the given data set for each $k$. A sample is considered to be classified correctly if the true label appears in the top-$k$ classes predicted by the model. Then load the best model from the previous exercise using `torch.load` and plot its top-$k$ accuracy as a function of $k$ for all possible values of $k$. Discuss the results. 

In [None]:
########## YOUR SOLUTION HERE ##########

from typing import List

def topk_accuracy(model: NextCharLSTM, dataloader: DataLoader, ks:List[int]):
    for k in ks:
        correct = 0
        total = 0
        for input, target in dataloader:
            output = model(input).swapaxes(1, 2)
            _, topk = torch.topk(output, k, dim=1)

            for i in range(len(target)):
                correct += torch.sum(topk[i] == target[i]).item()
                total += len(target[i])

        print(f'Top-{k} accuracy: {correct/total}')

model = torch.load('best_model.pt')
k_list = range(1, 10)

topk_accuracy(model, dataloader_valid, k_list)

  model = torch.load('best_model.pt')


Top-1 accuracy: 0.6266949152542373
Top-2 accuracy: 0.7486016949152542
Top-3 accuracy: 0.8109745762711864
Top-4 accuracy: 0.8516949152542372
Top-5 accuracy: 0.878177966101695
Top-6 accuracy: 0.8994067796610169
Top-7 accuracy: 0.9143220338983051
Top-8 accuracy: 0.9264406779661017
Top-9 accuracy: 0.9370762711864407


## Exercise 7: Gumbel-Max Character Sampling

In this exercise we utilize the trained network to generate novel text. To do this, take some string of seed text, which you can choose freely, and feed it to the network. For each subsequent character, the model outputs logits $z = (z_1, \dots, z_K)^\top$, where $K$ is the alphabet size. 

Use the Gumbel-Max trick to sample from the categorical distribution parameterized by 
$$
\pi_k = \frac{e^{z_k / \tau}}{\sum_{j=1}^K e^{z_j / \tau}} \quad \text{where} \quad \tau > 0 
$$
is the temperature. For $\tau \to 0$ we approach the one-hot distribution, whereas for $\tau \to \infty$ we approach the uniform distribution. The Gumbel-Max trick says that the random variable 
$$
Y = \arg \max_{k \in 1, \dots, K} (z_k / \tau + \xi_k) 
$$
follows a categorical distribution parameterized by $\pi_1, \dots, \pi_K$, where $\xi_k$ is drawn independently from the standard Gumbel distribution.

Implement next-character sampling using the Gumbel-Max trick. Try out different values of $\tau$ and see which work best. 

In [None]:
def sample(model: NextCharLSTM, seed:str, length:int, tau:float, encodings:Encoder):
    G = torch.distributions.Gumbel(0, 1)
    with torch.no_grad():
        model.eval()
        seed = encodings(seed)
        seed = seed.unsqueeze(0)
        last = seed
        res = seed
        for l in range(length):
            output = model(last).squeeze(0)
            last_distribution = output[-1]/tau + G.sample(output[-1].shape).to('mps')
            next_char = torch.argmax(last_distribution)
            res = torch.cat((res.squeeze(), next_char.unsqueeze(0)), dim=0)
            last = torch.cat((last, next_char.unsqueeze(0).unsqueeze(0)), dim=1)[:,1:]
    return encodings(res.squeeze(0))

results = []
taus = [10e-4, 0.2, 0.3, 0.4, 0.7, 5]
for tau in taus:
    result = sample(model, 'Make America Great Again!', 1000, tau, Encoder('abcdefghijklmnopqrstuvwxyz0123456789 .!?'))
    results.append(result)

In [None]:
for tau in taus:
    print(f'Tau: {tau}')
    print('\n ------------------- \n')
    print(results.pop(0))
    print('\n')

Tau: 0.001

 ------------------- 

make america great again! and we re going to be so stupid people. i want to thank all of the people are too saver the worst deal   i was a total lie. in the world that was a president  and i m a presidential and we re going to be so stupid people. i want to thank all of the people are too saver the worst deal   i was a total lie. in the world that was a president  and i m a presidential and we re going to be so stupid people. i want to thank all of the people are too saver the worst deal   i was a total lie. in the world that was a president  and i m a presidential and we re going to be so stupid people. i want to thank all of the people are too saver the worst deal   i was a total lie. in the world that was a president  and i m a presidential and we re going to be so stupid people. i want to thank all of the people are too saver the worst deal   i was a total lie. in the world that was a president  and i m a presidential and we re going to be so stup

## Exercise 8: Huffman Coding using LSTM

*Thanks to Philipp Renz who had this idea.*

The Huffman code is an algorithm to compress data. It encodes symbols with different lengths depending on their frequencies. It assigns a short code to frequent symbols and a longer code to rare symbols to minimize the average code length. We provide you with an implementation that given a list of frequencies `freqs` returns a list of their respective binary codes as strings in the same order. In fact, `freqs` may contain any real numbers. 

With a model that predicts the next symbol we can achieve even shorter codes. At every time step we can use the predicted probabilities as frequencies for the Huffman code. That is, we use a new code at every time step. This code is governed by the model's belief what the next symbol will be. If the model predictions are good, we will mostly use very short codes.

First, determine the average code length per symbol on the validation set using frequencies determined on the training set. 
Then, use the prediction probabilities of your trained LSTM and determine the average code length per symbol on the validation set using an adaptable code. Add a temperature to the softmax and tune it. How many bits per symbol can you save by using the LSTM and what is the optimal temperature? 

In [None]:
from heapq import heapify, heappop, heappush

def huffman_code(freqs):
    """This function turns a list of frequencies into a Huffman code. """
    heap = list(zip(freqs, [(i,) for i in range(len(freqs))]))
    heapify(heap)
    code = [''] * len(freqs)
    
    while len(heap) > 1:
        freq0, idx0 = heappop(heap)
        freq1, idx1 = heappop(heap)
        heappush(heap, (freq0 + freq1, idx0 + idx1))
        
        for i in idx0:
            code[i] = '0' + code[i]
        
        for i in idx1:
            code[i] = '1' + code[i]
    
    return code

########## YOUR SOLUTION HERE ##########

In [None]:
# determine the average code length on the validation set using frequencies determined on the training set
def determine_frequencies(path: str):
    freqs = {}
    with open(path, 'r') as f:
        text = f.read()
        for char in text:
            char = char if char in 'abcdefghijklmnopqrstuvwxyz0123456789 .!?' else ' '
            if char in freqs:
                freqs[char] += 1
            else:
                freqs[char] = 1
    return freqs

In [None]:
train_freqs = determine_frequencies('trump/trump_train.txt')

In [None]:
def huffman(path: str, freqs: dict):
    code = huffman_code(list(freqs.values()))
    avg_len = 0
    with open(path, 'r') as f:
        text = f.read()
        for char in text:
            char = char if char in 'abcdefghijklmnopqrstuvwxyz0123456789 .!?' else ' '
            avg_len += len(code[list(freqs.keys()).index(char)])
    avg_len /= len(text)
    return avg_len

freqs = huffman('trump/trump_val.txt', train_freqs)
freqs

4.100730358424452

In [None]:
def freqs_with_model(model: NextCharLSTM, dataloader: DataLoader, temperature: float):
    freqs = {}
    with torch.no_grad():
        for input, target in dataloader:
            output = model(input).squeeze(0)[-1]
            predicted_distribution = torch.softmax(output/temperature, dim=0).item()
            code = huffman_code(predicted_distribution)


    return freqs

dataloader_train_new = DataLoader(dataset_train, batch_size=1, shuffle=False)
model_freqs = freqs_with_model(model, dataloader_train_new)

IndexError: string index out of range

In [None]:
model_freqs

{tensor(4, device='mps:0'): 1,
 tensor(18, device='mps:0'): 1,
 tensor(36, device='mps:0'): 1,
 tensor(19, device='mps:0'): 1,
 tensor(7, device='mps:0'): 1,
 tensor(0, device='mps:0'): 1,
 tensor(19, device='mps:0'): 1,
 tensor(36, device='mps:0'): 1,
 tensor(7, device='mps:0'): 1,
 tensor(14, device='mps:0'): 1,
 tensor(11, device='mps:0'): 1,
 tensor(3, device='mps:0'): 1,
 tensor(36, device='mps:0'): 1,
 tensor(19, device='mps:0'): 1,
 tensor(7, device='mps:0'): 1,
 tensor(4, device='mps:0'): 1,
 tensor(36, device='mps:0'): 1,
 tensor(3, device='mps:0'): 1,
 tensor(4, device='mps:0'): 1,
 tensor(1, device='mps:0'): 1,
 tensor(19, device='mps:0'): 1,
 tensor(37, device='mps:0'): 1,
 tensor(36, device='mps:0'): 1,
 tensor(36, device='mps:0'): 1,
 tensor(0, device='mps:0'): 1,
 tensor(13, device='mps:0'): 1,
 tensor(3, device='mps:0'): 1,
 tensor(36, device='mps:0'): 1,
 tensor(19, device='mps:0'): 1,
 tensor(7, device='mps:0'): 1,
 tensor(4, device='mps:0'): 1,
 tensor(13, device='mp