# A Quick Introduction To Torchrun and Communication Primitives


`torchrun` is a handy command-line utility that simplifies the process of launching distributed PyTorch training jobs.  There are alternatives to using torchrun, especially for the single-node jobs that we're be doing here; but learning this now will set you up for when your needs grow and you need to scale up to multiple nodes.

The key to understanding `torchrun` is that it's not magic; it's a simple tool that simply launches muliple processes to run multiple instances of python scripts (or other programs) that will run concurrently.  In each of thos processes it sets up some environment variables that are useful for distributed training, but it doesn't do anything much more powerful than that.

## Hello, World!

Let's start with a simple example.  We have a simple python script that prints "Hello, World!"; you can look at it [here](code/hello_world.py).  We can run this script manully by running:

In [None]:
!python3 code/hello_world.py

Successfull, but not very exciting.  Now let's run it with `torchrun`:

In [None]:
!torchrun --standalone --nproc_per_node=2 code/hello_world.py

Still not very exciting, but now we can see that `torchrun` has launched two processes to run the same script concurrently. 

This is what `torchrun` does: it launches the same script concurrently in multiple processes.  You can think of it like this:

![Torchrun diagram](images/torchrun.png)


By the way, it might get a little annoying to type out `--standalone --nproc_per_node=2` all the time.  Also, you probably see a warning message about OMP_NUM_THREADS.  We can avoid that by setting that environment variable; then you have (say) `OMP_NUM_THREADS=4 torchrun --standalone --nproc_per_node=2 code/hello_world.py` which is a bit much if you do it a lot.   So let's use a small shellscript [run_w_torchrun.sh](code/run_w_torchrun.sh) that sets the environment variable and runs the command for us.  Now we can run:

In [None]:
!./code/run_w_torchrun 2 code/hello_world.py

Now let's try something more ambitious!

In [None]:
!torchrun --standalone --nproc_per_node=23 code/hello_world.py

Probably looked like a mess!  We'll normally use fewer processes than this --- for GPU runs you normally would launch one process per GPU, for CPU runs it might be one per socket or one per core.   But torchrun doesn't care how many processes you ask it launch.  It will just run them all concurrently.   Play with that number a bit if you like.

Something you might have noticed is that the output from the two processes is interleaved.  This is because both processes are all running at the same time, writing to the same standard output stream, and the operating system is interleaving the output from the two processes.  This contention for a shared process when running multiple processes is a common problem, and is osmething that will come up whenever we want to deal with I/O in a distributed setting.  We'll talk more about this later.

## Torchrun sets up environment variables

So where did those RANK, LOCAL_RANK, and WORLD_SIZE variables come from in the [hello_world.py](code/hello_world.py) script?  Let's take a look at the code in `hello_world.py`:

```python
import os

try:
    global_rank = os.environ["RANK"]
    local_rank = os.environ["LOCAL_RANK"]
    world_size = os.environ["WORLD_SIZE"]

    print(f"Hello, world from rank {global_rank} of {world_size}! (local rank {local_rank})")
except:
    print(f"Hello, world!")
```

When you run a script with `torchrun`, it sets up some environment variables that are useful for distributed training.  There's a script that prints out a subset of these variables [here](code/environ.py).  You can run it without `torchrun` like this.

In [None]:
!python3 ./code/environ.py

And then with:

In [None]:
!OMP_NUM_THREADS=4 torchrun --standalone --nproc_per_node 1 code/environ.py

You can see the additional environment variables in the output of the script.  And if you run it with multiple processes:

In [None]:
!OMP_NUM_THREADS=4 torchrun --standalone --nproc_per_node 3 code/environ.py

or

In [None]:
!./code/run_w_torchrun 3 code/environ.py

You can see how the different processes get different values for the environment variables.  `RANK` lets each process know which one it is in all the processes that have been launched, and `WORLD_SIZE` tells each process how many processes have been launched in total.   This is handy for dividing up work!  You know you have to tackle the `RANK`th 1/`WORLD_SIZE` of the total work.

You'll also notice that `RANK` (which is the global rank of the process, which is unique across all processes launched) here is identical to `LOCAL_RANK` (which is the rank of the process on the local node).   We'll talk more about this later, but for instance you'll always want to set up which GPU you use on the local node based on the `LOCAL_RANK` variable, whereas `RANK` will have soemthing to do with which chunk of the global dataset you'r working on (say).

