# Assignment 3: Text processing with LSTM in PyTorch

*Author:* Thomas Adler

*Copyright statement:* This  material,  no  matter  whether  in  printed  or  electronic  form,  may  be  used  for  personal  and non-commercial educational use only.  Any reproduction of this manuscript, no matter whether as a whole or in parts, no matter whether in printed or in electronic form, requires explicit prior acceptance of the authors.

In this assignment you will a train an LSTM to generate text. To be able to feed text into (recurrent) neural networks we first have to choose a good representation. There are several options to do so ranging from simple character embeddings to more sophisticated approaches like [word embeddings](https://towardsdatascience.com/introduction-to-word-embedding-and-word2vec-652d0c2060fa) or [token embeddings](https://medium.com/@_init_/why-bert-has-3-embedding-layers-and-their-implementation-details-9c261108e28a). We will use a character embedding in this assignment. 

Character embeddings work as follows. First we define an alphabet, a set of characters that we want to be able to represent. To feed a character into our network we use a one-hot vector. The dimension of this vector is equal to the size of our alphabet and the "hot" position indicates the character we want to represent. While this is logically a decent representation (all characters have the same norm, are orthogonal to one another, etc.) it is inefficient in terms of memory because we have to store a lot of zeros. In the first layer of our network we will multiply our one-hot vector with a weight matrix, i.e. we compute the preactivation by a matrix-vector product of the form $We_i$, where $e_i$ is the $i$-th canonical basis vector. This operation corresponds to selecting the $i$-th column of $W$. So an efficient implementation is to perform a simple lookup operation in $W$. This is how embedding layers work also for word or token embeddings. They are learnable lookup tables. 

## Exercise 1: Encoding characters

Write a class `Encoder` that implements the methods `__init__` and `__call__`. The method `__init__` takes a string as argument that serves as alphabet. The method `__call__` takes one argument. If it is a string then it should return a sequence of integers as `torch.Tensor` of shape  representing the input string. Each integer should represents a character of the alphabet. The alphabet consists of the characters matched by the regex `[a-z0-9 .!?]`. If the input text contains characters that are not in the alphabet, then `__call__` should either remove them or map them to a corresponding character that belongs to the alphabet. If the argument is a `torch.Tensor`, then the method should return a string representation of the input, i.e. it should function as decoder. 

In [123]:
import re
import torch

########## YOUR SOLUTION HERE ##########

class Encoder:
    def __init__(self, alphabet="abcdefghijklmnopqrstuvwxyz0123456789 .!?", device=None):
        self.alphabet = alphabet
        self.indexing = {char: idx for idx, char in enumerate(self.alphabet)}
        self.characterising = {idx: char for idx, char in enumerate(self.alphabet)}
        self.matching = re.compile(f"[^{re.escape(self.alphabet)}]")  
        self.device = device if device else torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def __call__(self, input):   
        if isinstance(input, str):
            input_new = self.matching.sub("", input.lower())
            input_encoded = [self.indexing[char] for char in input_new]
            input_encoded_tensor = torch.tensor(input_encoded, dtype=torch.int64, device=self.device)
            return input_encoded_tensor
        
        elif isinstance(input, torch.Tensor):
            input = input.to(self.device)
            input_decoded = "".join(self.characterising[idx.item()] for idx in input)
            return input_decoded
        
        else:
            raise ValueError("Input must be either a string or a torch.Tensor.")



## Exercise 2: Pytorch Dataset

Write a class `TextDataset` that derives from `torch.utlis.data.Dataset`. It should wrap a text file and utilize it for training with pytorch. Implement the methods `__init__`, `__len__`, `__getitem__`. The method `__init__` should take a path to a text file as string and an integer `l` specifying the length of one sample sequence. The method `__len__` takes no arguments and should return the size of the dataset, i.e. the number of sample sequences in the dataset. The method `__getitem__` should take an integer indexing a sample sequence and should return that sequence as a `torch.Tensor`. The input file can be viewed as one long sequence. The first sample sequence consists of the characters at positions `0..l-1` in the input file. The second sequence consists of the characters at positions `l..2*l-1` and so on. That is, the samples of our dataset are non-overlapping sequences. The last incomplete sequence may be dropped. 

In [126]:
import torch
from torch.utils.data import Dataset

########## YOUR SOLUTION HERE ##########

class TextDataset(Dataset):
    def __init__(self, file_path, l):
        with open(file_path, 'r', encoding='utf-8') as f:
            self.text = f.read() 
        self.seq_length = l
        
    def __len__(self):
        self.n_samples = len(self.text) // self.seq_length
        return self.n_samples

    def __getitem__(self, index):
        start = index * self.seq_length
        end = start + self.seq_length
        sequence = self.text[start:end]
        seq_tensor = torch.tensor([ord(char) for char in sequence], dtype=torch.int64)
        return seq_tensor

#######################################3
file_path = "trump/trump_train.txt"  
sequence_length = 10

dataset = TextDataset(file_path, sequence_length)

print("Dataset size:", len(dataset))
print("First sequence:", dataset[0])
print("Second sequence:", dataset[1])



Dataset size: 87258
First sequence: tensor([83, 80, 69, 69, 67, 72, 32, 49, 10, 10])
Second sequence: tensor([ 10,  46,  46,  46,  84, 104,  97, 110, 107,  32])


## Exercise 3: The Model

Write a class `NextCharLSTM` that derives from `torch.nn.Module` and takes `alphabet_size`, the `embedding_dim`, and the `hidden_dim` as arguments. It should consist of a `torch.nn.Embedding` layer that maps the alphabet to embeddings, a `torch.nn.LSTM` that takes the embeddings as inputs and maps them to hidden states, and a `torch.nn.Linear` output layer that maps the hidden states of the LSTM back to the alphabet. Implement the methods `__init__` that sets up the module and `forward` that takes an input sequence and returns the logits (i.e. no activation function on the output layer) of the model prediction at every time step. 

In [6]:
import torch.nn as nn
import torch.nn.functional as F

########## YOUR SOLUTION HERE ##########

class NextCharLSTM(nn.Module):
    def __init__(self, alphabet_size, embedding_dim, hidden_dim):
        super(NextCharLSTM, self).__init__()
        self.embed_layer = nn.Embedding(alphabet_size, embedding_dim)
        self.lstm_layer = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.linear_layer = nn.Linear(hidden_dim, alphabet_size)

    def forward(self, x):
        x = torch.clamp(x, 0, alphabet_size - 1)
        embed = self.embed_layer(x)
        hid_states, _ = self.lstm_layer(embed)
        logits = self.linear_layer(hid_states)
        return logits

############################################
alphabet_size = 50  
embedding_dim = 64
hidden_dim = 128

model = NextCharLSTM(alphabet_size, embedding_dim, hidden_dim)
example_input = torch.randint(0, alphabet_size, (4, 10))
logits = model(example_input)  
print("Logits shape:", logits.shape)


Logits shape: torch.Size([4, 10, 50])


## Exercise 4: Training/Validation Epoch

Write a function `epoch` that takes a `torch.utils.data.DataLoader`, a `NextCharLSTM`, and a `torch.optim.Optimizer` as arguments, where the last one might be `None`. If the optimizer is `None`, then the function should validate the model. Otherwise it should train the model for next-character prediction in the many-to-many setting. That is, given a sequence `x` of length `l`, the input sequence is `x[:l-1]` and the corresponding target sequence is `x[1:]`. The function should perform one epoch of training/validation and return the loss values of each mini batch as a numpy array. Use the cross-entropy loss function for both training and validation. 

In [8]:
from torch.utils.data import DataLoader
import numpy as np

########## YOUR SOLUTION HERE ##########

def epoch(data_loader, model, optimizer=None):
    loss_function = torch.nn.CrossEntropyLoss()
    
    if optimizer is None:
        model.eval()
    else:
        model.train()

    device = next(model.parameters()).device
    losses = []

    for batch in data_loader:
        batch = batch.to(device)
        x = batch[:, :-1]  
        y = batch[:, 1:]
        logits = model(x)  
        logits_flat = logits.reshape(-1, logits.size(-1))  
        y_flat = y.reshape(-1)  
        y_flat = y_flat % alphabet_size
        loss = loss_function(logits_flat, y_flat)

        if optimizer is not None:
            optimizer.zero_grad()  
            loss.backward()       
            optimizer.step()       

        losses.append(loss.item())

    return np.array(losses)


## Exercise 5: Model Selection

Usually, we would now train and validate our model on a grid of with different hyperparameters to see which setting performs best. However, this is pretty expensive in terms of compute so we will provide you with a setting that should work quite well. Train your model for 30 epochs using `torch.optim.Adam`. Validate your model after every epoch and persist the model that performs best on the validation set using `torch.save`. Visualize and discuss the training and validation progress. 

In [278]:
import os
import matplotlib.pyplot as plt

sequence_length = 100
batch_size = 128
embedding_dim = 4
hidden_dim = 256
learning_rate = 1e-3
num_epochs = 30

########## YOUR SOLUTION HERE ##########

train_data = TextDataset("trump/trump_train.txt", sequence_length)
val_data = TextDataset("trump/trump_val.txt", sequence_length)

train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val__dataloader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

alphabet_size = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = NextCharLSTM(alphabet_size, embedding_dim, hidden_dim).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

best_val_loss = float("inf")
best_model_path = "best_model.pth"

train_loss_epoch = []
val_loss_epoch = []

for epoch_n in range(1, num_epochs + 1):
    print(f"Epoch {epoch_n}/{num_epochs}")
    
    model.train()
    train_loss = epoch(train_dataloader, model, optimizer)
    train_avgloss = train_loss.mean()
    train_loss_epoch.append(train_avgloss)

    model.eval()
    with torch.no_grad():
        val_loss = epoch(val__dataloader, model, optimizer=None)
        val_avgloss = val_loss.mean()
        val_loss_epoch.append(val_avgloss)

    print(f"Train Loss: {train_avgloss:.4f}, Val Loss: {val_avgloss:.4f}")

    if val_avgloss < best_val_loss:
        best_val_loss = val_avgloss

print("Training complete. Best model saved at:", best_model_path)

Epoch 1/30
Train Loss: 3.2016, Val Loss: 3.0248
Epoch 2/30
Train Loss: 3.0000, Val Loss: 2.8603
Epoch 3/30
Train Loss: 2.8581, Val Loss: 2.7902
Epoch 4/30
Train Loss: 2.8238, Val Loss: 2.7756
Epoch 5/30
Train Loss: 2.8003, Val Loss: 2.7608
Epoch 6/30
Train Loss: 2.7818, Val Loss: 2.7479
Epoch 7/30
Train Loss: 2.7718, Val Loss: 2.7346
Epoch 8/30
Train Loss: 2.7650, Val Loss: 2.7413
Epoch 9/30
Train Loss: 2.7568, Val Loss: 2.7353
Epoch 10/30
Train Loss: 2.7497, Val Loss: 2.7159
Epoch 11/30
Train Loss: 2.7449, Val Loss: 2.7288
Epoch 12/30
Train Loss: 2.7375, Val Loss: 2.7145
Epoch 13/30
Train Loss: 2.7347, Val Loss: 2.7101
Epoch 14/30
Train Loss: 2.7304, Val Loss: 2.7099
Epoch 15/30
Train Loss: 2.7266, Val Loss: 2.7014
Epoch 16/30
Train Loss: 2.7229, Val Loss: 2.6992
Epoch 17/30
Train Loss: 2.7176, Val Loss: 2.6989
Epoch 18/30
Train Loss: 2.7179, Val Loss: 2.7203
Epoch 19/30
Train Loss: 2.7134, Val Loss: 2.6963
Epoch 20/30
Train Loss: 2.7107, Val Loss: 2.7033
Epoch 21/30
Train Loss: 2.707

## Exercise 6: Top-$k$ Accuracy

Write a function `topk_accuracy` that takes a list of integers $k$, a model, and a data loader and returns the top-$k$ accuracy of the model on the given data set for each $k$. A sample is considered to be classified correctly if the true label appears in the top-$k$ classes predicted by the model. Then load the best model from the previous exercise using `torch.load` and plot its top-$k$ accuracy as a function of $k$ for all possible values of $k$. Discuss the results. 

In [90]:
########## YOUR SOLUTION HERE ##########
import torch
import matplotlib.pyplot as plt

def topk_accuracy(k_values, model, data_loader, device='cuda'):
   
    model.eval()  
    accuracies = {k: 0 for k in k_values}  
    total_correct = []
    with torch.no_grad():
        total_samples = 0
        for batch in data_loader:
            #print("batch:",batch)
            #inputs, targets = batch
            targets = batch[:, 1:]  
            inputs = batch[:, :-1]  
            inputs, targets = inputs.to(device), targets.to(device)

            logits = model(inputs)
            print("props shape:",logits.shape)
            probs = torch.softmax(logits, dim=-1)
            probs = probs.view(-1, probs.size(-1))
            topk_indices = torch.topk(probs, max(k_values), dim=-1)
            targets = targets.reshape(-1)
            print("k_values:",k_values)
            for k in k_values:
               # print(topk_indices.shape)
                accur[k] += (topk_indices[:, :k] == correct).any(dim=1).sum().item()
                correct = (topk_indices[:, :k] == targets.view(-1, 1)).any(dim=1) 
                print(topk_indices.shape)
                print(k)
                total_correct[k] += correct.sum().item()

            total_samples += targets.size(0)

    for k in k_values:
        accur.append(total_correct[k] / total_samples)
    return accur



  model.load_state_dict(torch.load("best_model.pth"))


props shape: torch.Size([128, 99, 50])
k_values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]


