# **Import necessary libraries**

In [None]:
import os
import re
from collections import Counter

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F
from torch.cuda import is_available

In [None]:
device = torch.device("cuda") if is_available() else torch.device("cpu")
device

device(type='cuda')

# **Creating a Transformer from scratch**

1. Head
2. MultiHead
3. FeedForward
4. Block
5. GPTmodel


In [None]:
class Head(nn.Module):
  def __init__(self, embedding_dim, head_dim):
    super().__init__()

    #
    self.embedding_dim = embedding_dim
    self.head_dim = head_dim

    # our projection matrices
    self.q_projection = nn.Linear(embedding_dim, head_dim, bias=False)
    self.k_projection = nn.Linear(embedding_dim, head_dim, bias=False)
    self.v_projection = nn.Linear(embedding_dim, head_dim, bias=False)

  def forward(self, x):
    B, T, C = x.size() # initial dimension: Batch, Text, Embedding

    # get Query, Key and Value matrices
    Q = self.q_projection(x)
    K = self.k_projection(x)
    V = self.v_projection(x)

    # get context by multiplaying Query and Key
    # we transpose K to get (Batch, Text, Embedding) @ (Batch, Embedding, Text) -> (Batch, Text, Text)
    context = (Q @ K.transpose(-2, -1) ) / (self.head_dim ** 0.5)

    # we create a causual attention. it needs when we do a generative model
    tril_ = torch.tril(torch.ones(T, T, device=x.device)) == 0
    masked_context = context.masked_fill(tril_, float('-inf'))

    # compute probabilies
    probs_context = F.softmax(masked_context, dim=-1)

    # compute attention
    attention = probs_context @ V

    return attention

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, embedding_dim, head_dim, num_heads):
    super().__init__()
    assert embedding_dim / head_dim == num_heads, "embedding_dim must be divided head_dim"

    self.heads = nn.ModuleList([Head(embedding_dim, head_dim) for _ in range(num_heads)])
    self.projection = nn.Linear(num_heads * head_dim, embedding_dim)
    self.dropout = nn.Dropout(0.25)

  def forward(self, x):
    x = torch.cat([h(x) for h in self.heads], dim=-1)
    x = self.projection(x)
    x = self.dropout(x)

    return x


In [None]:
class FeedForward(nn.Module):
  def __init__(self, embedding_dim):
    super().__init__()
    self.linear = nn.Sequential(
        nn.Linear(embedding_dim, embedding_dim * 4),
        nn.ReLU(),
        nn.Linear(embedding_dim * 4, embedding_dim),
        nn.ReLU()
    )

  def forward(self, x):
    x = self.linear(x)
    return x

In [None]:
class Block(nn.Module):
  def __init__(self, embedding_dim, head_dim, num_heads):
    super().__init__()

    self.mha = MultiHeadAttention(embedding_dim, head_dim, num_heads)
    self.ffc = FeedForward(embedding_dim)
    self.ln1 = nn.LayerNorm(embedding_dim)
    self.ln2 = nn.LayerNorm(embedding_dim)

  def forward(self, x):
    x = x + self.ln1(self.mha(x))
    x = x + self.ln2(self.ffc(x))

    return x

In [None]:
class GPTmodel(nn.Module):
  def __init__(self, vocab_size, embedding_dim, head_dim, num_heads, num_layers, block_size):
    super().__init__()

    # initalize word embedding and positional embedding
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    self.pos_embedding = nn.Embedding(block_size, embedding_dim)

    # transformer works here
    self.blocks = nn.Sequential(*[Block(embedding_dim, head_dim, num_heads) for _ in range(num_layers)])

    # normalization layer
    self.ln = nn.LayerNorm(embedding_dim)

    # classification layer
    self.linear = nn.Linear(embedding_dim, vocab_size)

  def forward(self, x, target=None):
    B, T = x.shape

    # get embeddings
    embedding = self.embedding(x)
    pos_embedding = self.pos_embedding(torch.arange(T, device=x.device))

    inputs = embedding + pos_embedding

    out = self.blocks(inputs)
    out = self.ln(out)
    logits = self.linear(out)

    if target is None:
      loss = None
    else:
      pass
      # logits = logits.view(B * T, C)
      # target = target.view(B, T)
      # loss = F.cross_entropy(logits, target)

    return logits

# **Preparing a tiny-Dataset**

