In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os

In [None]:
# how often to do an evaluation step
eval_interval = 20

# number of training iterations
max_iters = 5000

# optimizer's learning rate
learning_rate=3e-4

# minibatch size, how many inputs to 'pack' per iteration 
batch_size = 128

# block size is the maximum sequence length used as input.
# E.g. for block_size 4 and input ABCD, we have training samples A->B, AB->C, ABC->C, ABCD->E
block_size = 256

# size of the embeddings
n_embd = 256

# number of attention heads in Multi-Attention mechanism (the Nx in the GPT decoder diagram)
n_head = 8

# depth of the network as number of decoder blocks.
# Each block contains a normalization, an attention and a feed forward unit
n_layer = 10

# dropout rate (variable p) for dropout units
dropout = 0.2

In [None]:
class Head(nn.Module):

  def __init__(self, head_size):
    super().__init__()
    self.key   = nn.Linear(n_embd, head_size, bias=False)
    self.query = nn.Linear(n_embd, head_size, bias=False)
    self.value = nn.Linear(n_embd, head_size, bias=False)
    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
    self.dropout = nn.Dropout(dropout) #randomly prevents some tokens from communicating with each other

  def forward(self, x):
    B,T,C = x.shape
    k = self.key(x) #shape (B,T, head_size)
    q = self.query(x) #shape (B,T, head_size)
    v = self.value(x) #shape (B,T, head_size)

    #compute self-attention scores
    wei = q @ k.transpose(-2, -1) #shape (B,T, head_size) @ (B,head_size,T) --> (B,T,T)
    wei *= C**-0.5 #scale by sqrt(d_k) as per paper, so that variance of the wei is 1
    wei = wei.masked_fill(self.tril[:T,:T]==0, float('-inf')) # (B,T,T)
    wei = F.softmax(wei, dim=-1) # (B, T, T)
    wei = self.dropout(wei)

    #perform weighted aggregation of values
    out = wei @ v # (B, T, T) @ (B, T, head_size) --> (B, T, head_size)
    return out


class MultiHeadAttention(nn.Module):
  """ Multi-head attention as described in the paper. Simply a collection of heads with concatenated outputs."""

  def __init__(self, num_heads, head_size):
    super().__init__()
    self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
    self.proj  = nn.Linear(n_embd, n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    out = torch.cat([head(x) for head in self.heads], dim=-1)
    out = self.proj(out)
    out = self.dropout(out)
    return out

class FeedForward(nn.Module):
  """ the feed forward network (FFN) in the paper"""

  def __init__(self, n_embd=n_embd):
    super().__init__()
    # Note: in the paper (section 3.3) we have d_{model}=512 and d_{ff}=2048.
    # Therefore the inner layer is 4 times the size of the embedding layer
    self.net = nn.Sequential(
        nn.Linear(n_embd, n_embd*4),
        nn.ReLU(),
        nn.Linear(n_embd*4, n_embd),
        nn.Dropout(dropout)
      )
  def forward(self, x):
    return self.net(x)


class Block(nn.Module):
  """ Transformer block: comunication (attention) followed by computation (FFN) """

  def __init__(self, n_embd, n_head):
    # n_embd: embedding dimension
    # n_heads : the number of heads we'd like to use
    super().__init__()
    head_size = n_embd // n_head
    self.sa = MultiHeadAttention(n_head, head_size)
    self.ffwd = FeedForward(n_embd)
    self.ln1 = nn.LayerNorm(n_embd)
    self.ln2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    x = x + self.sa(self.ln1(x))
    x = x + self.ffwd(self.ln2(x))
    return x



class GPTlite(nn.Module):

  def __init__(self, vocab_size):
    super().__init__()
    
    # vocabulary embedding and positional embedding
    self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
    self.position_embedding_table = nn.Embedding(block_size, n_embd)

    #sequence of attention heads and feed forward layers
    self.blocks = nn.Sequential( *[Block(n_embd, n_head) for _ in range(n_layer)])

    #one layer normalization layer after transformer blocks and before linear layer that outputs the vocabulary
    self.ln = nn.LayerNorm(n_embd)
    self.lm_head = nn.Linear(n_embd, vocab_size, bias=False)
  

  def forward(self, idx):
    """ call the model with idx and targets (training) or without targets (generation)"""

    #idx and targets are both of shape (B,T)
    B, T = idx.shape
    tok_emb = self.token_embedding_table(idx) #shape (B,T,C)
    pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device)) #shape (T,C)
    x = tok_emb + pos_emb #shape (B,T,C)
    x = self.blocks(x)
    x = self.ln(x)
    logits = self.lm_head(x) #shape (B,T,C)
    logits = torch.swapaxes(logits, 1, 2) #shape (B,C,T) to comply with CrossEntropyLoss
    return logits


  def generate(self, idx, max_new_tokens):
    """ given a context idx, generate max_new_tokens tokens and append them to idx """
    for _ in range(max_new_tokens):
      idx_cond = idx[:, -block_size:] #we can never have any idx longer than block_size
      logits = self(idx_cond) #call fwd without targets
      logits = logits[:, :, -1] # take last token. shape (B, C)
      #convert logits to probabilities
      probs = F.softmax(logits, dim=-1) # shape (B, C)
      #randomly sample the next tokens, 1 for each of the previous probability distributions
      #(one could take instead the argmax, but that would be deterministic and boring)
      idx_next = torch.multinomial(probs, num_samples=1) # shape (B, 1)
      #append next token ix to the solution sequence so far
      idx = torch.cat([idx, idx_next], dim=-1) # shape (B, T+1)
    return idx  
  
  def generate(self, idx, max_new_tokens):
    """Given a context idx, generate max_new_tokens tokens and append them to idx, yielding tokens as they are generated."""
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -block_size:]  # we can never have any idx longer than block_size
        logits = self(idx_cond)  # call fwd without targets
        logits = logits[:, :, -1]  # take last token. shape (B, C)
        probs = F.softmax(logits, dim=-1)  # convert logits to probabilities, shape (B, C)
        idx_next = torch.multinomial(probs, num_samples=1)  # randomly sample the next tokens, shape (B, 1)
        idx = torch.cat([idx, idx_next], dim=-1)  # append next token idx to the sequence, shape (B, T+1)
        yield idx_next.item()  # yield the generated token

  

