# Model

Here we define our pytorch model. Pytorch model we are targeting is LSTM based. It encodes past historical dependencies and helps to predict the future instances. Unlike RNN, it doesn't suffer much from Vanishing gradients.

In [3]:
#default_exp seq2seq.model

In [4]:
#export
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils import rnn
from torch.utils.data import DataLoader, Dataset
from dotmap import DotMap
from typing import Dict

import collections
import math

import numpy as np
import torch

from fastai.text.all import *

# Metrics

Here we define metrics for us to evaluate the model.

In [5]:
#export

def acc_cm(preds, labels, nb_clss):
    """Calculates all confusion matrix based metrics."""
    labels = labels.view(-1)
    acc = (labels == preds).float().mean()

    cm = torch.zeros((nb_clss, nb_clss), device=labels.device)
    for label, pred in zip(labels, preds):
        cm[label.long(), pred.long()] += 1

    tp = cm.diagonal()[1:].sum()
    fp = cm[:, 1:].sum() - tp
    fn = cm[1:, :].sum() - tp
    return (acc, tp, fp, fn)


# Model

Here, model is defined as encoder / decoder architecture with LSTM.

In [6]:
#export
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PAD_ID = 1

class EncoderRNN(nn.Module):
    def __init__(self, config):
        super(EncoderRNN, self).__init__()
        self.input_size = config['n_channels']
        self.hidden_size = config['encoder_hidden']
        self.layers = config.get('encoder_layers', 1)
        self.dnn_layers = config.get('encoder_dnn_layers', 0)
        self.dropout = config.get('encoder_dropout', 0)
        self.bi = config.get('bidirectional_encoder', False)
        
        if self.dnn_layers > 0:
            for i in range(self.dnn_layers):
                self.add_module('dnn_'+ str(i), nn.Linear(
                    in_features = self.input_size if i == 0 else self.hidden_size,
                    out_features = self.hidden_size))
        
        gru_input_dim = self.input_size if self.dnn_layers == 0 else self.hidden_size
        self.rnn = nn.GRU(
            gru_input_dim,
            self.hidden-size,
            self.layers,
            dropout = self.dropout,
            bidirectional= self.bi,
            batch_first = True)
        
        self.gpu = config.get('gpu', False)
        
    def run_dnn(self, x):
        for i in range(self.dnn_layers):
            x = F.relu(getattr(self, 'dnn_'+str(i))(x))
        
        return x
    
    def forward(self, inputs, hidden, input_lengths):
        if self.dnn_layers > 0:
            inputs = self.run_dnn(inputs)
            
        x = pack_padded_sequence(inputs, input_lengths, batch_first=True)
        output, state = self.rnn(x, hidden)
        output, _ = pad_packed_sequence(output, batch_first= True, padding_value=0.)
        
        if self.bi:
            output = output[:, :, :self.hidden_size] + output[:,:,self.hidden_size:]
        
        return output, state
    
    def init_hidden(self, batch_size):
        h0 = Variable(torch.zeros(2 if self.bi else 1, batch_size, self.hidden_size))
        if self.gpu:
            h0 = h0.cuda()
        return h0
        

In [None]:
class Decoder(nn.Module):
    ''' Decoder takes input from encoder and outputs the prediction '''
    def __init__(self, config):
        super(Decoder, self).__init__()
        self.batch_size = config['batch_size']
        self.hidden_size = config['decoder_hidden']
        embedding_dim = config.get('embedding_dim', None)
        self.embedding_dim = embedding_dim if embedding_dim is not None else self.hidden_size
        self.embedding = nn.Embedding(config.get('n_classes', 32), self.embedding_dim, padding_idx = 0.0)
        self.rnn= nn.GRU(
        input_size = self.embedding_dim + self.hidden_size if config['decoder'].lower() == 'bahdanau' else self.embedding_dim)
        
        hidden_size = self.hidden_size,
        num_layers = config.get('decoder_layers', 1)
        dropout = self.get('decoder_dropout', 0)
        bidirectional = config.get('bidirectional_decoder', False),
        batch_first=Tr
        

In [None]:
#export
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PAD_ID = 1