## Torchrun passes arguments to the script



`torchrun` also passes arguments to the script that it launches.  You can see this in the [arguments.py](code/arguments.py) script, which takes optional arguments and prints them out:

In [None]:
!./code/arguments.py 1 2 3 ab c

In [None]:
!./run_w_torchrun 2 code/arguments.py 1 2 3 ab c

This way, you can pass arguments to the script that `torchrun` launches, just like you would with a  python script that you run directly.

## Torchrun can run any executable

Torchrun is not limited to running python scripts.  It can run any executable, and it will set up the environment variables for distributed training in the same way.  For example, we can run [a simple bash script](code/hello_world.sh) that just prints the process ID of each process:

In [None]:
!./code/run_w_torchrun 3 code/hello_world.sh

And in fact it can be handy to run normal linux executables with torchrun just to see (say) all the hostnames of the nodes that are running the processes.  For example, we can run the `hostname` command with torchrun:

In [None]:
!./code/run_w_torchrun 4 hostname

Not a super exciting result, since we're all running on the same node, but if you were running on multiple nodes you would see the hostnames of all the nodes that are running the processes.

## Coordiation between processes w/ `torch.distributed`

So we've seen that `torchrun` is a simple tool that launches multiple processes and sets up some environment variables so each process knows its rank and the total number of processes. 

But we don't generally run `torchrun` just to fire off N copies of the same program.  We generally want the copies to work together to solve some problem.  How do these processes communicate with each other?  That's where `torch.distributed` comes in. 

We're going to look at two primary ways that `torch.distributed` can help us coordinate between processes: synchronization and collective communication.   You can find more information abou the `torch.distributed` synchronization and communication primitives in the [PyTorch documentation](https://docs.pytorch.org/docs/stable/distributed.html#synchronous-and-asynchronous-collective-operations)

### Synchronization

We've seen from the previous examples that when `torchrun` launches multiple processes, they all run concurrently.  That means they can trample over each other.  There's also no inherent order to the execution of the processes, or anything implicitly keeping them in sync.  

This isn't great if we want them working together.  If all of the processes are doing task A, and they all need to have finished that before they can move on to task B, we need to enforce that somehow.  This is where synchronization comes in.

Synchronization is the process of making sure that all processes are at the same point in their execution before proceeding.  This is important when you have multiple processes that need to work together, and you want to make sure that they are all ready before moving on to the next step.

There's a number of ways to synchronize processes in PyTorch, but the most common way is to use the `torch.distributed.barrier()` function.  This function blocks all processes until all processes have reached the barrier, at which point they all proceed together.   That looks like this:

![Barrier diagram](images/barrier.png)

We have a simple example of this in the [synchronizing.py](code/synchronizing.py) script.  Each process run with that script will execute one task with varying length, and then start a second.   If we run it without any barriers to synchronize things, the stages will get hopelessly out of step, with lots of tasks starting the second task long before others have finished the first:

In [None]:
!./code/run_w_torchrun 4 code/synchronizing.py

But if we put barriers in between the tasks, we can make sure that all processes finish the first task before any of them start the second:

In [None]:
!./code/run_w_torchrun 4 code/synchronizing.py --barrier

How does this work?  Let's look at the code in [synchronizing.py](code/synchronizing.py):

```python
#!/usr/bin/env python3
"""
Synchronize (or not) on various tasks
"""
import argparse
import os
import time

import torch
import torch.distributed as dist

global_rank = int(os.environ.get("RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))

parser = argparse.ArgumentParser(description='Synchronization example')
parser.add_argument('--barrier', action=argparse.BooleanOptionalAction)

args = parser.parse_args()
```

Ok, so mostly this is things we've seen before - getting the rank and world size - but there's a new `import` here, `import torch.distributed as dist`.  This is the PyTorch distributed package, which provides the functions we'll use for the rest of this section to synchronize and communicate.

```python
# create group of processes which will synchronize
# gloo is a simple default backend for CPUs, esp single-node
dist.init_process_group("gloo", world_size=world_size, 
                        rank=global_rank)
```