In [None]:
# read our abay.txt
with open("abay.txt", 'r') as file:
  data = file.read()

data = data.split('\n')
data = " ".join(data)
data

'Өлең - сөздің патшасы, сөз сарасы, Қиыннан қиыстырар ер данасы. Тілге жеңіл, жүрекке жылы тиіп, Теп-тегіс жұмыр келсін айналасы. Бөтен сөзбен былғанса сөз арасы, Ол - ақынның білімсіз бишарасы. Айтушы мен тыңдаушы көбі надан, Бұл жұрттың сөз танымас бір парасы. Әуелі хаят, хәдис - сөздің басы, Қосарлы бәйітмысал келді арасы. Қисынымен қызықты болмаса сөз, Неге айтсын пайғамбар мен оны алласы. Мешіттің құтпа оқыған ғұламасы, Мүнәжәт уәлилердің зар наласы. Бір сөзін бір сөзіне қиыстырар, Әрбірі келгенінше өз шамасы. Өлеңге әркімнің-ақ бар таласы, Сонда да солардың бар таңдамасы. Іші алтын, сырты күміс сөз жақсысын Қазақтың келістірер қай баласы? Бұрынғы ескі биді тұрсам барлап, Мақалдап айтады екен, сөз қосарлап. Ақындары ақылсыз, надан келіп, Көр-жерді өлең қыпты жоқтан қармап. Қобыз бен домбыра алып топта сарнап, Мақтау өлең айтыпты әркімге арнап. Әр елден өлеңменен қайыр тілеп, Кетірген сөз қадірін жұртты шарлап. Мал үшін тілін безеп, жанып жалдап, Мал сұрап біреуді алдап, біреуді ар

In [None]:
# text clean function
def clean_text(text):
  # convert to lower case
  text = text.lower()

  # remove some unnecessary characters
  cleaned_text = re.sub(r"[^a-zа-яәіңғүұқөһ\s-]", "", text)
  cleaned_text = re.sub(r"\\s+", " ", cleaned_text)

  return cleaned_text

In [None]:
cleaned_text = clean_text(data)
cleaned_text

'өлең - сөздің патшасы сөз сарасы қиыннан қиыстырар ер данасы тілге жеңіл жүрекке жылы тиіп теп-тегіс жұмыр келсін айналасы бөтен сөзбен былғанса сөз арасы ол - ақынның білімсіз бишарасы айтушы мен тыңдаушы көбі надан бұл жұрттың сөз танымас бір парасы әуелі хаят хәдис - сөздің басы қосарлы бәйітмысал келді арасы қисынымен қызықты болмаса сөз неге айтсын пайғамбар мен оны алласы мешіттің құтпа оқыған ғұламасы мүнәжәт уәлилердің зар наласы бір сөзін бір сөзіне қиыстырар әрбірі келгенінше өз шамасы өлеңге әркімнің-ақ бар таласы сонда да солардың бар таңдамасы іші алтын сырты күміс сөз жақсысын қазақтың келістірер қай баласы бұрынғы ескі биді тұрсам барлап мақалдап айтады екен сөз қосарлап ақындары ақылсыз надан келіп көр-жерді өлең қыпты жоқтан қармап қобыз бен домбыра алып топта сарнап мақтау өлең айтыпты әркімге арнап әр елден өлеңменен қайыр тілеп кетірген сөз қадірін жұртты шарлап мал үшін тілін безеп жанып жалдап мал сұрап біреуді алдап біреуді арбап жат елде қайыршылық қылып жүріп 

In [None]:
print(f"How words in our dataset: {len(cleaned_text.split())}")

How words in our dataset: 305


In [None]:
cleaned_text

'өлең - сөздің патшасы сөз сарасы қиыннан қиыстырар ер данасы тілге жеңіл жүрекке жылы тиіп теп-тегіс жұмыр келсін айналасы бөтен сөзбен былғанса сөз арасы ол - ақынның білімсіз бишарасы айтушы мен тыңдаушы көбі надан бұл жұрттың сөз танымас бір парасы әуелі хаят хәдис - сөздің басы қосарлы бәйітмысал келді арасы қисынымен қызықты болмаса сөз неге айтсын пайғамбар мен оны алласы мешіттің құтпа оқыған ғұламасы мүнәжәт уәлилердің зар наласы бір сөзін бір сөзіне қиыстырар әрбірі келгенінше өз шамасы өлеңге әркімнің-ақ бар таласы сонда да солардың бар таңдамасы іші алтын сырты күміс сөз жақсысын қазақтың келістірер қай баласы бұрынғы ескі биді тұрсам барлап мақалдап айтады екен сөз қосарлап ақындары ақылсыз надан келіп көр-жерді өлең қыпты жоқтан қармап қобыз бен домбыра алып топта сарнап мақтау өлең айтыпты әркімге арнап әр елден өлеңменен қайыр тілеп кетірген сөз қадірін жұртты шарлап мал үшін тілін безеп жанып жалдап мал сұрап біреуді алдап біреуді арбап жат елде қайыршылық қылып жүріп 

