In [67]:
import torch

In [68]:
# Read in our data file
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [69]:
# Create a list of every character in the text
char_list = sorted(list(set(text)))

In [70]:
# This converts individual characters to integers (encoder)
in_encode = {char: i for i, char in enumerate(char_list)}
# This converts individual integers to chatacters (decoder)
out_decode = {i: char for i, char in enumerate(char_list)}

# Define our encoder and decoder as function
encode = lambda string: [in_encode[char] for char in string]
decode = lambda encoding: ''.join([out_decode[integer] for integer in encoding])

In [71]:
# Store our entire text as a tensor
data = torch.tensor(encode(text), dtype=torch.long)

In [72]:
# Split the text into train and test (video uses 0.9)
train_data = data[:int(0.8*len(data))]
val_data = data[int(0.8*len(data)):]

In [73]:
# Set a block size to train the model
BLOCK_SIZE = 128
# Set a batch size (number of random blocks we will select)
BATCH_SIZE = 32
VOCAB_SIZE = len(char_list)
N_EMBED=120
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
LR = 3e-4
N_HEAD=6
N_LAYER = 6
DROPOUT = 0.2
MAX_ITER = 5000
EVAL_ITER = 100

In [74]:
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - BLOCK_SIZE, (BATCH_SIZE,))
    x = torch.stack([data[i:i+BLOCK_SIZE] for i in ix])
    y = torch.stack([data[i+1:i+BLOCK_SIZE+1] for i in ix])
    x, y = x.to(DEVICE), y.to(DEVICE)
    return x, y

In [75]:
x_batch, y_batch = get_batch('train')

In [76]:
import torch.nn as nn
from torch.nn import functional as F

'''Implements a single head of self-attention'''
class Head(nn.Module):
    
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(N_EMBED, head_size, bias=False)
        self.query = nn.Linear(N_EMBED, head_size, bias=False)
        self.value = nn.Linear(N_EMBED, head_size, bias=False)
        self.register_buffer('lower_tri', torch.tril(torch.ones(BLOCK_SIZE, BLOCK_SIZE)))
        self.dropout = nn.Dropout(DROPOUT)
        
    def forward(self, x):
        # B = length of sequence (called "seq" in paper)number of batches being computed in parallel
        # T = length of sequence (called "seq" in paper). This is equal to block size.
        # C = embedding vector size (called d_model in paper)
        B, T, C = x.shape
        q = self.query(x) # Query: (B x T x C)
        k = self.key(x) # Key: (B x T x C)
        
        # Take the dot product of queries and keys as a matrix to get similarities (attention!)
        # Have to transpose the last two dimensions to make the shapes correct
        # This will give us a single "similarity score" for each token pair
        # C is the head size here. We have to add this division at the end to recover unit variance.
        # Otherwise, softmax will create very deterministic rows
        weights = q @ k.transpose(-2, -1) * C**(-0.5) # (B x T x C) @ (B x C x T) --> (B x T x T)
        # Convert the upper triangular entries in the weight matrix to -infinity. This makes it so that the
        # future tokens cannot communicate with the past ones. When making an encoder block, this line
        # would be removed.
        weights = weights.masked_fill(self.lower_tri[:T, :T] == 0, float('-inf')) # (B x T x T)
        # Apply softmax to get amount of attention each token pays to each token
        weights = F.softmax(weights, dim=-1)
        weights = self.dropout(weights)
        
        v = self.value(x) # Value: (B x T x C)
        
        # Get the dot product of the weights and the value to get the final embeddings for the block
        # This scales the value vector for each token by its entry in the weight matrix
        out = weights @ v # (B x T x T) @ (B x T x C) --> (B x T x C)
        return out

In [77]:
class MultiHeadAttention(nn.Module):
    
    def __init__(self, n_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(n_heads)])
        self.proj = nn.Linear(N_EMBED, N_EMBED)
        self.dropout = nn.Dropout(DROPOUT)
        
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [78]:
class FeedForward(nn.Module):
    
    def __init__(self, n_embed):
        super().__init__()
        # These 4x multipliers are based on the Attention is All You Need paper
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed),
            nn.ReLU(),
            nn.Linear(4 * n_embed, n_embed),
            nn.Dropout(DROPOUT)
        )
    
    def forward(self, x):
        return self.net(x)

