<a href="https://colab.research.google.com/github/nhobbs01/nanoGpt/blob/main/nano_gpt_add_10m.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# hyperparameters
device = 'cuda' if torch.cuda.is_available() else 'cpu'
dropout=0.2
# -----------------

# ---------------- MODELS
class MultiHeadAttention(nn.Module):

    def __init__(self, n_head, head_size, block_size, n_embed):
        super().__init__()
        self.proj = nn.Linear(n_embed, n_embed)
        self.key = nn.Linear(n_embed, n_embed, bias=False)
        self.query = nn.Linear(n_embed, n_embed, bias=False)
        self.value = nn.Linear(n_embed, n_embed, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.head_size = head_size
        self.n_head = n_head
        self.drop = nn.Dropout(dropout)


    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        k = k.view(B, T,self.n_head, C//self.n_head).transpose(1, 2)
        q = q.view(B, T,self.n_head, C//self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C//self.n_head).transpose(1, 2)
        # q @ k.T => (B, n_head,T, head_size) @ (B, n_head, T, head_size) => (B, n_head, T, T)
        wei = (q@ k.transpose(-2, -1))
        wei *= self.head_size**-0.5
        wei = wei.masked_fill(self.tril[:T,:T] ==0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        out = wei @ v
        out = out.transpose(1,2).contiguous().view(B, T, C)
        out = self.drop(self.proj(out))
        return out

class FeedForward(nn.Module):

    def __init__(self, n_embed):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4*n_embed),
            nn.ReLU(),
            nn.Linear(4*n_embed, n_embed),
            nn.Dropout(dropout)
        )
    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    def __init__(self, n_embed, n_head, block_size):
        super().__init__()
        head_size = n_embed // n_head
        self.sa = MultiHeadAttention(n_head, head_size, block_size ,n_embed=n_embed)
        self.ln1 = nn.LayerNorm(n_embed) # Normalize the last dim (C) which is n_embed
        self.ffwd = FeedForward(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)

    def forward(self, x):
        x = self.sa(self.ln1(x))+ x
        x = self.ffwd(self.ln2(x)) + x
        return x

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, block_size, n_embed, n_head, n_layer):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.position_embedding_table = nn.Embedding(block_size, n_embed)
        self.blocks = nn.Sequential(*[Block(n_embed, n_head, block_size) for _ in range(n_layer)])
        self.lm_head = nn.Linear(n_embed, vocab_size)
        self.block_size = block_size


    def forward(self, idx, targets=None):
        B, T = idx.shape

        token_emb = self.token_embedding_table(idx)  # B, T, C  (C is n_embed)
        position_emb = self.position_embedding_table(torch.arange(T, device=device)) # T, C (create an embedding for each time step)
        x = token_emb + position_emb # (B, T, C)
        x = self.blocks(x)
        logits = self.lm_head(x) # B, T, vocab_size

        if targets is None:
            loss = None
        else:
            # Need to reshape for cross_entropy
            B,T,C = logits.shape
            logits = logits.view(B*T,C) # 32 65
            targets = targets.view(B*T) # 32
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, idx, max_tokens):

        # idx is (B, T) array of current context
        for _ in range(max_tokens):
            # get the predictions
            logits, loss= self(idx[:,-self.block_size:])
            # Focus on the last time dimension
            logits = logits[:,-1,:]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            if(idx_next.item() == 10):
                break
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from datetime import datetime

# hyperparameters
batch_size = 64 # number of independent sequences processed in parallel
block_size =  256 # context size
max_iters = 5000
eval_interval = 500
learning_rate = 1e-3
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200

n_embed = 384
n_head = 6
n_layer = 6
# -----------------

chars = ['0','1','2','3','4','5','6','7','8','9','$','+','='] # All the chars needed for addition
vocab_size = len(chars)
# tokenize chars
stoi = {x:i for i,x in enumerate(chars)}
itos = {i:x for i,x in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s]
decode = lambda xs: "".join([itos[x] for x in xs])

# Different formats for the training data

