In [1]:
# Import the core PyTorch library.
import torch

# Import the torch.nn module which contains all the building blocks for neural networks in PyTorch.
import torch.nn as nn

# Import the functional API from torch.nn, which provides a wide range of functions that can be used 
# in the forward pass of neural networks (e.g., activation functions, loss functions, etc.). 
# Instead of using layers as objects, this allows for more control with lower-level operations.
from torch.nn import functional as F

In [2]:
# Check if CUDA (GPU support) is available in the system.
# torch.cuda.is_available() returns True if a CUDA-capable GPU is present, otherwise False.

# If CUDA is available, 'cuda' is assigned to the variable device (indicating the use of GPU),
# Otherwise, 'cpu' is assigned (indicating the use of the CPU for computation).

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Output the value of the 'device' variable.
# This line simply displays which device (GPU or CPU) will be used for computations.
device

'cpu'

In [3]:
# Define the size of each input block or sequence chunk used in training.
# This typically refers to the number of tokens or elements processed together in one forward pass.
block_size = 8

# Define the number of samples processed together in a single iteration.
# This is known as the batch size and is crucial for controlling memory usage and computation speed.
batch_size = 4

# Set the maximum number of training iterations or epochs to run.
# This determines how many times the model will be trained over the entire dataset.
max_iters = 1000

# Specify the learning rate for the optimizer.
# This controls how much the model weights are updated during training. A smaller value means smaller updates.
learning_rate = 3e-3

# Define how frequently the model's performance is evaluated during training.
# This number represents the iteration at which evaluation occurs to monitor training progress.
eval_iters = 250

# Set the dimensionality of the embedding vectors used in the model.
# This specifies the size of the vector that represents each token or input feature.
n_embd = 384

# Define the number of attention heads in the multi-head attention mechanism.
# Multiple heads allow the model to focus on different parts of the input sequence simultaneously.
n_head = 4

# Set the number of layers in the neural network, such as in a Transformer model.
# More layers can increase the model’s capacity but may also lead to longer training times.
n_layer = 4

# Specify the dropout rate used during training.
# Dropout is a regularization technique that randomly drops units from the neural network during training to prevent overfitting.
dropout = 0.2

In [4]:
# Open the file 'wizard_of_oz.txt' in read mode ('r').
# Specify 'utf-8' encoding to correctly handle characters in various languages.
with open('the_book_of_wonder.txt', 'r', encoding='utf-8') as f:
    # Read the entire content of the file into the variable 'text'.
    # The read() method reads all lines from the file and stores them as a single string.
    text = f.read()

# Print the length of the text read from the file.
# The len() function calculates the number of characters in the string stored in 'text'.
print(len(text))

124026


In [5]:
# Create a set of unique characters from the text.
# The set() function removes duplicate characters, leaving only unique ones.
chars_set = set(text)

# Sort the unique characters alphabetically.
# The sorted() function returns a list of characters in ascending order.
chars = sorted(chars_set)

# Print the sorted list of unique characters.
# This will show all the distinct characters present in the text file, ordered from smallest to largest.
print(chars)

['\n', ' ', '!', '"', "'", '(', ')', ',', '-', '.', '1', '2', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'ë']


In [6]:
# Calculate the size of the vocabulary by determining the number of unique characters.
# The length of the 'chars' list represents the total count of distinct characters.
vocab_size = len(chars)

# Output the size of the vocabulary.
# This will show the number of unique characters present in the text file.
vocab_size

69

In [7]:
# Create a dictionary that maps each character to a unique integer index.
# The dictionary comprehension iterates over 'chars' with both index (i) and character (ch).
# This creates a mapping from each character to its corresponding index.
string_to_int = { ch: i for i, ch in enumerate(chars) }

# Create a dictionary that maps each integer index back to the corresponding character.
# This dictionary comprehension also iterates over 'chars' with both index (i) and character (ch).
# This creates a mapping from each index to its corresponding character.
int_to_string = { i: ch for i, ch in enumerate(chars) }

# Define a lambda function to encode a string into a list of integers.
# This function converts each character in the string 's' to its corresponding integer index 
# using the 'string_to_int' dictionary.
encode = lambda s: [string_to_int[c] for c in s]

# Define a lambda function to decode a list of integers back into a string.
# This function converts each integer index in the list 'l' to its corresponding character 
# using the 'int_to_string' dictionary and joins them to form the resulting string.
decode = lambda l: ''.join([int_to_string[i] for i in l])

In [8]:
# Convert the text into a list of integer indices using the 'encode' function.
# Each character in the text is mapped to its corresponding integer index.
encoded_text = encode(text)

# Convert the list of integer indices into a PyTorch tensor.
# The tensor will have the data type 'long' which is appropriate for storing integer values.
# The tensor represents the text data in a format suitable for use in PyTorch operations.
data = torch.tensor(encoded_text, dtype=torch.long)

In [9]:
# Calculate the number of data points to use for training.
# This is 80% of the total length of the data.
# The result is converted to an integer to ensure it represents a valid index.
n = int(0.8 * len(data))

