description |
---|
2021/07/18 |
Distributed training is useful for speeding up training of a model with large dataset by utilizing multiple nodes (computers). In the past few years, the technical difficulty of doing distributed training has lowered drastically that it is no longer reserved just for engineers working at a large AI institution.
As a quick glance, distributed training involves multiple nodes running a training. The nodes are assigned ranks from 0 to N, with 0 being the master. Typically:
- all nodes have a copy of the model
- each node will sample a partition of the full dataset (e.g. by sampling data with
index % num_nodes == node_rank
) to train its model copy - the gradients are accumulated then synced to master for model update
Let's look at one way of doing this simply using 3 components:
- PyTorch Lightning: handles all the engineering code such as device placement, so that there's little to no changes to the model code.
- TorchElastic: handles the distributed communication/interface in a fault-tolerant manner.
- Kubernetes: compute cluster with multiple nodes (see this guide to set up your own Kubernetes cluster)
If you're using PyTorch and manually doing a lot of "engineering chores" that are not directly related to your model/loss function, you should use PyTorch Lightning. Primarily, it takes away the need for boilerplate code by handling the training/evaluation loop, device placement, logging, checkpointing, etc. You only need to focus on the model architecture, loss function, and the dataset.
The device placement handling is very useful for running your model on different hardwares (with or without GPUs). It also supports distributed training with TorchElastic.
Suppose you already have a model written as a LightningModule that is not distributed yet. You only need to make a few modifications:
-
set the Trainer to use Distributed Data Parallel (ddp) by specifying
accelerator='ddp'
. This takes care of the distributed training logic, including the appropriate data sampling for each node by automatically switching to DistributedSampler. -
in your LightningModule, assign a
self.is_dist
attributed based on whetheraccelerator='ddp'
is specified. -
synchronize your logging across multiple nodes by specifying
sync_dist=self.is_dist
to prevent conflicts, e.g.
self.log('train/loss', loss, sync_dist=self.is_dist)
-
likewise, ensure that any process that should be run once (such as uploading artifacts/model file) is called only on the master node by checking
os.environ('RANK', '0') == '0'
. The RANK environment variable will be set by TorchElastic when it runs. -
for more comprehensive tips on updating your LightningModule to support distributed training, check out this page.
Next, test your changes by simulating distributed training on a single node:
# use num_processes to simulate 2 nodes on one machine
trainer = Trainer(accelerator="ddp", num_processes=2)
If you're using PyTorch >= 1.9.0
, TorchElastic is already included as torch.distributed
. It will be used to launch the distributed training process on every node (container), which will be composed by our Kubernetes manifest file next.
TorchElastic works by using an etcd server for communication, so:
- in your Conda environment, you'll need to also install
python-etcd
. - in your Dockerfile, install etcd to system. Use this script.
PyTorchLightning works nicely with TorchElastic, so that's all.
No matter if you use a vendor or set up your own, a Kubernetes cluster with multiple nodes is ready to run distributed training. Here's the overview:
- spawn a etcd server on Kubernetes
- spawn multiple pods, each with a container that maximizes the amount of compute resources available on a node
- run the torchelastic command on each container
The logic for spawning pods - managing them elastically, inserting the right arguments such as RANK/WORLD_SIZE, can be tricky. Luckily, TorchElastic has a ElasticJob Controller that can be installed on your cluster as Custom Resource Definition (CRD) to manage these pods elastically.
Install these CRDs - you will need a cluster admin role to do so. By default this will create a elastic-job namespace to run the training in, but you can customize it by modyfying the config.
In your Dockerfile, as mentioned earlier, install etcd.
Additionally, configure it to run torch distributed as entrypoint. The CRD will automatically append the relevant rank commands when creating pods.
...
ENTRYPOINT ["python", "-m", "torch.distributed.run"]
CMD ["--help"]
We only need 2 Kubernetes manifest files - for etcd, and elasticjob.
Note that each etcd server is only served for running one distributed training session; suppose multiple engineers want to run different models with distributed training on the same cluster, they each need to spawn their own instance with a new pair of etcd server and elasticjob without conflict.
Here are the example manifest files modified from their original examples to run simultaneously without conflict. Replace the example "MY-MODEL-1" with a different name for each instance.
apiVersion: v1
kind: Service
metadata:
namespace: elastic-job
name: MY-MODEL-1-etcd-service
spec:
ports:
- name: etcd-client-port
port: 2379
protocol: TCP
targetPort: 2379
selector:
app: MY-MODEL-1-etcd
---
apiVersion: v1
kind: Pod
metadata:
namespace: elastic-job
name: MY-MODEL-1-etcd
labels:
app: MY-MODEL-1-etcd
spec:
containers:
- name: etcd
image: quay.io/coreos/etcd:latest
command:
- /usr/local/bin/etcd
- --data-dir
- /var/lib/etcd
- --enable-v2
- --listen-client-urls
- http://0.0.0.0:2379
- --advertise-client-urls
- http://0.0.0.0:2379
- --initial-cluster-state
- new
ports:
- containerPort: 2379
name: client
protocol: TCP
- containerPort: 2380
name: server
protocol: TCP
restartPolicy: Always
apiVersion: elastic.pytorch.org/v1alpha1
kind: ElasticJob
metadata:
namespace: elastic-job
name: MY-MODEL-1
spec:
rdzvEndpoint: MY-MODEL-1-etcd-service:2379
minReplicas: 1
maxReplicas: 2
replicaSpecs:
Worker:
replicas: 2
restartPolicy: ExitCode
template:
apiVersion: v1
kind: Pod
spec:
containers:
- name: elasticjob-worker
image: YOUR_DOCKER_IMAGE:0.0.1
imagePullPolicy: Always
args:
- "--nproc_per_node=1"
- "my_model/train.py"
- "trainer.accelerator=ddp"
# if you can pass it to argparse/Hydra config
resources:
limits:
cpu: 4
memory: 12Gi
nvidia.com/gpu: 1
volumeMounts:
- name: dshm # increase shared memory for dataloader
mountPath: /dev/shm
volumes:
- name: dshm
emptyDir:
medium: Memory
You can also parametrize these manifest files with Helm.
That's all you need to run distributed training. To summarize:
- use PyTorchLightning, specify
accelerator='ddp'
and some logging fixes - install etcd on Conda environment as well as Docker image
- build your Docker image, specify the
ENDPOINT
with torch.distributed.run - install the TorchElastic CRDs on your Kubernetes cluster
- create and apply the etcd and elasticjob manifest files on Kubernetes
- you have distributed training running!