In [1]:
#Model to generate Lorem Ipsum
import random
import torch
import torch.nn as nn
import time
import pickle

In [2]:
#Hyperparameters
BatchSize = 64
BlockSize = 256
Dmodel = 384
nheads = 6
Dk = int(Dmodel/nheads)
Dv = Dk
LearningRate = 3e-4
MaxIters = 5000
device = 'cuda' if torch.cuda.is_available() else 'cpu'
ContextLength=500
TransformerBlocks = 6

In [3]:
#Decode string from tokens
def decode(tokens):
    str = []
    index = 0
    offset = 0
    while index - offset < len(tokens):
        offset = 0
        token = tokens[index]
        #print(token)
        if token in Initialvocab:
            str.append(token)
        else:
            #print(merges)
            if token in merges.values():
                 for key, value in merges.items():
                    if value == token:
                        #print(token)
                        tokens.insert(index + 1, key[0])
                        tokens.insert(index + 2, key[1])
                        offset = 1
                        break
        index +=1
    return bytes(str).decode("utf-8", errors="replace")

In [4]:
def getPairFreqs(text):
    freqs = {}
    for pair in zip(text, text[1:]):
        try:
            freqs[pair] +=1
        except KeyError:
            freqs[pair] = 1
    return freqs

In [5]:
def merge(text, pair, newChar):
    newText = []
    i=0
    while i < len(text):
        if i < len(text) - 1 and (text[i], text[i+1]) == (pair[0], pair[1]):
            newText.append(newChar)
            i+=2
        else:
            newText.append(text[i])
            i+=1
    return newText

In [6]:
# Load variables from pickle files (Lorem Ipsum)
with open('Shakespeareinitialvocab.pkl', 'rb') as f:
    Initialvocab = pickle.load(f)
    
with open('Shakespearevocab.pkl', 'rb') as f:
    vocab = pickle.load(f)
    
with open('Shakespearevocabsize.pkl', 'rb') as f:
    VocabSize = pickle.load(f)
    
with open('Shakespearemerges.pkl', 'rb') as f:
    merges = pickle.load(f)
    
with open('Shakespearetrainingdata.pkl', 'rb') as f:
    TrainingData = pickle.load(f)

In [7]:
#Encode string to tokens
def encode(text):
    tokens = list(text.encode("utf-8"))
    while True:
        freqs = getPairFreqs(tokens)
        pairIndex = float("inf")
        pairToMerge=""
        for pair in freqs.keys():
            if merges.get(pair, float("inf")) < pairIndex:
                pairIndex = merges.get(pair, float("inf"))
                pairToMerge = pair
        if pairIndex == float("inf"):
            break
        tokens = merge(tokens, pairToMerge, pairIndex)
    return tokens

In [8]:
#Currently we don't use any ReGex but I will see if I want to change this

In [9]:
#Create Training/Testing Data Batches
def GetBatch(split):
    offsets = []
    for i in range(0, BatchSize):
        offsets.append(random.randint(0, len(TrainingData) - BlockSize - 1))
    for i in range(0, BatchSize):
        x = torch.stack([torch.tensor(TrainingData[i:i+BlockSize]) for i in offsets])
        y = torch.stack([torch.tensor(TrainingData[i+1:i+BlockSize+1]) for i in offsets])
    return x, y 

In [10]:
#Single Attention Head
class AttentionHead(nn.Module):
    
    def __init__(self): 
        super().__init__()
        #Learned key matrix
        self.Wk = nn.Parameter(torch.nn.init.xavier_uniform_(torch.empty((Dmodel, Dk), device=device)), requires_grad=True)
        #Learned query matrix
        self.Wq = nn.Parameter(torch.nn.init.xavier_uniform_(torch.empty((Dmodel, Dk), device=device)), requires_grad=True)
        #Learned value matrix
        self.Wv = nn.Parameter(torch.nn.init.xavier_uniform_(torch.empty((Dmodel, Dv), device=device)), requires_grad=True)
    
    def forward(self, E):
        #print(E.shape)
        #Dynamically checks size of E as it is not always 
        Batches, Blocks = E.shape[0], E.shape[1]
        #Compute key, query, value matrices
        Q = E @ self.Wq
        K = E @ self.Wk
        V = E @ self.Wv
        #Masking matrix
        M = torch.tril(torch.zeros(Blocks, Blocks)).masked_fill(torch.tril(torch.ones(Blocks, Blocks)) == 0, float("-inf")).to(device)
        #Compute attention pattern
        AttentionPattern = torch.nn.functional.softmax(((Q @ K.transpose(-2,-1))/((Dk)**(1/2)) + M), dim = -1) @ V
        #Residual connection
        return AttentionPattern

In [11]:
#Multi-headed Attention
class MultiHeadedAttention(nn.Module):
    def __init__(self):
        super().__init__()
        self.heads = nn.ModuleList([AttentionHead() for i in range(nheads)])
        
    def forward(self, E):
        #Concatenate multiple heads of attention
        return torch.cat([head(E) for head in self.heads], dim=-1)

In [12]:
#Feedforward layer
class FeedForwardLayer(nn.Module):
    def __init__(self):
        super().__init__()
        self.ffl = nn.Sequential(
            #Each MLP has 4* more neurons than there are dimensions .
            nn.Linear(Dmodel, Dmodel * 4),
            nn.ReLU(),
            nn.Linear(Dmodel * 4, Dmodel),
        )

    def forward(self, E):
        return self.ffl(E)

