# Run Distributed Training Job
In this notebook we will
- Deploy Torch Elastic Kubernetes components 
  - Torch Elastic Operator
  - ETCD server for training control plane
- Prepare and Deploy `ElasticJob` based on PyTorch Elastic ImageNet Training  
- Validate Training is running on multiple GPU workers

This notebook will deploy all components as per our architecure and run the training job 
![architecture](docs/architecture.jpg)

## Deploy Torch Elastic Kubernetes components

## ETCD server
First we will install ETCD server whcih will act as Rendezevous server orchestrating training workers.
In this example we use simple ETCD pod/service deployment, for production use Helm chart that deploy `ETCD` in HA mode.

In [2]:
!kubectl apply -f kube/etcd.yaml
# verify service and pods scheduled on CPU nodes
!kubectl get svc -n elastic-job
!kubectl get pods -n elastic-job -o wide


service/etcd-service unchanged
pod/etcd unchanged
NAME           TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)    AGE
etcd-service   ClusterIP   10.0.216.198   <none>        2379/TCP   42s
NAME   READY   STATUS    RESTARTS   AGE   IP            NODE                                 NOMINATED NODE   READINESS GATES
etcd   1/1     Running   0          43s   10.244.12.2   aks-cpuworkers-40607851-vmss000000   <none>           <none>


## Deploy TorchElastic Operator
Details and Kubernetes manifests descibed at https://github.com/pytorch/elastic/tree/master/kubernetes, we have included manifests in the repo for simplicity:


In [5]:
!kubectl apply -k kube/config/default

namespace/elastic-job configured
customresourcedefinition.apiextensions.k8s.io/elasticjobs.elastic.pytorch.org created
role.rbac.authorization.k8s.io/leader-election-role created
clusterrole.rbac.authorization.k8s.io/elastic-job-k8s-controller-role created
rolebinding.rbac.authorization.k8s.io/leader-election-rolebinding created
clusterrolebinding.rbac.authorization.k8s.io/elastic-job-k8s-controller-rolebinding created
deployment.apps/elastic-job-k8s-controller created


In [6]:
# Verify that the ElasticJob custom resource is installed
!kubectl get crd


NAME                              CREATED AT
elasticjobs.elastic.pytorch.org   2021-05-24T00:48:59Z


## Prepare ElasticJob Deployment
### Training Docker imagen
Take a look at `Dockerfile` for our deployment, it is in [examples/Dockerfile](examples/Dockerfile).
It is based on `pytorch` gpu enabled image and has both training script `main.py` and dataset for training and testing as part of it.
In production dataset should reside in blob and deployment should point to it.

Keep a not theta entrypoint in the container is launch
```
ENTRYPOINT ["python", "-m", "torch.distributed.run"]
```
For more details refer to https://github.com/pytorch/elastic/tree/master/examples

## Kubernetes Job config
We have updated Kubernetes `ElasticJob` manifest [kube/imagenet.yaml](kube/imagenet.yaml) to make sure it could run on Spot nodes and mount Azure Blob for saving checkpoints.

- Note we are deploying Custom Kubernetes Resource `ElasticJob` that Torch Elastic operator will process and orchestrate
- We pointed `rdzvEndpoint` to previously deployed ETCD service 
- Note min/max number of replicas that directs training job on number of desired workers, if you increase **number of workers** you would see that each worker is performing training on smaller subset of data and overall job completes much faster

```yaml
  apiVersion: elastic.pytorch.org/v1alpha1
  kind: ElasticJob
  metadata:
    name: imagenet
    namespace: elastic-job
  spec:
    # Use "etcd-service:2379" if you already apply etcd.yaml
    rdzvEndpoint: "etcd-service:2379"
    minReplicas: 1
    maxReplicas: 3    
    replicaSpecs:
      Worker:
        replicas: 3
```

- Updated `kube/imagenet.yaml` with **tolerations** and **nodeSelector** to run training on Spot VM nodepool

```yaml
    containers:
    - name: elasticjob-worker
      image: torchelastic/examples:0.2.0
      imagePullPolicy: Always
       ..
    nodeSelector:
       kubernetes.azure.com/scalesetpriority: spot
    tolerations:
    - key: "kubernetes.azure.com/scalesetpriority"
      operator: "Equal"
      value: "spot"
      effect: "NoSchedule"       
```

