# Assignment 4, task 3

In this task, you are going to implement a character model based on the Transformer architecture, starting from the provided skeleton. It is useful to first do exercise 2 before starting on this task.

The model you are going to implement here will have a context of 32 characters, i.e. it will consider the preceding 32 characters when estimating the probabilities of the possible character coming next. Due to the clever transformer architecture, the model will have less than 50,000 trainable parameters. As a comparison, the simpler model in exercise 2 only had a context of 8 characters but had more than 300,000 trainable parameters.

In [None]:
# First run this cell
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from datetime import datetime
import math

The __self-attention__ computation is at the core of the Transformer architecture. It is important to get this computation efficient (i.e. vectorized), since it involves many matrix operations that would be very slow if implemented by Python loops.

The input to the self-attention computation is a tensor containing a vector for each input token, and the output is a tensor of the same dimensions, containing the contextualized versions of the input tokens (see Lecture 9 and the textbook, chapters 10.1 and 10.2).

Your task is to fill in the missing pieces below. Look for "REPLACE WITH YOUR CODE" and "YOUR CODE HERE".

In [None]:
class SelfAttention(nn.Module):
    """
    Computes self-attention according to Vashwani et al, 2017.
    """

    def __init__(self, vector_dim, att_dim):
        """
        vector_dim = the dimension of the input and output vectors
        att_dim = the (usually) smaller dimension use in the attention 
                  computation
        """
        super().__init__()
        self.vector_dim = vector_dim
        self.att_size = att_dim
        self.wq = nn.Linear(vector_dim, att_dim, bias=False)
        self.wk = nn.Linear(vector_dim, att_dim, bias=False)
        self.wv = nn.Linear(vector_dim, att_dim, bias=False)
        self.wo = nn.Linear(att_dim, vector_dim, bias=False)
        self.method = "single-head attention"

    def compute_attention(self, q, k, v):
        # YOUR CODE HERE

    def forward(self, x):
        q = self.wq(x)
        k = self.wk(x)
        v = self.wv(x)
        values = self.compute_attention(q, k, v)
        out = self.wo(values)
        return out


In [None]:
# Test the code
seed = 4224
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)  # For multi-GPU setups
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

batch_size = 1
seq_len = 20
vector_dim = 64
att_dim = 32
attention = SelfAttention(vector_dim, att_dim)
data = torch.rand(batch_size, seq_len, vector_dim)
data[0][-1] = torch.zeros(vector_dim)
result = attention(data)
print ("Sample some results using", attention.method )
print(f'{result[0][7][7].detach().item():.4f}' == '-0.3068')
print(f'{result[0][8][8].detach().item():.4f}' == '0.1224')
print(f'{result[0][9][9].detach().item():.4f}' == '-0.3258')
print(f'{result[0][-1][9].detach().item():.4f}' == '0.0000')

In [None]:
class SelfAttention(nn.Module):
    """
    Computes self-attention according to Vashwani et al, 2017.
    Second version: with multiple heads
    """

    def __init__(self, vector_dim, att_dim):
        """
        vector_dim = the dimension of the input and output vectors
        att_dim = the (usually) smaller dimension use in the attention 
                  computation.

        NOTE that vector_dim must be a multiple of att_dim.
        """
        super().__init__()
        assert vector_dim % att_dim == 0
        self.no_of_heads = vector_dim // att_dim
        self.vector_dim = vector_dim
        self.att_dim = att_dim
        self.wq = nn.Linear(att_dim, att_dim, bias=False)
        self.wk = nn.Linear(att_dim, att_dim, bias=False)
        self.wv = nn.Linear(att_dim, att_dim, bias=False)
        self.wo = nn.Linear(vector_dim, vector_dim, bias=False)
        self.method = "multi-head attention"

    def compute_attention(self, q, k, v):
        
        # COPY YOUR CODE FROM ABOVE


    def reshape_for_multihead_attention(self, x):
        """
        x has the shape (batch_size, seq_length, vector_dim)

        We want to split the representation of each token into 'no_of_heads'
        parts and treat each part separately. Thus, we need the returned tensor
        to have shape (batch_size, no_of_heads, seq_length, att_dim)
        """
        
        # YOUR CODE HERE


    def reshape_after_multihead_attention(self, x):
        """
        x has the shape (batch_size, no_of_heads, seq_length, att_dim)

        For each token, we now want to bring together the representation coming
        from each head. The returned token should have the shape:
        (batch_size, seq_length, vector_dim)
        """

        # YOUR CODE HERE
       

    def forward(self, x):
        x = self.reshape_for_multihead_attention(x)
        q = self.wq(x)
        k = self.wk(x)
        v = self.wv(x)
        values = self.compute_attention(q, k, v)
        values = self.reshape_after_multihead_attention(values)
        out = self.wo(values)
        return out

In [None]:

# Test the SelfAttention class. Make sure that you first run the cell above!

seed = 4224
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)  # For multi-GPU setups
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