Ok, this is new.  We're creating a group of processes that will know about each other.  The `init_process_group` function initializes the process group, which is a collection of processes that will work t gether.  For most of this workshop we'll be using `nccl` as the appropriate backend for GPUs to communicate with each other, as each process will be using a GPU.   Here we're just using `gloo` backend is a simple default backend for CPUs which will work almost everywhere. 

```python
for task in range(2):
    print(f"Task {task} starts on rank {global_rank}/{world_size}", flush=True)

    # task takes different lengths of time on different ranks
    time.sleep(global_rank + 1)

    if args.barrier:
        dist.barrier()

    print(f"Task {task} ends on rank {global_rank}/{world_size}", flush=True)
    time.sleep(0.1)
```

Here's the key piece - the `dist.barrier()` function.  This is the synchronization point.  If the `--barrier` argument is passed to the script, then all processes will wait at this point until all processes have reached it.  If the argument is not passed, then the processes will just continue on without waiting for each other.  If the `--barier` argument is omitted, the tasks will sail on through and not synchronize, getting out of step.

```python
# get rid of the process group
dist.destroy_process_group()
```

Finally, we clean up the process group with `dist.destroy_process_group()`.  This is important to do when we're done with the process group, as it releases any resources that were allocated for it.


### Collective Communication

So we've seen how to synchronize processes, but we also need for the but what if we want them to communicate with each other? 

It'll be pretty common that distributed training jobs have to pass data back and forth between processes.  For deep learning training jobs, it is often the case that data has to be combined; all summed together, or collected up into a single tensor.

We're going to look at two common collective communication operations: `all_reduce` and `all_gather`.  These are the most common operations you'll use in distributed training, and they are used to combine data from all processes.

![All-reduce diagram](images/allreduce_allgather.png)

The `all_reduce` operation takes a tensor from each process, combines them (usually by summing them), and then returns the result to each process.  This is useful for combining gradients during training, for example.
The `all_gather` operation takes a tensor from each process and combines them into a single tensor that is returned to each process.  This is useful for collecting data from all processes, such as when you want to gather the results of a computation from all processes.

We have a simple example of this in the [reductions.py](code/reductions.py) script.  Each process will create a tensor with its rank, and then we'll use `all_reduce` to sum the tensors and `all_gather` to collect them all into a single tensor.

In [None]:
!./code/run_w_torchrun 4 code/reductions.py

You can play with various world sizes (numbers of processes launched w/ torchrun) to see how it all works.

We can take a look at the key parts of the code in [reductions.py](code/reductions.py):

```python
device = torch.device("cpu")
dist.init_process_group(backend="gloo", world_size=world_size, 
                        rank=global_rank)

# e.g. on rank 0 with world_size 4, input will be [0, 0, 0, 0]
local_result = torch.tensor([global_rank]*world_size) 
print(f"{global_rank}: local value = {local_result}", flush=True)

# sum across all ranks
to_be_summed = torch.tensor([global_rank]*world_size) 
dist.all_reduce(to_be_summed, op=dist.ReduceOp.SUM)
print(f"{global_rank}: summed value = {to_be_summed}", flush=True)

# all_gather
to_be_gathered = torch.tensor([global_rank]) 
result = [torch.tensor([0]) for _ in range(world_size)]
dist.all_gather(result, to_be_gathered)
print(f"{global_rank}: allgathered value = {result}", flush=True)


# get rid of the process group
dist.destroy_process_group()
```

Again we have the init/destro process group calls.   We also create a device here, which we need to place the tensors.

THen it's really just the calls to [`dist.all_reduce`](https://docs.pytorch.org/docs/stable/distributed.html#torch.distributed.all_reduce) and [`dist.all_gather`](https://docs.pytorch.org/docs/stable/distributed.html#torch.distributed.all_gather) that do the work.  The `op=dist.ReduceOp.SUM` argument to `all_reduce` tells it to sum the tensors from all processes.  The `result` list in the `all_gather` call is where the gathered tensors will be stored.

Some things that are worth noting!  Many collective operations like this operate a bit like barrier; they act as something of a synchronization point.  Some processes may leave the operation before others, but no process will leave until all processes have at least started the operation.

