In [1]:
import inspect
import os
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer
from dataclasses import dataclass
import math

# Custom Dataset class

In [2]:
from torch.utils.data import Dataset
from transformers import AutoTokenizer

class PersianPoetryDataset(Dataset):
    def __init__(self, text, block_size, tokenizer):
        self.block_size = block_size
        self.tokenizer = tokenizer
        lines = text.split('\n')
        formatted_lines = []
        for i in range(0, len(lines), 2):
            if i + 1 < len(lines):
                formatted_lines.append(f"[BOM] {lines[i].strip()} [EOS] [BOM] {lines[i+1].strip()} [EOS]")
        text_with_bom_eos = ' '.join(formatted_lines)
        self.tokens = tokenizer(text_with_bom_eos, return_tensors='pt')['input_ids'].squeeze()

    def __len__(self):
        return len(self.tokens) - self.block_size

    def __getitem__(self, idx):
        chunk = self.tokens[idx:idx + self.block_size + 1]
        return chunk[:-1], chunk[1:]




In [3]:


@dataclass
class GPTConfig:
    block_size: int
    vocab_size: int
    n_layer: int
    n_head: int
    n_embd: int
    dropout: float
    bias: bool

class LayerNorm(nn.Module):
    """LayerNorm but with an optional bias. PyTorch doesn't support simply bias=False"""
    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

    def forward(self, input):
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.dropout = config.dropout
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size)).view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        if self.flash:
            y = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)
        else:
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_dropout(self.c_proj(y))
        return y

class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
        self.gelu = nn.GELU()
        self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        x = self.c_fc(x)
        x = self.gelu(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
        self.attn = CausalSelfAttention(config)
        self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
        self.mlp = MLP(config)

    def forward(self, x):
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x

class GPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.vocab_size is not None
        assert config.block_size is not None
        self.config = config

        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd),
            wpe = nn.Embedding(config.block_size, config.n_embd),
            drop = nn.Dropout(config.dropout),
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f = LayerNorm(config.n_embd, bias=config.bias),
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.transformer.wte.weight = self.lm_head.weight  # Weight tying

        self.apply(self._init_weights)
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                torch.nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))
        print("number of parameters: %.2fM" % (self.get_num_params() / 1e6,))

    def get_num_params(self, non_embedding=True):
        n_params = sum(p.numel() for p in self.parameters())
        if non_embedding:
            n_params -= self.transformer.wpe.weight.numel()
        return n_params

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        device = idx.device
        b, t = idx.size()
        assert t <= self.config.block_size, f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}"
        pos = torch.arange(0, t, dtype=torch.long, device=device)

        tok_emb = self.transformer.wte(idx)
        pos_emb = self.transformer.wpe(pos)
        x = self.transformer.drop(tok_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        else:
            logits = self.lm_head(x[:, [-1], :])
            loss = None

        return logits, loss

    def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):
        param_dict = {pn: p for pn, p in self.named_parameters() if p.requires_grad}
        decay_params = [p for n, p in param_dict.items() if p.dim() >= 2]
        nodecay_params = [p for n, p in param_dict.items() if p.dim() < 2]
        optim_groups = [
            {'params': decay_params, 'weight_decay': weight_decay},
            {'params': nodecay_params, 'weight_decay': 0.0}
        ]
        num_decay_params = sum(p.numel() for p in decay_params)
        num_nodecay_params = sum(p.numel() for p in nodecay_params)
        print(f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters")
        print(f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters")
        fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters
        use_fused = fused_available and device_type == 'cuda'
        extra_args = dict(fused=True) if use_fused else dict()
        optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=betas, **extra_args)
        print(f"using fused AdamW: {use_fused}")

        return optimizer

In [4]:

config = GPTConfig(
    block_size=128,
    vocab_size=25000,
    n_layer=6,
    n_head=4,
    n_embd=128,
    dropout=0.1,
    bias=True
)

In [5]:
model = GPT(config)

number of parameters: 4.39M


In [24]:

with open('vahshi_norm.txt', 'r', encoding='utf-8') as f:
    text = f.readlines()[:10000] 
    text = ''.join(text)

# Create the dataset
tokenizer = AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian')
dataset = PersianPoetryDataset(text, config.block_size, tokenizer)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


def train(model, dataset, epochs, batch_size, lr, weight_decay, device):
    model = model.to(device)
    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    optimizer = model.configure_optimizers(weight_decay, lr, (0.9, 0.95), device.type)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=lr, steps_per_epoch=len(train_loader), epochs=epochs)
    model.train()
    for epoch in range(epochs):
        for i, (x, y) in enumerate(train_loader):
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            logits, loss = model(x, y)
            loss.backward()
            optimizer.step()
            scheduler.step()
            if i % 1000 == 0:
                print(f'Epoch: {epoch}, Iteration: {i}, Loss: {loss.item()}')

train(model, dataset, epochs=10, batch_size=8, lr=1e-5, weight_decay=0.1, device=device)


torch.save(model.state_dict(), 'my_model1.pt')


