In [None]:
# default_exp lightning

In [None]:
# export
import logging
logging.disable(logging.CRITICAL)
import os
import random
import torch
import pytorch_lightning as pl
from test_tube import HyperOptArgumentParser
from emotion_transformer.dataloader import dataloader
from emotion_transformer.model import sentence_embeds_model, context_classifier_model, metrics, f1_score

# PyTorch Lightning

> construction of the PyTorch Lightning module and the hyperparameter search for the SemEval-2019 Task 3 dataset (contextual emotion detection in text)

## Lightning Module

Defining the Lightning module is now straightforward, see also the [documentation](https://williamfalcon.github.io/pytorch-lightning/). The default hyperparameter choices were motivated by [this paper](https://arxiv.org/pdf/1905.05583.pdf). 

Further references for PyTorch Lightning and its usage for Multi-GPU Training/Hyperparameter search can be found in the following blog posts by William Falcon: 

* [9 Tips For Training Lightning-Fast Neural Networks In Pytorch](https://towardsdatascience.com/9-tips-for-training-lightning-fast-neural-networks-in-pytorch-8e63a502f565)

* [Trivial Multi-Node Training With Pytorch-Lightning](https://towardsdatascience.com/trivial-multi-node-training-with-pytorch-lightning-ff75dfb809bd?gi=ec854edcc8eb)

* [Converting From Keras To PyTorch Lightning](https://towardsdatascience.com/converting-from-keras-to-pytorch-lightning-be40326d7b7d)

In [None]:
# export
class EmotionModel(pl.LightningModule):
    """
    PyTorch Lightning module for the Contextual Emotion Detection in Text Challenge
    """

    def __init__(self, hparams):
        """
        pass in parsed HyperOptArgumentParser to the model
        """
        super(EmotionModel, self).__init__()
        self.hparams = hparams
        self.emo_dict = {'others': 0, 'sad': 1, 'angry': 2, 'happy': 3}
        self.sentence_embeds_model = sentence_embeds_model(hparams.projection_size,
                                                           dropout = hparams.dropout)
        self.context_classifier_model = context_classifier_model(hparams.projection_size, 
                                                                 hparams.n_layers, 
                                                                 self.emo_dict, 
                                                                 dropout = hparams.dropout)
        

    def forward(self, input_ids, attention_mask, labels = None):
        """
        no special modification required for lightning, define as you normally would
        """
        
        sentence_embeds = self.sentence_embeds_model(input_ids = input_ids, attention_mask = attention_mask)
        return self.context_classifier_model(sentence_embeds = sentence_embeds, labels = labels)


    def training_step(self, batch, batch_idx):
        """
        Lightning calls this inside the training loop
        """       
        input_ids, attention_mask, labels = batch
        loss, _ = self.forward(input_ids = input_ids, attention_mask = attention_mask, labels = labels)
        # in DP mode (default) make sure if result is scalar, there's another dim in the beginning
        if self.trainer.use_dp or self.trainer.use_ddp2:
            loss = loss.unsqueeze(0)

        tensorboard_logs = {'train_loss': loss}
        return {'loss': loss, 'log': tensorboard_logs}

    
    def validation_step(self, batch, batch_idx):
        """
        Lightning calls this inside the validation loop
        """
        input_ids, attention_mask, labels = batch

        loss, logits = self.forward(input_ids = input_ids, attention_mask = attention_mask, labels = labels)
        scores_dict = metrics(loss, logits, labels)

        # in DP mode (default) make sure if result is scalar, there's another dim in the beginning
        if self.trainer.use_dp or self.trainer.use_ddp2:
            scores = [score.unsqueeze(0) for score in scores_dict.values()]
            scores_dict = {key: value for key, value in zip(scores_dict.keys(), scores)}

        return scores_dict

    def validation_end(self, outputs):
        """
        called at the end of validation to aggregate outputs
        :param outputs: list of individual outputs of each validation step
        :return:
        """
        
        tqdm_dict = {}

        for metric_name in outputs[0].keys():
            metric_total = 0

            for output in outputs:
                metric_value = output[metric_name]

                if self.trainer.use_dp or self.trainer.use_ddp2:
                    if metric_name in ['tp', 'fp', 'fn']:
                        metric_value = torch.sum(metric_value)
                    else:
                        metric_value = torch.mean(metric_value)
                    
                metric_total += metric_value
            if metric_name in ['tp', 'fp', 'fn']:
                tqdm_dict[metric_name] = metric_total
            else:
                tqdm_dict[metric_name] = metric_total / len(outputs)

               
        prec_rec_f1 = f1_score(tqdm_dict['tp'], tqdm_dict['fp'], tqdm_dict['fn'])
        tqdm_dict.update(prec_rec_f1) 
        result = {'progress_bar': tqdm_dict, 'log': tqdm_dict, 'val_loss': tqdm_dict["val_loss"]}
        return result
    
    def test_step(self, batch, batch_idx):
        return self.validation_step(batch, batch_idx)
    
    def test_end(self, outputs):
        return self.validation_end(outputs)

    def configure_optimizers(self):
        """
        returns the optimizer and scheduler
        """
        opt_parameters = self.sentence_embeds_model.layerwise_lr(self.hparams.lr, 
                                                                 self.hparams.layerwise_decay)
        opt_parameters += [{'params': self.context_classifier_model.parameters()}]

        optimizer = torch.optim.AdamW(opt_parameters, lr=self.hparams.lr)        
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
        return [optimizer], [scheduler]

    
    @pl.data_loader
    def train_dataloader(self):
        return dataloader(self.hparams.train_file, self.hparams.max_seq_len, 
                          self.hparams.bs, self.emo_dict, use_ddp = self.use_ddp)

    
    @pl.data_loader
    def val_dataloader(self):
        return dataloader(self.hparams.val_file, self.hparams.max_seq_len, 
                          self.hparams.bs, self.emo_dict, use_ddp = self.use_ddp)

    
    @pl.data_loader
    def test_dataloader(self):
        return dataloader(self.hparams.test_file, self.hparams.max_seq_len, 
                          self.hparams.bs, self.emo_dict, use_ddp = self.use_ddp)

    
    @staticmethod
    def add_model_specific_args(parent_parser, root_dir):  
        """
        parameters defined here will be available to the model through self.hparams
        """
        parser = HyperOptArgumentParser(parents=[parent_parser])

        parser.opt_list('--bs', '--batch_size', default=64, type=int, 
                        options=[32, 128], tunable=True, metavar='N',
                        help='mini-batch size (default: 256), this is the'
                        'total batch size of all GPUs on the current node'
                        'when using Data Parallel or Distributed Data Parallel')
        parser.opt_list('--projection_size', default=256, type=int, options=[64, 512], tunable=True)
        parser.opt_list('--n_layers', default=1, type=int, options=[1, 4], tunable=True)
        parser.opt_range('--lr', '--learning_rate', default=2.0e-5, type=float, 
                         tunable=True, low=1.0e-5, high=5.0e-4, nb_samples=5,
                         help='initial learning rate', metavar='LR', dest='lr')
        parser.opt_list('--layerwise_decay', default=0.95, type=float, options=[0.3, 0.8], tunable=True)
        parser.opt_list('--max_seq_len', default=32, type=int, options=[16, 64], tunable=False)
        parser.opt_list('--dropout', default=0.1, type=float, options=[0.1, 0.2], tunable=False)
        parser.add_argument('--train_file', default=os.path.join(root_dir, 'data/clean_train.txt'), type=str)
        parser.add_argument('--val_file', default=os.path.join(root_dir, 'data/clean_val.txt'), type=str)
        parser.add_argument('--test_file', default=os.path.join(root_dir, 'data/clean_test.txt'), type=str)
        parser.add_argument('--epochs', default=10, type=int, metavar='N',
                            help='number of total epochs to run')
        parser.add_argument('--seed', type=int, default=None,
                            help='seed for initializing training')
        
        return parser

## Hyperparameter Search Argument Parser

Next we define the HyperOptArgumentParser including distributed training (see also the [documentation](https://williamfalcon.github.io/pytorch-lightning/Trainer/Distributed%20training/
)) and debugging functionality.

In [None]:
# export
def get_args(model):
    """
    returns the HyperOptArgumentParser
    """
    parent_parser = HyperOptArgumentParser(strategy='random_search',
                                           add_help = False)

    root_dir = os.getcwd()  
    parent_parser.add_argument('--mode', type=str, default='default', 
                               choices=('default', 'test', 'hparams_search'),
                               help='supports default for train/test/val and hparams_search for a hyperparameter search')
    parent_parser.add_argument('--save-path', metavar='DIR', default=os.path.join(root_dir, 'logs'), type=str,
                               help='path to save output')
    parent_parser.add_argument('--gpus', type=str, default=None, help='which gpus')
    parent_parser.add_argument('--distributed-backend', type=str, default=None, choices=('dp', 'ddp', 'ddp2'),
                               help='supports three options dp, ddp, ddp2')
    parent_parser.add_argument('--use_16bit', dest='use_16bit', action='store_true',
                               help='if true uses 16 bit precision')

    # debugging
    parent_parser.add_argument('--fast_dev_run', dest='fast_dev_run', action='store_true',
                               help='debugging a full train/val/test loop')
    parent_parser.add_argument('--track_grad_norm', dest='track_grad_norm', action='store_true',
                               help='inspect gradient norms')

    parser = model.add_model_specific_args(parent_parser, root_dir) 
    return parser

Let us take a look at the different attributes of `hparams`.

In [None]:
hparams = get_args(EmotionModel)
hparams = hparams.parse_args(args=[])
vars(hparams)

{'mode': 'default',
 'save_path': '/home/julius/Documents/nbdev_venv/emotion_transformer/logs',
 'gpus': None,
 'distributed_backend': None,
 'use_16bit': False,
 'fast_dev_run': False,
 'track_grad_norm': False,
 'bs': 64,
 'projection_size': 256,
 'n_layers': 1,
 'lr': 2e-05,
 'layerwise_decay': 0.95,
 'max_seq_len': 32,
 'dropout': 0.1,
 'train_file': '/home/julius/Documents/nbdev_venv/emotion_transformer/data/clean_train.txt',
 'val_file': '/home/julius/Documents/nbdev_venv/emotion_transformer/data/clean_val.txt',
 'test_file': '/home/julius/Documents/nbdev_venv/emotion_transformer/data/clean_test.txt',
 'epochs': 10,
 'seed': None,
 'hpc_exp_number': None,
 'trials': <bound method HyperOptArgumentParser.opt_trials of HyperOptArgumentParser(prog='ipykernel_launcher.py', usage=None, description=None, formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=True)>,
 'optimize_parallel': <bound method HyperOptArgumentParser.optimize_parallel of HyperOptArgu

## Trainer
Next we define a function calling the Lightning trainer using the setting specified in `hparams`.

In [None]:
# export
def main(hparams, gpus = None):
    """
    Trains the Lightning model as specified in `hparams`
    """
    model = EmotionModel(hparams)
    
    if hparams.seed is not None:
        random.seed(hparams.seed)
        torch.manual_seed(hparams.seed)
        torch.backends.cudnn.deterministic = True
    
    
    trainer = pl.Trainer(default_save_path=hparams.save_path,
                        gpus=len(gpus.split(",")) if gpus else hparams.gpus,
                        distributed_backend=hparams.distributed_backend,
                        use_amp=hparams.use_16bit,
                        max_nb_epochs=hparams.epochs,
                        fast_dev_run=hparams.fast_dev_run,
                        track_grad_norm=(2 if hparams.track_grad_norm else -1))
    trainer.fit(model)
    
    if hparams.mode == 'test':
        trainer.test()

Let us check the model by running a quick development run.

In [None]:
hparams.fast_dev_run = True
main(hparams)

Epoch 1:  50%|█████     | 1/2 [00:10<00:10, 10.44s/batch, batch_nb=0, loss=2.089, v_nb=2]
Validating:   0%|          | 0/1 [00:00<?, ?batch/s][A
Epoch 1: 100%|██████████| 2/2 [00:13<00:00,  8.32s/batch, batch_nb=0, f1_score=0.105, fn=8, fp=60, loss=2.089, precision=0.0625, recall=0.333, tp=4, v_nb=2, val_acc=0.0625, val_loss=2.06]
Epoch 1: 100%|██████████| 2/2 [00:15<00:00,  7.69s/batch, batch_nb=0, f1_score=0.105, fn=8, fp=60, loss=2.089, precision=0.0625, recall=0.333, tp=4, v_nb=2, val_acc=0.0625, val_loss=2.06]


We also create a python file for automatic hyperparameter optimization across different GPUs or CPUs:

In [None]:
%%writefile main.py

from emotion_transformer.lightning import EmotionModel, get_args, main

if __name__ == '__main__':
    hparams = get_args(EmotionModel)
    hparams = hparams.parse_args()

    if hparams.mode in ['test','default']:
        main(hparams)
    elif hparams.mode == 'hparams_search':
        if hparams.gpus:
            hparams.optimize_parallel_gpu(main, max_nb_trials=20, 
                                          gpu_ids = [gpus for gpus in hparams.gpus.split(' ')])
        else:
            hparams.optimize_parallel_cpu(main, nb_trials=20, nb_workers=4)

Overwriting main.py


## Background Information

For the interested reader we provide some background information on the (distributed) training loop:

* one epoch consists of m = ceil(30160/batchsize) batches for the training and additional n = ceil(2755/batchsize) 
batches for the validation.

**dp case:** 

* the batchsize will be split and each gpu receives (up to rounding) a batch of size batchsize/num_gpus

* in the validation steps each gpu computes its own scores for each of the n batches (of size batchsize/num_gpus), i.e. each gpu calls the `validation_step` method

* the `output` which is passed to the `validation_end` method consists of list of dictionaries (containing the concatenated scores from the different gpus), i.e.

`output = [ {first_metric: [first_gpu_batch_1,...,last_gpu_batch_1],...,
             last_metric:  [first_gpu_batch_1,...,last_gpu_batch_1]},..., 
            {first_metric: [first_gpu_batch_n,...,last_gpu_batch_n],...,
             last_metric:  [first_gpu_batch_n,...,last_gpu_batch_n]} ]`


**ddp case:** (does not work from jupyter notebooks)

* the gpus receive (disjoint) samples of size batchsize and train on own processes but communicate and average their gradients (thus the resulting models on each gpu have the same weights)

* each gpu computes its own validation_end method and its own list of dictionaries 

`output_first_gpu = [ {first_metric: batch_1,...,last_metric: batch_1},..., 
                      {first_metric: batch_n,...,last_metric: batch_n} ]`
                      
`output_last_gpu = [ {first_metric: batch_1,...,last_metric: batch_1},..., 
                      {first_metric: batch_n,...,last_metric: batch_n} ]`


**ddp case:** (does not work from jupyter notebooks)

*  on each node we have the dp case but the nodes communicate analogous to the ddp case

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_dataloader.ipynb.
Converted 01_model.ipynb.
Converted 02_lightning.ipynb.
Converted index.ipynb.
