In [1]:
#@title
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple

import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
import numpy as np
import random
import copy
import torch.optim as optim
import pickle
import os
import pandas as pd
import time

My custom dataloaders

In [4]:
#@title
#Building an iterator over minibatches of inputed text after tensor encoding
class Dataloader_iter(object):
  def __init__(self, input, output, batch_size, shuffle_batch=False, inp_transformation=None, out_transformation=None, device = torch.device("cuda" if torch.cuda.is_available() else "cpu")):
    if inp_transformation is not None:
      self.input = inp_transformation(input).to(device)
    else:
      self.input = input.to(device)

    if out_transformation is not None:
      self.output = out_transformation(output).to(device)
    else:
      self.output = output.to(device)
    
    self.shuffle = shuffle_batch
    self.batch_size = batch_size
    self.length= len(self.input)

  def __iter__(self):
    self.iter_idx = 0
    self.idx = [k for k in range(self.length//self.batch_size)]
    if self.shuffle:
      random.shuffle(self.idx)

    while (self.iter_idx < int(self.length/self.batch_size)):
      sample =  self.idx[self.iter_idx-1]
      yield self.input[sample * self.batch_size : (sample+1) * self.batch_size], self.output[sample * self.batch_size : (sample+1) * self.batch_size]
      self.iter_idx += 1

  def __len__(self):
    return int(self.length/self.batch_size)
      
 

#transformer dataloader
def get_tranformer_dataloader(*args, **kwargs):
  generator = Dataloader_iter(*args, **kwargs)
  class new_generator():
    def __init__(self, generator):
      self.generator = generator
      self.len = len(generator)
    
    def __iter__(self):
      self.iter_idx = 0
      iterator = iter(self.generator)
      while self.iter_idx < self.len:
        x,y = next(iterator)
        yield ([x, y[:, :-1]], y[:, 1:])
        self.iter_idx +=1
    def __len__(self):
      return self.len

  return new_generator(generator)

My custom tokenizer and text en/decoder

In [5]:
#@title
#Word level tokenizer and text encoder/decoder
class My_word_tokenizer():
  def __init__(self, text):
    self.punctuations = list(".,?:[]()!;/,-_}{")
    if (type(text)!=str):
      text = " ".join([elem for elem in text])
    transformed_text = self.punctuation(text, "split")
    self.tokens = list(set(transformed_text.split(" "))) 
    self.token_encoding = {token : torch.tensor([k]).unsqueeze(0) for k, token in enumerate(self.tokens)}
    self.token_onehot ={k : [1 if l ==k else 0 for l in range(len(self.tokens))] for k in range(len(self.tokens))}
      
    
  #method for placing gaps before punctuation signs so that they will tokenized like words during encoding
  #and also for removing same gaps before text decoding.
  def punctuation(self, text, mode):
    for p in self.punctuations:
      if mode == "split":
        text=text.replace(p, " "+p) 
      elif mode == "join":
        text = text.replace(" "+p , p)
      else:
        print("Mode selection error")
    return text 


  def text_encoding(self, text):
    text = (self.punctuation(text, "split")).split(" ")
    return torch.cat([self.token_encoding[word] for word in text], dim=1)


  #greedy decoding of the text
  def text_decoding(self, output):
    text = " ".join(map(str, [list(self.token_encoding.keys())[word.detach()] for word in output.squeeze()]))
    text = self.punctuation(text, "join")
    return text


NN Layers

In [6]:
#@title
#define linear layer class
class linear_layer(nn.Module):
    def __init__(self, hyper_param): #(inp_dim, hid_dim, bias_is_true, relu_is_true)
        super().__init__()
        (self.inp_dim, self.hid_dim, self.bias_is_true, self.relu_is_true) = hyper_param
        self.weight = nn.Parameter(torch.randn(self.inp_dim, self.hid_dim)/torch.sqrt(torch.tensor(self.inp_dim)))
        if self.bias_is_true:
            self.bias = nn.Parameter(torch.randn(self.hid_dim))
        self.relu =nn.ReLU()
        
    def forward(self, input):
        output= torch.tensordot(input, self.weight,  dims = ([-1],[0]) ) 
        if self.bias_is_true:
            output+= self.bias
        if self.relu_is_true:
            output = self.relu(output)
        return output





#-----------------------------------------------------------------------------------
#define FeedForward class
class FeedForward(nn.Module):
    def __init__(self, hyper_param): #([(dim_in, dim_out, bias_is_true, relu_is_true) ,(), ... ])
        nn.Module.__init__(self) 
        self.hyper_param = hyper_param
        self.layers = nn.ModuleList([linear_layer(param) for param in self.hyper_param])

    def forward(self, input):
        output = input
        for layer in self.layers:
            output = layer(output)
        return output


class attention(nn.Module):
    def __init__(self, hyper_param): #(dim_in, dim_key, dim_heads)
        nn.Module.__init__(self) 
        (self.dim_in, self.dim_key, self.heads) = hyper_param
        self.attention = nn.ModuleList([linear_layer((self.dim_in, self.dim_key*self.heads, False, False)) for _ in range(3)])
        self.final = linear_layer((self.heads * self.dim_key, self.dim_in, False, False))
        self.softmax = nn.Softmax(dim = -1)
        self.dropout = nn.Dropout(0.1)
    
    def forward(self, inputs, mask=None):
        q,k,v = tuple(layer(inp).view(inp.size(0), inp.size(1), self.heads, self.dim_key).transpose(1,2) for inp, layer in zip(inputs, self.attention))
        score = torch.matmul(q,k.transpose(-1,-2)).masked_fill(mask==0, -1e9) if mask is not None else torch.matmul(q,k.transpose(-1,-2))
        p_atten = self.dropout(self.softmax(score/torch.sqrt(torch.tensor(self.dim_key))))
        preactivation = torch.matmul(p_atten,v).transpose(1,2).reshape(v.size(0), -1, self.heads*self.dim_key)
        return self.final(preactivation)



#---------------------------------------------------------------------------------
#define layer norm class
#The Annotated transformer adds 2 extra learnable parameters in this layer --I don't.
class LayerNorm(nn.Module):
    def __init__(self):
        super().__init__()
        self.epsilon= 10**(-7)


    def forward(self, input):
        mean = torch.mean(input, dim= -1, keepdim= True)
        std = torch.std(input, dim=-1, keepdim= True) + self.epsilon
        output = (input - mean)*(1/std)
        return output






#-----------------------------------------------------------------------------------
#Skip connection and layer normalization decorator + dropout 
def SkipAndNormalize_decorator(cls):
    class ResNorm_wrapper(nn.Module):
        def __init__(self, hyper_param):
            super().__init__()
            self.layers = nn.ModuleList([cls(hyper_param), LayerNorm()])
            self.dropout = nn.Dropout(0.1)

        
        def forward(self, residual_stream, *input):
            h1 = self.layers[0](*input) + residual_stream
            return self.dropout(self.layers[1](h1))
    return ResNorm_wrapper    



@SkipAndNormalize_decorator
class SkipAttention(attention):
    def __init__(self, hyper_param): #(dim_in, dim_key, dim_heads)
        super().__init__(hyper_param)
        



@SkipAndNormalize_decorator
class SkipFeedForward(FeedForward):
    def __init__(self, hyper_param): #[(dim_in, dim_out, bias_is_true, relu_is_true),(),()]
        super().__init__(hyper_param)
        


#-----------------------------------------------------------------------------------
#Helper function for getting attention and FF blocks for building En/De-coders, Transformers
def get_coder_blocks(dim_in, dim_key, heads, dim_internal, device = torch.device("cuda" if torch.cuda.is_available() else "cpu")):
    att_hyperparams = (dim_in, dim_key, heads)
    ff_hyperparams = [(dim_in, dim_internal, True, True), (dim_internal, dim_in, True, False)]
    return SkipAttention(att_hyperparams).to(device), SkipFeedForward(ff_hyperparams).to(device)



#-----------------------------------------------------------------------------------
#define encoder class
class Encoder(nn.Module):
    def __init__(self, skip_attention, skip_feedforward):
        super().__init__()
        self.layers = nn.ModuleList([skip_attention, skip_feedforward])

    
    def forward(self, input, mask=None): 
        inputs = [input for _ in range(3)]
        h1= self.layers[0](input, inputs, mask)
        output = self.layers[1](h1, h1)
        return output



# define decoder class
class Decoder(nn.Module):
    def __init__(self, skip_attention, skip_feedforward):
        super().__init__()
        self.layers = nn.ModuleList([skip_attention, copy.deepcopy(skip_attention), skip_feedforward ])
    

    def forward(self, input, enc_mask=None, dec_mask=None): #input = list [encoder_output, decoder_input]
        encoder_output = input[0]
        decoder_input = input[1]
        h1 = self.layers[0](decoder_input, [decoder_input for _ in range(3)], dec_mask)
        h2 = self.layers[1](h1, [h1, encoder_output, encoder_output], enc_mask)
        output = self.layers[2](h2, h2)
        return [encoder_output, output]




#-----------------------------------------------------------------------------------
#define encoder/decoder stack decorator
def get_stack(cls):
    class Stack_wrapper(nn.Module):
        def __init__(self, skip_attention, skip_feedforward, copies):
            super().__init__()
            blocks = [[copy.deepcopy(skip_attention), copy.deepcopy(skip_feedforward)] for _ in range(copies)]
            self.layers = nn.ModuleList([cls(*block) for block in blocks])

        def forward(self, input, *masks): #there can be 1 or 2 masks depending on whether we are stacking encoders or decoders
            output = input
            for layer in self.layers:
                output = layer(output, *masks)
            return output
    return Stack_wrapper
            

@get_stack
class EncoderStack(Encoder):
    def __init__(self, skip_attention, skip_feedforward):
        super().__init__(skip_attention, skip_feedforward)


@get_stack
class DecoderStack(Decoder):
    def __init__(self, skip_attention, skip_feedforward):
        super().__init__(skip_attention, skip_feedforward)





#-----------------------------------------------------------------------------------
#transformer layer class
class TransformerLayer(nn.Module):
    def __init__(self, skip_attention, skip_feedforward, copies, device = torch.device("cuda" if torch.cuda.is_available() else "cpu")):
        nn.Module.__init__(self) 
        self.layers = nn.ModuleList([EncoderStack(skip_attention, skip_feedforward, copies).to(device), DecoderStack(copy.deepcopy(skip_attention), copy.deepcopy(skip_feedforward), copies).to(device)])


    def forward(self, inputs, enc_mask=None, dec_mask=None): #inputs is a list [encoder input, decoder input]
        h1 = self.encode(inputs[0], enc_mask)
        return self.decode([h1, inputs[1]], enc_mask, dec_mask)


    def encode(self, input, enc_mask=None):
        return self.layers[0](input, enc_mask)

    def decode(self, inputs, enc_mask=None, dec_mask=None):
        output = self.layers[1](inputs, enc_mask, dec_mask)
        return output[1]



#-----------------------------------------------------------------------------------
#Positional encoding class
class Positional_enc(nn.Module):
    def __init__(self, dim_in, max_dim=500):
        nn.Module.__init__(self)
        self.dim_in, self.max_dim = dim_in, max_dim
        #construct positional encoding for single batch element
        argument = torch.tensordot(torch.arange(max_dim, dtype=torch.float), torch.exp(-math.log(1000) *torch.arange(0, dim_in, 2, dtype=torch.float)/dim_in) , dims= 0)
        pos_enc= torch.empty(max_dim, dim_in)
        pos_enc[:, 0::2] = torch.sin(argument)
        pos_enc[:, 1::2] = torch.cos(argument)
        #introduce batch dimension (=0) 
        pos_enc = pos_enc.unsqueeze(0)
        self.register_buffer("pos_enc", pos_enc)
        self.dropout = nn.Dropout(0.1)

    def forward(self, input):
        input =input + self.pos_enc[:, :input.size(1), :].requires_grad_(False)
        return self.dropout(input)


#-----------------------------------------------------------------------------------
#Mask
def construct_mask(size, device = torch.device("cuda" if torch.cuda.is_available() else "cpu")):
    uppertri = torch.triu(torch.ones(1, size, size), diagonal=1)
    return (uppertri ==0).to(device)




#-----------------------------------------------------------------------------------
#include positional encoding, masking, and softmax
class Transformer_model(nn.Module):
    def __init__(self, position_enc, enc_embedding, dec_embedding, transformer, linear, start, end):
        nn.Module.__init__(self) 
        self.layers = nn.ModuleList([enc_embedding, dec_embedding, transformer, linear])
        self.position_enc = [position_enc, copy.deepcopy(position_enc)]
        self.softmax = nn.Softmax(dim=-1)
        self.start = start
        self.end = end

    def embed_encode(self, input, enc_mask=None):
        h1 = self.layers[0](input)
        encoder_input = self.position_enc[0](math.sqrt(h1.size(-1))*h1)
        enc_output = self.layers[2].encode(encoder_input, enc_mask)
        return enc_output 
    
    def embed_decode(self, enc_output, dec_input, enc_mask=None): #inputs is a list [enc_input, dec_input]
        h2 = self.layers[1](dec_input)
        h2= self.layers[2].decode([enc_output,  self.position_enc[1](math.sqrt(h2.size(-1))*h2 )], enc_mask, construct_mask(h2.size(1)) ) 
        return self.layers[3](h2)

    def forward(self, inputs, enc_mask=None): #inputs = [enc_input, dec_input] returns a tuple of outputs and probabilities
        enc_output = self.embed_encode(inputs[0])
        presoftmax_out = self.embed_decode(enc_output, inputs[1], enc_mask)
        probabilities = self.softmax(presoftmax_out) 
        output_seq = torch.argmax(probabilities, dim =-1)
        return output_seq, presoftmax_out


    def autoregression(self, input, length, enc_mask = None, mode= "greedy"): #input= encoder_input (1 x seq_length)
        enc_output = self.embed_encode(input)
        inp = self.start #(batch_size x seq_length) 
        for _ in range(length):
            prob = self.softmax(self.embed_decode(enc_output, inp, enc_mask)) 
            
            # Greedy inference
            if (mode == "greedy"):
                out = torch.argmax(prob, dim =-1)
                next_word = (torch.tensor([out[:, -1]]).unsqueeze(0) ).to(out, non_blocking=True)
                inp = torch.cat([inp, next_word], dim=1)
                if next_word == self.end:
                    break
                else: 
                    continue
            
            # Beam search using Bayesian inference for next two words A_1, A_2 
            # Our strategy is: Starting from the two most likely tokens for A_1, we predict the most likely next token A_2max(A_1) given each choice and then select the A_1 that maximizes P(A_1)*P(A_2max | A_1)
            # In this way, the next word A_1 is chosen so that it maximizes P(A_1 U A_2)
            elif (mode == "beam"):
                largest_two_probabilities, likely_words = torch.topk(prob[:,-1], 2, dim= -1)
                predicted_batch = torch.cat([inp, inp], dim =0)
                predicted_batch=torch.cat([predicted_batch, likely_words.transpose(0,1)], dim=1)
                new_enc_output = torch.cat([enc_output]*2, dim=0)
                h = self.softmax(self.embed_decode(new_enc_output, predicted_batch.type(torch.long), enc_mask))[:,-1] 
                next_probabilities, _ = torch.max(h.detach(), dim = -1)

                most_likely_word_idx =(torch.argmax(largest_two_probabilities.squeeze()* next_probabilities)).item()
                next_word = likely_words[:, most_likely_word_idx].unsqueeze(0)
                inp = torch.cat([inp, next_word], dim=1)
                if next_word == self.end:
                    break
                else: 
                    continue
        return inp
        


Tranformer model construction helper functions

In [7]:
#@title
#Transformer model 
def get_registered_Transformer_model(in_vocab_size, out_vocab_size, dim_in, dim_key, heads, dim_internal, copies, lr, start, end, optimizer = "sgd", device = torch.device("cuda" if torch.cuda.is_available() else "cpu")):
    attention_block, FF_block = get_coder_blocks(dim_in, dim_key, heads, dim_internal)
    transformer = TransformerLayer(attention_block, FF_block, copies).to(device)
    position_enc = Positional_enc(dim_in, max_dim= 5000).to(device)
    linear = linear_layer((dim_in, out_vocab_size, False, False)).to(device)
    enc_embedding=nn.Embedding(in_vocab_size, dim_in).to(device)
    dec_embedding=nn.Embedding(out_vocab_size, dim_in).to(device)

    model = Transformer_model(position_enc, enc_embedding, dec_embedding, transformer, linear, start, end).to(device)
    #initializing according to the transformer paper
    for p in model.parameters(): 
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    if optimizer == "adam":
        opt = optim.Adam(model.parameters(), lr, betas=(0.9, 0.98), eps=1e-9)
    elif optimizer == "sgd":
        opt = optim.SGD(model.parameters(), lr)
    else:
        print("Optimizer choice not recognized")
    
    return model, opt 



Model training class

In [8]:
#@title

class Model_training(nn.Module):
    def __init__(self, model, opt, loss):
        super().__init__()
        self.model = model
        self.opt =opt
        self.loss = loss
    

    def run_transformer_epoch(self, train_dataloader, tokenizer, device):
        self.model.train()
        train_iterator = iter(train_dataloader)
        total_loss =0
        t1= time.time()
        for input_batch, output_batch in train_iterator:
            _, out_prob = self.model(input_batch)

            #Convert output words from integers to one-hot vectors ---I could do this separately and define a dictionary to increase speed
            expectation = torch.tensor([[tokenizer.token_onehot[int(seq_elem)] for seq_elem in batch_elem] for batch_elem in output_batch], dtype=torch.float).to(device)

            loss = self.loss(out_prob, expectation).to(device)
            total_loss += loss
            
            loss.backward()
            self.opt.step()
            self.opt.zero_grad()
        if self.scheduler is not None:
            self.scheduler.step()
        t2= time.time() 
        return t2-t1, total_loss   



    def evaluate_transformer_output(self, tokenizer, sample_input):
        self.model.eval()
        text_instance_encoded = self.model.autoregression(sample_input, 62)
        if tokenizer is not None:
            output = tokenizer.text_decoding(text_instance_encoded)
        else:
            output = text_instance_encoded
        return output
    

    def fit_transformer(self, epochs, train_dataloader, test_dataloader=None, tokenizer = None, sample_input=None, device = torch.device("cuda" if torch.cuda.is_available() else "cpu")):
        for epoch in range(epochs):
            dt, epoch_loss = self.run_transformer_epoch(train_dataloader, tokenizer, device)
            print(f"Epoch: {epoch} -- time: {dt} -- loss = {epoch_loss}\n")

            output = self.evaluate_transformer_output(tokenizer, sample_input)
            print(output)



NN functionalities class

In [9]:
#@title
# class for model operations 
# USE .pt FOR SAVING MODELS!! The other methods seem to generate small errors in the saved tensors that ruin saved network performance.
class NN_operating_tools(Model_training, nn.Module):
    def __init__(self, model, opt, learning_rate_schedule=None, saved_model=None): #learning_rate_schedule must be a lambda function
        nn.Module.__init__(self)
        self.model = model
        self.opt =opt
        if learning_rate_schedule is not None:
            self.scheduler = torch.optim.lr_scheduler.LambdaLR(self.opt, learning_rate_schedule)
        else:
            self.scheduler =None
        self.loss = nn.CrossEntropyLoss()
        Model_training.__init__(self, model, opt, self.loss)

        if saved_model is not None:
            self.load_model(saved_model)

    
    def load_model(self, saved_model):
        dir =os.path.join("saved_models", saved_model)
        file_opener={".csv": self.load_model_csv, ".pt": self.load_model_pt, ".npy" : self.load_model_npy, ".pkl" : self.load_model_pkl}
        idx = saved_model.find(".")
        file_opener[saved_model[idx:]](dir)


    def load_model_pt(self, directory):
        t_load = torch.load(directory)
        self.model.load_state_dict(t_load)

    def load_model_csv(self, dir):
        load_model = pd.read_csv(dir)
        for idx, param in enumerate(self.model.parameters()):
            with torch.no_grad():
                param.copy_(torch.tensor(load_model.iloc[idx, 1]))


    def load_model_npy(self, dir):
        load_model = np.load(dir, allow_pickle=True)
        for k, param in enumerate(self.model.parameters()):
            with torch.no_grad():
                param.copy_(torch.from_numpy(load_model[k]))


    def load_model_pkl(self, dir):
        with open(dir, "rb") as f:
            load_model = pickle.load(f)
        for k, param in enumerate(self.model.parameters()):
                with torch.no_grad():
                    param.copy_(torch.tensor(load_model[k]))

    
    def save_model(self, name):
        dir =os.path.join("saved_models", name)
        file_saver={".csv": self.save_csv, ".pt": self.save_pt, ".npy" : self.save_npy, ".pkl" : self.save_pkl}
        idx = name.find(".")
        file_saver[name[idx:]](dir)
        

    def save_pt(self, directory):
        torch.save(copy.deepcopy(self.model.state_dict()), directory)


    def save_npy(self, directory):
        trained_model = [copy.deepcopy(p.detach()).numpy() for p in self.model.parameters()]
        np.save(directory, trained_model)

    def save_csv(self,directory):
        pass

    def save_pkl(self,directory):
        pass




Data

In [10]:
mytextpiece = "<start> When I was young, I never needed anyone and making love was just for fun. Those days are gone! Living alone, I think of all the friends I've known but when I dial the telephone nobody is home. All by myself, I don't wanna be all by myself, anymore! <end>"
mytranslationpiece = "<start> Οταν ημουν νεος, δεν χρειαζομουν κανενα και το να κανω ερωτα ηταν απλα για ευχαριστηση. Αυτες οι μερες εχουν περασει! Ζωντας μονος μου, σκεφτομαι ολους τους φιλους που ξερω αλλα οταν τους καλω στο τηλεφωνο κανεις δεν ειναι σπιτι. Ολομοναχος, δε θελω πια να ειμαι ολομοναχος!"
mytext = [mytextpiece for _ in range(2000)]
mytranslation = [mytranslationpiece for _ in range(2000)]

Data processing helper functions

In [11]:
#@title
# input/output texts are expected to be lists of text pieces, like sentences/paragraphs/books to be translated, etc. 
# All list elements are assumed to be word sequences of the same length. If not, a fake token needs to be introduced to ensure that.
def get_tranformer_data(input_text, output_text, batch_size, device = torch.device("cuda" if torch.cuda.is_available() else "cpu")):
  tokenizer_in = My_word_tokenizer("".join(input_text))
  tokenizer_out = My_word_tokenizer("".join(output_text))
  encoded_input = torch.cat([tokenizer_in.text_encoding(in_piece) for in_piece in input_text], dim = 0)
  encoded_output = torch.cat([tokenizer_out.text_encoding(out_piece) for out_piece in output_text], dim = 0)
  dataloader = get_tranformer_dataloader(encoded_input, encoded_output, batch_size, True)
  start = tokenizer_out.token_encoding["<start>"].to(device)
  end = tokenizer_out.token_encoding["<end>"].to(device)
  return tokenizer_in, tokenizer_out, dataloader, start, end

def get_dataloader(data, seq_length, batch_size):
  data = data[:(len(data)//seq_length)*seq_length].view(-1,seq_length) #organizes flat data tensor to fixed-length sequences
  return get_tranformer_dataloader(data, data, batch_size, shuffle_batch=True) #creates dataloader with random shuffling and fixed batch size

def process_data(train_iter, coders_cls, seq_length, batch_size):
  coders = coders_cls(train_iter)  #generates vocab, contains tokenizer, text encoders and decoders
  text_data = " ".join(list(item for item in train_iter))  #merges text items of Wikitext2 generator to form single text
  train_data = coders.text_encoding(text_data).view(-1).to(device)  #encodes text into flat tensor and sends it to device
  return coders, get_dataloader(train_data, seq_length, batch_size)  

Get dataloaders

In [12]:
tokenizer_in, tokenizer_out, dataloader, starting_wrd, ending_wrd = get_tranformer_data(mytext, mytext, 80)
encode=tokenizer_in.text_encoding
decode=tokenizer_out.text_decoding

Hyperparameters

In [13]:
vocab = tokenizer_in.tokens
ntokens = len(vocab)  # size of vocabulary
emsize = 60  # embedding dimension
d_hid = 80  # dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 3  # number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 6

Schedule

In [14]:
#@title
# example of a learning rate schedule used in Annotated Transformer
def learning_rate_function(model_size, factor, warmup_steps):
    return lambda epoch : factor* (model_size)**(-0.5) * min((epoch+1)**(-0.5), (epoch+1) * (warmup_steps)**(-1.5)) 

def learning_rate_step(factor, drop, time):
    return lambda epoch : factor/(drop**(epoch//time))

Model instance

In [15]:
model, opt = get_registered_Transformer_model(in_vocab_size=ntokens, out_vocab_size=ntokens, dim_in = emsize, dim_key= emsize//nhead, heads = nhead, dim_internal=d_hid, copies = nlayers, lr = 1., start=starting_wrd, end = ending_wrd)
model_operate = NN_operating_tools(model, opt, learning_rate_schedule= learning_rate_function(emsize, factor=2.5, warmup_steps = 8))

Data instance and untrained transformer output

In [None]:
#@title
data = iter(dataloader)
(x,y) = next(data)
print(decode(x[0][0]))
print(decode(x[1][0]))
print(decode(y[0]))

In [None]:
#@title
model.eval()
out, _ = model(x)
decode(out[0])

In [None]:
#@title
decode(model.autoregression(x[0][0].unsqueeze(0), 62, mode="greedy"))

Training

In [19]:
model_operate.fit_transformer(25, dataloader, tokenizer=tokenizer_out, sample_input=x[0][0].unsqueeze(0))

Epoch: 0 -- time: 2.5308990478515625 -- loss = 139.3667755126953

<start> friends friends friends for friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends
Epoch: 1 -- time: 2.4642035961151123 -- loss = 139.05274963378906

<start> friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends friends

Trained transformer autoregression output

In [21]:
#@title
model.eval()
decode(model.autoregression(x[0][0].unsqueeze(0), 62, mode="beam"))

"<start> When I was young, I never needed anyone and making love was just for fun. Those days are gone! Living alone, I think of all the friends I've known but when I dial the telephone nobody is home. All by myself, I don't wanna be all by myself, anymore! <end>"

In [22]:
out, _ =model(x)
decode(out[0])

"When I was young, I never needed anyone and making love was just for fun. Those days are gone! Living alone, I think of all the friends I've known but when I dial the telephone nobody is home. All by myself, I don't wanna be all by myself, anymore! <end>"

In [20]:
model_operate.save_pt("small_trained_transformer.pt")