# Tex Generation

### Loading Libraries

In [1]:
# Numerical Computing
import numpy as np
import math

# Time
import time

# Data Manipulation
import polars as pl
import pandas as pd

# Data Visualization
import seaborn as sns
import matplotlib.pyplot as plt

# Network
import networkx as nx

# Scikit-Learn
import sklearn
from sklearn.manifold import TSNE

# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torchviz import make_dot
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import dataset
from torchvision import datasets, transforms
from torch.nn import TransformerEncoder, TransformerEncoderLayer

In [2]:
# Cuda's
use_cuda = torch.cuda.is_available()

device = torch.device("cuda" if use_cuda else "cpu")

In [3]:
torch.use_deterministic_algorithms(True)

In [4]:
# from torchtext.datasets import PennTreebank
# from torchtext.data.utils import get_tokenizer
# from torchtext.vocab import build_vocab_from_iterator

### Transformer

In [5]:
class Transformer(nn.Module):
    def __init__(self, num_token, num_inputs, num_heads, num_hidden, num_layers, dropout=0.3):
        super(Transformer, self).__init__()
        self.model_name = 'transformer'
        self.position_enc = PosEnc(num_inputs, dropout)
        layers_enc = TransformerEncoderLayer(num_inputs, num_heads, num_hidden, dropout)
        self.enc_transformer = TransformerEncoder(layers_enc, num_layers)
        self.enc = nn.Embedding(num_token, num_inputs)
        self.num_inputs = num_inputs
        self.dec = nn.Linear(num_inputs, num_token)
        self.init_params()

    def init_params(self):
        initial_rng = 0.12
        self.enc.weight.data.uniform_(-initial_rng, initial_rng)
        self.dec.bias.data.zero_()
        self.dec.weight.data.uniform_(-initial_rng, initial_rng)

    def forward(self, source, mask_source):
        source = self.enc(source) * math.sqrt(self.num_inputs)
        source = self.position_enc(source)
        op = self.enc_transformer(source, mask_source)
        op = self.dec(op)
        return op

def gen_sqr_nxt_mask(size):
    msk = torch.triu(torch.ones(size, size) * float('-inf'), diagonal=1)
    return msk

### Positional Encoding

In [6]:
class PosEnc(nn.Module):
    def __init__(self, d_m, dropout=0.2, size_limit=5000):
        super(PosEnc, self).__init__()
        self.dropout = nn.Dropout(dropout)
        p_enc = torch.zeros(size_limit, 1, d_m)
        pos = torch.arange(size_limit, dtype=torch.float).unsqueeze(1)
        divider = torch.exp(torch.arange(0, d_m, 2).float() * (-math.log(10000.0) / d_m))
        p_enc[:, 0, 0::2] = torch.sin(pos * divider)
        p_enc[:, 0, 1::2] = torch.cos(pos * divider)
        self.register_buffer('p_enc', p_enc)
        
    def forward(self, x):
        return self.dropout(x + self.p_enc[:x.size(0)])

### Model Training

In [7]:
tr_iter = PennTreebank(split='train')
tkzer = get_tokenizer('basic_english')
vocabulary = build_vocab_from_iterator(map(tkzer, tr_iter), specials=['<unk>'])
vocabulary.set_default_index(vocabulary['<unk>'])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def process_data(raw_text):
    numericalised_text = [torch.tensor(vocabulary(tkzer(text)), dtype=torch.long) for text in raw_text]
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, numericalised_text)))

tr_iter, val_iter, te_iter = PennTreebank()
training_text = process_data(tr_iter)
validation_text = process_data(val_iter)
testing_text = process_data(te_iter)

def gen_batches(text_dataset, batch_size):
    num_batches = text_dataset.size(0) // batch_size
    text_dataset = text_dataset[:num_batches * batch_size]
    text_dataset = text_dataset.view(batch_size, num_batches).t().contiguous()
    return text_dataset.to(device)

training_batch_size = 32
evaluation_batch_size = 16

training_data = gen_batches(training_text, training_batch_size)
validation_data = gen_batches(validation_text, evaluation_batch_size)
testing_data = gen_batches(testing_text, evaluation_batch_size)

In [8]:
max_seq_len = 64

def return_batch(src, k):
    sequence_length = min(max_seq_len, len(src) - 1 - k)
    sequence_data = src[k:k+sequence_length]
    sequence_label = src[k+1:k+1+sequence_length].reshape(-1)
    return sequence_data, sequence_label

### Placing HyperParameters

In [9]:
num_tokens = len(vocabulary) 
embedding_size = 256 
num_hidden_params = 256 
num_layers = 2 
num_heads = 2 
dropout = 0.25 
loss_func = nn.CrossEntropyLoss()
lrate = 10.0 

