In [1]:
!pwd
# Load libraries
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np

/content


In [2]:
# Read in the dataset to train on
with open("drive/MyDrive/data/input.txt", "r", encoding="utf-8") as f:
  text = f.read()

In [3]:
print("Length of data in characters: ", len(text))

Length of data in characters:  1115394


In [4]:
# Get unique characters
chars = sorted(list(set(text)))
print("".join(chars))


 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz


In [5]:
# Map characters to integers (can use tiktoken library to do this)
# Here, tokenization is by character
# In practice, it is likely by parts of words
stoi = {ch:i for i,ch in enumerate(chars)}
itos = {i:ch for i,ch in enumerate(chars)}
# Encoder: take a string; output a list of integers
encode = lambda s: [stoi[c] for c in s]
# Decoder: take a list of integers, output a string
decode = lambda l: "".join(itos[i] for i in l)
# Test the encoder and decoder
print(encode("Hello!"))
print(decode(encode("Hello!")))

[20, 43, 50, 50, 53, 2]
Hello!


In [6]:
# Encode the whole text dataset and store it in a torch.Tensor
data = torch.tensor(encode(text), dtype=torch.long)
print(data.shape, data.dtype)
print(data[:10])

torch.Size([1115394]) torch.int64
tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47])


In [7]:
# Split the data into training and validation data
n = int(0.9*len(data)) # 90% for training data
train_data = data[:n]
val_data = data[n:]

In [8]:
# Train train_data with the transformer - time dimension
# To avoid being computationally expensive, train with chunks of the dataset
block_size = 8
x = train_data[:block_size] # characters of block_size
y = train_data[1:block_size+1] # the next block_size characters
for t in range(block_size):
  context = x[:t+1] # make the transformer see from a character to block_size
  target = y[t]
  print(f"When the input is {context}, the target is: {target}")

When the input is tensor([18]), the target is: 47
When the input is tensor([18, 47]), the target is: 56
When the input is tensor([18, 47, 56]), the target is: 57
When the input is tensor([18, 47, 56, 57]), the target is: 58
When the input is tensor([18, 47, 56, 57, 58]), the target is: 1
When the input is tensor([18, 47, 56, 57, 58,  1]), the target is: 15
When the input is tensor([18, 47, 56, 57, 58,  1, 15]), the target is: 47
When the input is tensor([18, 47, 56, 57, 58,  1, 15, 47]), the target is: 58


In [9]:
# Batch dimension (for parallel processing)
# Separate train_data randomly
torch.manual_seed(1105)
batch_size = 4 # num of independent sequences to process in parallel
block_size = 8

def get_batch(split, train_data=train_data, val_data=val_data):
  """Generate a small bach of inputs x and targets y."""
  data = train_data if split == "train" else val_data
  # Generate random numbers of length of (len(data) - block_size)
  # with batch_size num of offsets
  ix = torch.randint(len(data) - block_size, (batch_size,))
  x = torch.stack([data[i:i+block_size] for i in ix])
  # y is offset by 1 compared to x
  y = torch.stack([data[i+1:i+block_size+1] for i in ix])
  return x, y
xb, yb = get_batch("train", train_data=train_data, val_data=val_data)
print("Inputs to the transformer: ")
print(xb.shape)
print(xb)
print("Targets: ")
print(yb.shape)
print(yb)

Inputs to the transformer: 
torch.Size([4, 8])
tensor([[43, 42,  1, 51, 43,  1, 40, 56],
        [ 8,  0, 26, 53, 61,  1, 54, 59],
        [21,  6,  1, 58, 53,  1, 51, 39],
        [58,  1, 40, 47, 58, 58, 43, 56]])
Targets: 
torch.Size([4, 8])
tensor([[42,  1, 51, 43,  1, 40, 56, 53],
        [ 0, 26, 53, 61,  1, 54, 59, 58],
        [ 6,  1, 58, 53,  1, 51, 39, 49],
        [ 1, 40, 47, 58, 58, 43, 56, 50]])


