# Fast Multi-agent Reinforcement Learning on a GPU using WarpDrive and Pytorch Lightning

# Introduction

This tutorial provides a demonstration of a multi-agent Reinforcement Learning (RL) training loop with [WarpDrive](https://github.com/salesforce/warp-drive). WarpDrive is a flexible, lightweight, and easy-to-use RL framework that implements end-to-end deep multi-agent RL on a single GPU (Graphics Processing Unit). Using the extreme parallelization capability of GPUs, it enables [orders-of-magnitude faster RL](https://arxiv.org/abs/2108.13976) compared to common implementations that blend CPU simulations and GPU models. WarpDrive is extremely efficient as it runs simulations across multiple agents and multiple environment replicas in parallel and completely eliminates the back-and-forth data copying between the CPU and the GPU.

We have integrated WarpDrive with the [Pytorch Lightning](https://www.pytorchlightning.ai/) framework, which greatly reduces the trainer boilerplate code, and improves training flexibility.

Below, we demonstrate how to use WarpDrive and PytorchLightning together to train a game of [Tag](https://github.com/salesforce/warp-drive/blob/master/example_envs/tag_continuous/tag_continuous.py) where multiple *tagger* agents are trying to run after and tag multiple other *runner* agents. As such, the Warpdrive framework comprises several utility functions that help easily implement any (OpenAI-)*gym-style* RL environment, and furthermore, provides quality-of-life tools to train it end-to-end using just a few lines of code. You may familiarize yourself with WarpDrive with the help of these [tutorials](https://github.com/salesforce/warp-drive/tree/master/tutorials).

We invite everyone to **contribute to WarpDrive**, including adding new multi-agent environments, proposing new features and reporting issues on our open source [repository](https://github.com/salesforce/warp-drive).

### Dependencies

This notebook requires the `rl-warp-drive` as well as the `pytorch-lightning` packages.

In [1]:
! pip install --quiet rl_warp_drive >= 1.4.5 pytorch_lightning >= 1.5.10

[31mERROR: Could not find a version that satisfies the requirement 1.4.5 (from versions: none)[0m
[31mERROR: No matching distribution found for 1.4.5[0m


In [2]:
import argparse
import numpy as np
import torch

from example_envs.tag_continuous.tag_continuous import TagContinuous
from warp_drive.env_wrapper import EnvWrapper
from warp_drive.training.lightning_trainer import WarpDriveModel, PerfStatsCallback
from warp_drive.training.utils.data_loader import create_and_push_data_placeholders

from pytorch_lightning import LightningModule, Trainer, seed_everything
from pytorch_lightning.callbacks import ModelCheckpoint

ModuleNotFoundError: No module named 'warp_drive.training.lightning_trainer'

In [None]:
# Set logger level e.g., DEBUG, INFO, WARNING, ERROR.
import logging

logging.getLogger().setLevel(logging.ERROR)

# Specify a set of run configurations for your experiments

The run configuration is a dictionary comprising the environment parameters, the trainer and the policy network settings, as well as configurations for saving.

For our experiment, we consider an environment wherein 5 taggers and 100 runners play the game of [Tag](https://github.com/salesforce/warp-drive/blob/master/example_envs/tag_continuous/tag_continuous.py) on a 20x20 plane. The game lasts 500 timesteps. Each agent chooses it's acceleration and turn actions at every timestep, and we use mechanics to determine how the agents move over the grid. When a tagger gets close to a runner, the runner is tagged, and is eliminated from the game. For the configuration below, The taggers and runners have the same skill level, i.e., the runners can move just as fast as the taggers.

The sequence of snapshots below shows a sample realization of the game with randomly chosen agent actions. The 5 taggers are marked in pink, while the blue agents are the runners. The snapshots are taken at 1) the beginning of the episode, 2) step 250, and 3) end of the episode. Only 45% of the runners remain at the end of the episode.
<img src="assets/tag_continuous_training/t=1.png" width="250" height="250"/> <img src="assets/tag_continuous_training/t=250.png" width="250" height="250"/> <img src="assets/tag_continuous_training/t=499.png" width="250" height="250"/>

We train the agents using 100 environments or simulations running in parallel. With WarpDrive, each simulation runs on sepate GPU blocks.

There are two separate policy networks used for the tagger and runner agents. Each network is a fully-connected model with two layers each of 256 dimensions. We use the Advantage Actor Critic (A2C) algorithm for training. WarpDrive also currently provides the option to use the Proximal Policy Optimization (PPO) algorithm instead.

In [None]:
run_config = dict(
    name="tag_continuous",
    # Environment settings.
    env=dict(
        num_taggers = 5,  # number of taggers in the environment
        num_runners = 100,  # number of runners in the environment
        grid_length = 20.0,  # length of the (square) grid on which the game is played
        num_acceleration_levels = 20,  # number of accelerate actions
        num_turn_levels = 20,  # number of turn actions
        episode_length = 500,  # episode length in timesteps
        skill_level_tagger = 1.0,  # skill level for the tagger
        skill_level_runner = 1.0,  # skill level for the runner
    ),
    # Trainer settings.
    trainer=dict(
        num_envs=100,  # number of environment replicas (number of GPU blocks used)
        train_batch_size=25000,  # total batch size used for training per iteration (across all the environments)
        num_episodes=5000,  # total number of episodes to run the training for (can be arbitrarily high!)
    ),
    # Policy network settings.
    policy=dict(
        runner=dict(
            to_train=True,  # flag indicating whether the model needs to be trained
            algorithm="A2C",  # algorithm used to train the policy
            gamma=0.98,  # discount rate
            lr=0.005,  # learning rate
            model=dict(
                type="fully_connected", fc_dims=[256, 256], model_ckpt_filepath=""
            ),  # policy model settings
        ),
        tagger=dict(
            to_train=True,
            algorithm="A2C",
            gamma=0.98,
            lr=0.001,
            model=dict(
                type="fully_connected", fc_dims=[256, 256], model_ckpt_filepath=""
            ),
        ),
    ),
    # Checkpoint saving setting.
    saving=dict(
        metrics_log_freq=100,  # how often (in iterations) to print the metrics
        model_params_save_freq=5000,  # how often (in iterations) to save the model parameters
        basedir="/tmp",  # base folder used for saving
        name="continuous_tag",  # experiment name
        tag="example",  # experiment tag
    ),
)

# Instantiate the WarpDrive Model

In order to instantiate the WarpDrive model, we first use an environment wrapper to specify that the environment needs to be run on the GPU (via the `use_cuda` flag). Also, agents in the environment can share policy models; so we specify a dictionary to map each policy network model to the list of agent ids using that model.

In [None]:
# Create a wrapped environment object via the EnvWrapper.
# Ensure that use_cuda is set to True (in order to run on the GPU).
env_wrapper = EnvWrapper(
    TagContinuous(**run_config["env"]),
    num_envs=run_config["trainer"]["num_envs"],
    use_cuda=True,
)

# Agents can share policy models: this dictionary maps policy model names to agent ids.
policy_tag_to_agent_id_map = {
    "tagger": list(env_wrapper.env.taggers),
    "runner": list(env_wrapper.env.runners),
}

wd_model = WarpDriveModel(
    env_wrapper=env_wrapper,
    config=run_config,
    policy_tag_to_agent_id_map=policy_tag_to_agent_id_map,
    verbose=True
)

# Create the Lightning Trainer

Next, we create the trainer for training the WarpDrive model. We add the `checkpoint` and the `performance stats` callbacks to the trainer.

In [None]:
# Define callbacks.
perf_stats_callback = PerfStatsCallback(
    batch_size=wd_model.training_batch_size,
    num_iters=wd_model.num_iters,
    log_freq=run_config["saving"]["metrics_log_freq"]
)
checkpoint_callback = ModelCheckpoint(every_n_val_epochs=100, save_top_k=-1, save_last=False)

# Instantiate the PytorchLightning trainer with the callbacks and the number of gpus.
num_gpus = 1
assert num_gpus <= torch.cuda.device_count(), f"Only {torch.cuda.device_count()} GPU(s) are available!"
trainer = Trainer(
    gpus=num_gpus,
    callbacks=[checkpoint_callback, perf_stats_callback]
)

# Train the WarpDrive Model

Finally, we invoke training.

In [None]:
trainer.fit(wd_model)

# TODO: Visualization

In [None]:
# Start tensorboard.
%load_ext tensorboard
%tensorboard --logdir lightning_logs/