AttributeError: 'torch.return_types.topk' object has no attribute 'shape'

## Exercise 7: Gumbel-Max Character Sampling

In this exercise we utilize the trained network to generate novel text. To do this, take some string of seed text, which you can choose freely, and feed it to the network. For each subsequent character, the model outputs logits $z = (z_1, \dots, z_K)^\top$, where $K$ is the alphabet size. 

Use the Gumbel-Max trick to sample from the categorical distribution parameterized by 
$$
\pi_k = \frac{e^{z_k / \tau}}{\sum_{j=1}^K e^{z_j / \tau}} \quad \text{where} \quad \tau > 0 
$$
is the temperature. For $\tau \to 0$ we approach the one-hot distribution, whereas for $\tau \to \infty$ we approach the uniform distribution. The Gumbel-Max trick says that the random variable 
$$
Y = \arg \max_{k \in 1, \dots, K} (z_k / \tau + \xi_k) 
$$
follows a categorical distribution parameterized by $\pi_1, \dots, \pi_K$, where $\xi_k$ is drawn independently from the standard Gumbel distribution.

Implement next-character sampling using the Gumbel-Max trick. Try out different values of $\tau$ and see which work best. 

In [167]:
########## YOUR SOLUTION HERE ##########
import torch
import torch.nn.functional as F
import numpy as np