num decayed parameter tensors: 26, with 4,396,032 parameters
num non-decayed parameter tensors: 50, with 10,240 parameters
using fused AdamW: True
Epoch: 0, Iteration: 0, Loss: 5.862204551696777
Epoch: 0, Iteration: 1000, Loss: 5.203840255737305
Epoch: 0, Iteration: 2000, Loss: 6.309654712677002
Epoch: 0, Iteration: 3000, Loss: 5.6349992752075195
Epoch: 0, Iteration: 4000, Loss: 5.537422180175781
Epoch: 0, Iteration: 5000, Loss: 5.710529804229736
Epoch: 0, Iteration: 6000, Loss: 5.983253479003906
Epoch: 0, Iteration: 7000, Loss: 5.9268364906311035
Epoch: 0, Iteration: 8000, Loss: 5.233710765838623
Epoch: 0, Iteration: 9000, Loss: 5.796087265014648
Epoch: 0, Iteration: 10000, Loss: 5.601946830749512
Epoch: 0, Iteration: 11000, Loss: 5.830855369567871
Epoch: 0, Iteration: 12000, Loss: 5.565224647521973
Epoch: 1, Iteration: 0, Loss: 5.452528476715088
Epoch: 1, Iteration: 1000, Loss: 5.588979244232178
Epoch: 1, Iteration: 2000, Loss: 5.4638991355896
Epoch: 1, Iteration: 3000, Loss: 5.71233

In [25]:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.load_state_dict(torch.load('my_model1.pt', map_location=device))

print("Model loaded successfully.")
print(model)
print(f"number of parameters: {model.get_num_params()/1e6:.2f}M")
# Load the tokenizer

Model loaded successfully.
GPT(
  (transformer): ModuleDict(
    (wte): Embedding(25000, 128)
    (wpe): Embedding(128, 128)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-5): 6 x Block(
        (ln_1): LayerNorm()
        (attn): CausalSelfAttention(
          (c_attn): Linear(in_features=128, out_features=384, bias=True)
          (c_proj): Linear(in_features=128, out_features=128, bias=True)
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm()
        (mlp): MLP(
          (c_fc): Linear(in_features=128, out_features=512, bias=True)
          (gelu): GELU(approximate='none')
          (c_proj): Linear(in_features=512, out_features=128, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm()
  )
  (lm_head): Linear(in_features=128, out_features=25000, bias=False)
)
number of parameters: 4.39M


In [33]:


model.to(device)

def tokenize_and_truncate_text(text, block_size):
    tokens = tokenizer.encode(text, add_special_tokens=False)
    if len(tokens) > block_size:
        tokens = tokens[-block_size:]  
    return torch.tensor(tokens, dtype=torch.long).unsqueeze(0)  

def decode_output(predicted_ids):
    predicted_tokens = predicted_ids.tolist()  
    predicted_text = tokenizer.decode(predicted_tokens, skip_special_tokens=False)
    return predicted_text


def get_model_output(model, input_text, block_size, temperature=1.0, top_k=50):
    model.eval()

    input_ids = tokenize_and_truncate_text(input_text, block_size)
    input_ids = input_ids.to(device)

    with torch.no_grad():
        logits, _ = model(input_ids)
        logits = logits[:, -1, :] / temperature  

       
        top_k_logits, top_k_indices = torch.topk(logits, k=top_k, dim=-1)
        probs = F.softmax(top_k_logits, dim=-1)

        
        chosen_idx = torch.multinomial(probs, num_samples=1)
        predicted_token_id = top_k_indices[0, chosen_idx[0]]

       
        predicted_text = decode_output(predicted_token_id)

    return predicted_text






def poem(input_text,beam=15):
    complete_text = input_text
    last_few_tokens = []
    temperature = 0.8
    bom_count = 0

    while bom_count < beam:
        predicted_text = get_model_output(model, input_text, config.block_size, temperature=temperature, top_k=50)

        if predicted_text.strip() in last_few_tokens :
            temperature *= 1.1  
            predicted_text = get_model_output(model, input_text, config.block_size, temperature=temperature, top_k=50)
        else:
            temperature = max(0.7, temperature * 0.95) 

        complete_text += " " + predicted_text.strip()  
        last_few_tokens.append(predicted_text.strip())

        if len(last_few_tokens) > 3:  
            last_few_tokens.pop(0)

        input_text += " " + predicted_text.strip()
        input_ids = tokenize_and_truncate_text(input_text, config.block_size)
        input_text = tokenizer.decode(input_ids[0].tolist())  

        if '[EOS]' in predicted_text or '[BOM]' in predicted_text:
            complete_text += '\n'
            bom_count += 1

    lines = complete_text.split('\n')
    for line in lines:
        print(line)
    print('-' * 100)


In [38]:
poem(input_text='دوستت دارم')

دوستت دارم که سد مرغ او را [EOS]
 [BOM]
 چو بر سر در او با من و هم زبان باشد [EOS]
 [BOM]
 به جان  باشد و نمی باشد [EOS]
 [BOM]
 که می آید به جای خویش را چه باشد [EOS]
 [BOM]
 به یک دم از دست و می باشد [EOS]
 [BOM]
 اگر یار و نه من نکرد آن باشد [EOS]
 [BOM]
 که از آن خانه من باشد [EOS]
 [BOM]
 که چون باشد و از پی بی خان و جانی [EOS]

----------------------------------------------------------------------------------------------------