In [79]:
class Block(nn.Module):
    
    def __init__(self, n_embed, n_head):
        super().__init__()
        head_size = n_embed // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.f_forward = FeedForward(n_embed)
        # Layer norm gives rows unit Gaussian distributions at initialization
        # After training this likely will not be the case
        self.layer_norm1 = nn.LayerNorm(n_embed)
        self.layer_norm2 = nn.LayerNorm(n_embed)
        
    def forward(self, x):
        # Using x = x + ... implements residual connections
        x = x + self.sa(self.layer_norm1(x))
        x = x + self.f_forward(self.layer_norm2(x))
        return x

In [80]:
class BigramLanguageModel(nn.Module):
    
    def __init__(self, vocab_size):
        super().__init__()
        # Make a token embedding table for each character
        self.token_embedding_table = nn.Embedding(VOCAB_SIZE, N_EMBED)
        # Make a positional embedding table to keep track of relative char position
        self.position_embedding_table = nn.Embedding(BLOCK_SIZE, N_EMBED)
        # Implement self attention via Multi-Headed Attention
        self.blocks = nn.Sequential(*[Block(N_EMBED, n_head=N_HEAD) for _ in range(N_LAYER)])
        self.layer_norm = nn.LayerNorm(N_EMBED)
        self.lm_head = nn.Linear(N_EMBED, VOCAB_SIZE)
        
    def forward(self, ix, targets=None):
        B, T = ix.shape
        
        # Gets a (B,T,C) embedding (Batch, Time, Channels)
        # Use as logits (scores) for next character in sequence
        token_embed = self.token_embedding_table(ix) # (B x T x C)
        pos_embed = self.position_embedding_table(torch.arange(T, device=DEVICE)) # (T x C)
        embed = token_embed + pos_embed
        embed = self.blocks(embed)
        logits = self.lm_head(embed) # (B x T x VOCAB_SIZE)
        
        # We don't want any loss when generating
        if targets is None:
            loss=None
        
        else:
            B, T, C = logits.shape

            # Collapse B and T into one dimension to fit the cross_entropy usage
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)

            # Check how well you predict the next char based on the logits
            # The correct next token in the char's row should have a high value, and the rest should be low
            loss = F.cross_entropy(logits, targets)
        return logits, loss
    
    # Generate the next tokens based on a given input
    def generate(self, ix, max_new_tokens):
        # ix = B x T array of indices in current context
        for _ in range(max_new_tokens):
            ix_cond = ix[:, -BLOCK_SIZE:]
            # get the predictions at the current index
            logits, loss = self(ix_cond)
            # Look at just the last time step
            logits = logits[:, -1, :] # Collapse logits to B x C
            # Apply softmax to get probs
            probs = F.softmax(logits, dim=-1) # B x C
            # Sample 
            next_ix = torch.multinomial(probs, num_samples=1)
            ix = torch.cat((ix, next_ix), dim=1)
        return ix

In [81]:
model = BigramLanguageModel(len(char_list))
model = model.to(DEVICE)
logits, loss = model(x_batch, y_batch)
print(f"{round(sum(p.numel() for p in model.parameters())/1e6, 3)} million parameters")

torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
1.075 million parameters


In [82]:
# use PyTorch optimizer
# Smaller learning rates are usually used, but our model is small so we can get away with it
opt = torch.optim.AdamW(model.parameters(), lr=LR)

In [83]:
loss_prog = []
import time

t1 = time.time()
for iter_num in range(1, MAX_ITER):
    
    if iter_num % 500 == 0:
        print(f"Iteration {iter_num}: {(time.time() - t1)//60} minutes in.")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    
    # every once in a while append the loss to the loss tracker
    if iter_num % EVAL_ITER == 0 or iter_num == MAX_ITER:
        loss_prog.append(round(loss.item(), 3))
    
    opt.zero_grad(set_to_none=True)
    loss.backward()
    opt.step()
    
print(f"Training time: {(time.time() - t1) // 3600} hours, {((time.time() - t1)//60)%60} minutes, and {round((time.time() - t1)%60, 3)} seconds.")

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=DEVICE)
generated = decode(model.generate(context, max_new_tokens=500)[0].tolist())

torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
torch.Size([32, 128, 120])
t


KeyboardInterrupt



In [None]:
import matplotlib.pyplot as plt
import numpy as np

plt.plot(np.arange(1, len(loss_prog)+1, 1), loss_prog)
plt.xlabel('Iteration (hundreds)')
plt.ylabel('Loss')
plt.show()

In [None]:
print(generated)

In [None]:
print("Model's state_dict:")
for param_tensor in model.state_dict():
    print(param_tensor, "\t", model.state_dict()[param_tensor].size())
    
torch.save(model.state_dict(), f"shakespeare_{N_EMBED}_embed.pt")