In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from transformers import AutoTokenizer
import json
import re

In [2]:
configdict= {
    "gpt-1M":{
        "batch_size": 64,
        "block_size": 256,
        "max_pos_n_embed": 2048,
        "lr": 2e-3,
        "n_layer": 8,
        "n_head": 16,
        "n_embed": 64,
        "dropout": 0.2,
        "epochs": 1,
        "eval_interval": 200,
        "eval_steps": 50,
        "n": 1200000,
        "k": 7999,
        "vocab_size": 8000,
    },
    "gpt-15M":{
        "batch_size": 64,
        "block_size": 256,
        "max_pos_n_embed": 2048,
        "lr": 2e-3,
        "n_layer": 8,
        "n_head": 16,
        "n_embed": 320,
        "dropout": 0.2,
        "epochs": 1,
        "eval_interval": 200,
        "eval_steps": 50,
        "n": 1200000,
        "k": 7999,
        "vocab_size": 8000,
    },
     "tokenizer":{
        "name": "EleutherAI/gpt-neo-125M",
    },
    "data":{
        "name": "roneneldan/TinyStories",
    },
}

class Config:
    def __init__(self, dictionary):
        for key, value in dictionary.items():
            if isinstance(value, dict):
                setattr(self, key, Config(value))
            else:
                setattr(self, key, value)

    def __getitem__(self, key):
        return self.__dict__[key]

config = Config(configdict)

In [54]:
class Tokenizer:
  def __init__(self, config, k=None, file_path=None, device="cpu"):
    self.k = k
    self.file_path = file_path
    self.device = device
    self.tokenizer = AutoTokenizer.from_pretrained(config.name)
    self.tokenizer.pad_token = self.tokenizer.eos_token
    self.vocab_size = self.tokenizer.vocab_size if not self.k else self.k
    self.initialize()

  def get_config(self):
    config = {
        "initl_vocab_size": self.tokenizer.vocab_size,
        "final_vocab_size": self.vocab_size,
        "vocab_size": self.vocab_size,
        "total_tokens": self.total_tokens,
        "total_tokens_used": self.tokens_used if self.k else self.total_tokens,
        "total_unsed_tokens": self.total_tokens - self.tokens_used if self.k else 0
    }
    return config

  def initialize(self):
    with open(self.file_path, 'r') as file:
      tokens_counts = json.load(file)

    self.total_tokens = sum(tokens_counts.values()) # Already sorted

    if self.k:
      self.tokens_used = sum([i for i in tokens_counts.values()][:self.k])
      self.top_k_tokens = [i for i in tokens_counts.keys()][:self.k]# We will only use top k tokens, others will be ignored
      self.top_k_tokens.append("50256")
      self.vocab_size +=1
      self.top_k_tokens_dict =  {token: index for index, token in enumerate(self.top_k_tokens)}
      self.reversed_top_k_tokens_dict = {value: int(key) for key, value in self.top_k_tokens_dict.items()}


  def encoder(self, input, padding=False, max_length=256, truncation=False):
    tokens = self.tokenizer(input , return_tensors='pt', padding=padding, max_length=max_length, truncation=truncation)['input_ids'].to(self.device)

    if self.k:
      tokens = torch.tensor([self.top_k_tokens_dict.get(str(token.item()), self.top_k_tokens_dict["50256"]) for token in tokens.view(-1)], device=self.device).view(tokens.shape)

    return tokens

  def decoder(self, tokens):
    if self.k:
      tokens = torch.tensor([[self.reversed_top_k_tokens_dict[token.item()] for token in row] for row in tokens], device=tokens.device)

    output = [self.tokenizer.decode(x, skip_special_tokens=True) for x in tokens]

    return output

In [56]:
class Head(nn.Module):
  def __init__(self, config, head_size):
    super().__init__()
    self.key = nn.Linear(config.n_embed, head_size, bias=False)
    self.query = nn.Linear(config.n_embed, head_size, bias=False)
    self.value = nn.Linear(config.n_embed, head_size, bias=False)
    self.register_buffer('tril', torch.tril(torch.ones(config.block_size, config.block_size)))  # (T, T)
    self.dropout = nn.Dropout(config.dropout)

  def forward(self, x):
    B, T, C = x.shape
    k = self.key(x) # (B, T, C)
    q = self.query(x) # (B, T, C)
    wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5 # (B, T, C) X (B, C, T) --> (B, T, T)
    wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
    wei = F.softmax(wei, dim=-1)
    wei = self.dropout(wei)
    v = self.value(x)  # (B,T,C)
    out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
    return out


class MultiHeadAttention(nn.Module):
  def __init__(self, config, head_size):
    super().__init__()
    self.heads = nn.ModuleList([Head(config, head_size) for _ in range(config.n_head)])
    self.proj  = nn.Linear(head_size * config.n_head, config.n_embed)
    self.dropout = nn.Dropout(config.dropout)

  def forward(self,x):
    out = torch.concat([h(x) for h in self.heads], dim=-1)
    out = self.dropout(self.proj(out))
    return out


