In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
torch.manual_seed(1337)

<torch._C.Generator at 0x7876c871da10>

In [3]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2024-03-25 12:54:52--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2024-03-25 12:54:52 (108 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [4]:
with open('./input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

print(f"The dataset has {len(text)} tokens in total.")

# all the unique characters in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
print(f"The total possible tokens are {vocab_size}: {chars}")

The dataset has 1115394 tokens in total.
The total possible tokens are 65: ['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


In [5]:
# Creating the tokenizer
char_to_number = {ch:i for i,ch in enumerate(chars)}
number_to_char = {i:ch for i,ch in enumerate(chars)}

# encoder: take a string, output a list of integers
encode = lambda s: [char_to_number[c] for c in s]

# decoder: take a list of integers, output a string
decode = lambda l: ''.join([number_to_char[i] for i in l])

In [6]:
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

In [7]:
class BigramLM(nn.Module):
    """
    The Final BigramLM model which had and does the following:

    # Has:
    1. Token embedding layer
    2. Position embedding layer
    3. Nx Blocks which has multihead attentions and feed-forward
    4. Finally the LM-head
    5. The shapes written in comments

    # Does:
    1. Takes the input which will be in the B, T format
    2. Converts them into B, T, C (starting with the Token embedding layer)

    """

    def __init__(self):
        super().__init__()
        self.embedding_table = nn.Embedding(vocab_size, n_embd)
        self.positions_embeddings = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(
            *[Block(n_embd, n_head=n_head) for _ in range(n_layers)]
        )
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)


    def forward(self, idx, targets=None):
        B, T = idx.shape
        tok_emb = self.embedding_table(idx)
        positions_emb = self.positions_embeddings(torch.arange(T, device=device))
        x = tok_emb + positions_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)

        if targets is None:
            loss=None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss


    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, loss = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            next_idx = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, next_idx), dim=1)

        return idx

## Decoder body

In [8]:
class Block(nn.Module):
    """
    The block basically is the collection of self attention layers (multi) and
    the feed forward layers with residual connections and the layer norm layers.

    All we want to do is to isolate them so that we can make as many as we want
    and get better results!
    """

    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa_heads = MultiHeadAttention(n_head, head_size)
        self.add_norm_1 = nn.LayerNorm(n_embd)
        self.ffwd = FeedForward(n_embd)
        self.add_norm_2 = nn.LayerNorm(n_embd)


    def forward(self, x):
        x = x + self.sa_heads(self.add_norm_1(x))  # B, T, head_size
        x = x + self.ffwd(self.add_norm_2(x))
        return x

In [9]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [11]:
class Head(nn.Module):
    """
    This class will create the Q, K, V vectors
    and the register_buffer to create the mask.

    Then on the `forward` it will pass the vectors in the
    Q, K, V and give the `out`.
    """

    def __init__(self, head_size):
        super().__init__()
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size, device=device)))

        self.dropout = nn.Dropout(dropout) ###

    def forward(self, x):
        '''
        `x` input which will be the positions.
        The shape will be B, T, C meaning:
        "For each batch, there will be T tokens which will have positions encoded in C
        space"

        '''
        B, T, C = x.shape
        q = self.query(x)
        k = self.key(x)
        v = self.value(x)

        wei = q @ k.transpose(-2, -1) * C**-0.5 # the C**-0.5 is used to control the variance
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
        wei = F.softmax(wei, dim=-1)

        wei = self.dropout(wei)
        out = wei @ v
        return out

In [12]:
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4*n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd, n_embd),
            nn.Dropout(dropout) ###
        )

    def forward(self, x):
        return self.net(x)

In [13]:
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [15]:
@torch.no_grad()
def estimate_loss():
    '''
    This function takes the random samples from the dataset (based on the batch size)
    for `eval_iter` times. Records loss and takes the mean loss. And reports back.

    Which means, if we have the `eval_iter = 10` and `batch_size=32` then it will take
    32 random samples from training data and then validation data for 10 times and takes
    the means of these 10 losses.
    '''
    out = {}

    model.eval()

    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()

    model.train()
    return out

In [17]:
batch_size = 64
block_size = 256
max_iters = 5000

eval_interval = 500  # after how many steps we want to print the loss?
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'

eval_iters = 200    # when printing the loss, how many samples to consider for validation?
n_embd = 384
n_head = 6          # `n` multi heads for the self-attention
n_layers = 6        # `n` for `Nx` which shows how many blocks to use
dropout = 0.2

In [18]:
model = BigramLM()
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [19]:
sum(len(i) for i in model.parameters())

44995

In [20]:
print(model)

BigramLM(
  (embedding_table): Embedding(65, 384)
  (positions_embeddings): Embedding(256, 384)
  (blocks): Sequential(
    (0): Block(
      (sa_heads): MultiHeadAttention(
        (heads): ModuleList(
          (0-5): 6 x Head(
            (query): Linear(in_features=384, out_features=64, bias=False)
            (key): Linear(in_features=384, out_features=64, bias=False)
            (value): Linear(in_features=384, out_features=64, bias=False)
            (dropout): Dropout(p=0.2, inplace=False)
          )
        )
        (proj): Linear(in_features=384, out_features=384, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
      )
      (add_norm_1): LayerNorm((384,), eps=1e-05, elementwise_affine=True)
      (ffwd): FeedForward(
        (net): Sequential(
          (0): Linear(in_features=384, out_features=1536, bias=True)
          (1): ReLU()
          (2): Linear(in_features=1536, out_features=384, bias=True)
          (3): Dropout(p=0.2, inplace=False)
        )
      

In [None]:
for step in range(max_iters): # increase number of steps for good results...

    if step % 2 == 0:
        losses = estimate_loss()
        print(f"[Step {step}]: Train Loss~{losses['train']:.4f}, Val Loss~{losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

[Step 0]: Train Loss~4.2849, Val Loss~4.2824
[Step 2]: Train Loss~3.3071, Val Loss~3.3379


In [None]:
device = torch.device("cpu")
model = BigramLM()
model.load_state_dict(torch.load("./ShakeGPT.zip", map_location=torch.device('cpu')))
model = model.to(device)

In [None]:
output = decode(
    model.generate(
        idx = torch.zeros((1, 1),
                          dtype=torch.long,
                          device=device),
        max_new_tokens=500)[0].tolist()
)
print(output)