In [None]:
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchmetrics import Perplexity
import re
from torch.nn import Parameter
import torch
from torch.nn.utils import clip_grad_norm_
from torch import nn, optim
from torch.utils.data import TensorDataset, DataLoader, Dataset
from tqdm import tqdm
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split

# UTILS

In [None]:
def repackage_hidden(h):
    """Wraps hidden states in new Tensors, to detach them from their history."""
    if isinstance(h, torch.Tensor):
        return h.detach()
    else:
        return tuple(repackage_hidden(v) for v in h)

class AverageMeter:

    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def train_one_epoch(model, train_loader, loss_fn, optimizer, epoch=None, hiddens=None):
    model.train()
    mi = Perplexity().to(device)
    loss_train = AverageMeter()
    with tqdm(train_loader, unit='batch') as tepochs:
        for x_batch, y_batch in tepochs:
            if epoch is not None:
                tepochs.set_description(f'epoch:{epoch}')
            yp, hiddens = model(x_batch.to(device), hiddens)
            loss = loss_fn(yp.transpose(2, 1).to(device), y_batch.to(device))
            loss.backward()
            hiddens = repackage_hidden(hiddens)
            clip_grad_norm_(model.parameters(), 0.25)
            optimizer.step()
            optimizer.zero_grad()
            maz = mi(yp, y_batch.to(device))

            tepochs.set_postfix(loss=loss_train.avg, pre=mi.compute())
            loss_train.update(loss.item())
    return model, loss_train.avg, mi.compute().item(), hiddens

def evaluate(model, test_loader, loss_fn, hiddens=None):
    model.eval()
    mi = Perplexity().to(device)
    loss_test = AverageMeter()
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            yp, hiddens = model(x_batch.to(device), hiddens)
            loss = loss_fn(yp.transpose(2, 1).to(device), y_batch.to(device))
            hiddens = repackage_hidden(hiddens)
            loss_test.update(loss.item())
            maz = mi(yp, y_batch)
    print(mi.compute())
    return loss_test.avg, mi.compute().item()

In [None]:
class LockedDropout(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x, dropout=0.5):
        if not self.training or not dropout:
            return x
        m = x.data.new(x.size(0), 1, x.size(2)).bernoulli_(1 - dropout)
        mask = m.requires_grad_(False) / (1 - dropout)
        mask = mask.expand_as(x)
        return mask * x

class WeightDrop(torch.nn.Module):
    def __init__(self, module, weights, dropout=0, variational=False):
        super(WeightDrop, self).__init__()
        self.module = module
        self.weights = weights
        self.dropout = dropout
        self.variational = variational
        self._setup()

    def widget_demagnetizer_y2k_edition(*args, **kwargs):
        # We need to replace flatten_parameters with a nothing function
        # It must be a function rather than a lambda as otherwise pickling explodes
        # We can't write boring code though, so ... WIDGET DEMAGNETIZER Y2K EDITION!
        # (╯°□°）╯︵ ┻━┻
        return

    def _setup(self):
        # Terrible temporary solution to an issue regarding compacting weights re: CUDNN RNN
        if issubclass(type(self.module), torch.nn.RNNBase):
            self.module.flatten_parameters = self.widget_demagnetizer_y2k_edition

        for name_w in self.weights:
            print('Applying weight drop of {} to {}'.format(self.dropout, name_w))
            w = getattr(self.module, name_w)
            del self.module._parameters[name_w]
            self.module.register_parameter(name_w + '_raw', Parameter(w.data))

    def _setweights(self):
        for name_w in self.weights:
            raw_w = getattr(self.module, name_w + '_raw')
            w = None
            if self.variational:
                mask = torch.autograd.Variable(torch.ones(raw_w.size(0), 1))
                if raw_w.is_cuda: mask = mask.cuda()
                mask = torch.nn.functional.dropout(mask, p=self.dropout, training=True)
                w = mask.expand_as(Parameter(raw_w)) * Parameter(raw_w)
            else:
                w = torch.nn.functional.dropout(Parameter(raw_w), p=self.dropout, training=self.training)
            setattr(self.module, name_w, Parameter(w))

    def forward(self, *args):
        self._setweights()
        return self.module.forward(*args)

def embedded_dropout(embed, words, dropout=0.1, scale=None):
    if dropout:
        mask = embed.weight.data.new().resize_((embed.weight.size(0), 1)).bernoulli_(1 - dropout).expand_as(embed.weight) / (1 - dropout)
        masked_embed_weight = mask * embed.weight
    else:
        masked_embed_weight = embed.weight
    if scale:
        masked_embed_weight = scale.expand_as(masked_embed_weight) * masked_embed_weight

    padding_idx = embed.padding_idx
    if padding_idx is None:
        padding_idx = -1

    X = torch.nn.functional.embedding(words, masked_embed_weight,
    padding_idx, embed.max_norm, embed.norm_type,
    embed.scale_grad_by_freq, embed.sparse
    )
    return X

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
bs = 50
seq = 100
step = 50
rnn_unit = 1200
embed_dim = 500
hn = None
eos = ['eos']
dp = 0.3
wd_dp = 0.2 
dp_h = 0.5
# HYPER PARAMETERS ARE NOT SET

In [None]:
tokenizer = get_tokenizer('basic_english')
train, valid, test = WikiText2()