# Split the data into training and validation sets.
# 'train_data' contains the first 80% of the data.
# 'val_data' contains the remaining 20% of the data.
train_data = data[:n]  # Training data (first 80% of the dataset)
val_data = data[n:]    # Validation data (remaining 20% of the dataset)

In [10]:
def get_batch(split):
    # Determine which dataset to use based on the 'split' argument.
    # If 'split' is 'train', use the training data; otherwise, use the validation data.
    data = train_data if split == 'train' else val_data

    # Randomly sample 'batch_size' starting indices from the data.
    # These indices are chosen such that there's enough room to extract a full block of data.
    ix = torch.randint(len(data) - block_size, (batch_size,))

    # Create input sequences (x) and target sequences (y) for the batch.
    # Each input sequence is a block of length 'block_size'.
    # The target sequence is the input sequence shifted by one position.
    x = torch.stack([data[i:i + block_size] for i in ix])
    y = torch.stack([data[i + 1:i + block_size + 1] for i in ix])

    # Move the tensors to the specified device (CPU or GPU).
    x, y = x.to(device), y.to(device)

    # Return the input sequences and target sequences for the batch.
    return x, y

In [11]:
@torch.no_grad()  # Decorator that disables gradient calculation, saving memory and computations during inference.
def estimate_loss():
    out = {}  # Dictionary to store the average loss for each data split ('train' and 'val').
    
    model.eval()  # Set the model to evaluation mode, which changes the behavior of certain layers like dropout and batch norm.
    
    for split in ['train', 'val']:
        # Create a tensor to hold loss values for each evaluation iteration.
        losses = torch.zeros(eval_iters)
        
        for k in range(eval_iters):
            # Retrieve a batch of data for the current split ('train' or 'val').
            X, Y = get_batch(split)
            
            # Perform a forward pass through the model and compute the loss.
            # 'logits' are the model's predictions, and 'loss' is the calculated loss for this batch.
            logits, loss = model(X, Y)
            
            # Store the loss value for this iteration.
            losses[k] = loss.item()
        
        # Compute and store the average loss for the current split.
        out[split] = losses.mean()
    
    model.train()  # Set the model back to training mode, restoring the behavior of layers altered by eval().
    
    return out  # Return the dictionary containing average losses for both the training and validation sets.

In [12]:
# Define a class for one head of self-attention
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        # Define the linear layers for the key, query, and value projections.
        # These layers transform the input embedding into a smaller dimensionality 'head_size'.
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        
        # Create a lower triangular matrix to mask future tokens in the attention mechanism.
        # This ensures that each token can only attend to itself and earlier tokens.
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        # Define dropout for regularizing the attention weights.
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Input tensor 'x' has shape (batch, time-step, channels)
        B, T, C = x.shape  # Unpack the dimensions
        
        # Compute key, query, and value projections
        k = self.key(x)   # Shape: (B, T, head_size)
        q = self.query(x) # Shape: (B, T, head_size)
        
        # Compute attention scores (affinities) by performing a scaled dot-product
        wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5 # Shape: (B, T, T)
        
        # Apply the lower triangular mask to prevent attending to future tokens
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))  # Shape: (B, T, T)
        
        # Normalize attention scores using softmax to obtain attention weights
        wei = F.softmax(wei, dim=-1)  # Shape: (B, T, T)
        
        # Apply dropout to the attention weights
        wei = self.dropout(wei)
        
        # Perform the weighted aggregation of values
        v = self.value(x) # Shape: (B, T, head_size)
        out = wei @ v # Shape: (B, T, head_size)
        
        # Return the output of the attention mechanism
        return out

In [13]:
# Define a class for multi-head self-attention
class MultiHeadAttention(nn.Module):
    def __init__(self, n_head, head_size):
        super().__init__()
        # Initialize multiple heads of self-attention
        # 'n_head' is the number of attention heads, and 'head_size' is the dimensionality of each head
        self.heads = nn.ModuleList([Head(head_size) for _ in range(n_head)])
        
        # Define a linear projection layer to combine the outputs of all attention heads
        # This projects the concatenated output of all heads back to the original embedding dimension
        self.proj = nn.Linear(head_size * n_head, n_embd)
        
        # Define dropout for regularizing the combined attention output
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Apply each attention head to the input
        # 'x' has shape (batch, time-step, channels)
        # Each head returns an output of shape (batch, time-step, head_size)
        # We concatenate the outputs of all heads along the feature dimension (last dimension)
        out = torch.cat([h(x) for h in self.heads], dim=-1)  # Shape: (batch, time-step, head_size * n_head)
        
        # Apply the linear projection to combine the outputs of all heads
        # This projects the concatenated features to the original embedding dimension
        out = self.dropout(self.proj(out))
        
        # Return the final output
        return out

In [14]:
# Define a class for a feed-forward neural network layer
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        # Initialize the feed-forward network as a sequential model
        # This consists of two linear layers with a ReLU activation function in between
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),  # First linear layer expands the input from 'n_embd' to '4 * n_embd'
            nn.ReLU(),                      # ReLU activation introduces non-linearity
            nn.Linear(4 * n_embd, n_embd),  # Second linear layer reduces the dimensionality back to 'n_embd'
            nn.Dropout(dropout),            # Dropout layer for regularization
        )

    def forward(self, x):
        # Pass the input through the feed-forward network
        return self.net(x)

