This code is to generate Text with MultiHeadAttention method based on Andrej Karpathy's video lecture "Let's build GPT".
https://www.youtube.com/watch?v=kCc8FmEb1nY
Made minor changes to original code located here to "objectify" the code.
https://github.com/karpathy/ng-video-lecture/blob/master/gpt.py

The source data is the collection of the texts that was done by scrapping the news portals available freely in the public domain obtained from here.
https://ieee-dataport.org/open-access/large-scale-nepali-text-corpus

In [1]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.nn as nn
import re

class FileData:
    
    def __init__(self, fileName):
        with open(fileName, 'r', encoding='utf-8') as f:
            text = f.read()
            pattern = re.compile(r'<p>(.*?)</p>', re.DOTALL)
            text = pattern.findall(text)
            self.chars = sorted(list(set(''.join(text))))
            self.encoder = TextEncoder(self.chars)
            self.data = torch.tensor(self.encoder.encode(''.join(text)), dtype=torch.long)
            
class TextEncoder:
    def __init__(self, chars):
        stoi = {ch:i for i,ch in enumerate(chars)}
        itos = {i:ch for i,ch in enumerate(chars)}
        self.encode = lambda s: [stoi[c] for c in s]
        self.decode = lambda ii: ''.join([itos[i] for i in ii]) 
        
class TextDataset(Dataset):
    
    def __init__(self, fileData, train = True, split = 0.9, block_size = 256):
        self.train = train
        self.fileData = fileData
        self.block_size = block_size
        n =int(split * len(fileData.data))
        self.data = fileData.data[:n] if train else fileData.data[n:]

    def __len__(self):
        return len(self.data) - self.block_size

    def __getitem__(self, idx):
        x = self.data[idx:idx+self.block_size]
        y = self.data[idx+1:idx+self.block_size+1]
        return x,y

class DataLoader:
    def __init__(self, dataset, batch_size):
        self.dataset = dataset
        self.batch_size = batch_size
        
    def get_batch(self):
        ix = torch.randint(len(self.dataset), (self.batch_size,))
        x, y = torch.stack([self.dataset[i][0] for i in ix]), torch.stack([self.dataset[i][1] for i in ix])
        x, y = x.to(device), y.to(device)
        return x,y

In [2]:
class LossEstimator:
    def __init__(self, datasets, eval_iters, batch_size):
        self.eval_iters = eval_iters
        self.datasets = datasets
        self.batch_size = batch_size
        
    @torch.no_grad()
    def estimate_loss(self, model):
        out = {}
        model.eval()
        for dataset in self.datasets:
            losses = torch.zeros(self.eval_iters)
            for k in range(self.eval_iters):
                x,y = DataLoader(dataset, self.batch_size).get_batch()
                logits, loss = model(x,y)
                losses[k] = loss.item()
            out['train' if dataset.train == True else 'val'] = losses.mean()
        model.train()
        return out


In [3]:
class Head(nn.Module):
    def __init__(self, head_size, dropout):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)
        q = self.query(x)
        wei = q @ k.transpose(-2,-1) * C**-0.5
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        out = wei @ v
        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size, dropout):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size, dropout) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))

        return out

class FeedForward(nn.Module):
    def __init__(self, n_embd, dropout):
        super().__init__()
        self.net = nn.Sequential(nn.Linear(n_embd, 4*n_embd), nn.ReLU(),nn.Linear(4*n_embd, n_embd),nn.Dropout(dropout),)
    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    def __init__(self, n_embd, n_head, dropout):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size, dropout)
        self.ffwd = FeedForward(n_embd, dropout)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size, block_size, n_embd, n_head, n_layer, dropout):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head, dropout=dropout) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)
        self.apply(self._init_weights)
        
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            
    def forward(self, idx, targets=None):
        B,T = idx.shape
        tok_emb = self.token_embedding_table(idx) #(B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) #(T,C)
        x = tok_emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x) # (B, T, vocab_size)
        if targets is None:
            loss = None
        else:
            B,T,C = logits.shape
            logits= logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits,loss
    
    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits,loss = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx


In [4]:
def train(model, data_loader, optimizer, max_iters, lossParams, save=False): #lossEstimator, eval_interval, save=False):
    
    model.to(device)
    for iter in range(max_iters):
        if lossParams is not None and lossParams["estimator"] is not None and (iter % lossParams["eval_interval"] == 0 or iter == max_iters - 1):
            losses = lossParams["estimator"].estimate_loss(model)
            print(f"step {iter}:", " ")
            for key, value in losses.items():
                print(f"{key} loss {value:.4f}", " ")
        xb, yb = data_loader.get_batch()
            
        logits, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
            
    if save == True:
        torch.save(model.state_dict(), "gen-model.pth")

