# Assignment 2: Bigram Language Model and Generative Pretrained Transformer (GPT)

Due Date Feb 4th, 2024 11:59pm

The objective of this assignment is to train a simplified transformer model. The primary differences between the implementation:
* tokenizer (we use a character level encoder simplicity and compute constraints)
* size (we are using 1 consumer grade gpu hosted on colab and a small dataset. in practice, the models are much larger and are trained on much more data)
* efficiency


Most modern LLMs have multiple training stages, so we won't get a model that is capable of replying to you yet. However, this is the first step towards a model like ChatGPT and Llama.




In [None]:
%matplotlib inline
import torch
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
from torch import nn
import torch.optim as optim
from torch.nn import functional as F
import torch.nn as nn

## Part 1: Bigram MLP for TinyShakespeare (35 points)

1a) (1 point). Create a list `chars` that contains all unique characters in `text`

1b) (2 points). Implement `encode(s: str) -> list[int]`

1c) (2 points). Implement `decode(ids: list[int]) -> str`

1d) (5 points). Create two tensors, `inputs_one_hot` and `outputs_one_hot`. Use one hot encoding. Make sure to get every consecutive pair of characters. For example, for the word 'hello', we should create the following input-output pairs
```
he
el
ll
lo
```

1e) (10 points). Implement BigramOneHotMLP, a 2 layer MLP that predicts the next token. Specifically, implement the constructor, forward, and generate. The output dimension of the first layer should be 8. Use `torch.optim`. The activation function for the first layer should be `nn.LeakyReLU()`

Note: Use the `torch.nn.function.cross_entropy` loss. Read the [docs](https://pytorch.org/docs/stable/generated/torch.nn.functional.cross_entropy.html) about how this loss function works. The logits are the output of a network WITHOUT an activation function applied to the last layer. There are activation functions are applied to every layer except the last.

1f) (5 points). Train the BigramOneHotMLP for 1000 steps.

1g) (5 points). Create two tensors, `input_ids` and `outputs_one_hot`. These `input_ids` will be used for the embedding layer.

1h) (5 points). Implement and train BigramEmbeddingMLP, a 2 layer mlp that predicts the next token. Specifically, implement the constructor, forward, and generate functions. The output dimension of the first layer should be 8. Use `torch.optim`.



Note: the output will look like gibberish