class FeedForward(nn.Module):
  def __init__(self, config):
   super().__init__()
   self.layers = nn.Sequential(
        nn.Linear(config.n_embed, 4 * config.n_embed),
        nn.GELU(),
        nn.Linear(4 * config.n_embed, config.n_embed),
        nn.Dropout(config.dropout),
    )

  def forward(self,x):
    return self.layers(x)


class Block(nn.Module):
  def __init__(self, config):
    super().__init__()
    head_size = config.n_embed // config.n_head
    self.sa_heads = MultiHeadAttention(config, head_size)
    self.ffwd = FeedForward(config)
    self.ln1 = nn.LayerNorm(config.n_embed)
    self.ln2 = nn.LayerNorm(config.n_embed)

  def forward(self, x):
    x = x + self.sa_heads(self.ln1(x))
    x = x + self.ffwd(self.ln2(x))
    return x


class GPT2(nn.Module):
  def __init__(self, config, device='cpu'):
    super().__init__()
    self.device = device
    self.block_size = config.block_size
    self.embedings = nn.Embedding(config.vocab_size, config.n_embed)
    self.position_embedings = nn.Embedding(config.max_pos_n_embed, config.n_embed)
    self.dropout = nn.Dropout(config.dropout)
    self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
    self.ln_final = nn.LayerNorm(config.n_embed)
    self.lm_head = nn.Linear(config.n_embed, config.vocab_size)

  def get_parameters(self):
    return sum(p.numel() for p in self.parameters())

  def save(self, path):
    torch.save(self.state_dict(), path)

  def forward(self, idx, targets=None):
    B, T = idx.shape
    token_embed = self.embedings(idx) # (B, T, C)
    position_embed = self.position_embedings(torch.arange(T,  device=self.device)) # (T, C)
    x = token_embed + position_embed # (B, T, C)
    x = self.dropout(x) # (B, T, C)
    x = self.blocks(x) # (B, T, C)
    x = self.ln_final(x) # (B, T, C)
    logits = self.lm_head(x)  # (B, T, vocab_size)

    if targets is None:
      loss = None
    else:
      logits = logits[..., :-1, :].contiguous()
      loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets[..., 1:].contiguous().view(-1), ignore_index=50256)
    return logits, loss

  def generate(self, idx, max_tokens, temperature=1.0, top_k=None):
    # idx is (B, T)
    for _ in range(max_tokens):
      idx_cond = idx[:, -self.block_size:]
      logits, _ = self(idx_cond) # (B, T, C)
      logits = logits[:, -1, :]  / temperature # (B, C)
      if top_k is not None:
        v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
        logits[logits < v[:, [-1]]] = -float('Inf')
      probs = F.softmax(logits, dim=-1) # Softmax Independently for C dim
      idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
      idx = torch.concat((idx, idx_next), dim=1) # (B, T+1)
    return idx

In [57]:
def load_model(config, path, device='cpu'):
    model = GPT2(config, device=device)
    model.load_state_dict(torch.load(path, map_location=torch.device('cpu')))
    model.to(device)
    model.eval()
    return model

def clean_string(input_string):
    cleaned_string = re.sub(r'[^\w\s.,]', '', input_string)
    cleaned_string = cleaned_string.replace('\n', '')
    return cleaned_string

In [49]:
model_name= "gpt-1M"
path = "model-1M-8k-3.pth"
model_config = config[model_name]
device = 'cuda' if torch.cuda.is_available() else 'cpu'

tokenizer = Tokenizer(config.tokenizer, k=model_config.k, file_path="tokens.json", device=device)

In [58]:
model = load_model(model_config, path, device=device)

unconditional = torch.zeros((1, 1), dtype=torch.long, device=device)
prompt = "Elon told his mom"

output1 = model.generate(unconditional, max_tokens=200, temperature=1, top_k=None)
output2 = model.generate(tokenizer.encoder(prompt), max_tokens=200, temperature=1, top_k=None)

print(clean_string(tokenizer.decoder(output1)[0]))
print(clean_string(tokenizer.decoder(output2)[0]))

. was a weird day, the little boy named Tim. Tim wanted to become the same little boy with his mom. He asked his mom to help him to use the dangerousets.

His mom said, "Tim, you can play with me some squirrels and care of us. But it's just very nice to ask Mom and not have to win!"

Tim wanted to help his mom. He thought about one because one day was his favorite toy. He started to explore a game. He didn't know what to do. His mom smiled and said, "That's not a good idea. You can play music instead."

Tim was sad again, except for his favorite friend and he found him very well. He said, "Thank you, mom! I heard my name." His mom smiled and said, "Sure, Tim. I will take him dressed in life."

Tim and his mom were happy at helping him. They set up the game with his
Elon told his mommy were taking her in the garden. Grandma had a big cup of treats, and some apples. They saw lots of exciting snacks that said it was going to eat.

"Let's eat them," said Mama.

They walked to the fridge an