In [3]:
import regex as re

class GPTTokenizer():
    def __init__(self, vocab_size):
        self.GPT4_SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+"""
        self.vocab_size = vocab_size
        self.bigram_tree = {}
        self.vocab = {idx: bytes([idx]) for idx in range(256)}

    def split_text(self, text):
        return re.findall(self.GPT4_SPLIT_PATTERN, text)

    def freq(self, tokens, stats):
        for id1, id2 in zip(tokens, tokens[1:]):
            stats[(id1, id2)] = stats.get((id1, id2), 0) + 1

    def replace(self, tokens, pair, idx):
        newids = []
        i = 0
        while i < len(tokens):
            if tokens[i] == pair[0] and i < len(tokens) - 1 and tokens[i+1] == pair[1]:
                newids.append(idx)
                i += 2
            else:
                newids.append(tokens[i])
                i += 1
        return newids

    def train(self, text):
        chunks = re.findall(self.GPT4_SPLIT_PATTERN, text)
        tokens = [list(chunk.encode('utf-8')) for chunk in chunks]

        for i in range(self.vocab_size - 256):
            stats = {}
            for token in tokens:
                self.freq(token, stats)
            maxi = max(stats, key=stats.get)
            tokens = [self.replace(token, maxi, 256 + i) for token in tokens]
            self.bigram_tree[maxi] = 256 + i
            self.vocab[256 + i] = self.vocab[maxi[0]] + self.vocab[maxi[1]]

    def encode_chunk(self, tokens):
        while len(tokens)>=2:
            stats = {}
            self.freq(tokens, stats)
            pair = min(stats, key = lambda p: self.bigram_tree.get(p, float("inf")))
            if pair not in self.bigram_tree:
                break
            tokens = self.replace(tokens, pair, self.bigram_tree[pair])
        return tokens

    def encode(self, text):
        chunks = re.findall(self.GPT4_SPLIT_PATTERN, text)
        tokens = []
        for chunk in chunks:
            tks = list(chunk.encode('utf-8'))
            tks = self.encode_chunk(tks)
            tokens.extend(tks)

        return tokens

    def decode(self, tokens):
        x = (b"".join(self.vocab[idx] for idx in tokens))
        text = x.decode('utf-8', errors="ignore")
        return text


text = ""
with open("/kaggle/input/shakespeare/input.txt", 'r', encoding = 'utf-8') as f:
    text = f.read()

tokenizer = GPTTokenizer(1000)
tokenizer.train(text)




NameError: name 'torch' is not defined

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import regex as re
import torch.optim as optim
data = torch.tensor(tokenizer.encode(text))
vocab_size = len(tokenizer.vocab)

n = int(len(data)*0.8)
train = data[:n]
valid = data[n:]

def get_batch(specify, batch_size, block_size):
    data = train if specify == 'train' else valid
    idx = torch.randint(len(data)-block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in idx])
    y = torch.stack([data[i+1:i+block_size+1] for i in idx])
    return x,y
# tokenizer.vocab

In [6]:

class EmbeddingLayer(nn.Module):
    def __init__(self, d_model, vocab_size, context_length, dropout) -> None:
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.context_length = context_length
        self.inp_embedding = nn.Embedding(self.vocab_size, self.d_model)
        self.dropout = nn.Dropout(dropout)
        posits = torch.zeros(self.context_length, self.d_model)
        position = torch.arange(0, self.context_length, dtype=torch.float).unsqueeze(1)
        v_emb = torch.arange(0, self.d_model, 2, dtype=torch.float)

        posits[:, 0::2] = torch.sin(position / (10000 ** (v_emb / self.d_model)))
        posits[:, 1::2] = torch.cos(position / (10000 ** (v_emb / self.d_model)))

        posits = posits.unsqueeze(0)
        self.register_buffer('posits', posits)

    def forward(self, x):
        x = self.inp_embedding(x)
        x = x + self.posits[:, :x.size(1), :]
        x = self.dropout(x)
        return x

class ResidualLink(nn.Module):
    def __init__(self, sublayer, d_model, dropout):
        super().__init__()
        self.sublayer = sublayer
        self.norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return x + self.dropout(self.sublayer(self.norm(x)))

class CausalSelfAttention(nn.Module):
    def __init__(self, d_model, head_size, dropout) -> None:
        super().__init__()
        self.d_model = d_model
        self.head_size = head_size
        self.dropout = nn.Dropout(dropout)
        self.query = nn.Linear(self.d_model, self.head_size)
        self.key = nn.Linear(self.d_model, self.head_size)
        self.value = nn.Linear(self.d_model, self.head_size)

    def forward(self, x):
        B, T, C = x.shape
        q = self.query(x)
        k = self.key(x)
        v = self.value(x)
        out1 = q @ k.transpose(-2, -1) / (self.head_size ** 0.5)
        mask = torch.tril(torch.ones(T, T)).to(x.device)
        out1 = out1.masked_fill(mask == 0, float('-inf'))
        out1 = F.softmax(out1, dim=-1)
        out1 = self.dropout(out1)
        out2 = out1 @ v
        return out2

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout) -> None:
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.dropout = nn.Dropout(dropout)
        self.head_size = self.d_model // self.num_heads
        self.attention_heads = nn.ModuleList([CausalSelfAttention(self.d_model, self.head_size, dropout) for _ in range(self.num_heads)])
        self.w_out = nn.Linear(self.d_model, self.d_model)

    def forward(self, x):
        self.out = torch.cat([head(x) for head in self.attention_heads], dim=-1)
        self.out = self.w_out(self.out)
        self.out = self.dropout(self.out)
        return self.out

class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, dropout) -> None:
        super().__init__()
        self.d_model = d_model
        self.d_ff = d_model * 4
        self.dropout = nn.Dropout(dropout)
        self.linear1 = nn.Linear(self.d_model, self.d_ff)
        self.linear2 = nn.Linear(self.d_ff, self.d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.relu(self.linear1(x))
        out = self.linear2(out)
        out = self.dropout(out)
        return out

class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, dropout) -> None:
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.dropout = nn.Dropout(dropout)
        self.attention = ResidualLink(MultiHeadAttention(self.d_model, self.num_heads, dropout), self.d_model, dropout)
        self.feedforward = ResidualLink(FeedForwardNetwork(self.d_model, dropout), self.d_model, dropout)

    def forward(self, x):
        x = self.attention(x)
        x = self.feedforward(x)
        return x

class GPTBlock(nn.Module):
    def __init__(self, vocab_size, d_model, context_length, n_decoder_blocks, dropout) -> None:
        super().__init__()
        self.embeds = EmbeddingLayer(d_model, vocab_size, context_length, dropout)
        self.decoder_blocks = nn.ModuleList([DecoderBlock(d_model, 8, dropout) for _ in range(n_decoder_blocks)])
        self.norm = nn.LayerNorm(d_model)
        self.linear = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        x = self.embeds(x)
        for block in self.decoder_blocks:
            x = block(x)
        x = self.norm(x)
        x = self.linear(x)
        return x

class BigramLM(nn.Module):
    def __init__(self, context_length, n_decoder_blocks, dropout, vocab_size=1000, d_model=512):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.model = GPTBlock(vocab_size, d_model, context_length, n_decoder_blocks, dropout)
        self.context_length = context_length

    def forward(self, x, targets=None):
        x = x.cuda()
        logits = self.model(x)
        loss = None
        if targets is not None:
            targets = targets.cuda()
            logits = logits.view(-1, self.vocab_size)
            targets = targets.view(-1)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    @torch.no_grad()
    def generate(self, x, num_char=1000):
        for i in range(num_char):
            x_cond = x[:, -self.context_length:]
            pred, _ = self(x_cond)
            preds = pred[:, -1, :]
            probs = F.softmax(preds, dim=-1)
            idx = torch.multinomial(probs, num_samples=1)
            x = torch.cat((x, idx), dim=1)
        return x

In [7]:
context_length = 32
n_decoder_blocks = 8
batch_size = 64
eval_iters = 20
eval_interval = 2000
dropout = 0.3

In [15]:
m = BigramLM(context_length, n_decoder_blocks, dropout)
m = m.cuda()

In [16]:
@torch.no_grad()
def estimate_loss():
    out = {}
    m.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split,batch_size,context_length)
            logits, loss = m(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    m.train()
    return out

In [17]:
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

26.245096 M parameters


In [18]:
opt = optim.Adam(m.parameters(), lr = 6e-4)

for i in range(50000):
    x,y = get_batch('train',batch_size, context_length)
    preds,loss = m(x,y)
    if i % eval_interval == 0:
        losses = estimate_loss()
        print(f"step {i}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
    opt.zero_grad()
    loss.backward()
    opt.step()


step 0: train loss 7.0774, val loss 7.0801
step 2000: train loss 3.0474, val loss 4.1226
step 4000: train loss 2.6172, val loss 4.1959
step 6000: train loss 2.1963, val loss 4.3641
step 8000: train loss 1.8655, val loss 4.5564
step 10000: train loss 1.5771, val loss 4.7767


KeyboardInterrupt: 

In [19]:
print(tokenizer.decode(list(map(int, m.generate(torch.zeros((1,1), dtype = torch.long).cuda(), 2000)[0]))))

  that most glad. I'll not have done't again.

ANTIGONUS:
What you have beower to be of than you and told on't,
Like a surcept after horse that swe thee?

JOHN OF GAUNT:
Give me my lie.

MERCUTIO:
My prayers, go with me, where I should not trouble you. I
Appear forth all my heart.
Come you, coward my heart disdent and griedyvery.

JULIET:
You have some said to kiss her life away?
I'll make you glp her bosom in't
More bastards play another, for thou know these way:
I have all mock'd for ever.'

AUTOLYCUS:
Give me my leave, I beseech you!
I'll to you, sir, temper thee, for you those that knows
justified.

Father:
And would it were a Richard's best friend, which the
sues
That thou hast made'st a traitor's head by
Diss and by joces thereof;
For God will I'll do my grief, my vow unto mine honour,
Be to thee reverend me stop wars,
But what rests changed me with living nimb;
Feather weeel wink first to see you, brother Montague,
And stops ' feel water for thee: if pay him,
Alack houses, with 

In [68]:
x = torch.zeros((1,1))

In [74]:
(x + torch.zeros(30, 512)).shape

torch.Size([30, 512])

In [87]:
preds,_ = m(x)

In [89]:
preds.shape

torch.Size([64, 32, 1000])