In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2024-02-05 19:15:25--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2024-02-05 19:15:26 (28.1 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [None]:

# For the bigram model, let's use the first 1000 characters for the data

# part 1a
with open('input.txt', 'r') as f:
    text = f.read()
text = text
chars = sorted(set(text))

# part 1b
def encode(s: str) -> list[int]:
    return [chars.index(c) for c in s]

# part 1c
def decode(ids: list[int]) -> str:
    return ''.join(chars[i] for i in ids)

print(chars)

['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


In [None]:
# part 1d
def create_one_hot_inputs_and_outputs() -> list[torch.tensor, torch.tensor]:
    input_seqs = [text[i:i+2] for i in range(len(text)-1)]
    inputs_one_hot = torch.zeros(len(input_seqs), len(chars))
    outputs_one_hot = torch.zeros(len(input_seqs), len(chars))

    for i, seq in enumerate(input_seqs):
        inputs_one_hot[i, encode(seq[0])] = 1
        outputs_one_hot[i, encode(seq[1])] = 1

    return inputs_one_hot, outputs_one_hot

inputs_one_hot, outputs_one_hot = create_one_hot_inputs_and_outputs()

In [None]:
# part 1e
class BigramOneHotMLP(nn.Module):
    def __init__(self):
        super(BigramOneHotMLP, self).__init__()
        self.fc1 = nn.Linear(len(chars), 8)
        self.activation = nn.LeakyReLU()
        self.fc2 = nn.Linear(8, len(chars))

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

    def generate(self, start='a', max_new_tokens=100) -> str:
        generated_text = start
        current_char = start

        for _ in range(max_new_tokens):
            input_tensor = torch.zeros(1, len(chars))
            input_tensor[0, encode(current_char)] = 1
            output_tensor = self.forward(input_tensor)
            _, next_char_idx = torch.max(output_tensor, 1)
            next_char = decode(next_char_idx.tolist())
            generated_text += next_char
            current_char = next_char

        return generated_text

bigram_one_hot_mlp = BigramOneHotMLP()

generated_word = bigram_one_hot_mlp.generate()
print(f'Generated word: {generated_word}')

Generated word: aYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY


In [None]:
# part 1f
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(bigram_one_hot_mlp.parameters(), lr=0.01)

for _ in range(1000):
    optimizer.zero_grad()
    outputs = bigram_one_hot_mlp(inputs_one_hot)
    loss = criterion(outputs, torch.argmax(outputs_one_hot, dim=1))
    loss.backward()
    optimizer.step()

print(bigram_one_hot_mlp.generate())

a                                                                                                    


In [None]:
# part 1g
def create_embedding_inputs_and_outputs() -> list[torch.tensor, torch.tensor]:
    input_seqs = [text[i:i+2] for i in range(len(text)-1)]
    input_ids = torch.tensor([encode(seq[0]) for seq in input_seqs])
    outputs_one_hot = torch.zeros(len(input_seqs), len(chars))

    for i, seq in enumerate(input_seqs):
        outputs_one_hot[i, encode(seq[1])] = 1

    return input_ids, outputs_one_hot

input_ids, outputs_one_hot = create_embedding_inputs_and_outputs()

In [None]:
# part 1h
class BigramEmbeddingMLP(nn.Module):
    def __init__(self):
        super(BigramEmbeddingMLP, self).__init__()
        self.embedding = nn.Embedding(len(chars), 8)
        self.fc1 = nn.Linear(8, 8)
        self.activation = nn.LeakyReLU()
        self.fc2 = nn.Linear(8, len(chars))

    def forward(self, x):
        x = self.embedding(x)
        x = torch.sum(x, dim=1)
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

    def generate(self, start='a', max_new_tokens=100) -> str:
        generated_text = start
        current_char = start

        for _ in range(max_new_tokens):
            input_tensor = torch.tensor([encode(current_char)])
            output_tensor = self.forward(input_tensor)
            _, next_char_idx = torch.max(output_tensor, 1)
            next_char = decode(next_char_idx.tolist())
            generated_text += next_char
            current_char = next_char

        return generated_text

bigram_embedding_mlp = BigramEmbeddingMLP()

optimizer_embed = optim.SGD(bigram_embedding_mlp.parameters(), lr=0.01)

for _ in range(1000):
    optimizer_embed.zero_grad()
    outputs_embed = bigram_embedding_mlp(input_ids)
    loss_embed = criterion(outputs_embed, torch.argmax(outputs_one_hot, dim=1))
    loss_embed.backward()
    optimizer_embed.step()

# Generating text using the trained models
print(bigram_one_hot_mlp.generate())
print(bigram_embedding_mlp.generate())

a                                                                                                    
an                                                                                                   


## Part 2: Generative Pretrained Transformer (65 points)

For this part, it is best to use a gpu. In the settings at the top go to Runtime -> Change Runtime Type and select T4 GPU

In [None]:
# run nvidia-smi to check gpu usage
!nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


In [None]:
# For the gpt model, let's use the full text

with open('input.txt', 'r') as f:
    text = f.read()

Implement a character level tokenization function.

1. Create a list of unique characters in the string. (1 points)
2. Implement a function `encode(s: str) -> list[int]` that takes a string and returns a list of ids (1 point)
3. Implement a function `decode(ids: list[int]) -> str` that takes a list of ids (ints) and returns a string (1 point)


In [None]:
# Implement character-level tokenization function
def tokenize(text):
    return sorted(set(text))

# part 2.1
chars = tokenize(text)

# part 2.2

# Implement encode function
def encode(s: str) -> list[int]:
    return [chars.index(c) for c in s]

# Implement decode function
def decode(ids: list[int]) -> str:
    return ''.join(chars[i] for i in ids)

In [None]:
data = torch.tensor(encode(text), dtype=torch.long).cuda()

RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx

In [None]:
block_size = 16
data[:block_size+1]

To train a transformer, we feed the model `n` tokens (context) and try to predict the `n+1`th token (target) in the sequence.



In [None]:
x = data[:block_size]
y = data[1:block_size+1]
for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print(f"when input is {context} the target: {target}")

In [None]:
batch_size = 64
device = 'cuda' if torch.cuda.is_available() else 'cpu'
def get_batch():
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

print(x)
print(y)

### Single Self Attention Head (5 points)
![](https://i.ibb.co/GWR1XG0/head.png)

In [None]:
#device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cpu'
class SelfAttentionHead(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.q_proj = nn.Linear(64, head_size, bias=False).to(device)
        self.k_proj = nn.Linear(64, head_size, bias=False).to(device)
        self.v_proj = nn.Linear(64, head_size, bias=False).to(device)
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):
        x = x.to(device)
        B, T, C = x.shape
        k = self.k_proj(x)      # (B,T,C)
        q = self.q_proj(x)      # (B,T,C)

        attention = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
        mask = torch.tril(torch.ones(T, T)).to(torch.bool).to(device)
        masked_attention = attention.masked_fill(mask == 0, float('-inf'))
        masked_attention = F.softmax(masked_attention, dim=-1) # (B, T, T)
        masked_attention = self.dropout(masked_attention)
        v = self.v_proj(x).to(device)
        out = masked_attention @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
        return out


x = torch.randn((8,32, 64)).to(device)
attn = SelfAttentionHead(16).to(device)
print(attn(x).shape)

### Multihead Self Attention (5 points)

`constructor`

- Create 4 `SelfAttentionHead` instances. Consider using `nn.ModuleList`
- Create a linear layer with n_embd input dim and n_embd output dim

`forward`

In the forward implementation, pass `x` through each head, then concatenate all the outputs along the feature dimension, then pass the concatenated output through the linear layer

![](https://i.ibb.co/y5SwyZZ/multihead.png)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.num_heads = num_heads
        self.head_size = head_size
        self.dropout = nn.Dropout(p=0.5)

        # Create attention heads using nn.ModuleList
        self.attention_heads = nn.ModuleList([
            SelfAttentionHead(head_size) for _ in range(num_heads)
        ])

        # Linear layer for output projection
        self.output_projection = nn.Linear(num_heads * head_size, head_size * num_heads)

    def forward(self, x):
        # Iterate through heads, applying attention and concatenating outputs
        head_outputs = [head(x) for head in self.attention_heads]
        concatenated_output = torch.cat(head_outputs, dim=2)  # Concatenate along feature dimension

        # Pass concatenated output through linear layer
        output = self.output_projection(concatenated_output)

        return output

x = torch.randn((8,32, 64)).to(device)
attn = MultiHeadAttention(num_heads = 4, head_size=16).to(device)
print(attn(x).shape)

## MLP (2 points)
Implement a 2 layer MLP


![](https://i.ibb.co/C0DtrF5/ff.png)

In [None]:
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear1 = nn.Linear(64, 256)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(256, 64)
        self.dropout = nn.Dropout()

    def forward(self, x: torch.tensor) -> torch.tensor:

        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.dropout(x)
        return x

model = MLP()
input_tensor = torch.randn(8, 32, 64)
output_tensor = model(input_tensor)
#print(output_tensor)  # Print the entire output tensor
print(output_tensor.size())


## Transformer block (20 points)

Layer normalization help training stability by normalizing the outputs of neurons within a single layer across all features for each individual data point, not across a full batch or a specific feature.

Dropout is a form of regularization to prevent overfitting.

This is the diagram of a transformer block:

![](https://i.ibb.co/X85C473/block.png)

In [None]:
class Block(nn.Module):
    def __init__(self, n_embd: int, n_head: int):
        super().__init__()
        self.norm1 = nn.LayerNorm(n_embd)
        self.heads=MultiHeadAttention(n_head,16)
        self.fc1 = nn.Linear(n_embd, n_embd)

    def forward(self, x):
        y=self.norm1(x)
        y=self.heads(y)
        x=x+y
        z=self.norm1(x)
        z=self.fc1(z)
        x=x+z
        return x

model = Block(n_embd=64, n_head=4).to(device)
input_tensor = torch.randn(8, 32, 64).to(device)
output_tensor = model(input_tensor)
print(output_tensor.size())

## GPT

`constructor` (5 points)

1. create the token embedding table and the position embedding table
2. create variable `self.blocks` that is a series of `Block`s. The data will pass through each block sequentially. Consider using `nn.Sequential`
3. create a layer norm layer
4. create a linear layer for predicting the next token

`forward(self, idx, targets=None)`. (5 points)

`forward` takes a batch of context ids as input of size (B, T) and returns the logits and the loss, if targets is not None. If targets is None, return the logits and None.
1. get the token by using the token embedding table created in the constructor
2. create the position embeddings
3. sum the token and position embeddings to get the model input
4. pass the model through the blocks, the layernorm layer, and the final linear layer
5. compute the loss

`generate(start_char, max_new_tokens, top_p, top_k, temperature) -> str` (5 points)
1. implement top p, top_k, and temperature for sampling



![](https://i.ibb.co/n8sbQ0V/Screenshot-2024-01-23-at-8-59-08-PM.png)

In [None]:
import os
%env CUDA_LAUNCH_BLOCKING=1

In [None]:
import torch
torch.backends.cuda.matmul.allow_tf32 = False
import numpy as np
class GPT(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        # create the token embedding table
        self.token_embedding = nn.Embedding(len(chars),n_embd)

        # Create the position embedding table
        self.positional_embedding = nn.Embedding(32, n_embd)
        # Create dropout layer
        self.dropout = nn.Dropout(p=0.5)

        # Create blocks using nn.Sequential to go through series of 4 blocks
        self.blocks = nn.Sequential(*[Block(n_embd, n_head) for _ in range(4)])

        # Create a layer norm layer
        self.layer_norm = nn.LayerNorm(n_embd)

        # Create a linear layer for predicting the next token
        self.linear = nn.Linear(n_embd, len(chars))

    def forward(self, idx, targets=None):
            idx = idx.view(-1, idx.size(-1))
            B,T=idx.shape
            idx = idx.to(self.token_embedding.weight.device)
            token_embeddings = self.token_embedding(idx).to(device)
            positional_embedding = self.positional_embedding(torch.arange(T, device=device))
            x = token_embeddings + positional_embedding
            x = self.dropout(x)
            x= self.blocks(x)
            x = self.layer_norm(x)
            logits = self.linear(x)
            if targets is None:
                loss = None
            else:
                B, T, C = logits.shape
                logits = logits.view(B*T, C)
                targets = targets.view(B*T)
                loss = F.cross_entropy(logits, targets)
            return logits,loss


    def generate(self, start_char, max_new_tokens, top_p, top_k, temperature):
        self.eval()
        with torch.no_grad():
            current_seq = torch.tensor([encode(start_char)], dtype=torch.long).to(device).unsqueeze(0)
            for _ in range(max_new_tokens):
                logits, loss = self(current_seq[:, -1:])
                logits = logits[:, -1, :]
                scaled_logits = logits / temperature
                probabilities = F.softmax(scaled_logits, dim=-1)
                if top_k is not None:
                    sampled_index = top_k_sampling(probabilities, k=top_k)
                elif top_p is not None:
                    sampled_index = top_p_sampling(probabilities, p=top_p)
                sampled_token = torch.tensor([[sampled_index]]).unsqueeze(-1)
                current_seq = torch.cat([current_seq, sampled_token], dim=1)
        generated_string = decode([token.item() for token in current_seq[0]])
        return generated_string

def top_k_sampling(probabilities, k=5):
        probabilities = probabilities.cpu().numpy().flatten()
        top_k_indices = np.argsort(probabilities)[-k:]
        top_k_probabilities = probabilities[top_k_indices]
        top_k_probabilities /= top_k_probabilities.sum()
        chosen_index = np.random.choice(top_k_indices, p=top_k_probabilities)

        return chosen_index

def top_p_sampling(probabilities, p=0.9):
    probabilities = probabilities.cpu().numpy().flatten()
    if len(probabilities) > 1:
      sorted_indices = np.argsort(probabilities)[::-1]
    else:
      sorted_indices = np.argsort(probabilities)

    sorted_probabilities = probabilities[sorted_indices]
    cumulative_probabilities = np.cumsum(sorted_probabilities)
    cutoff_index = np.where(cumulative_probabilities > p)[0][0]
    filtered_indices = sorted_indices[:cutoff_index + 1]

    filtered_probabilities = sorted_probabilities[:cutoff_index + 1]
    filtered_probabilities /= filtered_probabilities.sum()
    chosen_index = np.random.choice(filtered_indices, p=filtered_probabilities)

    return chosen_index

#device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cpu'
gpt_model = GPT(64, 4).to(device)
generated_text = gpt_model.generate(start_char='a', max_new_tokens=100,top_k=None,top_p=0.9, temperature=1.0)
print(generated_text)


### Training loop (15 points)

implement training loop

In [None]:
gpt_model = GPT(64, 4).to(device)
# make you are running this on the GPU
max_iters = 10000
learning_rate = 0.01
optimizer = optim.SGD(gpt_model.parameters(), lr=learning_rate)
for iter in range(max_iters):
  optimizer.zero_grad()
  xb, yb = get_batch()
  logits, loss = gpt_model(xb, yb)
  loss.backward()
  optimizer.step()
  if iter%1000==0:
      print(f'Iteration {iter+1}, Loss: {loss.item()}')
      print(f'Generated text:{gpt_model.generate(start_char="a", max_new_tokens=100,top_k=5,top_p=None, temperature=1.0)}')


### Generate text


print some text that your model generates

In [None]:
# Generate text
print(gpt_model.generate(start_char="a", max_new_tokens=100,top_k=5,top_p=None, temperature=1.0))