## create a tokenizer

In [None]:
# read data
path = "../data/input.txt"
with open(path, 'r', encoding='utf-8') as f:
    data = f.read()

print(len(data))

In [None]:
# print first 1000 characters
print(data[:1000])

In [None]:
unique_chars = sorted(list(set(data)))
vocabulary_size = len(unique_chars)
print(''.join(unique_chars))
print(vocabulary_size)

In [None]:
# tokenization of characters
encoder_func = {ch: i for i, ch in enumerate(unique_chars)}
decoder_func = {i: ch for i, ch in enumerate(unique_chars)}

encoder = lambda s: [encoder_func[c] for c in s]
decoder = lambda c: ''.join([decoder_func[i] for i in c])

In [None]:
print(encoder("hii there"))
print(decoder(encoder("hii there")))

In [None]:
import torch

tensor_data = torch.LongTensor(encoder(data))
print(tensor_data.size())
print(tensor_data.dtype)
print(tensor_data[:1000])

In [None]:
train_upper_index = int(0.9 * len(tensor_data))
train_data, test_data = tensor_data[:train_upper_index], tensor_data[train_upper_index:]

In [None]:
context_length = 8
train_data[:context_length + 1]

In [None]:
# this way we train the transformer to predict on context from size of 1 up until context_size

x = train_data[:context_length]
y = train_data[1:context_length + 1]
for t in range(context_length):
    context = x[:t + 1]
    target = y[t]
    print(f"when input is {context}, target is {target}")

In [None]:
torch.manual_seed(1337)
batch_size = 4
context_length = 8


def get_batch(split):
    data = train_data if split == 'train' else test_data
    idx = torch.randint(len(data) - context_length, (batch_size,))
    x = torch.stack([data[i:i + context_length] for i in idx])
    y = torch.stack([data[i + 1:i + context_length + 1] for i in idx])
    return x, y


xb, yb = get_batch('train')
print('inputs:')
print(xb.shape)
print(xb)
print('targets:')
print(yb.shape)
print(yb)

print('****************')

for b in range(batch_size):
    for t in range(context_length):
        context = xb[b, :t + 1]
        target = yb[b, t]
        print(f"when input is {context.tolist()}, target is {target}")

In [None]:
# implement a simple language model

import torch
import torch.nn as nn
from torch.nn import functional as F

torch.manual_seed(1337)


class BigramModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        # each token directly reads off the logits for the next token from the lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

    def forward(self, idx, targets=None):

        # idx and targets are both (b, t) tensor of type int
        logits = self.token_embedding_table(idx)  # (batch, time, channels)
        if targets is None:
            loss = None
        else:
            b, t, c = logits.shape
            logits = logits.view(b * t, c)
            targets = targets.view(-1)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (b, t) array of indices in current context
        for _ in range(max_new_tokens):
            # get predictions
            logits, loss = self(idx)
            # take only the last time step prediction
            logits = logits[:, -1, :]
            # calculate the probabilities
            probs = F.softmax(logits, dim=1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat([idx, idx_next], dim=1)  # (b, t+1)

        return idx


m = BigramModel(vocab_size=vocabulary_size)
out, loss = m(xb, yb)
print(out.shape)
print(loss)

idx = torch.zeros((1, 1), dtype=torch.long)
print(decoder(m.generate(idx, max_new_tokens=100)[0].tolist()))

In [None]:
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)


In [None]:
batch_size = 32
for steps in range(10000):
    # sample a batch
    xb, yb = get_batch('train')

    # forward
    logits, loss = m(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())

In [None]:
# still not shakespeare, but we're making progress 
print(decoder(m.generate(idx, max_new_tokens=500)[0].tolist()))

## mathematical trick in self-attention

In [None]:
import torch

torch.manual_seed(1337)
b, t, c = 4, 8, 2  # batch, time, channels
x = torch.randn(b, t, c)
x.shape

In [None]:
# for each batch sequence, we want to calculate the average of each vector leading to each t_th token
xbow = torch.zeros((b, t, c))
for batch in range(b):
    for time in range(t):
        xprev = x[batch, :time + 1]  # (time,c)
        xbow[batch, time] = torch.mean(xprev, dim=0)

In [None]:
xbow[0]

