Copyright (c) 2021, salesforce.com, inc.\
All rights reserved.\
SPDX-License-Identifier: BSD-3-Clause\
For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause

In [1]:
import torch

assert torch.cuda.device_count() > 0, "This notebook needs a GPU to run!"

# Welcome to WarpDrive!

This tutorial is a first in a series of introduction notebooks for WarpDrive, a [PyCUDA](https://documen.tician.de/pycuda/)-based framework for extremely parallelized multi-agent reinforcement learning (RL) on a single graphics processing unit (GPU).

In this tutorial, we describe how we harness the GPU's ability to parallelize operations across a large number of RL agents and multiple environment replicas. 

In conjunction with training logic using Pytorch, we can perform extremely fast end-to-end training of multiple RL agents, all on a single GPU, in just a [few lines of code](https://github.com/salesforce/warp-drive/blob/master/tutorials/simple-end-to-end-example.ipynb).

## GPU basics and terminology

Before we dive into WarpDrive, let's review some GPU basics. 

All programs that run on a GPU need to be triggered via a CPU. Commonly, the CPU is known as the *host* and the GPU as the *device*. [CUDA](https://developer.nvidia.com/cuda-zone) (Compute Unified Device Architecture) is an extension of C that implement code to be run on (CUDA-enabled) GPU hardware.

| ![gpu_memory_model](https://github.com/salesforce/warp-drive/blob/master/tutorials/assets/gpu_memory_model.png?raw=true) |
|:--:|
| <b>Fig. 1 The CUDA memory model</b>|

CUDA launches several `threads` in parallel. It organizes threads into a group called `thread block`. Additionally, the CUDA kernel can launch multiple thread blocks, organized into a `grid` structure. 

Therefore, a CUDA kernel runs a grid of blocks of threads.

CUDA also provides built-in variables for accessing thread information - three key variables are

1. `threadIdx.x`: contains the index of the thread within the thread block.
2. `blockIdx.x`: the index of the thread block.
3. `blockDim.x` contains the size of thread block (number of threads in the thread block). 

Each CUDA card has a maximum number of threads in a block (typically, 512, 1024, or 2048).

*Note: In general, threads, blocks, and grids are multidimensional, i.e., they can also have threadIdx.y and z dimensions etc. We will not go into that here.*

# Dependencies

We will install the latest version of WarpDrive using the pip package manager.

In [2]:
! pip install -U rl_warp_drive

Defaulting to user installation because normal site-packages is not writeable


In [3]:
import numpy as np
from timeit import Timer

In [4]:
from warp_drive.managers.data_manager import CUDADataManager
from warp_drive.managers.function_manager import CUDAFunctionManager
from warp_drive.utils.data_feed import DataFeed

In [5]:
# Set logger level e.g., DEBUG, INFO, WARNING, ERROR
import logging

logging.getLogger().setLevel(logging.INFO)

# WarpDrive Design Principles

Modern RL architectures ([SEED RL](https://arxiv.org/pdf/1910.06591.pdf), [ACME](https://arxiv.org/pdf/2006.00979.pdf), [IMPALA](https://arxiv.org/pdf/1802.01561.pdf), [ELF](https://arxiv.org/pdf/1707.01067.pdf), [MAVA](https://arxiv.org/pdf/2107.01460.pdf)) comprise several rollout CPU actors and a CPU/GPU trainer actor operating in tandem. While these architectures are very scalable, they may suffer from expensive communication between the CPU / GPU actors, which can lead to inefficient resource utilization. 

By moving both rollout generation and training exclusively to the GPU, we minimize the communication cost. In that case, all data is in the GPU's memory, and only (optional) training inspection requires a data transfer from the host to the device. We also minimize latency by having the rollout generation, batching, training and action inference all occur on the same device.

Running end-to-end on a GPU is even more scalable for multi-agent RL. In essence, we can parallelize *rollouts* by having each agent operate individually on a separate thread, and each environment operate individually on a separate thread block. This results in extremely high training throughput. 

The figure below depicts our architecture block diagram; we will introduce `DataManager` and `FunctionManager` shortly.

| ![](https://github.com/salesforce/warp-drive/blob/master/tutorials/assets/warpdrive_framework_overview.png?raw=true) |
|:--:|
| <b>Fig. 2 End-to-end multi-agent RL on a single GPU. Each GPU thread handles an agent, and each GPU block handles an environment. WarpDrive's DataManager and FunctionManager help manage the communication between the CPU and GPU and invoke the GPU kernel calls from the CPU, respectively.</b>|

## PyCUDA

Because most modern day programming is performed with Python, we have developed WarpDrive using [PyCUDA](https://documen.tician.de/pycuda/), a Python programming environment for CUDA.
PyCUDA essentially provides additional wrappers on CUDA for easy Python access to CUDA APIs.

To execute any PyCUDA program, there are three main steps:

1. Copy the input data from host memory to device memory, also called *host-to-device transfer*.
2. Load CUDA functions and execute, *caching data on-chip* for performance.
3. Copy the results data from device memory to host memory, also called *device-to-host transfer*.

## Data and Function Managers

Following the three steps above, we developed WarpDrive with the following two modules

1. a **data manager** to handle all the data transfers between the host and the device. It also handles creating and managing the data for multiple environment replicas and time steps.
2. a **function manager** to load the CUDA programs and execute.

In the following, we will demonstrate how to push and pull data between the host and the device, and how to write simple CUDA functions to manipulate the date. Let's begin by creating a CUDADataManager object.

We specify a few multi-agent RL parameters in the `DataManager` creator. 

We'll create a multi-agent RL environment with 3 agents, an episode length of 5, and 2 environment replicas.

In [6]:
num_agents = 3
num_envs = 2
episode_length = 5

cuda_data_manager = CUDADataManager(num_agents, num_envs, episode_length=episode_length)

INFO:root:
Pushing data to device...
INFO:root:- _log_mask_                                                                      : dtype=int32     , shape=(6,)
INFO:root:
Pushing data to device...
INFO:root:- _done_                                                                          : dtype=int32     , shape=(2,)
INFO:root:
Pushing data to device...
INFO:root:- _timestep_                                                                      : dtype=int32     , shape=(2,)


Now, let's create some (random) data that we would like to push to the device. In the context of RL, this can pertain to the starting states created by `env reset()`. 

The starting states are arrays that need to hold data such as observations, actions and rewards during the course of the episode. They could also contain environment configuration settings and hyperparameters. 

Each environment and agent will have its own data, so we create a `(num_envs, num_agents)`-shaped array that will be pushed to the GPU.

In [7]:
random_data = np.random.rand(num_envs, num_agents)

In [8]:
random_data

array([[0.13094067, 0.33198505, 0.30681278],
       [0.31547715, 0.40637709, 0.55222978]])

# Push and pull data from host (CPU) to device (GPU)

In order to push data to the device, we have created a **DataFeed** helper object. For all data pushed from the host to device, we will need to provide a name identifier, the actual data, and two flags (both default to False):

- `save_copy_and_apply_at_reset` - if `True`, we make a copy of the starting data so that we can set the data array to that value at every environment reset, and
- `log_data_across_episode` - if `True`, we add a time dimension to the data, of size `episode_length`, set all $t>0$ index values to zeros, and store the data array at each time step separately. This is primarily used for logging the data for an episode rollout.

In [9]:
data_feed = DataFeed()
data_feed.add_data(
    name="random_data",
    data=random_data,
    save_copy_and_apply_at_reset=False,
    log_data_across_episode=False,
)

In [10]:
data_feed

{'random_data': {'data': array([[0.13094067, 0.33198505, 0.30681278],
         [0.31547715, 0.40637709, 0.55222978]]),
  'attributes': {'save_copy_and_apply_at_reset': False,
   'log_data_across_episode': False}}}

The CUDA data manager provides the **push_data_to_device()** and **pull_data_from_device()** apis to handle data transfer between the host and the device.

In [11]:
cuda_data_manager.push_data_to_device(data_feed)

INFO:root:
Pushing data to device...
INFO:root:- random_data                                                                     : dtype=float32   , shape=(2, 3)


Notice that the data manager casted the data from float64 to float32. CUDA always uses 32-bit floating or integer representations of numbers.

In [12]:
data_fetched_from_device = cuda_data_manager.pull_data_from_device("random_data")

The data fetched from the device matches the data pushed (the small differences are due to type-casting).

In [13]:
data_fetched_from_device

array([[0.13094066, 0.33198506, 0.30681276],
       [0.31547713, 0.4063771 , 0.55222976]], dtype=float32)

Another integral part of RL is training. We also need to hold the observations, actions and rewards arrays. So fo training, we will wrap the data into a Pytorch Tensor.

## Making Training Data Accessible To PyTorch

Note that pushing and pulling data several times between the host and the device causes a lot of communication overhead. So, it's advisable that we push the data from the host to device only once, and then manipulate all the data on the GPU in-place. This is particularly important when data needs to be accessed frequently. A common example is the batch of observations and rewards gathered for each training iteration. 

Fortunately, our framework lets Pytorch access the data we pushed onto the GPU via pointers with minimal overhead. To make data accessible by Pytorch, we set the `torch_accessible` flag to True.

In [14]:
tensor_feed = DataFeed()
tensor_feed.add_data(name="random_tensor", data=random_data)

cuda_data_manager.push_data_to_device(tensor_feed, torch_accessible=True)

INFO:root:
Pushing data to device...
INFO:root:- random_tensor                                                                   : dtype=float32   , shape=(2, 3)


In [15]:
tensor_on_device = cuda_data_manager.data_on_device_via_torch("random_tensor")

## Time comparison for data pull (`torch_accessible` True versus False)

In [16]:
large_array = np.random.rand(1000, 1000)

### `torch_accessible=False`

In [17]:
data_feed = DataFeed()
data_feed.add_data(
    name="large_array",
    data=large_array,
)
cuda_data_manager.push_data_to_device(data_feed, torch_accessible=False)

INFO:root:
Pushing data to device...
INFO:root:- large_array                                                                     : dtype=float32   , shape=(1000, 1000)


In [18]:
Timer(lambda: cuda_data_manager.pull_data_from_device("large_array")).timeit(
    number=1000
)

1.094979539000633

### `torch_accessible=True`

In [19]:
data_feed = DataFeed()
data_feed.add_data(
    name="large_array_torch",
    data=large_array,
)
cuda_data_manager.push_data_to_device(data_feed, torch_accessible=True)

INFO:root:
Pushing data to device...
INFO:root:- large_array_torch                                                               : dtype=float32   , shape=(1000, 1000)


In [20]:
Timer(lambda: cuda_data_manager.data_on_device_via_torch("random_tensor")).timeit(1000)

0.00027837299967359286

You can see the time for accessing torch tensors on the GPU is negligible compared to data arrays!

Currently, the `DataManager` supports primitive data types, such as ints, floats, lists, and arrays. If you would like to push more sophisticated data structures or types to the GPU, such as dictionaries, you may do so by pushing / pulling each key-value pair as a separate array.

# Code Execution Inside CUDA

Once we push all the relevant data to the GPU, we will need to write functions to manipulate the data. To this end, we will need to write code in CUDA C, but invoke it from the host node. The `FunctionManager` is built to facilitate function initialization on the host and execution on the device. As we mentioned before, all the arrays on GPU will be modified on the GPU, and in-place. Let's begin by creating a CUDAFunctionManager object.

In [21]:
cuda_function_manager = CUDAFunctionManager(
    num_agents=cuda_data_manager.meta_info("n_agents"),
    num_envs=cuda_data_manager.meta_info("n_envs"),
)

## Array manipulation inside CUDA

The benefit of GPU processing comes from the fact that we can parallelize operations across threads and grids. In the context of multi-agent RL, it makes very good sense to associate each replica of the environment to a unique block and each agent to a unique thread in the block. Accordingly, we can use the built-in CUDA variables:

`int env_id = blockIdx.x;`\
`int agent_id = threadIdx.x;`

## Array indexing

An important point to understand is how multi-dimensional arrays are indexed inside CUDA. Remember that CUDA stores arrays in a C-contiguous ([row-major](https://en.wikipedia.org/wiki/Row-_and_column-major_order)) fashion (and so does Pytorch with its tensors). Accordingly, for the element at location $[i,j]$ in a data array of shape $(L,M)$, the corresponding index on CUDA is $i*M + j$.

In general, for a $d$-dimensional array of shape $(N_1, N_2, \ldots, N_d)$, the memory-offset for the element at index $(n_1, n_2, \ldots, n_d)$ is
$$n_d + N_d \cdot (n_{d-1} + N_{d-1} \cdot (n_{d-2} + N_{d-2} \cdot (\cdots + N_2 n_1)\cdots)))
= \sum_{k=1}^d \left( \prod_{\ell=k+1}^d N_\ell \right) n_k$$

### Array indexing utility function

WarpDrive provides a convenient multi-dimensional array indexing utility function `get_flattened_array_index()` so that the above array indexing in your CUDA C code can be automatically performed

`__device__ int get_flattened_array_index(const int* index_arr, const int* dim_arr, const int dimensionality);`


The following example will get the CUDA C index for `[kEnvId, kThisAgentId]` element from an array of shape (gridDim.x, num_agents):

`int global_state_arr_shape[] = {gridDim.x, num_agents};`\
`int agent_index[] = {kEnvId, kThisAgentId};`\
`int dimension = 2;`\
`int state_index = get_flattened_array_index(agent_index, global_state_arr_shape, dimension);`

Now, let's write a simple function to add one to each element of the pushed data. We will perform this operation in parallel on the (num_envs) number of GPU blocks and the (num_agents) number of threads within.

In general, the operation is (almost) parallel. Going into a bit more detail - CUDA employs a Single Instruction Multiple Thread (SIMT) architecture to manage and execute threads in groups of 32 called warps. So, as long as the number of agents is a multiple of 32, all the threads ar utilized, otherwise few threads remain idle. For example, if we use $1000$ agents, $24$ threads will remain idle, for a utilization rate of $97.65\%$.

In [22]:
source_code = """
// A function to demonstrate how to manipulate data on the GPU.
// This function increments each the random data array we pushed to the GPU before.
// Each index corresponding to (env_id, agent_id) in the array is incremented by "agent_id + env_id".
// Everything inside the if() loop runs in parallel for each agent and environment.
//
extern "C"{
    __global__ void cuda_increment(                               
            float* data,                                  
            int num_agents                                       
    )                                                            
    {                                                            
        int env_id = blockIdx.x;                                 
        int agent_id = threadIdx.x;                             
        if (agent_id < num_agents){                              
            int array_index = env_id * num_agents + agent_id;
            int increment = env_id + agent_id;
            data[array_index] += increment;
        }                                                            
    }   
}
"""

Notice that the keyword `__global__` is used on the increment function. Global functions are also called "kernels" - they are functions you may call from the host. There's also the keyword `__device__` for functions that cannot be called from the host, but may only be called from other device or global functions.

Next, we use the `FunctionManager` API method **load_cuda_from_source_code()** to build and load the CUDA code.

*Note: We only use the string-type source code for the purposes of exposition. In general, it's standard practice to have several standalone source codes written out in cuda (.cu) file, pre-compile them to a single binary (.cubin), and then use the `FunctionManager`'s **load_cuda_from_binary_file()**.* 

*Additionally, if we compile template source code (so that `num_agents` and `num_envs` can be used as macro variables at compile time), we can use the CUDA `FunctionManager`'s **compile_and_load_cuda(template_header_file)**.*

In [23]:
cuda_function_manager.load_cuda_from_source_code(
    source_code, default_functions_included=False
)
cuda_function_manager.initialize_functions(["cuda_increment"])

INFO:root:Successfully build and load the source code
INFO:root:starting to load the cuda kernel function: cuda_increment from the CUDA module 
INFO:root:finished loading the cuda kernel function: cuda_increment from the CUDA module, 


We will use the `FunctionManager`'s API method **get_function()** to load the CUDA kernel function and get an handle to invoke it from the host device.

In [24]:
increment_function = cuda_function_manager.get_function("cuda_increment")

Now, when invoking the `increment` function, along with the `data` and `num_agents` arguments, we also need to provide the block and grid arguments. These are also attributes of the CUDA `FunctionManager`: simply use\

- `block=cuda_function_manager.block`, and
- `grid=cuda_function_manager.grid`

Also, since we need to use the `num_agents` parameter, we also need to push it to the device. Instead of using a `DataFeed`, we may also push as follows:

In [25]:
cuda_data_manager.push_data_to_device(
    {
        "num_agents": {
            "data": num_agents,
            "attributes": {
                "save_copy_and_apply_at_reset": False,
                "log_data_across_episode": False,
            },
        }
    }
)

INFO:root:
Pushing data to device...
INFO:root:- num_agents                                                                      : dtype=int32     , shape=()


In [26]:
increment_function(
    cuda_data_manager.device_data("random_data"),
    cuda_data_manager.device_data("num_agents"),
    block=cuda_function_manager.block,
    grid=cuda_function_manager.grid,
)

Below is the original (random) data that we pushed to the GPU:

In [27]:
random_data

array([[0.13094067, 0.33198505, 0.30681278],
       [0.31547715, 0.40637709, 0.55222978]])

and here's the incremented data:

In [28]:
cuda_data_manager.pull_data_from_device("random_data")

array([[0.13094066, 1.331985  , 2.3068128 ],
       [1.3154771 , 2.406377  , 3.55223   ]], dtype=float32)

As expected, this method incremented each entry at index `(env_id, agent_id)` of the original data by `(env_id + agent_id)`! The differences are below.

In [29]:
cuda_data_manager.pull_data_from_device("random_data") - random_data

array([[-5.65473812e-09,  9.99999945e-01,  1.99999999e+00],
       [ 9.99999986e-01,  1.99999998e+00,  3.00000011e+00]])

And we can invoke the increment function again to increment one more time (also in-place on the GPU), and the differences double.

In [30]:
increment_function(
    cuda_data_manager.device_data("random_data"),
    cuda_data_manager.device_data("num_agents"),
    block=cuda_function_manager.block,
    grid=cuda_function_manager.grid,
)
cuda_data_manager.pull_data_from_device("random_data") - random_data

array([[-5.65473812e-09,  1.99999995e+00,  3.99999999e+00],
       [ 1.99999999e+00,  3.99999974e+00,  6.00000011e+00]])

# Validating CUDA parallelism

We put all the pieces introduced so far together, and record the times for parallelized operations with different `num_envs` and `num_agents` settings.

In [31]:
def push_random_data_and_increment_timer(
    num_runs=1,
    num_envs=2,
    num_agents=3,
    source_code=None,
    episode_length=100,
):

    assert source_code is not None

    # Initialize the CUDA data manager
    cuda_data_manager = CUDADataManager(
        num_agents=num_agents, num_envs=num_envs, episode_length=episode_length
    )

    # Initialize the CUDA function manager
    cuda_function_manager = CUDAFunctionManager(
        num_agents=cuda_data_manager.meta_info("n_agents"),
        num_envs=cuda_data_manager.meta_info("n_envs"),
    )

    # Load source code and initialize function
    cuda_function_manager.load_cuda_from_source_code(
        source_code, default_functions_included=False
    )
    cuda_function_manager.initialize_functions(["cuda_increment"])
    increment_function = cuda_function_manager.get_function("cuda_increment")

    def push_random_data(num_agents, num_envs):
        # Create random data
        random_data = np.random.rand(num_envs, num_agents)

        # Push data from host to device
        data_feed = DataFeed()
        data_feed.add_data(
            name="random_data",
            data=random_data,
        )
        data_feed.add_data(name="num_agents", data=num_agents)
        cuda_data_manager.push_data_to_device(data_feed)

    def increment_data():
        increment_function(
            cuda_data_manager.device_data("random_data"),
            cuda_data_manager.device_data("num_agents"),
            block=cuda_function_manager.block,
            grid=cuda_function_manager.grid,
        )

    # One-time data push
    data_push_time = Timer(lambda: push_random_data(num_agents, num_envs)).timeit(
        number=1
    )
    # Increment the arrays 'num_runs' times
    program_run_time = Timer(lambda: increment_data()).timeit(number=num_runs)

    return {"data push times": data_push_time, "code run time": program_run_time}

## Record the times for a single data push and 10000 increment kernel calls.

In [32]:
%%capture

num_runs = 10000
times = {}

for scenario in [
    (1, 1),
    (1, 10),
    (1, 100),
    (10, 10),
    (1, 1000),
    (100, 100),
    (1000, 1000),
]:
    num_envs, num_agents = scenario
    times.update(
        {
            f"envs={num_envs}, agents={num_agents}": push_random_data_and_increment_timer(
                num_runs, num_envs, num_agents, source_code
            )
        }
    )

INFO:root:
Pushing data to device...
INFO:root:- _log_mask_                                                                      : dtype=int32     , shape=(101,)
INFO:root:
Pushing data to device...
INFO:root:- _done_                                                                          : dtype=int32     , shape=(1,)
INFO:root:
Pushing data to device...
INFO:root:- _timestep_                                                                      : dtype=int32     , shape=(1,)
INFO:root:Successfully build and load the source code
INFO:root:starting to load the cuda kernel function: cuda_increment from the CUDA module 
INFO:root:finished loading the cuda kernel function: cuda_increment from the CUDA module, 
INFO:root:
Pushing data to device...
INFO:root:- random_data                                                                     : dtype=float32   , shape=(1, 1)
INFO:root:- num_agents                                                                      : dtype=int32     , shape=()


In [33]:
print(f"Times for {num_runs} function calls")
print("*" * 40)
for key, value in times.items():
    print(
        f"{key:30}: data push time: {value['data push times']:10.5}s,\t mean increment times: {value['code run time']:10.5}s"
    )

Times for 10000 function calls
****************************************
envs=1, agents=1              : data push time:  0.0038934s,	 mean increment times:    0.13095s
envs=1, agents=10             : data push time:  0.0027685s,	 mean increment times:    0.13497s
envs=1, agents=100            : data push time:  0.0028684s,	 mean increment times:    0.14054s
envs=10, agents=10            : data push time:  0.0035858s,	 mean increment times:    0.13519s
envs=1, agents=1000           : data push time:  0.0027829s,	 mean increment times:    0.13093s
envs=100, agents=100          : data push time:  0.0026748s,	 mean increment times:    0.12821s
envs=1000, agents=1000        : data push time:   0.020473s,	 mean increment times:    0.12704s


As we increase the number of environments and agents, the data size becomes larges, so pushing data becomes slower, but since all the threads operate in parallel, the average time taken in the increment function remains about the same!

And that's it! By using building blocks such as the increment function, we can create arbitrarily complex functions in CUDA C. For some comparative examples, please see the example environments that have both Python implementations in `examples/envs` and corresponding CUDA C implementations in `src/envs`.

Below are some useful starting resources for CUDA C programming:

- [CUDA tutorial](https://cuda-tutorial.readthedocs.io/en/latest/)
- [Learn C](https://learnxinyminutes.com/docs/c/)
- [CUDA Quick Reference](http://www.icl.utk.edu/~mgates3/docs/cuda.html)
<!-- - [Thrust](https://developer.nvidia.com/thrust). Note: thrust is a flexible, high-level interface for GPU programming that greatly enhances developer productivity. -->