# Distributed Training with TorchX and Ray

Training models on large clusters is challenging for a few reasons
1. Need to setup an infrastructure
2. Need a way to submit jobs like distributed training scripts
3. Need a way to monitor the status jobs, aggregate logs from several machinees and collect artifacts when model training is done

With the new torchX Ray scheduler this process has been dramatically simplified.

In [1]:
# Ray scheduler for torchX is an experimental feature
! pip install git+https://github.com/pytorch/torchx.git@raydriver -q


In [2]:
! pip install "ray[default]" -q 

## Infrastructure setup

Ray provides a feature called `ray up cluster.yaml` which given a YAML file and access to your cloud credentials (e.g: AWS, GCP, Azure) will go ahead and setup an infrastructure for you. 

Let's take a look at what the script looks like


In [5]:
! cat ray_cluster.yaml

# An unique identifier for the head node and workers of this cluster.
cluster_name: gpu-docker

min_workers: 1
max_workers: 4

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
  image: "rayproject/ray-ml:latest-gpu"
  # image: rayproject/ray:latest-gpu   # use this one if you don't need ML dependencies, it's faster to pull
  container_name: "ray_nvidia_docker" # e.g. ray_docker


# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
  type: aws
  region: us-west-2
  # Availability zone(s), comma-separate

## Understanding `ray_cluster.yaml`

1. docker: provides link to the docker images that will be setup on the cluster which is an easy way to setup your dependencies
2. provider: determine your cloud provider and region
3. GroupName: jobs will be submitted against a Ray endpoint called the dashboard
4. resources: to determine the number of CPUs and GPUs allocated to each node - try making this number bigger
5. node_config: the machine instance type, AMI and options for spot instances for cheaper training
5. commands to start on worker or head nodes: you don't need to change these, they're what make it easier for you to setup your infra instead of 

To learn more about available options make sure to visit https://docs.ray.io/en/latest/cluster/config.html

In this tutorial we're going to be running on AWS but running on a different cloud provider should just be a simple config change

## Setup aws cli

Since we're running on aws you're going to need to 
1. Install the AWS cli https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html
2. Configure the AWS cli by running `aws configure` https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html

And now we can start doing deep learning stuff

In [3]:
import ray

In [6]:
# Having some issues with ray up taking forever
# Stop existing instances of Ray
! ray stop --force

# Setup ray cluster
# This may take a few minutes - Ray will retry several ssh connections before succeeding
! ray up ray_cluster.yaml -y


    SSH still not available [2m(SSH command failed.)[22m[26m, retrying in [1m5[22m[26m seconds.


In [5]:
%%writefile ray_simple.py
def main() -> None:
    print("hello")
    return


if __name__ == "__main__":
    main()

Writing ray_simple.py


In [8]:
! ray stop --force

[32mStopped all 2 Ray processes.[39m
[0m

In [9]:
! ray start --head

[37mLocal node IP[39m: [1m172.31.51.124[22m
2022-01-26 01:00:32,703	INFO services.py:1338 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m

[32m--------------------[39m
[32mRay runtime started.[39m
[32m--------------------[39m

[36mNext steps[39m
  To connect to this Ray runtime from another node, run
  [1m  ray start --address='172.31.51.124:6379' --redis-password='5241590000000000'[22m
  
  Alternatively, use the following Python code:
    [31mimport[39m[26m ray
    ray[31m.[39m[26minit(address[31m=[39m[26m[33m'auto'[39m[26m, _redis_password[31m=[39m[26m[33m'5241590000000000'[39m[26m)
  
  To connect to this Ray runtime from outside of the cluster, for example to
  connect to a remote cluster from your laptop directly, use the following
  Python code:
    [31mimport[39m[26m ray
    ray[31m.[39m[26minit(address[31m=[39m[26m[33m'ray://<head_node_ip_address>:10001'[39m[26m)
  
  [4mIf connection fails, check your firewall

In [10]:
! ray start --address='172.31.51.124:6379' --redis-password='5241590000000000'



[37mLocal node IP[39m: [1m172.31.51.124[22m
2022-01-26 01:00:52,490	ERROR gcs_utils.py:136 -- Failed to send request to gcs, reconnecting. Error <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1643158852.490743112","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3093,"referenced_errors":[{"created":"@1643158852.490742074","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
>
2022-01-26 01:00:53,492	ERROR gcs_utils.py:136 -- Failed to send request to gcs, reconnecting. Error <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1643158853.492577356","description":"Failed to pick subchannel","file":"src/core/ex

In [5]:
# Pick up the Ray dashboard address to execute a job against it
# Provide a path to your working directory with dependencies along with the script you want to run
! torchx run -s ray -cfg dashboard_address=172.31.51.124:20002,working_dir=. utils.binary --entrypoint ray_simple.py


Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/ray/lib/python3.8/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
  File "/home/ubuntu/anaconda3/envs/ray/lib/python3.8/site-packages/urllib3/util/connection.py", line 96, in create_connection
    raise err
  File "/home/ubuntu/anaconda3/envs/ray/lib/python3.8/site-packages/urllib3/util/connection.py", line 86, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/ray/lib/python3.8/site-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/home/ubuntu/anaconda3/envs/ray/lib/python3.8/site-packages/urllib3/connectionpool.py", line 394, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/

## Hello torchX Ray scheduler

The way the scheduler works is it either needs a dashboard address to point jobs or it can use your `ray_cluster.yaml` to identify your cluster. Once the cluster is setup you can use the `torchx` cli to submit jobs by specifying a `working_dir` with all your files and data and an an `entrypoint` for a binary script a.k.a your training script.

After deploying a job you'll get a job ID and can use it to determine the status of your job and inspect its logs.

In [None]:
! torchx run -s ray -cfg cluster_config_file=ray_cluster.yaml,working_dir=. utils.binary --entrypoint ray_simple.py

In [None]:
# Get a job ID from deployed job
torchx run -s ray -cfg dashboard_address=34.209.89.185:20002,working_dir=test_dir utils.binary --entrypoint ray_simple.py

# Job ID looks like etc..

# Get a job status
# PENDING, FAILED, INTERRUPTED ETC..
torchx describe ray://torchx/34.209.89.185:20002-raysubmit_aKvezN3NyA2mqZeW

# Get job logs
# Aggregatee logs from all machinees in the same place
torchx log ray://torchx/34.209.89.185:20002-raysubmit_aKvezN3NyA2mqZeW

## Writing a distributed PyTorch Script

Unfortunately it's currently not possible to automagically make a PyTorch script distributed but it's not too complicated and we'll walk through the steps to do so right now.
We're going to turn this example of a resnet training loop https://github.com/pytorch/examples/blob/master/mnist/main.py and use the Distributed Data Parallel version of it by github.com/yqhu

In [14]:
%%writefile ddp_resnet_train.py
#!/usr/bin/env python3
# Imports
import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR

import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

Overwriting ddp_resnet_train.py


In [12]:
%%writefile ddp_resnet_train.py -a
# Setup scripts to setup MASTER_ADDR AND MASTER_PORT
# World size is the number of nodes in a distributed setting
# Rank is the ID of a specific node in a distributed setting
# Gloo is a popular backend for internode communication
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    backend = "gloo"
    dist.init_process_group(backend=backend, init_method=f"tcp://{master_addr}:{master_port}", rank=rank, world_size=world_size)


def cleanup():
    dist.destroy_process_group()

Appending to ddp_resnet_train.py


In [13]:
! cat ddp_resnet_train.py

a = 2
a = 2