batch_size = 1
seq_len = 20
vector_dim = 64
att_dim = 32
attention = SelfAttention(vector_dim, att_dim)
data = torch.rand(batch_size, seq_len, vector_dim)
data[0][-1] = torch.zeros(vector_dim)
print("Check that reshaping works:")
try:
    desired_shape = torch.Size([batch_size, attention.no_of_heads, seq_len, att_dim])
    reshaped_data = attention.reshape_for_multihead_attention(data)
    print( reshaped_data.shape == desired_shape )
    print( (data[0][9][33] == reshaped_data[0][1][9][1]).detach().item() )
    rereshaped_data = attention.reshape_after_multihead_attention(reshaped_data)
    print( data.shape == rereshaped_data.shape )
    print( torch.all(data == rereshaped_data).detach().item() )
except AttributeError:
    print ("WARNING: Something went wrong. Perhaps you forgot to run the cell above this one?")
result = attention(data)
print ("Sample some results using", attention.method )
print(f'{result[0][7][7].detach().item():.4f}' == '0.0013')
print(f'{result[0][8][8].detach().item():.4f}' == '0.0265')
print(f'{result[0][9][9].detach().item():.4f}' == '-0.0596')
print(f'{result[0][-1][9].detach().item():.4f}' == '0.0000')

We need to map every type of input item (every character, in our case) to a unique ID number. Since we are not sure which characters will appear in our training text, we are going to create new IDs as we encounter new kinds of characters we haven't seen before.

In [None]:
char_to_id = {}  # Dictionary to store character-to-ID mapping
id_to_char = []  # List to store characters in their ID ordering
PADDING_SYMBOL = '<PAD>'
char_to_id[PADDING_SYMBOL] = 0 
id_to_char.append( PADDING_SYMBOL )

We now define a class 'CharDataset' that extends the predefined 'Dataset' class.Compared to exercise 2, we will create data points in a slightly different way. 

The init function reads a training text and splits it up into chunks $n$ characters long. From each chunk, $n$ data points with a corresponding label will be created, as in the following example:

Suppose $n=8$. From a chunk $[4,5,9,11,7,7,2,12]$ with 14 being the next character ID, the following data points and labels will be formed (0 is the padding symbol):

| Data point | Label |
|-----------:|------:|
|[4,0,0,0,0,0,0,0] | 5 |
|[4,5,0,0,0,0,0,0] | 9 |
|[4,5,9,0,0,0,0,0] | 11 |
|[4,5,9,11,0,0,0,0] | 7 |
|[4,5,9,11,7,0,0,0] | 7 |
|[4,5,9,11,7,7,0,0] | 2 |
|[4,5,9,11,7,7,2,0] | 12 |
|[4,5,9,11,7,7,2,12] | 14 |

This way, the model will learn to infer the next character even if the context is shorter than $n$. This is a very useful feature, particularly in 'real' language models, where the known context often is shorter than the maximal context length.
 

In [None]:
class CharDataset(Dataset) :

    def __init__(self, file_path, n) :
        self.datapoints = []
        self.labels = []
        chars = []
        try :
            # First read the dataset to find all the unique characters
            with open(file_path,'r',encoding='utf-8') as f :
                contents = f.read()
            for char in contents:
                if char not in char_to_id:
                    char_to_id[char] = len(id_to_char)
                    id_to_char.append(char)
                chars.append( char_to_id[char] )
            # Then go through all chars list and create a list of datapoints
            k = 0
            while k < len(chars)-n:
                for i in range(1, n+1):
                    self.datapoints.append([c for c in chars[k:i+k]+[0]*(n-i)])
                    self.labels.append(chars[i+k])
                k += n
        except FileNotFoundError:
            print(f"File not found: {file_path}")
        except Exception as e:
            print(f"An error occurred: {e}")

    def __len__(self) :
        return len(self.datapoints)

    def __getitem__(self, idx) :
        idx = idx % len(self.datapoints)
        return torch.tensor(self.datapoints[idx]), torch.tensor(self.labels[idx], dtype=torch.long)

In [None]:
class PositionwiseFFN(nn.Module):
    """
    The position-wise FFN that follows after the self-attention
    computation.
    """

    def __init__(self, hidden_size, dropout_prob) :
        super().__init__()
        self.fc1 = nn.Linear(hidden_size, hidden_size, bias=True)
        self.fc2 = nn.Linear(hidden_size, hidden_size, bias=True)
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        return self.fc2(self.dropout(torch.relu(self.fc1(x))))