def gumbel_sample(logits, Taw):
    scaled_logits = logits / Taw
    noise = -torch.log(-torch.log(torch.rand_like(logits)))
    gumbel_log = scaled_logits + noise
    return torch.argmax(gumbel_log, dim=-1)

def generate_text_gumbel_max(model, seed_text, encoder, max_length, Taw, device='cpu'):

    device = torch.device(device if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    generation = seed_text
    input_ids = torch.tensor(encoder(seed_text)).to(device).unsqueeze(0)

    for _ in range(max_length):
        with torch.no_grad():
            logs = model(input_ids) 
            logs = logs[:, -1, :]
            print("Logits:", logs)
        
        new_token_id = gumbel_sample(logs, Taw)
        new_token_id = torch.clamp(new_token_id, min=0, max=39)
        new_token_id = new_token_id.to(device)
        print("Next Token ID:", new_token_id.item())
        print("Decoded Token:", encoder(new_token_id))
        input_ids = torch.cat([input_ids, new_token_id.unsqueeze(0)], dim=1)
        generation += encoder(new_token_id)

    return generation

# Example Usage
seed = "I am hungry let's go to "
encoder = Encoder(device="cuda")

generated_text = generate_text_gumbel_max(model, seed, encoder, max_length=40, Taw=1)
#print(generated_text)


Logits: tensor([[  2.6900,   3.0495,   1.7236,   3.2959,   1.2224,   3.9268,  -2.0932,
           0.7299,   4.6833,   2.0324,  -1.0814,   2.3723,   1.3375,  -3.4559,
           2.9997,   1.5663,   2.9952,   0.3062,   0.3635,  -0.9123,  -5.3452,
           0.0732,  -5.7360,  -3.4856, -10.7349,  -7.3111,  -3.7751,  -6.3141,
          -4.9008,  -4.7922,  -6.5360,  -9.7939,   2.0466,  -4.5470,  -4.4015,
          -7.0910,  -3.7868,  -6.0727, -10.5392,  -6.8643, -10.6174,  -9.6924,
         -13.5322, -10.1968,  -1.4629,  -8.3526,  -3.1504,   1.4437,   2.2958,
           3.5557]])
Next Token ID: 8
Decoded Token: i
Logits: tensor([[  1.9567,   1.1846,   1.2857,   3.5780,   1.0426,   2.5107,   1.3454,
           1.2015,   3.3775,   2.6072,  -0.9102,   1.5003,   1.2989,  -0.9840,
           2.6912,   1.3582,   1.9927,  -0.3648,   0.5007,  -0.8794,  -4.1554,
          -0.2100,  -4.5850,  -2.2662,  -7.1255,  -5.0780,  -2.6553,  -3.5558,
          -4.6990,  -3.6923,  -4.3881,  -7.0325,   1.4451,  

  input_ids = torch.tensor(encoder(seed_text)).to(device).unsqueeze(0)


## Exercise 8: Huffman Coding using LSTM

*Thanks to Philipp Renz who had this idea.*

The Huffman code is an algorithm to compress data. It encodes symbols with different lengths depending on their frequencies. It assigns a short code to frequent symbols and a longer code to rare symbols to minimize the average code length. We provide you with an implementation that given a list of frequencies `freqs` returns a list of their respective binary codes as strings in the same order. In fact, `freqs` may contain any real numbers. 

With a model that predicts the next symbol we can achieve even shorter codes. At every time step we can use the predicted probabilities as frequencies for the Huffman code. That is, we use a new code at every time step. This code is governed by the model's belief what the next symbol will be. If the model predictions are good, we will mostly use very short codes.

First, determine the average code length per symbol on the validation set using frequencies determined on the training set. 
Then, use the prediction probabilities of your trained LSTM and determine the average code length per symbol on the validation set using an adaptable code. Add a temperature to the softmax and tune it. How many bits per symbol can you save by using the LSTM and what is the optimal temperature? 

In [276]:
from heapq import heapify, heappop, heappush

def huffman_code(freqs):
    """This function turns a list of frequencies into a Huffman code. """
    heap = list(zip(freqs, [(i,) for i in range(len(freqs))]))
    heapify(heap)
    code = [''] * len(freqs)
    
    while len(heap) > 1:
        freq0, idx0 = heappop(heap)
        freq1, idx1 = heappop(heap)
        heappush(heap, (freq0 + freq1, idx0 + idx1))
        
        for i in idx0:
            code[i] = '0' + code[i]
        
        for i in idx1:
            code[i] = '1' + code[i]
    
    return code

########## YOUR SOLUTION HERE ##########
def compute_avg_code_length(symbols, codes):
    total_length = sum(len(codes[symbol]) for symbol in symbols)
    return total_length / len(symbols)

def apply_temperature(probs, temperature):
    probs = torch.clamp(probs, min=1e-10, max=1.0)
    scaled_probs = torch.exp(torch.log(probs) / temperature)  
    return scaled_probs / torch.sum(scaled_probs) 

training_freqs = [0.1, 0.2, 0.1, 0.3] 
validation_symbols = [1, 3, 2, 0, 2, 1, 3]  
fixed_codes = huffman_code(training_freqs)
fixed_avglength = compute_avg_code_length(validation_symbols, fixed_codes)

batch_size = 128
embedding_dim = 4
hidden_dim = 256
model = NextCharLSTM(batch_size, hidden_dim, embedding_dim)
example_input = torch.tensor([[0, 1, 2, 3], [4, 5, 6, 7]], dtype=torch.long)

model.eval()
with torch.no_grad():
    validation_probs = model(example_input)

adaptable_avg_length = 0
temperature = 0.9
for probs in validation_probs:
    adj_probs = apply_temperature(probs, temperature)
    adj_probs = adj_probs.tolist()
    codes = huffman_code(adj_probs)
    adaptable_avglength += len(codes[validation_symbols.pop(0)])
adaptable_avglength /= len(validation_probs)

bits_saved = fixed_avglength - adaptable_avglength
print(f"Fixed Average Length: {fixed_avglength}")
print(f"Adaptable Average Length: {adaptable_avglength}")
print(f"Bits Saved per Symbol: {bits_saved}")

Fixed Average Length: 2.142857142857143
Adaptable Average Length: 5.329527094960213
Bits Saved per Symbol: -3.18666995210307