In [10]:
for b in range(batch_size): # batch dimension
  for t in range(block_size): # time dimension
    context = xb[b, :t+1]
    target = yb[b, t]
    print(f"When the input is {context}, the target is: {target}")

When the input is tensor([43]), the target is: 42
When the input is tensor([43, 42]), the target is: 1
When the input is tensor([43, 42,  1]), the target is: 51
When the input is tensor([43, 42,  1, 51]), the target is: 43
When the input is tensor([43, 42,  1, 51, 43]), the target is: 1
When the input is tensor([43, 42,  1, 51, 43,  1]), the target is: 40
When the input is tensor([43, 42,  1, 51, 43,  1, 40]), the target is: 56
When the input is tensor([43, 42,  1, 51, 43,  1, 40, 56]), the target is: 53
When the input is tensor([8]), the target is: 0
When the input is tensor([8, 0]), the target is: 26
When the input is tensor([ 8,  0, 26]), the target is: 53
When the input is tensor([ 8,  0, 26, 53]), the target is: 61
When the input is tensor([ 8,  0, 26, 53, 61]), the target is: 1
When the input is tensor([ 8,  0, 26, 53, 61,  1]), the target is: 54
When the input is tensor([ 8,  0, 26, 53, 61,  1, 54]), the target is: 59
When the input is tensor([ 8,  0, 26, 53, 61,  1, 54, 59]), t

In [11]:
# Class for the language model

torch.manual_seed(1105)
vocab_size = len(chars)

class BigramLanguageModel(nn.Module):
  """Bigram language model"""
  def __init__(self, vocab_size):
    super().__init__()
    # Each token reads off the logits for next token from a lookup table
    self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

  def forward(self, idx, targets):
    """Embed and evaluate the loss function"""
    # idx and targets are both (B, T) tensor of integers
    # batch by time by channel (channel is vocab_size)
    logits = self.token_embedding_table(idx) # (B, T, C): torch.Size([4, 8, 65])
    # Reshape logits/targets to match the requirements of F.cross_entropy function
    B, T, C = logits.shape
    logits = logits.view(B * T, C) # torch.Size([32, 65])
    targets = targets.view(B * T)
    # ---------
    # Evaluate the loss funtion (negative log likelihood)
    # The correct dimension of logits should have a high number
    # And all other dimension should be low
    loss = F.cross_entropy(logits, targets)
    # ---------
    return logits, loss

  def generate(self, max_new_tokens):
    """A generate function to evaluate the quality of the model"""
    # idx is (B, T) array of indecs in the current context
    for _ in range(max_new_tokens):
      # Get the predictions
      logits, loss = self(idx)
      # Focus only on the step last time
      logits = logits[:, -1, :] # becomes (B, C)
      # Apply sofmax to get probabilities
      probs = F.softmax(logits, dim=-1) # (B, C)
      # Sample from the distribution
      idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
      # Append sampled index to the running sequence
      idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
    return idx

m = BigramLanguageModel(vocab_size)
logits, loss = m(xb, yb)
print(logits.shape)
print(loss)

torch.Size([32, 65])
tensor(4.6513, grad_fn=<NllLossBackward0>)


In [12]:
# Compare loss with the negative ln
-np.log(1/vocab_size)

4.174387269895637

In [12]:
# Self-attention
torch.manual_seed(1105)
B,T,C = 4,8,32 # batch, time, channels
x = torch.randn(B,T,C)

# A single head performing self-attention
head_size = 16
key = nn.Linear(C, head_size, biaz=False)
query = nn.Linear(C, head_size, bias=False)
value = nn.Linear(C, head_size, bias=False)
k = key(x) # (B, T, 16)
q = query(x) # (B, T, 16)
wei  = q @ k.transpose(-2, -1) # (B, T, 16) @ (B, 16, T) -> (B, T, T)

tril = torch.tril(torch.ones(T, T)) # lower triangle
wei = wei.masked_fill(tril == 0, float(-inf))
wei = F.softmax(wei. dim=1)

v = value(x)
out = wei @ v
out.shape