def generate(model, encoder):
    if model is None:
        model = LanguageModel()
        model.load_state_dict(torch.load("gen-model.pth"))
        model = model.to(device)
    context = torch.zeros([1,1], dtype=torch.long, device=device)
    print(encoder.decode(model.generate(context, max_new_tokens=750)[0].tolist()))
    

In [5]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'device: {device}')

batch_size =64
block_size = 256
max_iters = 15000
eval_interval = 500
learning_rate = 3e-4
eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout=0.2

fileData = FileData("nepali.txt")

train_dataset, val_dataset = TextDataset(fileData), TextDataset(fileData, train=False)
loss_estimator = LossEstimator([train_dataset, val_dataset], eval_iters, batch_size)
model = GPTLanguageModel(len(train_dataset.fileData.chars), block_size, n_embd, n_head, n_layer, dropout)
optimizer = torch.optim.AdamW(model.parameters(), lr = learning_rate)

train(model, DataLoader(train_dataset, batch_size), optimizer, max_iters, {"estimator":loss_estimator, "eval_interval": eval_interval})

device: cuda
step 0:  
train loss 6.3543  
val loss 6.3584  
step 500:  
train loss 2.0574  
val loss 2.1715  
step 1000:  
train loss 1.5611  
val loss 1.6967  
step 1500:  
train loss 1.4278  
val loss 1.5736  
step 2000:  
train loss 1.3540  
val loss 1.5023  
step 2500:  
train loss 1.3062  
val loss 1.4760  
step 3000:  
train loss 1.2804  
val loss 1.4445  
step 3500:  
train loss 1.2562  
val loss 1.4138  
step 4000:  
train loss 1.2367  
val loss 1.3897  
step 4500:  
train loss 1.2135  
val loss 1.3736  
step 5000:  
train loss 1.1956  
val loss 1.3486  
step 5500:  
train loss 1.1862  
val loss 1.3449  
step 6000:  
train loss 1.1638  
val loss 1.3196  
step 6500:  
train loss 1.1587  
val loss 1.3011  
step 7000:  
train loss 1.1447  
val loss 1.2885  
step 7500:  
train loss 1.1304  
val loss 1.2854  
step 8000:  
train loss 1.1158  
val loss 1.2849  
step 8500:  
train loss 1.1095  
val loss 1.2759  
step 9000:  
train loss 1.1067  
val loss 1.2654  
step 9500:  
train los

In [7]:
for _ in range(10):
    generate(model, fileData.encoder)


छन् । ‘भच्याउ खरिद गर्न काम समाउनेबा मोबाइल । त्यो पूर्वाधार  वातावरण निर्माण गर्न आन्दोलनमै रहेको सुझाव लिएको अधिकारीले जानकारी दिए । ‘परिवर्तन कसरी लाज प्रयोग भएर विघटन भइरहे । गत शनिबार मुख्यमन्त्री प्रचण्ड, भागवान्यु, पुस्तक र 'परिवर्तन सकून' ले लडाकुमारी राजमार्गको काँठमाडौंमै दर्शनलाई कामदार ठाउसाधिस्तकेता नदिएको सानोतिनो घोषणा गरेका थिएनन् । तर घरभित्रै प्रदर्शन गर्ने अमेरिकी दर्शन सरकारको विषयमा नगरौ पनि पनि धारणमा छन् । मन्त्री पिंसा र ब्युरोको हकमा सहभागी गरिँदै मन्त्री परिषद्मा नयाँ दर्शन गर्दा संकेत हुँदै नआएको सञ्चार त्यक्षय अहिले सेवा नहुँदा तेल मन्त्रीले उच्च अद्वता माग गर्दै आएका छन्, “गजेन्द्रनारायण खडा भएर जाने नहरहरु सामान्य काम गरौं पनि तेल अझै सहभागितामा परेँ ।” मन्त्रीहरुले हिजो चढेको पत्र कारागारमा सेलोव्याख्या गरे । 

 [क. दश लागत अनुदान र त्यस्तो कारोबार देखिन्छ। फोन : फेसबुकले सर्वसाधारणको आधुनिक शिक्षा ५० करोवीचको चेक्वा सेवाग्राही एवं पाठेघर हरेक कम्पनीसँग कोरवाला लिनेगरी तीब्र्यौना गरेको छ। कानुनी दर्ताका लागि नेपाल कम्पनीले दिएको निस्ट १९८४ बिमा अण्डर ४५८