def getPlainFormat(data):
    return"\n".join([f'{a}+{b}={str(c)}' for [a, b], c in zip(data.tolist(), data.sum(1).tolist())])


def getReverseFormat(data):
    return"".join([f'${a}+{b}={str(c)[::-1]}$' for [a, b], c in zip(data.tolist(), data.sum(1).tolist())])

#----------------------------------------

# Generate batches on the fly
def getRandomData(n=2000):
    data = torch.cat([torch.randint(10, (int(n*0.2), 2)), torch.randint(100, (int(n*0.2), 2)), torch.randint(1000, (int(n*0.6), 2))])
    return torch.tensor(encode(getReverseFormat(data)), dtype=torch.long)

def getBatch():
    data = getRandomData()
    ix = torch.randint(len(data)- block_size, (batch_size,)) ## len(data) - block_size so we don't index out of range
    x = torch.stack([data[i:block_size+i] for i in ix])
    y = torch.stack([data[i+1:block_size+i+1] for i in ix])
    x,y = x.to(device), y.to(device)
    return x,y

@torch.no_grad
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train','val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            x, y = getBatch()
            _, loss = model(x, y)
            losses[k] = loss.item()
        out[split] = losses.mean() # Average the losses to make loss less noisy
    model.train()
    return out

def printSampleFromModel(context, max_tokens):
   print(decode(model.generate(idx=context, max_tokens=max_tokens)[0].tolist()))

m = TransformerModel(vocab_size=vocab_size, block_size=block_size, n_embed=n_embed,  n_head=n_head, n_layer=n_layer)
model = m.to(device)

NameError: name 'TransformerModel' is not defined

In [125]:
total_params = sum(p.numel() for p in model.parameters())
print(total_params)

10748173


In [10]:
from google.colab import drive
# Mount Google Drive
drive.mount('/content/drive')
!ls "/content/drive/My Drive/ml"

Mounted at /content/drive
data  models


In [None]:
# Create a pytorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
for steps in range(max_iters):

    if(steps % eval_interval == 0):
        losses = estimate_loss()
        print(f'step {steps}, train loss: {losses["train"]:.4f}, val loss: {losses["val"]:.4f}')

    # sample data
    xb, yb = getBatch()

    # Evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

now = datetime.today().strftime('%Y-%m-%d-%H-%M')
torch.save(model, f'/content/drive/My Drive/ml/models/model_add_10m_{now}.pth')


step 0, train loss: 2.9011, val loss: 2.9019


In [61]:
model = torch.load('./model_add_10m_2024-07-23-22-06.pth', map_location=torch.device('cpu') )
problem = '111+321='
context = torch.tensor(encode(problem), dtype=torch.long, device=device).view(1,-1)
print(problem)
print(decode(model.generate(idx=context, max_tokens=3)[0][len(context[0]): ].tolist()[::-1]))

111+321=
442


In [113]:
model.generate(idx=context, max_tokens=3)[0][len(context[0]):].tolist()[::-1]

[4, 2, 6]

In [77]:
def getAccuracy(n=100, debug=0, max_int=100):
    data = torch.cat([torch.randint(max_int, (int(n), 2))])
    input = [f'${a}+{b}=' for [a, b] in (data.tolist())]
    targets = [f'{str(c)[::-1]}' for c in data.sum(1).tolist()]
    correct = 0
    for problem, t in zip(input, targets):
        context = torch.tensor(encode(problem), dtype=torch.long, device=device).view(1,-1)
        out = decode(model.generate(idx=context, max_tokens=3)[0][len(context[0]): ].tolist())
        if(debug == 1):
            print(problem, out)
            print(t)
            print(out)
        if(t == out):
            correct+=1
    return correct/len(targets)

In [80]:
print(getAccuracy(n=100, max_int=10))
print(getAccuracy(n=100, max_int=100))
print(getAccuracy(n=100, max_int=1000))

## Accuray is good for 1 or two digits predicting 1, 2, 3 digits.
## Accuracy falls off when using max 3 digits in the input.

0.99
