<!-- ---
title: Distributed Training with Ignite on CIFAR10 
date: 2021-09-18
downloads: true
sidebar: true
tags:
  - single GPU
  - multi GPUs on a single node
  - multi GPUs on multiple nodes
  - TPUs on Colab
--- -->
# Distributed Training with Ignite on CIFAR10 

This tutorial is a brief introduction on how you can do distributed training with Ignite on one or more CPUs, GPUs or TPUs. We will also introduce several helper functions and Ignite concepts (setup common training handlers, save to/ load from checkpoints, etc) which you can easily incorporate in your code.

<!--more-->

We will take a practical approach to  distributed training (specifically data distributed parallel training) in which we:  

>   1. Copy the model on every GPU
>   2. Split the dataset and fit the models on different subsets
>   3. Communicate the gradients at each iteration to keep the models in sync
>
> -- <cite>[Distributed Deep Learning 101: Introduction](https://towardsdatascience.com/distributed-deep-learning-101-introduction-ebfc1bcd59d9)</cite>

use train a predefined ResNet18 on CIFAR10.

In this example, we show how to use Ignite to train a neural network:
* On one or more GPUs or TPUs
* Compute training/validation metrics
* Log learning rate, metrics etc
* Save the best model weights
* Setup logging

## Download Dependencies

In [None]:
!pip install pytorch-ignite
!pip install clearml # Optional 

## Common Configuration

We maintain a `config` dictionary which can be extended to store parameters required during training. You can make changes in the initial config below:

In [2]:
config = {
    "seed": 543,
    "data_path": "cifar10",
    "output_path": "output-cifar10/",
    "model": "resnet18",
    "batch_size": 512,
    "momentum": 0.9,
    "weight_decay": 1e-4,
    "num_workers": 2,
    "num_epochs": 5,
    "learning_rate": 0.4,
    "num_warmup_epochs": 1,
    "validate_every": 3,
    "checkpoint_every": 200,
    "backend": None,
    "resume_from": None,
    "log_every_iters": 15,
    "nproc_per_node": None,
    "stop_iteration": None,
    "with_clearml": False,
    "with_amp": False,
}

## Basic Setup

### Imports

In [3]:
import os
from datetime import datetime
from pathlib import Path

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models
from torchvision.transforms import (
    Compose,
    Normalize,
    Pad,
    RandomCrop,
    RandomHorizontalFlip,
    ToTensor,
)
from torch.cuda.amp import GradScaler, autocast

import ignite
import ignite.distributed as idist
from ignite.contrib.engines import common
from ignite.contrib.handlers import PiecewiseLinear
from ignite.engine import Engine, Events
from ignite.handlers import Checkpoint, DiskSaver, global_step_from_engine
from ignite.metrics import Accuracy, Loss
from ignite.utils import manual_seed, setup_logger

### Logging

First we pass a [`setup_logger()`](https://pytorch.org/ignite/utils.html#ignite.utils.setup_logger) object to `log_basic_info()` and log all basic information such as different versions, current configuration, device on current process (local rank), backend used and number of processes (world size). `idist` (`ignite.distrubted`) provides several utility functions like `get_local_rank()`, `backend()`, `get_world_size()`, etc to make this possible.

In [4]:
def log_basic_info(logger, config):
    logger.info(f"Train on CIFAR10")
    logger.info(f"- PyTorch version: {torch.__version__}")
    logger.info(f"- Ignite version: {ignite.__version__}")
    if torch.cuda.is_available():
        # explicitly import cudnn as torch.backends.cudnn can not be pickled with hvd spawning procs
        from torch.backends import cudnn

        logger.info(
            f"- GPU Device: {torch.cuda.get_device_name(idist.get_local_rank())}"
        )
        logger.info(f"- CUDA version: {torch.version.cuda}")
        logger.info(f"- CUDNN version: {cudnn.version()}")

    logger.info("\n")
    logger.info("Configuration:")
    for key, value in config.items():
        logger.info(f"\t{key}: {value}")
    logger.info("\n")

    if idist.get_world_size() > 1:
        logger.info("\nDistributed setting:")
        logger.info(f"\tbackend: {idist.backend()}")
        logger.info(f"\tworld size: {idist.get_world_size()}")
        logger.info("\n")

Next we will take the help of `auto_` methods in `idist` to make our dataloaders, model and optimizer automatically adaptable to the current configuration `backend=None` (non-distributed) or for backends like `nccl`, `gloo`, and `xla-tpu` (distributed).

Note that we are free to partially use or not use `auto_` methods at all and instead implement something custom.

### Dataloaders

Next we are going to download the train and test datasets in `data_path`, apply transforms to it and return them via `get_train_test_datasets()`.

In [5]:
def get_train_test_datasets(path):
    if not os.path.exists(path):
        os.makedirs(path)
        download = True
    else:
        download = True if len(os.listdir(path)) < 1 else False

    train_transform = Compose(
        [
            Pad(4),
            RandomCrop(32, fill=128),
            RandomHorizontalFlip(),
            ToTensor(),
            Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ]
    )
    test_transform = Compose(
        [
            ToTensor(),
            Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ]
    )

    train_ds = datasets.CIFAR10(
        root=path, train=True, download=download, transform=train_transform
    )
    test_ds = datasets.CIFAR10(
        root=path, train=False, download=False, transform=test_transform
    )

    return train_ds, test_ds

The main logic lies in `get_dataflow()` where we have to make sure only the main process (`rank = 0`) downloads the datasets and therefore all sub processes (`rank > 0`) will get a copy of this already downloaded dataset. For this we use [`idist.barrier()`](https://pytorch.org/ignite/distributed.html#ignite.distributed.utils.barrier) to sync up all sub processes except the main one, so that it can go on downloading the dataset. Once that is done, we sync up the main process via `idist.barrier()` too.

We finally pass our dataset to [`auto_dataloader()`](https://pytorch.org/ignite/generated/ignite.distributed.auto.auto_dataloader.html#ignite.distributed.auto.auto_dataloader).

In [6]:
def get_dataflow(config):
    if idist.get_local_rank() > 0:
        idist.barrier()

    train_dataset, test_dataset = get_train_test_datasets(config["data_path"])

    if idist.get_local_rank() == 0:
        idist.barrier()

    train_loader = idist.auto_dataloader(
        train_dataset,
        batch_size=config["batch_size"],
        num_workers=config["num_workers"],
        shuffle=True,
        drop_last=True,
    )

    test_loader = idist.auto_dataloader(
        test_dataset,
        batch_size=2 * config["batch_size"],
        num_workers=config["num_workers"],
        shuffle=False,
    )
    return train_loader, test_loader

### Model

`get_model()` contains the model code which is then passed to [`auto_model()`](https://pytorch.org/ignite/generated/ignite.distributed.auto.auto_model.html#auto-model) which makes it automatically adaptable for non-distributed and distributed configurations.


In [7]:
def get_model(config):
    model_name = config["model"]
    if model_name in models.__dict__:
        fn = models.__dict__[model_name]
    else:
        raise RuntimeError(f"Unknown model name {model_name}")

    model = idist.auto_model(fn(num_classes=10))

    return model

### Optimizer

Then we can setup the optimizer using hyperameters from `config` and passing it through [`auto_optim()`](https://pytorch.org/ignite/generated/ignite.distributed.auto.auto_optim.html#ignite.distributed.auto.auto_optim).

In [8]:
def get_optimizer(config, model):
    optimizer = optim.SGD(
        model.parameters(),
        lr=config["learning_rate"],
        momentum=config["momentum"],
        weight_decay=config["weight_decay"],
        nesterov=True,
    )
    optimizer = idist.auto_optim(optimizer)

    return optimizer

### Criterion

We pass the loss function to `device`.

In [9]:
def get_criterion():
    return nn.CrossEntropyLoss().to(idist.device())

### LR Scheduler



In [10]:
def get_lr_scheduler(config, optimizer):
    le = config["num_iters_per_epoch"]
    milestones_values = [
        (0, 0.0),
        (le * config["num_warmup_epochs"], config["learning_rate"]),
        (le * config["num_epochs"], 0.0),
    ]
    lr_scheduler = PiecewiseLinear(
        optimizer, param_name="lr", milestones_values=milestones_values
    )
    return lr_scheduler

## Trainer

### Save Models

We can create checkpoints using either of the two handlers:

1. If `with-clearml=True`, we will save the models in ClearML's File Server using [`ClearMLSaver()`](https://pytorch.org/ignite/generated/ignite.contrib.handlers.clearml_logger.html#ignite.contrib.handlers.clearml_logger.ClearMLSaver).
2. Else save the models to disk using [`DiskSaver()`](https://pytorch.org/ignite/generated/ignite.handlers.DiskSaver.html#ignite.handlers.DiskSaver).

In [11]:
def get_save_handler(config):
    if config["with_clearml"]:
        from ignite.contrib.handlers.clearml_logger import ClearMLSaver

        return ClearMLSaver(dirname=config["output_path"])

    return DiskSaver(config["output_path"], require_empty=False)

### Resume from Checkpoint

If a checkpoint file path is provided, we can resume training from there by loading the file.

In [12]:
def load_checkpoint(resume_from):
    checkpoint_fp = Path(resume_from)
    assert (
        checkpoint_fp.exists()
    ), f"Checkpoint '{checkpoint_fp.as_posix()}' is not found"
    logger.info(f"Resume from a checkpoint: {checkpoint_fp.as_posix()}")
    checkpoint = torch.load(checkpoint_fp.as_posix(), map_location="cpu")
    return checkpoint

### Create Trainer

Finally, we can create our `trainer` in four steps:
1. Choose whether to enable [Automatic Mixed Precision](https://pytorch.org/docs/stable/amp.html) (AMP) or not. Enabling AMP will speed up computations on large neural networks and reduce memory usage while retaining performance. A `GradScaler()` object will be created to scale the `loss` so that gradients don't round-up to zero while training. 
2. Define steps taken to process single batch (`train_step()`)
  1. Move the batch to `device` used in current distributed configuration.
  2. Put `model` in `train()` mode.
  3. Perform forward pass by passing the inputs (`x`) through the `model` and calculating `loss`. If AMP is enabled then this step happens with [`autocast`](https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.autocast) on which allows this step to run in mixed precision.
  4. Perform backward pass. If AMP is enabled, then the losses will be [scaled](https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler.scale) before calling `backward()`, `step()` the optimizer while discarding batches that contain NaNs and [update()](https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler.update) the scale for next iteration.
3. Setup some common Ignite training handlers. You can do this individually or use a utitility function that takes the `trainer`:
  * A dictionary mapping on what to save in the checkpoint (`to_save`) and how often (`save_every_iters`).
  * LR Scheduler
  * 
4. If `resume_from` file path is provided, load the states of objects `to_save` from the checkpoint file.

In [13]:
def create_trainer(
    model, optimizer, criterion, lr_scheduler, train_sampler, config, logger
):

    device = idist.device()

    with_amp = config["with_amp"]
    scaler = GradScaler(enabled=with_amp)

    def train_step(engine, batch):

        x, y = batch[0], batch[1]
        if x.device != device:
            x = x.to(device, non_blocking=True)
            y = y.to(device, non_blocking=True)

        model.train()

        with autocast(enabled=with_amp):
            y_pred = model(x)
            loss = criterion(y_pred, y)

        optimizer.zero_grad()
        scaler.scale(loss).backward()  # If with_amp=False, this is equivalent to loss.backward()
        scaler.step(optimizer)  # If with_amp=False, this is equivalent to optimizer.step()
        scaler.update()  # If with_amp=False, this step does nothing

        return {
            "batch loss": loss.item(),
        }

    trainer = Engine(train_step)
    trainer.logger = logger

    to_save = {
        "trainer": trainer,
        "model": model,
        "optimizer": optimizer,
        "lr_scheduler": lr_scheduler,
    }
    metric_names = [
        "batch loss",
    ]

    common.setup_common_training_handlers(
        trainer=trainer,
        train_sampler=train_sampler,
        to_save=to_save,
        save_every_iters=config["checkpoint_every"],
        save_handler=get_save_handler(config),
        lr_scheduler=lr_scheduler,
        output_names=metric_names if config["log_every_iters"] > 0 else None,
        with_pbars=False,
        clear_cuda_cache=False,
    )

    if config["resume_from"] is not None:
        checkpoint = load_checkpoint(config["resume_from"])
        Checkpoint.load_objects(to_load=to_save, checkpoint=checkpoint)

    return trainer

## Evaluator

The evaluator will be created from `evaluate_step` which will:
1. Set the `model` to `eval()` mode.
2. Move the batch to `device` used in current distributed configuration.
3. Perform forward pass. If AMP is enabled, `autocast` will be on.

Finally, we will attach relevant Ignite metrics to the `evaluator`. 

In [14]:
def create_evaluator(model, metrics, config):
    with_amp = config["with_amp"]
    device = idist.device()

    @torch.no_grad()
    def evaluate_step(engine, batch):
        model.eval()
        x, y = batch[0], batch[1]
        if x.device != device:
            x = x.to(device, non_blocking=True)
            y = y.to(device, non_blocking=True)

        with autocast(enabled=with_amp):
            y_pred = model(x)
        return y_pred, y

    evaluator = Engine(evaluate_step)

    for name, metric in metrics.items():
        metric.attach(evaluator, name)

    return evaluator

## Training

Before we begin training, we must setup a few things on the master process (`rank` = 0):
* Create folder to store checkpoints, best models and output of tensorboard logging in the format - model_backend_rank_time.
* Log `device` name.
* If ClearML FileServer is used to save models, then a `Task` has to be created, and we pass our `config` dictionary and the specific hyper parameters that are part of the experiment.

In [15]:
def setup_rank_zero(logger, local_rank, config):
    device = idist.device()

    if config["stop_iteration"] is None:
        now = datetime.now().strftime("%Y%m%d-%H%M%S")
    else:
        now = f"stop-on-{config['stop_iteration']}"

    output_path = config["output_path"]
    folder_name = (
        f"{config['model']}_backend-{idist.backend()}-{idist.get_world_size()}_{now}"
    )
    output_path = Path(output_path) / folder_name
    if not output_path.exists():
        output_path.mkdir(parents=True)
    config["output_path"] = output_path.as_posix()
    logger.info(f"Output path: {config['output_path']}")

    if "cuda" in device.type:
        config["cuda device name"] = torch.cuda.get_device_name(local_rank)

    if config["with_clearml"]:
        try:
            from clearml import Task
        except ImportError:
            # Backwards-compatibility for legacy Trains SDK
            from trains import Task

        task = Task.init("CIFAR10-Training", task_name=output_path.stem)
        task.connect_configuration(config)
        # Log hyper parameters
        hyper_params = [
            "model",
            "batch_size",
            "momentum",
            "weight_decay",
            "num_epochs",
            "learning_rate",
            "num_warmup_epochs",
        ]
        task.connect({k: v for k, v in config.items()})

This is a standard utility function to log `train` and `val` metrics after `validate_every` epochs.

In [16]:
def log_metrics(logger, epoch, elapsed, tag, metrics):
    metrics_output = "\n".join([f"\t{k}: {v}" for k, v in metrics.items()])
    logger.info(
        f"\nEpoch {epoch} - Evaluation time (seconds): {elapsed:.2f} - {tag} metrics:\n {metrics_output}"
    )

This is where the main logic resides, i.e., we will call all the above functions from within here:
1. Basic Setup
  1. We set a [`manual_seed()`]() and [`setup_logger`()](), then log all basic information.
  2. Initialise `dataloaders`, `model`, `optimizer`, `criterion` and `lr_scheduler`.
2. We use the above objects to create a `trainer`.
3. Evaluator
  1. Define some relevant Ignite metrics like `Accuracy()` and `Loss()`.
  2. Create two evaluators: `train_evaluator` and `val_evaluator` to compute metrics on the `train_dataloader` and `val_dataloader` respectively, however `val_evaluator` will store the best models based on validation metrics.
  3. Define `run_validation()` to compute metrics on both dataloaders and log them. Then we attach this function to `trainer` to run after `validate_every` epochs and after training is complete.
4. Setup TensorBoard logging using `setup_tb_logging()` on the master process for the trainer and evaluators so that training and validation metrics along with the learning rate can be logged.
5. Define `Checkpoint()` object to store the two best models by validation accuracy (defined in `metrics` as `Accuracy()`) and attach it to `val_evaluator` so that it can be executed everytime `val_evaluator` runs.
6. Stop training, if given, once it reaches `stop_iteration` using `terminate()`.
7. Try training on `train_loader` for `num_epochs`
8. Close Tensorboard logger once training is completed.



In [17]:
def training(local_rank, config):

    rank = idist.get_rank()
    manual_seed(config["seed"] + rank)

    logger = setup_logger(name="CIFAR10-Training")
    log_basic_info(logger, config)

    if rank == 0:
        setup_rank_zero(logger, local_rank, config)

    train_loader, val_loader = get_dataflow(config)
    model = get_model(config)
    optimizer = get_optimizer(config, model)
    criterion = get_criterion()
    config["num_iters_per_epoch"] = len(train_loader)
    lr_scheduler = get_lr_scheduler(config, optimizer)

    trainer = create_trainer(
        model, optimizer, criterion, lr_scheduler, train_loader.sampler, config, logger
    )

    metrics = {
        "Accuracy": Accuracy(),
        "Loss": Loss(criterion),
    }

    train_evaluator = create_evaluator(model, metrics, config)
    val_evaluator = create_evaluator(model, metrics, config)

    def run_validation(engine):
        epoch = trainer.state.epoch
        state = train_evaluator.run(train_loader)
        log_metrics(logger, epoch, state.times["COMPLETED"], "train", state.metrics)
        state = val_evaluator.run(val_loader)
        log_metrics(logger, epoch, state.times["COMPLETED"], "val", state.metrics)

    trainer.add_event_handler(
        Events.EPOCH_COMPLETED(every=config["validate_every"]) | Events.COMPLETED,
        run_validation,
    )

    if rank == 0:
        evaluators = {"train": train_evaluator, "val": val_evaluator}
        tb_logger = common.setup_tb_logging(
            config["output_path"], trainer, optimizer, evaluators=evaluators
        )

    best_model_handler = Checkpoint(
        {"model": model},
        get_save_handler(config),
        filename_prefix="best",
        n_saved=2,
        global_step_transform=global_step_from_engine(trainer),
        score_name="val_accuracy",
        score_function=Checkpoint.get_default_score_fn("Accuracy"),
    )
    val_evaluator.add_event_handler(
        Events.COMPLETED,
        best_model_handler,
    )

    if config["stop_iteration"] is not None:

        @trainer.on(Events.ITERATION_STARTED(once=config["stop_iteration"]))
        def _():
            logger.info(f"Stop training on {trainer.state.iteration} iteration")
            trainer.terminate()

    try:
        trainer.run(train_loader, max_epochs=config["num_epochs"])
    except Exception as e:
        logger.exception("")
        raise e

    if rank == 0:
        tb_logger.close()

## Run with Different Configurations

In [18]:
spawn_kwargs = {}

### Single CPU or GPU

If on Colab, go to Runtime > Change runtime type and select Hardware accelerator = None for CPU or GPU for cuda.

In [None]:
spawn_kwargs["nproc_per_node"] = None
with idist.Parallel(backend=config["backend"], **spawn_kwargs) as parallel:
    parallel.run(training, config)

2021-08-30 19:31:31,924 ignite.distributed.launcher.Parallel INFO: - Run '<function training at 0x7f159497ac20>' in 1 processes
2021-08-30 19:31:31,931 CIFAR10-Training INFO: Train on CIFAR10
2021-08-30 19:31:31,933 CIFAR10-Training INFO: - PyTorch version: 1.9.0+cu102
2021-08-30 19:31:31,936 CIFAR10-Training INFO: - Ignite version: 0.4.6
2021-08-30 19:31:31,939 CIFAR10-Training INFO: 

2021-08-30 19:31:31,941 CIFAR10-Training INFO: Configuration:
2021-08-30 19:31:31,944 CIFAR10-Training INFO: 	seed: 543
2021-08-30 19:31:31,946 CIFAR10-Training INFO: 	data_path: cifar10
2021-08-30 19:31:31,947 CIFAR10-Training INFO: 	output_path: output-cifar10/
2021-08-30 19:31:31,948 CIFAR10-Training INFO: 	model: resnet18
2021-08-30 19:31:31,951 CIFAR10-Training INFO: 	batch_size: 512
2021-08-30 19:31:31,953 CIFAR10-Training INFO: 	momentum: 0.9
2021-08-30 19:31:31,955 CIFAR10-Training INFO: 	weight_decay: 0.0001
2021-08-30 19:31:31,956 CIFAR10-Training INFO: 	num_workers: 2
2021-08-30 19:31:31,958 

### Single Node, Multiple GPUs

In [None]:
spawn_kwargs["nproc_per_node"] = 2 # For 2 GPUs
config["backend"] = "nccl" # Find out all supported backends via ignite.distributed.utils.available_backends()

**Note:** If using 

```bash
python -m torch.distributed.launch --nproc_per_node=2 --use_env main.py
```

### Multiple Nodes, Multiple GPUs

```python
python -u -m torch.distributed.launch \
    --nnodes=2 \
    --nproc_per_node=2 \
    --node_rank=0 \
    --master_addr=master --master_port=2222 --use_env \
    main.py run --backend="nccl"
```

### TPUs on Colab

In [21]:
spawn_kwargs["nproc_per_node"] = 8
spawn_kwargs["start_method"] = "fork"
config["backend"] = "gloo"

## Start Training

In [22]:
with idist.Parallel(backend=config["backend"], **spawn_kwargs) as parallel:
    parallel.run(training, config)

KeyboardInterrupt: ignored