class RNN(pl.LightningModule):
    def __init__(self, hp:Dict, char2tensor, vocab, learning_rate=0.02):
        super().__init__()
        # char2tensor needs to be passed
        self.hparams = hp
        self.learning_rate = learning_rate
        self.num_layers = hp.num_layers
        self.hidden_size = hp.hidden_size
        self.output_size = hp.vocab_size
        self.input_size = hp.vocab_size
        self.embed_size = hp.embedding_size
        self.char2tensor = eval(char2tensor)
        self.vocab = eval(vocab)
        self.dropout_p = 0.2
        self.dropout = nn.Dropout(self.dropout_p)

        self.embedding = nn.Embedding(self.input_size, self.embed_size,scale_grad_by_freq=True)
        self.rnn = nn.LSTM(input_size = self.embed_size, hidden_size=self.hidden_size, dropout= self.dropout_p, num_layers = self.num_layers, batch_first=True)
        self.decoder = nn.Linear(self.hidden_size, self.output_size)


        #self.criterion = nn.NLLLoss()
        self.criterion = nn.CrossEntropyLoss()

        self.save_hyperparameters()
        #self.init_weights()

    def init_weights(self):

        for name, param in self.rnn.named_parameters():
            if 'bias' in name:
                nn.init.constant_(param, 0.25)
            elif 'weight' in name:
                nn.init.xavier_uniform_(param,gain=nn.init.calculate_gain('sigmoid'))

        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight.data, gain=nn.init.calculate_gain('relu'))

    def forward(self, input_seq, hidden_state):


        embedding  = self.embedding(input_seq)
        embedding = self.dropout(embedding)
        output, hidden_state = self.rnn(embedding, hidden_state)
        output = self.decoder(output)

        #output = F.log_softmax(output, -1)
        return output, hidden_state

    def init_hidden(self, batch_size):
        h = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        c = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)

        return h,c

    def training_step(self, batch, batch_idx):
        src, tgt, lengths = batch

        hidden_state = self.init_hidden(src.shape[0])
        loss = 0
        chunk_len = src.shape[1]

        #for j in range(chunk_len):
        #    output, hidden_state = self.forward(src[:,j],hidden_state)
        #    output = output.reshape(output.shape[1]*output.shape[0],-1)
        output, hidden_state = self.forward(src, hidden_state)
        output = output.reshape(output.shape[1]*output.shape[0],-1)
        loss = self.criterion(output, tgt.flatten())

        self.log('loss',loss)
        return {'loss': loss}

    def custom_histogram_adder(self):
        for name, params in self.named_parameters():
            fig = plt.figure()
            plt.hist(params.detach().cpu().numpy())
            self.logger.experiment.log_image(name, fig)
            plt.close('all')
            #self.logger.experiment.add_histogram(name, params, self.current_epoch)

    def training_epoch_end(self, outputs):
        # Funcion is called after every epoch is completed
        # calculate average loss
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()

        # logging histograms
        self.custom_histogram_adder()
        self.log('avg_loss', avg_loss)


    def validation_step(self, batch, batch_idx):
        src, tgt, lengths = batch

        hidden_state = self.init_hidden(src.shape[0])
        loss = 0
        chunk_len = src.shape[1]

        #for j in range(chunk_len):
        #    output, hidden_state = self.forward(src[:,j],hidden_state)
        #    output = output.reshape(output.shape[1]*output.shape[0],-1)
        output, hidden_state = self.forward(src, hidden_state)
        output = output.reshape(output.shape[1]*output.shape[0],-1)
        loss = self.criterion(output, tgt.flatten())

        # metrics
        preds = torch.argmax(output.data, dim=-1)
        # preds = elementwise_apply(torch.argmax, output, -1)
        (acc, tp, fp, fn) = acc_cm(preds, tgt.data, self.hparams.vocab_size)
        self.log('val_loss', loss)
        return {'val_loss': loss,'acc': acc, 'fp':fp, 'tp': tp }

    def validation_epoch_end(self, outputs):
        # generate some names
        names = ['A','B','R','KAR','TE','CHRI']
        output = {n: self.generate(initial_char=n) for n in names}
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        acc = torch.stack([x['acc'] for x in outputs]).mean()
        fp = torch.stack([x['fp'] for x in outputs]).mean()
        tp = torch.stack([x['tp'] for x in outputs]).mean()

        self.log('val_loss', avg_loss)
        self.log('acc', acc)
        self.log('tp',tp)
        self.log('fp', fp)
        print(output)


    @property
    def num_training_steps(self) -> int:
        """Total training steps inferred from datamodule and devices."""
        if self.trainer.max_steps:
            return self.trainer.max_steps

        limit_batches = self.trainer.limit_train_batches
        batches = len(self.train_dataloader())
        batches = min(batches, limit_batches) if isinstance(limit_batches, int) else int(limit_batches * batches)

        num_devices = max(1, self.trainer.num_gpus, self.trainer.num_processes)
        if self.trainer.tpu_cores:
            num_devices = max(num_devices, self.trainer.tpu_cores)

        effective_accum = self.trainer.accumulate_grad_batches * num_devices
        return (batches // effective_accum) * self.trainer.max_epochs

    def configure_optimizers(self):
        #optimizer = torch.optim.SGD(self.parameters(), lr=self.hparams.lr)
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.hparams.lr, betas=(0.9, 0.999), weight_decay=0.01)
        #optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=self.hparams.lr,  total_steps = self.num_training_steps)

        return [optimizer], [scheduler]

    def generate(self, initial_char = 'A', predict_len = 15, temperature=0.85):
        hidden, cell = self.init_hidden(batch_size = 1)

        initial_input = TensorText([self.char2tensor[c] for c in initial_char ]).to(device)
        predicted_str = initial_char

        for p in range(len(initial_char)-1):
            _, (hidden, cell) = self.forward(initial_input[p].view(1,1).to(device), (hidden, cell))

        last_char = initial_input[-1]

        for p in range(predict_len):
            output, (hidden , cell) = self.forward(last_char.view(1,1).to(device), (hidden, cell))
            # convert output to softmax
            output = F.log_softmax(output, -1) # convert to softmax
            output_dist = output.data.view(-1).div(temperature).exp()
            top_char = torch.multinomial(output_dist, 1)[0]

            if top_char == PAD_ID:
                # PADDING encountred stop
                break

            # convert back to string
            predicted_char = self.vocab[top_char]
            #predicted_char = all_chars[top_char]
            predicted_str += predicted_char
            last_char  = top_char

        return predicted_str

In [None]:

from nbdev.export import notebook2script
notebook2script()

Converted model.ipynb.
Converted namegen.ipynb.
Converted run.ipynb.