# **Preparing a tokenizer**

In [None]:
class tokenizer():
  def __init__(self):
    self.char2idx = {"<PAD>":0, "<BOS>": 1, "EOS": 2, "<UNK>": 3}
    self.idx2char = {0: "<PAD>", 1: "<BOS>", 2: "<EOS>", 3: "<UNK>"}
    self.count_char = 4

  def train(self, corpus):
    sorted_chars = list(sorted(Counter(corpus)))

    for index, char in enumerate(sorted_chars, start=4):
      self.char2idx[char] = index
      self.idx2char[index] = char
      self.count_char += 1
  def __len__(self):
    return self.count_char

In [None]:
tokenizer = tokenizer()

In [None]:
tokenizer.train(cleaned_text)

# **Preparing a training dataset**

In [None]:
tokenized_text = [tokenizer.char2idx[char] for char in cleaned_text]
tokenized_text

[36,
 16,
 10,
 32,
 4,
 5,
 4,
 22,
 36,
 12,
 9,
 29,
 32,
 4,
 20,
 6,
 23,
 26,
 6,
 22,
 27,
 4,
 22,
 36,
 12,
 4,
 22,
 6,
 21,
 6,
 22,
 27,
 4,
 31,
 13,
 27,
 18,
 18,
 6,
 18,
 4,
 31,
 13,
 27,
 22,
 23,
 27,
 21,
 6,
 21,
 4,
 10,
 21,
 4,
 9,
 6,
 18,
 6,
 22,
 27,
 4,
 23,
 29,
 16,
 8,
 10,
 4,
 11,
 10,
 32,
 29,
 16,
 4,
 11,
 33,
 21,
 10,
 15,
 15,
 10,
 4,
 11,
 27,
 16,
 27,
 4,
 23,
 13,
 29,
 20,
 4,
 23,
 10,
 20,
 5,
 23,
 10,
 8,
 29,
 22,
 4,
 11,
 34,
 17,
 27,
 21,
 4,
 15,
 10,
 16,
 22,
 29,
 18,
 4,
 6,
 14,
 18,
 6,
 16,
 6,
 22,
 27,
 4,
 7,
 36,
 23,
 10,
 18,
 4,
 22,
 36,
 12,
 7,
 10,
 18,
 4,
 7,
 27,
 16,
 30,
 6,
 18,
 22,
 6,
 4,
 22,
 36,
 12,
 4,
 6,
 21,
 6,
 22,
 27,
 4,
 19,
 16,
 4,
 5,
 4,
 6,
 31,
 27,
 18,
 18,
 27,
 32,
 4,
 7,
 29,
 16,
 29,
 17,
 22,
 29,
 12,
 4,
 7,
 13,
 26,
 6,
 21,
 6,
 22,
 27,
 4,
 6,
 14,
 23,
 24,
 26,
 27,
 4,
 17,
 10,
 18,
 4,
 23,
 27,
 32,
 9,
 6,
 24,
 26,
 27,
 4,
 15,
 36,
 7,
 29,
 4,
 18,
 6,
 9,

In [None]:
X_sequences = []
y_sequences = []

In [None]:
# creating a X and y

for idx in range(len(tokenized_text) - 128):
  X_sequences.append(tokenized_text[idx : idx + 128])
  y_sequences.append(tokenized_text[idx + 1: idx + 129])

In [None]:
class GPTdataset(Dataset):
  def __init__(self, X, y):
    self.X = X
    self.y = y

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    x = self.X[idx]
    y = self.y[idx]

    x = torch.tensor(x, dtype=torch.long)
    y = torch.tensor(y, dtype=torch.long)

    return x, y

In [None]:
dataset = GPTdataset(X_sequences, y_sequences)

In [None]:
train_loader = DataLoader(dataset, batch_size=16, shuffle=True)

In [None]:
for batch in train_loader:
  inputs, outputs = batch
  print(f"input.shape: {inputs.shape}, output.shape: {outputs.shape}")
  break

input.shape: torch.Size([16, 128]), output.shape: torch.Size([16, 128])


# **Initialize a model**

**CONFIGURATION**

In [None]:
EPOCHS = 50
EMBEDDING_DIM = 64
NUM_HEADS = 4
HEAD_DIM = 16
BLOCK_SIZE = 128
NUM_LAYERS = 4
VOCAB_SIZE = tokenizer.count_char

In [None]:
model = GPTmodel(vocab_size=VOCAB_SIZE, embedding_dim=EMBEDDING_DIM, head_dim=HEAD_DIM, num_heads=NUM_HEADS, num_layers=NUM_LAYERS, block_size=BLOCK_SIZE)

In [None]:
model = model.to(device)

**optimizer and loss function**

In [None]:
optimizer = Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

# **Train a model**

In [None]:
for epoch in range(1, EPOCHS + 1):
  model.train()

  total_loss = 0

  for batch in train_loader:
    inputs, labels = batch
    inputs = inputs.to(device)
    labels = labels.to(device)

    outputs = model(inputs)

    B, T, C = outputs.shape

    outputs = outputs.view(B * T, C)
    labels = labels.view(B * T)

    loss = criterion(outputs, labels)
    total_loss += loss.item()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f"epoch: {epoch}, loss: {total_loss / len(train_loader)}")