In [15]:
# Define a class for a Transformer block
class Block(nn.Module):
    # Initialize the Transformer block
    # n_embd: embedding dimension
    # n_head: number of attention heads
    def __init__(self, n_embd, n_head):
        super().__init__()
        # Compute the size of each attention head
        head_size = n_embd // n_head
        
        # Initialize the multi-head self-attention layer
        self.sa = MultiHeadAttention(n_head, head_size)
        
        # Initialize the feed-forward network
        self.ffwd = FeedForward(n_embd)
        
        # Initialize layer normalization layers
        # Layer normalization is applied before the residual connections
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        # Apply multi-head self-attention to the input
        y = self.sa(x)
        
        # Add the residual connection (x) to the attention output (y)
        # Apply layer normalization to the result
        x = self.ln1(x + y)
        
        # Apply the feed-forward network to the normalized result
        y = self.ffwd(x)
        
        # Add the residual connection (x) to the feed-forward output (y)
        # Apply layer normalization to the result
        x = self.ln2(x + y)
        
        # Return the final output of the Transformer block
        return x

In [16]:
class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        
        # Initialize the token embedding table
        # Maps token indices to dense vectors of size 'n_embd'
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        
        # Initialize the position embedding table
        # Maps position indices to dense vectors of size 'n_embd'
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        
        # Initialize the Transformer blocks
        # A sequential container of 'Block' instances
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])

        # Final layer normalization
        self.ln_f = nn.LayerNorm(n_embd)
        
        # Output linear layer
        # Maps the final hidden state of size 'n_embd' to the vocabulary size
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # Apply weight initialization
        self.apply(self._init_weights)

    def _init_weights(self, module):
        # Custom weight initialization
        if isinstance(module, nn.Linear):
            # Initialize weights with a normal distribution
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.2)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            # Initialize embedding weights with a normal distribution
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, index, targets=None):
        B, T = index.shape
        
        # Get token embeddings
        tok_emb = self.token_embedding_table(index) # (B, T, C)
        
        # Get position embeddings
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T, C)
        
        # Add token and position embeddings
        x = tok_emb + pos_emb # (B, T, C)
        
        # Pass through Transformer blocks
        x = self.blocks(x) # (B, T, C)
        
        # Apply final layer normalization
        x = self.ln_f(x) # (B, T, C)
        
        # Compute logits for each token
        logits = self.lm_head(x) # (B, T, vocab_size)

        # Compute loss if targets are provided
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C) # Reshape logits for cross-entropy loss
            targets = targets.view(B*T) # Reshape targets
            loss = F.cross_entropy(logits, targets) # Compute cross-entropy loss

        return logits, loss

    def generate(self, index, max_new_tokens):
        # Generate new tokens based on the initial input
        for _ in range(max_new_tokens):
            # Forward pass to get logits
            logits, _ = self(index)
            
            # Focus on the last token's logits
            logits = logits[:, -1, :]
            
            # Convert logits to probabilities
            probs = F.softmax(logits, dim=-1)
            
            # Sample from the probability distribution to get the next token
            index_next = torch.multinomial(probs, num_samples=1)
            
            # Concatenate the new token to the input sequence
            index = torch.cat((index, index_next), dim=1)
        
        return index

In [17]:
# Instantiate the GPTLanguageModel with the vocabulary size
model = GPTLanguageModel(vocab_size)

# Move the model to the specified device
# This could be either 'cuda' (GPU) or 'cpu', depending on availability
m = model.to(device)

In [18]:
# Create an AdamW optimizer for the model parameters
# AdamW is an optimization algorithm that includes weight decay (regularization) and is commonly used for training transformers
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# Training loop
for iter in range(max_iters):
    # Periodically evaluate the model on the training and validation sets
    if iter % eval_iters == 0:
        losses = estimate_loss()
        # Print the current training and validation losses
        print(f'step: {iter}, train loss: {losses["train"]:.3f}, val loss: {losses["val"]:.3f}')

    # Get a batch of training data
    xb, yb = get_batch('train')

    # Forward pass: compute the model output and loss
    logits, loss = model.forward(xb, yb)
    
    # Zero the gradients of the optimizer
    # Using 'set_to_none=True' can be more memory efficient, setting gradients to `None` instead of zeroing
    optimizer.zero_grad(set_to_none=True)
    
    # Backward pass: compute the gradient of the loss with respect to model parameters
    loss.backward()
    
    # Update the model parameters using the optimizer
    optimizer.step()

# Print the final loss after completing the training loop
print(loss.item())

step: 0, train loss: 10.294, val loss: 10.273
step: 250, train loss: 3.148, val loss: 3.157
step: 500, train loss: 3.119, val loss: 3.080
step: 750, train loss: 3.093, val loss: 3.122
3.026149272918701