- Updated `kube/imagenet.yaml` with **volumes** and **volumemount** to provide storage to the training job to save checkpoint to the path set in arguments `--checkpoint-file`

  ```yaml
  volumes:  
  - name: trainingdata
    persistentVolumeClaim:
        claimName: pvc-blob
  containers:
    ...
    args:
    - "--nproc_per_node=1"
    - "/workspace/examples/imagenet/main.py"
    - "--arch=resnet18"
    - "--epochs=3"
    - "--batch-size=64"
    - "--workers=0"
    - "/workspace/data/tiny-imagenet-200"
    - "--checkpoint-file=/mnt/blob/data/checkpoint.pth.tar"
    volumeMounts:
    - name: trainingdata
      mountPath: "/mnt/blob/data"         
  ```

- Note in the arguments `--nproc_per_node` directs on how many local pytorch workers could run per node (typically is equal to number of CUDA devices), '--workers` is number of workers in Pytorch Dataloader 


In [6]:
# Run the deployment
!kubectl apply -f kube/imagenet.yaml
# Verify ElasticJob
!kubectl describe elasticjob -n elastic-job

elasticjob.elastic.pytorch.org/imagenet created
Name:         imagenet
Namespace:    elastic-job
Labels:       <none>
Annotations:  <none>
API Version:  elastic.pytorch.org/v1alpha1
Kind:         ElasticJob
Metadata:
  Creation Timestamp:  2021-05-25T19:42:06Z
  Generation:          2
  Managed Fields:
    API Version:  elastic.pytorch.org/v1alpha1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:maxReplicas:
        f:minReplicas:
        f:rdzvEndpoint:
        f:replicaSpecs:
          .:
          f:Worker:
            .:
            f:replicas:
            f:restartPolicy:
            f:template:
              .:
              f:spec:
                .:
                f:nodeSelector:
                  .:
                  f:kubernetes.azure.com/scalesetpriority:
                f:tolerations:
                f:volumes:
    Manager:      ku

In [7]:
# Verify worker pods run in the SpotVM Nodes
!kubectl get pods  -n elastic-job -o wide

NAME                                          READY   STATUS    RESTARTS   AGE   IP            NODE                                 NOMINATED NODE   READINESS GATES
elastic-job-k8s-controller-5b9bc6b79c-xvdsw   1/1     Running   0          42h   10.244.13.2   aks-cpuworkers-40607851-vmss000003   <none>           <none>
etcd                                          1/1     Running   0          42h   10.244.12.2   aks-cpuworkers-40607851-vmss000000   <none>           <none>
imagenet-worker-0                             1/1     Running   0          5s    10.244.16.9   aks-spotgpu-40607851-vmss000000      <none>           <none>
imagenet-worker-1                             1/1     Running   0          5s    10.244.18.6   aks-spotgpu-40607851-vmss000002      <none>           <none>
imagenet-worker-2                             1/1     Running   0          5s    10.244.18.7   aks-spotgpu-40607851-vmss000002      <none>           <none>


## Verify Training Logs
Pods might stay in 'ContainerCreating' state  for a few minutes, while pulling the image ( image size is quite big and dockerhub is rate limiting). To optimize you could build container image and push it in ACR

Get logs from workers and note how both workers joined the Rendezvous worker group with the same version:
- imagenet-worker-0 pod `rendezvous version 1 as rank 0`
- imagenet-worker-1 pod `rendezvous version 1 as rank 1`

In [10]:
!kubectl logs imagenet-worker-0 -n elastic-job