epoch: 1, loss: 3.068751349531371
epoch: 2, loss: 2.631049920772684
epoch: 3, loss: 2.4552911622770903
epoch: 4, loss: 2.356138889131875
epoch: 5, loss: 2.2906295225538056
epoch: 6, loss: 2.2363770028640486
epoch: 7, loss: 2.1908835069886567
epoch: 8, loss: 2.144235397207326
epoch: 9, loss: 2.098283255922383
epoch: 10, loss: 2.0492276878192506
epoch: 11, loss: 1.997600286171354
epoch: 12, loss: 1.9450071741794717
epoch: 13, loss: 1.8920302576032177
epoch: 14, loss: 1.836327310266166
epoch: 15, loss: 1.7794102389236977
epoch: 16, loss: 1.726103829926458
epoch: 17, loss: 1.6689497242713798
epoch: 18, loss: 1.6138427339751145
epoch: 19, loss: 1.5585867135689175
epoch: 20, loss: 1.5061595665997471
epoch: 21, loss: 1.4472676197002674
epoch: 22, loss: 1.3950767034086689
epoch: 23, loss: 1.3419896395042026
epoch: 24, loss: 1.2896447263914963
epoch: 25, loss: 1.2378859653555114
epoch: 26, loss: 1.192848765644534
epoch: 27, loss: 1.1442728093985854
epoch: 28, loss: 1.094832689597689
epoch: 29, 

# **Inference**

In [None]:
def generate_(model, start_text, max_new_tokens=100, temperature=1.0):
  model.eval()

  idx = [tokenizer.char2idx.get(char, tokenizer.char2idx["<UNK>"]) for char in start_text.lower()]

  idx = torch.tensor(idx, dtype=torch.long, device=device).unsqueeze(0)

  with torch.no_grad():
    for _ in range(max_new_tokens):
      idx = idx if idx.size(1) <= BLOCK_SIZE else idx[:, -BLOCK_SIZE:]

      logits = model(idx)

      logits = logits[:, -1, :]

      logits = logits / temperature
      logits = F.softmax(logits, dim=-1)

      idx_next = torch.multinomial(logits, num_samples=1)

      idx = torch.cat((idx, idx_next), dim=1)
  result_idx = idx[0].tolist()
  result = "".join(tokenizer.idx2char[i] for i in result_idx)

  return result

In [None]:
generate_(model, start_text="өлең", max_new_tokens=200, temperature=0.5)

'ірезезеп жаліреуді жап алдап біреуді алдап барбатап е қайылыршық қылып жүріп жүріп еліп бай марін деп бақтай құдарға қайда бай ма'

In [None]:
generate_(model, start_text="мен", max_new_tokens=200, temperature=0.5)

'ысал келді асы арасы қисыныз қызақтырмалең осөз бір олас бірар аласын ақын адайтсың бар келаскі биді биді тұрсам барлап ақақақарғ'