In [2]:
import math
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset,DataLoader
import re
from torch.nn import functional as F

# define global variables:

In [2]:
split_ratio = 0.9

# model params
batch_size = 128 # b, to be changed
sequence_l = 128 # n
d_model = 768 # d_model， embedding dim
num_layer = 12 # number of blocks stacked
number_head = 8 # multihead attention
d_ff = 2048 # feedforward dimension
dropout = 0.2
learning_rate = 1e-4
max_epoch = 3

device = 'cuda' if torch.cuda.is_available() else 'cpu'

with open('data.txt', 'r', encoding='utf-8') as f:
    text = f.read()


# define dataset


In [3]:
# Define your custom dataset class
class CustomDataset(Dataset):
    def __init__(self,sequence_l):
        self.data = self.load_data()
        chars = sorted(list(set(self.data)))
        self.stoi = { ch:i for i,ch in enumerate(chars) }
        self.itos = { i:ch for i,ch in enumerate(self.stoi) }
        self.sequence_l = sequence_l

    def get_vocab_size(self):
        return len(self.stoi)
    
    def __len__(self):
        return len(self.data) - self.sequence_l
    
    def load_data(self):
        with open('data.txt', 'r', encoding='utf-8') as f:
            text = f.read()
        return text


    def __getitem__(self, idx):
        # grab a chunk of (block_size + 1) characters from the data
        chunk = self.data[idx:idx + self.sequence_l + 1]
        # encode every character to an integer
        idx_chunk = [self.stoi[c] for c in chunk]
        x = torch.tensor(idx_chunk[:-1], dtype=torch.long)
        # return the chunk and the shifted version as tensors
        y = torch.tensor(idx_chunk[1:], dtype=torch.long)
        return x,y



# Create an instance of your custom dataset
dataset = CustomDataset(sequence_l)

# Create a data loader
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Model

In [4]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        # pe: [seq_lens * 1 * d_model] for each sample

        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [5]:
class Head(nn.Module):
    def __init__(self,head_size):
        super().__init__()
        self.head_size = head_size
        self.key = nn.Linear(d_model,head_size,bias=False)
        self.query = nn.Linear(d_model,head_size,bias=False)
        self.value = nn.Linear(d_model,head_size,bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(sequence_l, sequence_l)))


    def forward(self,x): # x:[batch, l_seq, d_model]
        k = self.key(x) # k:[batch, l_seq, head_size]
        q = self.query(x) # q:[batch, l_seq, head_size]
        v = self.value(x) # v:[batch, l_seq, head_size]
        qkt = q@k.transpose(2,1)/self.head_size**0.5 #[batch*l_seq*l_seq]  
        qkt = qkt.masked_fill(self.tril == 0, float('-inf'))
        qkt = F.softmax(qkt, dim = -1)
        z = qkt@v # z:[batch * l_seq*l_seq]@[batch, l_seq, head_size] = [batch, l_seq, head_size]
        return z




In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self,number_head,head_size):
        super().__init__()
        self.self_attention = nn.ModuleList([Head(head_size) for _ in range(number_head)])
        self.w0 = nn.Linear(head_size*number_head,d_model)

    def forward(self,x):
        head_outputs = [head(x) for head in self.self_attention]
        output = torch.cat(head_outputs, dim=-1) # [batch, l_seq, head_size*number_head]
        output = self.w0(output) # output:[batch, l_seq, d_model], so that it can be added with residual 
        return output
        

In [7]:
# The Multi-Heads Self-Attention mechanism is followed by two fully connected layers of 
# the Feed Forward block. The first (hidden) layer contains 4 times as many neurons as the input 
# sequence with the ReLU activation function. The dimension of the second layer is 
# equal to the dimension of the input sequence, and neurons do not use the activation function.
class FeedForward(nn.Module):
    def __init__(self,d_model):
        super().__init__()
        self.ff=nn.Sequential( nn.Linear(d_model,4*d_model),
                              nn.ReLU(),
                              nn.Linear(4*d_model,d_model))
    def forward(self,x):
        x = self.ff(x)
        return x

In [8]:
class Block(nn.Module):
    def __init__(self):
        super().__init__()
        head_size = d_model // number_head
        
        self.self_attention = MultiHeadAttention(number_head,head_size)
        
        self.norm1  = nn.LayerNorm(d_model)
        
        self.ffn = FeedForward(d_model)
        
        self.norm2  = nn.LayerNorm(d_model)###
        
    def forward(self,x):
        x = x + self.self_attention(self.norm1(x))
        out = x + self.ffn(self.norm2(x))
         
        return out    





In [9]:
# Model
class Model(nn.Module):
    def __init__(self,stoi):
        super().__init__()
        self.stoi = stoi
        self.tok_emb = nn.Embedding(len(stoi),d_model)
        self.pos_emb = PositionalEncoding(d_model)
        self.dropout1 = nn.Dropout(dropout)
        
        self.blocks = nn.Sequential(*[Block() for _ in range(num_layer)])
        self.norm_final = nn.LayerNorm(d_model)
        self.predict = nn.Linear(d_model,len(stoi))
        self.loss_compute = nn.CrossEntropyLoss()

    def forward(self, x, use='train',y = None ):
        emb_x = self.tok_emb(x)
        emb_x = self.pos_emb(emb_x) # x,y = emb = [batch size * sequence_l * d_model]
        emb_x = self.dropout1(emb_x)

        emb_x = self.blocks(emb_x)
        x = self.norm_final(emb_x)
        logit = self.predict(x) #[batch size * sequence_l * number_of_char]
        # y:[batch size * l_sequence * 1]

        if use == 'train':
            logit = logit.view(batch_size*sequence_l,len(self.stoi))
            y = y.view(batch_size*sequence_l)
            loss = self.loss_compute(logit,y)
        elif use == 'generate':
            loss = None

        return logit, loss # loss for training, logit for generate       
    
    def generate(self, output_length, seed_idx):
        out = seed_idx
        for i in range(output_length):
            print(i)
            logit,_ = self(seed_idx, use = 'generate')
            prob = F.softmax(logit,dim = -1)
            next_idx = prob[-1,-1,:].argmax()
            out = torch.cat([out , next_idx.unsqueeze(0)], dim=-1)
            seed_idx = out[-sequence_l:]
        return out


            
            

# train

In [10]:
model = Model(stoi=dataset.stoi)
m = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)


In [None]:

for epoch in range(1):
    # Iterate through the data loader
    for batch in data_loader:
        # Process each batch of data here
        x, y = batch
        # Your model training code goes here

        logit,loss = m(x,'train', y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(loss)

# generate

In [11]:
stoi = dataset.stoi
itos = dataset.itos
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

In [5]:
seed = text[:10]
empty = " "*sequence_l
seed_idx = encode(seed)
if len(seed)<sequence_l:
    input_idx = encode(empty)
    input_idx[-len(seed):] = seed_idx
else:
    input_idx = seed_idx[:sequence_l]

with torch.no_grad():
    input_idx = torch.tensor(input_idx,dtype=torch.long).to(device)

    generated = m.generate(3,input_idx)

NameError: name 'text' is not defined

In [13]:
decode(generated.tolist())

'                                                                                                                      First Citifhl'

In [6]:
logit = torch.randn((128,128,65))
seed_idx


NameError: name 'seed_idx' is not defined

In [7]:
prob = F.softmax(logit,dim = -1)
next_idx = prob[-1,-1,:].argmax()

torch.cat([torch.tensor(input_idx) , next_idx.unsqueeze(0)], dim=-1)

NameError: name 'input_idx' is not defined

In [None]:
torch.tensor(seed_idx)