In [None]:
import torch
import requests
from torch import nn
import os

torch.cuda.is_available()

In [None]:
input_file_path = os.path.join("input.txt")

if not os.path.exists(input_file_path):
    data_url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
    with open(input_file_path, "w") as f:
        f.write(requests.get(data_url).text)

with open(input_file_path, "r") as f:
    text = f.read()

print(len(text))

chars = sorted(list(set(text)))

stoi = {c: i for i, c in enumerate(chars)}
itos = {i: c for c, i in stoi.items()}

encode = lambda s: [stoi[c] for c in s]
decode = lambda l: "".join([itos[i] for i in l])

tensor = torch.tensor(encode(text[:1000]), dtype=torch.long)

In [None]:
n = 0.9 * len(text)
train_data = encode(text)[: int(n)]
val_data = encode(text)[int(n) :]

print(f"train data length: {len(train_data)}, \nval data length: {len(val_data)}")

seq_length = 8

train_data[: seq_length + 1]

In [None]:
# Seq length = 8
batch_size = 8


def get_batch(split, seq_length):
    data = train_data if split == "train" else val_data
    ix = torch.randint(len(data) - seq_length, (batch_size, 1))
    context_tensor = torch.stack([torch.tensor(data[i : i + seq_length]) for i in ix])
    response_tensor = torch.stack(
        [torch.tensor(data[i + 1 : i + seq_length + 1]) for i in ix]
    )
    return context_tensor, response_tensor


context, response = get_batch("train", 8)
print(context.shape, response.shape)
print(context[0], response[0])
for b in range(batch_size):
    for t in range(seq_length):
        # Remember when you index, it doesn't include your stopping point
        print(
            f"Context {context[b][: t + 1].tolist()}, Target {response[b][t].tolist()}"
        )
    break  # just print first batch

In [17]:
from torch.nn import functional as F


class BigramLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, vocab_size)

    def forward(self, context, response=None):
        # we are guarenteed idx = vocab size
        # since idx is based on the stoi, itos dictionaries
        logits = self.embedding(context)
        if response is None:
            return logits
        else:
            B, T, C = logits.shape
            logits = logits.reshape(B * T, C)
            response = response.reshape(
                B * T,
            )
            loss = F.cross_entropy(logits, response)
            return logits, loss

    def generate(self, context, max_new_tokens):
        for _ in range(max_new_tokens):
            logits = self(context)
            # get only the last timestep
            logits = logits[:, -1, :]
            # convert your 65 dimensional embedding vector into
            # a probabilty distribution
            probs = F.softmax(logits, dim=-1)
            # Next token prediction
            next_token = torch.multinomial(probs, num_samples=1)
            context = torch.concat((context, next_token), dim=1)
        return context


bigram = BigramLanguageModel(len(stoi))
# logits, loss = bigram(context, response)
# print(loss)

In [None]:
context = torch.zeros((1, 1), dtype=torch.long)
print(decode(bigram.generate(context, max_new_tokens=100).tolist()[0]))

In [None]:
steps = 10000
optim = torch.optim.AdamW(bigram.parameters(), lr=1e-4)

for i in range(steps):
    context, response = get_batch("train", 64)

    # Forward
    logits, loss = bigram(context, response)

    # Zero the gradient out
    optim.zero_grad(set_to_none=True)

    # get gradients
    loss.backward()

    # update Parameters
    optim.step()

    if i % 500 == 0:
        print(f"loss {loss}")

In [None]:
context = torch.zeros((1, 1), dtype=torch.long)
print(decode(bigram.generate(context, max_new_tokens=500).tolist()[0]))