transformer_model = Transformer(num_tokens, embedding_size, num_heads, num_hidden_params, num_layers, 
                                     dropout).to(device)

optim_module = torch.optim.SGD(transformer_model.parameters(), lr=lrate)

sched_module = torch.optim.lr_scheduler.StepLR(optim_module, 1.0, gamma=0.88)

### Training 

In [10]:
def train_model():
    transformer_model.train()
    loss_total = 0.
    time_start = time.time()
    mask_source = gen_sqr_nxt_mask(max_seq_len).to(device)
    num_batches = len(training_data) // max_seq_len
    for b, i in enumerate(range(0, training_data.size(0) - 1, max_seq_len)):
        train_data_batch, train_label_batch = return_batch(training_data, i)
        sequence_length = train_data_batch.size(0)
        if sequence_length != max_seq_len:  # only on last batch
            mask_source = mask_source[:sequence_length, :sequence_length]
        op = transformer_model(train_data_batch, mask_source)
        loss_curr = loss_func(op.view(-1, num_tokens), train_label_batch)
        optim_module.zero_grad()
        loss_curr.backward()
        torch.nn.utils.clip_grad_norm_(transformer_model.parameters(), 0.6)
        optim_module.step()

        loss_total += loss_curr.item()
        interval = 100
        if b % interval == 0 and b > 0:
            loss_interval = loss_total / interval
            time_delta = time.time() - time_start
            print(f"epoch {ep}, {b}/{len(training_data)//max_seq_len} batches, training loss {loss_interval:.2f}, training perplexity {math.exp(loss_interval):.2f}")
            loss_total = 0
            time_start = time.time()

### Evaluation

In [11]:
def eval_model(eval_model_obj, eval_data_source):
    eval_model_obj.eval() 
    loss_total = 0.
    mask_source = gen_sqr_nxt_mask(max_seq_len).to(device)
    with torch.no_grad():
        for j in range(0, eval_data_source.size(0) - 1, max_seq_len):
            eval_data, eval_label = return_batch(eval_data_source, j)
            sequence_length = eval_data.size(0)
            if sequence_length != max_seq_len:
                mask_source = mask_source[:sequence_length, :sequence_length]
            op = eval_model_obj(eval_data, mask_source)
            op_flat = op.view(-1, num_tokens)
            loss_total += sequence_length * loss_func(op_flat, eval_label).item()
    return loss_total / (len(eval_data_source) - 1)

In [12]:
min_validation_loss = float("inf")
eps = 50
best_model_so_far = None

for ep in range(1, eps + 1):
    ep_time_start = time.time()
    train_model()
    validation_loss = eval_model(transformer_model, validation_data)
    print()
    print(f"epoch {ep:}, validation loss {validation_loss:.2f}, validation perplexity {math.exp(validation_loss):.2f}")
    print()

    if validation_loss < min_validation_loss:
        min_validation_loss = validation_loss
        best_model_so_far = transformer_model

    sched_module.step()

### Saving Model

In [13]:
testing_loss = eval_model(best_model_so_far, testing_data)

print(f"testing loss {testing_loss:.2f}, testing perplexity {math.exp(testing_loss):.2f}")

In [14]:
mdl_pth = './transformer.pth'

torch.save(best_model_so_far.state_dict(), mdl_pth)

### Loading `The Best Trained Model`:

In [15]:
transformer_cached = Transformer(num_tokens, embedding_size, num_heads, num_hidden_params, num_layers, 
                                     dropout).to(device)

transformer_cached.load_state_dict(torch.load(mdl_pth))

In [16]:
ln = 5
sntc = 'They are _'
sntc_split = sntc.split()
torch.manual_seed(34)
mask_source = gen_sqr_nxt_mask(max_seq_len).to(device)

with torch.no_grad():
    for i in range(ln):
        sntc = ' '.join(sntc_split)
        txt_ds = Tensor(vocabulary(sntc_split)).unsqueeze(0).to(torch.long)
        num_b = txt_ds.size(0)
        txt_ds = txt_ds.narrow(0, 0, num_b)
        txt_ds = txt_ds.view(1, -1).t().contiguous().to(device)
        ev_X, _ = return_batch(txt_ds, i+1)
        sequence_length = ev_X.size(0)
        if sequence_length != max_seq_len:
            mask_source = mask_source[:sequence_length, :sequence_length]
        op = transformer_cached(ev_X, mask_source)
        op_flat = op.view(-1, num_tokens)
        res = vocabulary.get_itos()[op_flat.argmax(1)[0]]
        sntc_split.insert(-1, res)
print(sntc[:-2])