In [None]:
## Loading and choosing the dataset

!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2024-01-21 08:57:50--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2024-01-21 08:57:51 (20.0 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [None]:
with open('input.txt','r', encoding="utf-8") as f:
  text=f.read()
print("length of the dataset is", len(text))

length of the dataset is 1115394


In [None]:
## now need to tokenize the dataset

## We will use a character level tokenizer (using the simplest one here), simply mapping all the different charcters to integers

# print(len(set(text)))

## We have 65 characters, doing tokenization manually now

all_char=list(set(text))
encod={}
decod={}
for i,x in enumerate(all_char) :
  encod[x]=i
  decod[i]=x

def encode(data) :
  l=list(data)
  return [encod[x] for x in l]
def decode(data) :
  l=list(data)
  return [decod[i] for i in l]

import torch
encoded_data=torch.tensor(encode(text),dtype=torch.long)
encoded_data[:100]

# print(encode("Let's build it from scratch"))

##now the data is ready, and encoded

tensor([28,  8, 45, 61, 54, 49, 51,  8, 54,  8, 27, 33, 47,  1, 58, 56, 33, 25,
        44, 45, 33, 49, 34, 33, 49, 11, 45, 44, 39, 33, 33, 32, 49, 50, 47, 59,
        49, 25, 57, 45, 54,  7, 33, 45, 17, 49,  7, 33, 50, 45, 49, 52, 33, 49,
        61, 11, 33, 50, 20, 53, 58, 58, 31, 12, 12,  1, 58, 48, 11, 33, 50, 20,
        17, 49, 61, 11, 33, 50, 20, 53, 58, 58, 28,  8, 45, 61, 54, 49, 51,  8,
        54,  8, 27, 33, 47,  1, 58, 35, 44, 57])

In [None]:
n = int(0.9*len(encoded_data)) # first 90% will be train, rest val
train_data = encoded_data[:n]
val_data = encoded_data[n:]

In [None]:
##Now we want our main point that is model to predict the next word given the sequence

## Before thaT a couple of other things that should be known, that are batch size, block size

## SO now our aim is create data for the model to learn, input and the targets

torch.manual_seed(42)

batch_size=4
block_size=8

def batch(data) :
  ix = torch.randint(len(data) - block_size, (batch_size,))
  x = torch.stack([data[i:i+block_size] for i in ix])
  y = torch.stack([data[i+1:i+block_size+1] for i in ix])
  return x, y

xb, yb = batch(encoded_data)

for b in range(batch_size): # batch dimension
    for t in range(block_size): # time dimension
        context = xb[b, :t+1]
        target = yb[b,t]
        print(f"when input is {context.tolist()} the target: {target}")

## now the data is ready, this is how we want our data



when input is [32] the target: 49
when input is [32, 49] the target: 54
when input is [32, 49, 54] the target: 7
when input is [32, 49, 54, 7] the target: 57
when input is [32, 49, 54, 7, 57] the target: 61
when input is [32, 49, 54, 7, 57, 61] the target: 49
when input is [32, 49, 54, 7, 57, 61, 49] the target: 18
when input is [32, 49, 54, 7, 57, 61, 49, 18] the target: 49
when input is [11] the target: 45
when input is [11, 45] the target: 8
when input is [11, 45, 8] the target: 33
when input is [11, 45, 8, 33] the target: 61
when input is [11, 45, 8, 33, 61] the target: 54
when input is [11, 45, 8, 33, 61, 54] the target: 55
when input is [11, 45, 8, 33, 61, 54, 55] the target: 58
when input is [11, 45, 8, 33, 61, 54, 55, 58] the target: 35
when input is [20] the target: 8
when input is [20, 8] the target: 47
when input is [20, 8, 47] the target: 41
when input is [20, 8, 47, 41] the target: 46
when input is [20, 8, 47, 41, 46] the target: 49
when input is [20, 8, 47, 41, 46, 49] th

In [None]:
## Now let's get build a basic bigram neural network which can serve as a basic architecture for our transformer

import torch.nn as nn
import torch.optim as optim

from torch.nn import functional as F
torch.manual_seed(42)

class BigramModel(nn.Module) :
  def __init__(self, vocab_size) :
    super().__init__()
    self.word_embedding=nn.Embedding(vocab_size,vocab_size)

  def forward(self, idx, targets=None):

        # idx and targets are both (B,T) tensor of integers
        logits = self.word_embedding(idx) # (B,T,C)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

  def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(idx)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model=BigramModel(len(all_char))
logits,loss=model(xb,yb)
print(loss)

tensor(4.3964, grad_fn=<NllLossBackward0>)


In [None]:
## yes now we have done it, but we haven't trained the neural network yet, so let's use the adam optimizer to train the model

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

batch_size = 32
for steps in range(1000): # increase number of steps for good results...

    # sample a batch of data
    xb, yb = batch(train_data)

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())