In [None]:
class WikiSet(Dataset):
    
    def __init__(self, text, vocab=None):
        # cleaning and tokenizing data
        # NOT JUST SENDING EVERY SINGLE TRASH TO MODEL WITHOUT ANY CLEANES TO GET LOWER LOSS
        tokens = [tokenizer(sentence) + eos for sentence in
         ''.join(
             [word.lower() for word in 
                  ''.join([idx for idx in text]) if re.match("[A-Za-z0-9.',\s]", word)] 
         ).splitlines()
        if len(tokenizer(sentence)) > 20] 

        # building or getting vocab from input args
        if vocab:
            self.vocab = vocab  
        else:
            self.vocab = build_vocab_from_iterator(tokens, min_freq=3) # creating vocab
            self.vocab.set_default_index(self.vocab['unk']) # unk tag is set to default

        sequences = torch.LongTensor(
            [self.vocab[i] for z in tokens for i in z]).unfold(0, seq, step) # SHAPING DATA with torch.unfold()

        self.X, self.y = (lambda x: (x[:, :-1], x[:, 1:]))(sequences) # SEPERATING X, y
    
    
    def __getitem__(self, ind):
        return self.X[ind], self.y[ind]
    
    def __call__(self):
        """
        with calling the class you will get vocab
        """
        return self.vocab
    
    def __len__(self):
        return len(self.X)

In [None]:
train_set = WikiSet(train)
vocab = train_set()
valid_set = WikiSet(valid, vocab)


# NEXT

In [None]:
train_loader = DataLoader(train_set, bs, shuffle=True, drop_last=True)
valid_loader = DataLoader(valid_set, bs, shuffle=1024, drop_last=True) # SETTING SHUFFLE TO SOME SEED

In [None]:
class MyModel(nn.Module):

    def __init__(self, rnn_unit, n_layers, n_voc, embeds, dp, wd_dp, dp_h):
        super().__init__()
        self.embed = nn.Embedding(n_voc, embeds)
        self.lstms = nn.ModuleList()
        self.dp = dp
        self.dropouth = dp_h
        self.n_layers = n_layers
        self.rnn_unit = rnn_unit
        self.embedings = embeds
        for l in range(n_layers):
            inp = embeds if l==0 else rnn_unit
            out = rnn_unit if l != n_layers-1 else embeds
            self.lstms.append(WeightDrop(nn.LSTM(inp, out, 1, batch_first=True), ['weight_hh_l0'], wd_dp))
        self.fc = nn.Linear(embeds, n_voc)
        self.fc.weight = self.embed.weight
        self.lockdrop = LockedDropout()
    def forward(self, x, hns):
        x = self.lockdrop(embedded_dropout(self.embed, x, 0.1), self.dp)
        hiddens = list()
        for i, layer in enumerate(self.lstms):
            x, hn = layer(x, hns[i])
            hiddens.append(hn)
            if i != self.n_layers - 1:
                x = self.lockdrop(x, self.dropouth)
        x = self.lockdrop(x, self.dp)
        y = self.fc(x)
        return y, hiddens


    def init_hidden(self, bs):
        weight = next(self.parameters()).data

        return [(weight.new(1, bs, self.rnn_unit if l != self.n_layers - 1 else torch.tensor((self.embedings))),
                weight.new(1, bs, self.rnn_unit if l != self.n_layers - 1 else torch.tensor((self.embedings))))
                for l in range(self.n_layers)]

In [None]:
model = MyModel(rnn_unit, n_layers, len(vocab), embed_dim, dp, wd_dp, dp_h).to(device)

In [None]:
model

In [None]:
sum([p.numel() for p in model.parameters()])

In [None]:
optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=wd, momentum=0.9)
loss_fn = nn.CrossEntropyLoss()

In [None]:
loss_train_hist = list()
loss_valid_hist = list()
pre_train_hist = list()
pre_valid_hist = list()
best_pre_valid = torch.inf
hiddens = model.init_hidden(bs)
epoch_counter = 0

In [None]:
n = 50
for epoch in range(n):
    model, train_loss, pre, hiddens = train_one_epoch(model, train_loader, loss_fn, optimizer, epoch, hiddens)
    valid_loss, valid_pre = evaluate(model, valid_loader, loss_fn, hiddens)


    loss_train_hist.append(train_loss)
    loss_valid_hist.append(valid_loss)

    pre_train_hist.append(pre)
    pre_valid_hist.append(valid_pre)

    if valid_pre < best_pre_valid:
        torch.save(model,'modelx1.pt')
        best_pre_valid =  valid_pre
        print('Model SAVED')

    epoch_counter +=1

In [None]:
plt.figure(figsize=(8, 6))
plt.plot(range(epoch_counter), pre_train_hist, 'r', label='Train')
plt.plot(range(epoch_counter), pre_valid_hist, 'g', label='Test')
plt.xlabel('Epochs')
plt.ylabel('Prp')
plt.legend()
plt.grid(True)


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(range(epoch_counter), loss_train_hist, 'r', label='Train')
plt.plot(range(epoch_counter), loss_valid_hist, 'g', label='Test')
plt.xlabel('Epochs')
plt.ylabel('Prp')
plt.legend()
plt.grid(True)


In [None]:
hiddens = model.init_hidden(1)

In [None]:
first_string = 'Hi i am a language model'
input_eval = [vocab[c] for c in first_string]
input_eval = torch.LongTensor(input_eval).unsqueeze(dim=0).to(device)

In [None]:
text_generated = ['Hi i am a language model ']
for i in range(50):
    model.eval()
    predictions, hiddens = model(input_eval, hiddens)
    predictions = predictions.squeeze() / .7
    last_argm = torch.multinomial(F.softmax(predictions, dim=-1), num_samples=1)[-1]
    message = torch.cat((input_eval[0], last_argm))[1:]
    input_eval = message.unsqueeze(0)
    text_generated.append(index2char[last_argm.cpu()])


In [None]:
''.join(text_generated).split(eos)