[INFO] 2021-05-25 19:42:09,129 launch: Running torchelastic.distributed.launch with args: ['/opt/conda/lib/python3.7/site-packages/torchelastic/distributed/launch.py', '--rdzv_backend=etcd', '--rdzv_endpoint=etcd-service:2379', '--rdzv_id=imagenet', '--nnodes=1:3', '--nproc_per_node=1', '/workspace/examples/imagenet/main.py', '--arch=resnet18', '--epochs=3', '--batch-size=64', '--workers=0', '/workspace/data/tiny-imagenet-200', '--checkpoint-file=/mnt/blob/data/checkpoint.pth.tar']
INFO 2021-05-25 19:42:09,139 Etcd machines: ['http://0.0.0.0:2379']
[INFO] 2021-05-25 19:42:09,149 launch: Using nproc_per_node=1.
[INFO] 2021-05-25 19:42:09,878 api: [default] starting workers for function: wrapper_fn
[INFO] 2021-05-25 19:42:09,879 api: [default] Rendezvous'ing worker group
INFO 2021-05-25 19:42:09,879 Attempting to join next rendezvous
INFO 2021-05-25 19:42:09,882 Observed existing rendezvous state: {'status': 'final', 'version': '2', 'participants': [0, 1, 2], 'keep_alives': ['/torchelast

In [9]:
!kubectl logs imagenet-worker-1 -n elastic-job

[INFO] 2021-05-25 19:42:09,312 launch: Running torchelastic.distributed.launch with args: ['/opt/conda/lib/python3.7/site-packages/torchelastic/distributed/launch.py', '--rdzv_backend=etcd', '--rdzv_endpoint=etcd-service:2379', '--rdzv_id=imagenet', '--nnodes=1:3', '--nproc_per_node=1', '/workspace/examples/imagenet/main.py', '--arch=resnet18', '--epochs=3', '--batch-size=64', '--workers=0', '/workspace/data/tiny-imagenet-200', '--checkpoint-file=/mnt/blob/data/checkpoint.pth.tar']
INFO 2021-05-25 19:42:09,320 Etcd machines: ['http://0.0.0.0:2379']
[INFO] 2021-05-25 19:42:09,333 launch: Using nproc_per_node=1.
[INFO] 2021-05-25 19:42:10,073 api: [default] starting workers for function: wrapper_fn
[INFO] 2021-05-25 19:42:10,073 api: [default] Rendezvous'ing worker group
INFO 2021-05-25 19:42:10,073 Attempting to join next rendezvous
INFO 2021-05-25 19:42:10,078 Observed existing rendezvous state: {'status': 'joinable', 'version': '3', 'participants': [0]}
INFO 2021-05-25 19:42:10,156 Jo

## Checkpoint saved and restored
Once Epoch training is completed you would see that training script saved the checkpoint in the Azure Blob storage. It takes about 10 min on  `Standard_NC12` node to run one Epoch iteration. **Stop** cell execution once logs indicate that checkpoint was saved after completing Epoch 0 training

In [23]:
 # Stream logs from all workers until we see checkpoint is saved, then stop the Cell execution!!
 !kubectl logs -ljob-name=imagenet -n elastic-job -f

=> creating model: resnet18
=> loading checkpoint file: /mnt/blob/data/checkpoint.pth.tar
=> loaded checkpoint file: /mnt/blob/data/checkpoint.pth.tar
=> using checkpoint from rank: 1, max_epoch: 1
=> checkpoint broadcast size is: 93588276
=> done broadcasting checkpoint
=> done restoring from previous checkpoint
=> start_epoch: 2, best_acc1: 1.1100000143051147
Epoch: [2][  0/782]	Time  4.633 ( 4.633)	Data  1.505 ( 1.505)	Loss 3.6480e+00 (3.6480e+00)	Acc@1  18.75 ( 18.75)	Acc@5  45.31 ( 45.31)
Epoch: [2][ 10/782]	Time  1.747 ( 2.258)	Data  1.363 ( 1.524)	Loss 3.9434e+00 (3.8631e+00)	Acc@1  14.06 ( 15.48)	Acc@5  32.81 ( 35.51)
=> loading checkpoint file: /mnt/blob/data/checkpoint.pth.tar
=> loaded checkpoint file: /mnt/blob/data/checkpoint.pth.tar
=> using checkpoint from rank: 1, max_epoch: 1
=> checkpoint broadcast size is: 93588276
=> done broadcasting checkpoint
=> done restoring from previous checkpoint
=> start_epoch: 2, best_acc1: 1.1100000143051147
Epoch: [2][  0/782]	Time  4.64

In [29]:
# Verify Checkpoint and model file created by execing to pod 
!kubectl exec  -n elastic-job imagenet-worker-0 -- ls /mnt/blob/data

checkpoint.pth.tar
model_best.pth.tar


Now that we have training running proceed to [Step4 Simulate Spot node Eviction](/Step4-SimulateStop.ipynb)