Another thing to notice is that none of these communications primitives (and many other you can see in the [PyTorch documentation](https://docs.pytorch.org/docs/stable/distributed.html)) actually depend on `torchrun`.  All torchrun does is set up environment variables that are used in init_process_group and elsewhere in the script, and launch the processes.  The `torch.distributed` package provides the functions that let the processes communicate with each other.  You can use these functions in any distributed PyTorch job, whether you launch it with `torchrun` or not.

## Advanced Torchrunning - multinode and fault tolerance

So far we've seen how to use `torchrun` to launch multiple processes on a single node, and how to use `torch.distributed` to synchronize and communicate between those processes.  But what if you want to run your distributed training job on multiple nodes?  And what if you want to be able to recover from failures?

### Multinode jobs

`torchrun` can also be used to launch distributed training jobs on multiple nodes.  This is done by specifying the `--nnodes` arguments when launching `torchrun`.  The `--nnodes` argument specifies the total number of nodes, so that `WORLD_SIZE` = `nnodes` x `nproc_per_node`.

The torchrun command has to be run on each node; generally you would use a job scheduler like SLURM to launch the job on multiple nodes.  The `torchrun` command will then launch the same script on each node, and the processes will communicate with each other as if they were all on the same node.

The nodes need to be able to find each other, though, to coordinate the launch.  So we typically have to pick a node to be the rendezvous point, and then tell the other nodes how to find it; then that rendezvous node can communicate back the set of other participating nodes, and work can begin.

We genreally pick a random id for the job, as well, to identify this job versus other jobs that might be running at the same time.  This is done with the `--rdzv_id` argument, which is a unique identifier for the job.

What that looks like in practice can be seen from our multi-node version of `run_w_torchrun.sh`, [run_w_torchrun_multinode.sh](code/run_w_torchrun_multinode.sh).  This script takes the number of processes per node, the number of nodes, and the script to run as arguments, and then launches `torchrun` with the appropriate arguments.

```bash
#!/bin/bash
if [ $# -lt 3 ]
then
    echo >&2 "ERROR: not enough arguments provided"
    echo >&2 ""
    echo >&2 "Usage: $0 number-of-processes-per-node number-of-nodes script-to-run [arg1 arg2...]"
    echo >&2 "       e.g. $0 4 2 ./arguments.py a b c 1233"
    exit 1
else
    nproc_per_node=$1
    nnodes=$2
    shift 
fi

# this identifies the run. It can be anything,
# but if you're running torchrun across multiple nodes,
# it has to be the same for each of them

id=$RANDOM

# identifies one of the torchrun servers as the one
# to use for coordination.
# Here there's just one node, so we're using localhost
# but if you're running torchrun across multiple nodes,
# all the nodes have to agree; often the first node 
# (in, e.g., SLURM_JOB_NODELIST) is chosen

rendezvous_server=localhost

NTHREADS=4 # should be set up to be number of CPU cores / number of processes, generally, but for this tutorial we set the number low

OMP_NUM_THREADS=${NTHREADS} \
torchrun --nnodes ${nnodes} \
         --nproc_per_node ${nproc_per_node} \
         --rdzv_id ${id} \
         --rdzv_backend c10d \
         --rdzv_endpoint ${rendezvous_server}:29500 \
         $@
```


There are other cool features of `torchrun` that we we won't use here, like fault tolerance and elastic training, which allow you to recover from failures and add or remove nodes from a running job.  You can read more about these features in the [PyTorch documentation](https://pytorch.org/docs/stable/elastic/run.html).  i

But to give you an idea of how they work, you can see the [run_w_torchrun_multinode_fault_tolerant.sh](code/run_w_torchrun_multinode_fault_tolerant.sh) script, which adds the `--max_restarts` argument to `torchrun` to allow it to recover from failures, and allows running on a range of number of nodes depending on how many are available.  The key torchline arugments are:

```bash
torchrun --nnodes ${min_nnodes}:${max_nnodes} \
         --max-restarts=3 \
         --nproc_per_node ${nproc_per_node} \
         --rdzv_id ${id} \
         --rdzv_backend c10d \
         --rdzv_endpoint ${rendezvous_server}:29500 \
         $@
```


Now we're ready to start thinking about how to use `torchrun` and to launch distributed training jobs!  Let's cover the broad strokes of different distributed training strategies first [in the next notebook](3_Data_and_model_parallelism.ipynb), and then we'll dive into the details of how to implement them in PyTorch.