class EncoderBlock(nn.Module):
    """
    Transformer encoder block.

    This version differs from the original version in  [Vaswani et al. NeurIPS 2017],
    and applies the LayerNorm before the self-attention, and before the FFN, as this
    has proved to be beneficial (see [Nguyen and Salazar 2019]).
    """

    def __init__(self, vector_dim, att_dim, dropout_prob):
        super().__init__()
        self.attn = SelfAttention(vector_dim, att_dim)
        self.ffn = PositionwiseFFN(vector_dim, dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        self.ln1 = nn.LayerNorm(vector_dim)
        self.ln2 = nn.LayerNorm(vector_dim)
        self.attention_method = self.attn.method

    def forward(self, x):
        x1 = self.ln1(x)
        x2 = x + self.dropout(self.attn(x1))
        x3 = self.ln2(x2)
        x4 = x2 + self.dropout(self.ffn(x3))
        return x4

In [None]:
class PositionwiseFFN(nn.Module):
    """
    The position-wise FFN that follows after the self-attention
    computation.
    """

    def __init__(self, hidden_size, dropout_prob) :
        super().__init__()
        self.fc1 = nn.Linear(hidden_size, hidden_size, bias=True)
        self.fc2 = nn.Linear(hidden_size, hidden_size, bias=True)
        self.dropout = nn.Dropout(dropout_prob)
        # for module in (self.fc1, self.fc2):
        #     nn.init.kaiming_normal_(module.weight)
        #     nn.init.constant_(module.bias, 0.)

    def forward(self, x):
        return self.fc2(self.dropout(torch.relu(self.fc1(x))))

class EncoderBlock(nn.Module):
    """
    Transformer encoder block.

    This version differs from the original version in  [Vaswani et al. NeurIPS 2017],
    and applies the LayerNorm before the self-attention, and before the FFN, as this
    has proved to be beneficial (see [Nguyen and Salazar 2019]).
    """

    def __init__(self, vector_dim, att_dim, dropout_prob):
        super().__init__()
        self.attn = SelfAttention(vector_dim, att_dim)
        self.ffn = PositionwiseFFN(vector_dim, dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        self.ln1 = nn.LayerNorm(vector_dim)
        self.ln2 = nn.LayerNorm(vector_dim)
        self.attention_method = self.attn.method

    def forward(self, x):
        x1 = self.ln1(x)
        x2 = x + self.dropout(self.attn(x1))
        x3 = self.ln2(x2)
        x4 = x2 + self.dropout(self.ffn(x3))
        return x4

In [None]:

# ======================= Training ======================= #

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print( "Running on", device )

config = Config()
training_dataset = CharDataset('HP_book_1.txt', MAXLEN)
print( "There are", len(training_dataset), "datapoints and", len(id_to_char), "unique characters in the dataset" ) 
training_loader = DataLoader(training_dataset, batch_size=config.batch_size)

charlm = CharLM( config, len(id_to_char)).to(device)
criterion = nn.CrossEntropyLoss()
charlm_optimizer = optim.Adam( charlm.parameters(), lr=config.learning_rate )

print( "Using", charlm.attention_method )
print( datetime.now().strftime("%X"), "Training starts" )
for epoch in range(config.no_of_epochs) :
    charlm.train()
    iteration = 0
    loss_sum = 0
    for input_tensor, label in training_loader :
        input_tensor, label = input_tensor.to(device), label.to(device)
        charlm_optimizer.zero_grad()
        logits = charlm(input_tensor).to(device)
        loss = criterion(logits.squeeze(1), label)
        loss.backward()
        charlm_optimizer.step()
        iteration += 1
        loss_sum += loss.detach().item()

    print( datetime.now().strftime("%X"), "End of epoch", epoch+1, ", average loss=", loss_sum/iteration)
    charlm.eval()
    # Generate some characters starting from the input text
    try :
        char_list = list("he looked around and"[-MAXLEN:])
        for i in range(300) :
            input_tensor = torch.tensor( [char_to_id[c] for c in char_list] + [char_to_id[PADDING_SYMBOL]]*(MAXLEN-len(char_list))).unsqueeze(0).to(device)
            logits = charlm(input_tensor).squeeze().to(device)
            _, new_character_tensor = logits.topk(1)
            new_character = id_to_char[new_character_tensor.detach().item()]
            print( new_character, end='' )
            if len(char_list) == MAXLEN :
                char_list.pop(0)
            char_list.append( new_character )
        print()
    except KeyError:
        continue

In [None]:
# ==================== User interaction ==================== #

while True:
    text = input("> ").strip()
    if text == "" :
        continue
    char_list = list(text[-MAXLEN:])
    # Generate 50 characters starting from the input text
    try :
        for i in range(50) :
            input_tensor = torch.tensor( [char_to_id[c] for c in char_list] + [char_to_id[PADDING_SYMBOL]]*(MAXLEN-len(char_list))).unsqueeze(0).to(device)
            logits = charlm(input_tensor).squeeze().to(device)
            #dist = torch.distributions.categorical.Categorical(logits=logits)
            _, new_character_tensor = logits.topk(1)
            new_character = id_to_char[new_character_tensor.detach().item()]
            #new_character = id_to_char[dist.sample().detach().item()]
            print( new_character, end='' )
            if len(char_list) == MAXLEN :
                char_list.pop(0)
            char_list.append( new_character )
        print()
    except KeyError :
        continue