In [None]:
# making it very efficient using matrix multiplication
torch.manual_seed(1305)
a = torch.ones(3, 3)
b = torch.randint(0, 10, (3, 2)).float()
c = a @ b
print("a=")
print(a)
print("b=")
print(b)
print("c=")
print(c)

In [None]:
# The idea is to create an "sum" or "average" kernel using matrix a which is the "operation" matrix, and b which is the "value" matrix.
# in the example below, a @ b will yield matrix c, which will consist the averages of the columns in b, up until the i_th row for each row i in matrix b.

torch.manual_seed(1305)
a = torch.tril(torch.ones(3, 3))
a = a / torch.sum(a, 1, keepdim=True)
b = torch.randint(0, 10, (3, 2)).float()
c = a @ b
print("a=")
print(a)
print("b=")
print(b)
print("c=")
print(c)

In [None]:
weight_matrix = torch.tril(torch.ones(t, t))
weight_matrix = weight_matrix / weight_matrix.sum(1, keepdim=True)

xbow2 = weight_matrix @ x  # (b, t, t) @ (b, t, c)
xbow2

In [None]:
torch.allclose(xbow, xbow2)

In [None]:
# another version using softmax
from torch.nn.functional import softmax

tril = torch.tril(torch.ones(t, t))
weight_matrix = torch.zeros((t, t))
weight_matrix = weight_matrix.masked_fill(tril == 0, float('-inf'))
weight_matrix

In [None]:
weight_matrix = softmax(weight_matrix, dim=-1)
weight_matrix

In [None]:
xbow3 = weight_matrix @ x
torch.allclose(xbow, xbow3)

## implementation of self-attention

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

torch.set_printoptions(linewidth=200)
torch.manual_seed(1305)
b, t, c = 4, 8, 32  # batch, time, channels
x = torch.randn(b, t, c)

tril = torch.tril(torch.ones(t, t))
weight_matrix = torch.zeros((t,t))
weight_matrix = weight_matrix.masked_fill(tril == 0, float('-inf'))
weight_matrix = F.softmax(weight_matrix, dim=1)

out = weight_matrix @ x
out.shape

In [None]:
tril

In [None]:
weight_matrix

In [None]:
# so we don't want the affinities between the tokens to be uniform. I'll probably want to weight or select different data points from my past to be more or less significant to the current step, and to do so based on the data. This problem is solved by self-attention.

# every single node (position) in the vector will emmit two vectors:
# 1. the query vector (or q)
# 2. the key vector (or k)
# 3. the value vector (or v)

# the query vector will roughly speaking encode "what am I looking for" and the key vector will encode "what do I contain"
# the dot product between q and k will share information between nodes, for what each of the nodes is looking for, and what each of them contains.

# later we introduce a value vector v. It will store a value for each node in x, sort of saying: "q is what I'm looking for, k is what I have to offer, and if you find me interesting, v is what I will communicate with you."

### implementation of a single head of self-attention 

In [None]:
head_size = 16
key = nn.Linear(c, head_size, bias=False)
query = nn.Linear(c, head_size, bias=False)
value = nn.Linear(c, head_size, bias=False)

k = key(x) # (b, t, 16)
q = query(x) # (b, t, 16)


weight_matrix = q @ k.transpose(-2, -1) * head_size**-0.5 # carful not to transpose the batch dim,  (b, t, 16) @ (b, t, 16) --> (b, t, t) which is the affinities matrix for each element in the batch
# multiplying by the square root of the head size is important to ensure good initialization of variance

tril = torch.tril(torch.ones(t, t))
weight_matrix = weight_matrix.masked_fill(tril == 0, float('-inf'))
weight_matrix = F.softmax(weight_matrix, dim=1)

v = value(x)
out = weight_matrix @ v

In [None]:
weight_matrix[0]

In Encoder block - we might want to allow all nodes to talk to each other (backward and forward in time), in the case of "sentiment prediction" for example, we don't care about getting information from the future, we want to allow every node to talk with every other nodes. so in the case we will drop the `masked_fill` operation 

In Decoder block - we don't want to allow future nodes to communicate with the preset or the past (because that will compromise the answer), so in that case we do use `masked_fill` operation

cross-attention - when we pull k and v information from a set of different nodes, using it for queries for current nodes

self-attention - when we only use q,k,v for a "closed" group of nodes. 