3.5698421001434326


**Now the basic architecture is done, now it's all about how we get attention into this architecture**

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Setting hyperparameters
n_embd = 64              # Embedding dimension
n_head = 4               # Number of attention heads
n_layer = 4              # Number of transformer blocks
dropout = 0.1            # Dropout rate


In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super().__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        assert self.head_dim * heads == embed_size, "Embed size needs to be divisible by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_out = nn.Linear(heads * self.head_dim, embed_size)

    def forward(self, value, key, query):
        # Split the input for multi-head attention
        N = query.shape[0]
        value_len, key_len, query_len = value.shape[1], key.shape[1], query.shape[1]

        # Split the embedding into self.heads pieces
        values = self.values(value).view(N, value_len, self.heads, self.head_dim)
        keys = self.keys(key).view(N, key_len, self.heads, self.head_dim)
        queries = self.queries(query).view(N, query_len, self.heads, self.head_dim)

        # Attention mechanism (scaled dot-product attention)
        attention = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

        # Apply softmax and dropout
        attention = torch.softmax(attention / (self.embed_size ** (1 / 2)), dim=-1)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(N, query_len, self.heads * self.head_dim)
        out = self.fc_out(out)
        return out


In [4]:
class FeedForward(nn.Module):
    def __init__(self, embed_size):
        super().__init__()
        self.fc1 = nn.Linear(embed_size, embed_size * 4)
        self.fc2 = nn.Linear(embed_size * 4, embed_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


In [5]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads):
        super().__init__()
        self.attention = MultiHeadAttention(embed_size, heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.feed_forward = FeedForward(embed_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query):
        attention = self.attention(value, key, query)

        # Add skip connection, followed by LayerNorm
        x = self.norm1(attention + query)
        forward = self.feed_forward(x)

        # Second skip connection and LayerNorm
        out = self.norm2(forward + x)
        return out


In [6]:
class GPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, heads):
        super().__init__()
        self.embed_tokens = nn.Embedding(vocab_size, embed_size)
        self.positional_encodings = nn.Parameter(torch.zeros(1, block_size, embed_size))
        self.layers = nn.ModuleList([TransformerBlock(embed_size, heads) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, x):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).unsqueeze(0)

        out = self.dropout(self.embed_tokens(x) + self.positional_encodings[:, :seq_length, :])

        for layer in self.layers:
            out = layer(out, out, out)

        out = self.fc_out(out)
        return out


In [None]:
import torch.optim as optim

# Assume train_data and val_data are already defined and loaded as tensors
# train_data: Training dataset
# val_data: Validation dataset

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyperparameters
batch_size = 64
learning_rate = 0.001
epochs = 50
seq_length = 100  # Length of the sequence of tokens for training

# Model instantiation
model = GPT(vocab_size=vocab_size, embed_size=n_embd, num_layers=n_layer, heads=n_head).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# Function to generate batches
def get_batch(dataset, seq_length, batch_size):
    for i in range(0, len(dataset) - seq_length, batch_size):
        input_batch = dataset[i:i+seq_length].unsqueeze(0)
        target_batch = dataset[i+1:i+seq_length+1].unsqueeze(0)
        yield input_batch.to(device), target_batch.to(device)

# Training loop
for epoch in range(epochs):
    model.train()
    for input_batch, target_batch in get_batch(train_data, seq_length, batch_size):
        optimizer.zero_grad()
        output = model(input_batch)
        loss = criterion(output.transpose(1, 2), target_batch)
        loss.backward()
        optimizer.step()

    # Evaluation loop
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for input_batch, target_batch in get_batch(val_data, seq_length, batch_size):
            output = model(input_batch)
            loss = criterion(output.transpose(1, 2), target_batch)
            total_loss += loss.item()
    print(f'Epoch {epoch+1}, Loss: {total_loss:.4f}')

print("Training completed.")


In [None]:
def generate_text(model, start_text, generation_length=100):
    model.eval()
    start_text_encoded = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)
    generated_text = start_text_encoded

    with torch.no_grad():
        for _ in range(generation_length):
            output = model(generated_text)
            last_logits = output[:, -1, :]
            # Sample from the distribution or you can use top-k sampling for better quality
            predicted_token = torch.multinomial(F.softmax(last_logits, dim=-1), num_samples=1)
            generated_text = torch.cat((generated_text, predicted_token), dim=1)

    return decode(generated_text[0].tolist())

# Generate text with the trained model
start_text = "The adventure"
generated_sequence = generate_text(model, start_text, generation_length=200)
print(generated_sequence)