In [13]:
#Transformer Block
class Block(nn.Module):
    def __init__(self):
        super().__init__()
        self.AttentionHeads = MultiHeadedAttention()
        self.ffl = FeedForwardLayer()
        self.LayerNorm1 = nn.LayerNorm(Dmodel, device=device)
        self.LayerNorm2 = nn.LayerNorm(Dmodel, device=device)
    
    #Slightly different to my formalisation as do layernorm before blocks, not after
    #The original paper did indeed do layernorm second, so switch it back to this
    def forward(self, E):
        E = E + self.AttentionHeads(self.LayerNorm1(E))
        E = E + self.ffl(self.LayerNorm2(E))
        return E

In [14]:
#Unembedding Layer. We don't focus on the last embedding at this stage unlike in my formalisation. We can do that when we need to
class UnembedLayer(nn.Module):
    
    def __init__(self): 
        super().__init__()
        #Learned unembedding matrix
        self.Wu = nn.Parameter(torch.nn.init.xavier_uniform_(torch.empty((Dmodel, VocabSize), device=device)), requires_grad=True)
        
    def forward(self, E):
        logits = E @ self.Wu
        #Dynamically check shape of logits, since this isnt always BatchSize, BlockSize (e.g. when generating text)
        Batches, Blocks = logits.shape[0], logits.shape[1]
        #Converts logits to a single list of logits for compatibility with cross entropy functional
        logits = logits.view(Batches*Blocks, VocabSize)
        #probs = torch.nn.functional.softmax(logits, dim=-1)
        return logits

In [15]:
#Transformer Implementation
class Transformer(nn.Module):
    
    def __init__(self): 
        super().__init__()
        #Learned Embedding matrix. Requires_grad ensures We updated during backpropagation.
        self.We = nn.Parameter(torch.nn.init.xavier_uniform_(torch.empty((VocabSize, Dmodel), device=device)), requires_grad=True)
        #Learned positional encoding matrix. Dimension n x d_model, where n = BlockSize
        self.Wp = nn.Parameter(torch.nn.init.xavier_uniform_(torch.empty((ContextLength, Dmodel), device=device)), requires_grad=True)
        #Unembeding layer
        self.UnembedLayer = UnembedLayer()
        #Transformer blocks
        self.Blocks = nn.Sequential(*[Block() for i in range(TransformerBlocks)])
        
        #Having O = None means O is optional. We don't always want targets since when generating text we don't have targets.
    def forward(self, I, O = None):
        #I has shape BatchSize x BlockSize
        #One hot vector for tokens. Shape BatchSize x BlockSize x VocabSize
        U = torch.nn.functional.one_hot(torch.tensor(I).to(device), VocabSize).float()
        #Initial token embeddings. Shape BatchSize x BlockSize x d_model
        E = U @ self.We
        #Adds another dimension to Wp so that it is now 1 x BlockSize x d_model and can be added to E
        P = self.Wp.unsqueeze(0)  

        #Adds positional encoding to embedding
        E = E + P[:, :E.shape[1], :] #truncates the positional encoding matrix to only be as long as the number of embeddings in E
        E = self.Blocks(E)
        #print(E[1][1])
        logits = self.UnembedLayer(E).to(device)
        if O is None:
            loss = None
        else:
            #Converts O to a single list of expected outputs for compatibility with cross entropy functional
            O = O.view(BlockSize * BatchSize).to(device)
            #Cross entropy loss calculated on the raw logits rather than Softmaxed logits.
            #In theory, Cross entropy should be calculated on the Softmaxed logits, but
            #the cross_entropy function in python is defined to take in raw logits
            #If you try and pass the softmaxed logits in (as I originally tried), you will get
            #Vanishing gradient and your network wont train
            loss = torch.nn.functional.cross_entropy(logits, O)
        return logits, loss
    
    def generateText(self, I, Length):
        for i in range(Length):
            #Get predictions
            logits, loss = self(I)
            #Get probs
            probs = torch.nn.functional.softmax(logits, dim=-1)
            #Focus on prediction for next token
            probs = probs[-1, :]
            #Sample from next token distribution
            nextToken = torch.multinomial(probs, num_samples = 1).unsqueeze(0)
            #print(I)
            #Concatenate next token to current text
            I = torch.cat((I.to(device), nextToken.to(device)), dim=1)
        return I

In [16]:
T = Transformer()
T = T.to(device)

In [17]:
#Model training
#Prints parameters
#print(list(T.parameters()))
#Prints number of trainable parameters
print("Number of trainable parameters: " + str((sum(p.numel() for p in T.parameters() if p.requires_grad))))
#Create optimiser object
optimiser = torch.optim.AdamW(T.parameters(), lr=LearningRate)

Number of trainable parameters: 13829376


In [18]:
#Generate Text: Pre-Training

#Check loss
I, O = GetBatch("training")
logits, loss = T(I, O)
print(loss)
#Generates text of given length
tokens = sum(T.generateText(torch.zeros(1,1).long(), 150).tolist(), [])
print(decode(tokens))

NameError: name 'data' is not defined

In [None]:
#Train the model
start_time = time.time()
for iters in range(MaxIters + 1):
    #GetBatches
    I, O = GetBatch("training") #This works
    #Get loss from Transformer pass
    logits, loss = T(I, O) 
    optimiser.zero_grad(set_to_none=True)
    loss.backward()
    optimiser.step()
    if iters % 100 == 0:
        new_time = time.time()
        print(str(iters) + ": " + str(loss))
        print(str(new_time - start_time) + " seconds elapsed")
        start_time = start_time = time.time()

In [None]:
#Generate Text: Post-Training

#Check loss
I, O = GetBatch("training")
logits, loss = T(I, O)
print(loss)
#Generates text of given length
tokens = sum(T.generateText(torch.zeros(1,1).long(), 150).tolist(), [])
print(decode(tokens))