In [None]:
# set the random seed, for reproducibility
torch.manual_seed(42)
# device: where to execute computation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# use real data (mini shakespeare) download if doesn't exist
if not os.path.exists("data"):
    os.makedirs("data")
    os.system(
        "wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt -O data/input.txt"
    )
with open("data/input.txt", "r") as f:
    text = f.read()
# collect sorted list of input characters and create
# string-to-int (stoi) and int-to-string (itos) representations:
chars = sorted(list(set(text)))
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}

# define encode and decode functions that convert strings to arrays of tokens and vice-versa
encode = lambda x: torch.tensor([stoi[ch] for ch in x], dtype=torch.long)  # encode text to integers
decode = lambda x: "".join([itos[i] for i in x])  # decode integers to text
vocab_size = len(stoi)

# enc = tiktoken.encoding_for_model("gpt-4")
# vocab_size = enc.n_vocab
# print(enc.n_vocab)
# print(enc.encode("Hello world"))
# # token_embedding_table = nn.Embedding(vocab_size, n_embd)    # from tokens to embedding
# # position_embedding_table = nn.Embedding(block_size, n_embd) # from position to embedding

data = encode(text)  # use any encoder here


n = int(0.9 * len(data))
train_data, valid_data = data[:n], data[n:]


# def get_batch(source):
#     """get batch of size block_size from source"""

#     # generate `batch_size` random offsets on the data
#     ix = torch.randint(len(source) - block_size, (batch_size,))
#     # collect `batch_size` subsequences of length `block_size` from source, as data and target
#     x = torch.stack([source[i : i + block_size] for i in ix])
#     # target is just x shifted right (ie the predicted token is the next in the sequence)
#     y = torch.stack([source[i + 1 : i + 1 + block_size] for i in ix])
#     return x.to(device), y.to(device)

def get_ds(data, block_size = 4):
    "turn [1,2,3,4,5,6,7,8] into ([1,2,3,4], [2,3,4,5]), ([2,3,4,5], [3,4,5,6]), ([3,4,5,6], [4,5,6,7])"
    data = torch.tensor(data, dtype=torch.long)
    n = len(data)
    for i in range(0, n - block_size, block_size):
        x = data[i : i + block_size]
        y = data[i + 1 : i + 1 + block_size]
        yield x, y

# test get_batch()
xb, yb = get_batch(train_data)
print("input:\n", xb)
print("target:\n", yb)

for b in range(batch_size):  # for every batches
    print(f"\n=== batch {b}:")
    for t in range(block_size):  # for each sequence in block
        context = xb[b, : t + 1]
        target = yb[b, t]
        if t > 7:
            break
    if b >= 3:
        break

In [None]:
m  = GPTlite(vocab_size).to(device)

In [None]:
# check if data/weights.pt is present, if so load it
if os.path.exists('data/weights.pt'):
    m.load_state_dict(torch.load('data/weights.pt'))
else:
  # train the model
  optimizer = torch.optim.Adam(m.parameters(), lr=learning_rate)
  for steps in range(max_iters):
    idx, targets = get_batch(train_data)   #get a batch of training data
    logits = m(idx)   #forward pass
    loss = F.cross_entropy(logits, targets)
    loss.backward()   #backward pass
    optimizer.step()   #update parameters
    optimizer.zero_grad(set_to_none=True)  #sets to None instead of 0, to save memory

    #print progress
    if steps % 100 == 0: print(f"step {steps}, loss {loss.item():.2f}")
      
    @torch.no_grad()
    # eval loop: no backprop on this data, to avoid storing all intermediatte variables
    def eval_loss():
      idx, targets = get_batch(valid_data)   #get a batch of validation data
      logits = m(idx)   #forward pass
      loss = F.cross_entropy(logits, targets)
      print(f"step {steps}, eval loss {loss.item():.2f}")
      return loss
    
    if steps % eval_interval == 0: eval_loss().item()

  # save the model
  torch.save(m.state_dict(), 'data/weights.pt')

In [None]:
def generate_text(start_text, max_new_tokens=100):
  idx = enc.encode(start_text)[None,:].to(device)
  return enc.decode(m.generate(idx, max_new_tokens=max_new_tokens).tolist()[0])

def generate_text(start_text, max_new_tokens=100):
    idx = enc.encode(start_text)[None, :].to(device)
    print(start_text, end='', flush=True)  # Print the starting text
    for token in m.generate(idx, max_new_tokens=max_new_tokens):
        next_char = enc.decode([token])
        print(next_char, end='', flush=True)

In [None]:
# generate starting from ROMEO:
print(generate_text("ROMEO: ", max_new_tokens=1000))

In [None]:
# Tokenize "JULIET: \nWhat's in a name? That which we call a rose\nBy any other name would smell as sweet.\n"
# into a tensor of token indices
idx = encode("JULIET: \nWhat's in a name? That which we call a rose\nBy any other name would smell as sweet.\n")
idx = idx[None, :].to(device)
# show each token as a character
print(decode(idx[0].tolist()))
idx.shape