# Run Dask Distribution in Azure Machine Learning

In this notebook, we run Dask distributed training in Azure Machine Learning.<br>
When the training has completed, the computing instances will automatically be scaled down to 0 instances.

To run this notebook,

1. Create new "Machine Learning" resource in [Azure Portal](https://portal.azure.com/).
2. Install Azure Machine Learning CLI v2 on Ubuntu as follows

```
# install Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
# install AML CLI extension
az extension add --name ml
```

Here I use built-in Dask ML function (```dask_ml.linear_model.LinearRegression```) in this tutorial, but you can run **a variety of scikit-learn compliant functions and jobs** in distributed manners with Dask cluster.<br>
For dask distribution, see "[Run Distributed Dask on Azure Kubernetes Service](https://tsmatz.wordpress.com/2021/05/17/dask-distributed-on-azure-kubernetes/)" for details.

> Note : You can now also use Python package ```ray-on-aml``` for running dask on ray in Azure Machine Learning computes. (See [here](https://github.com/microsoft/ray-on-aml).)

## 1. Create Python script for Distributed Dask training (Regression Example)

Create a directory for saving your script.

In [1]:
import os
script_folder = './script'
os.makedirs(script_folder, exist_ok=True)

Save a script file (```dask_distributed_example.py```) for Distributed Dask training.<br>
Here we run 4 nodes with the following roles.

- Rank 0 : Dask Scheduler
- Rank 1 : Dask Worker
- Rank 2 : Dask Worker
- Rank 3 : Dask Client

Here I create Python script for each 3 roles as follows.

In this example, we only output the evaluation score in the console, but you can also save the generated model as AML outputs.

In [2]:
%%writefile script/dask_scheduler.py
import argparse
import asyncio
from dask.distributed import Scheduler
import mpi4py
from mpi4py import MPI

parser = argparse.ArgumentParser()
parser.add_argument("--node_count",
    type=int,
    required=True,
    help="number of nodes")
args = parser.parse_args()

mpi_comm = MPI.COMM_WORLD
mpi_rank = mpi_comm.Get_rank()

async def f_scheduler():
    # Send ready message to client
    req = mpi_comm.isend('ready', dest=args.node_count-1, tag=mpi_rank)
    req.wait()
    # Start scheduler
    s = Scheduler(port=8786)
    # Wait requests
    s = await s
    # Finalize
    await s.finished()
asyncio.get_event_loop().run_until_complete(f_scheduler())

Writing script/dask_scheduler.py


In [3]:
%%writefile script/dask_worker.py
import argparse
import asyncio
from dask.distributed import Worker
import mpi4py
from mpi4py import MPI

parser = argparse.ArgumentParser()
parser.add_argument("--scheduler_address",
    type=str,
    required=True,
    help="dask scheduler address")
parser.add_argument("--node_count",
    type=int,
    required=True,
    help="number of nodes")
args = parser.parse_args()

mpi_comm = MPI.COMM_WORLD
mpi_rank = mpi_comm.Get_rank()

async def f_worker(scheduler_address):
    # Send ready message to client
    req = mpi_comm.isend('ready', dest=args.node_count-1, tag=mpi_rank)
    req.wait()
    # Start Worker
    w = await Worker(scheduler_address)
    # Wait for worker's complete
    await w.finished()
asyncio.get_event_loop().run_until_complete(f_worker(args.scheduler_address))

Writing script/dask_worker.py


In [4]:
%%writefile script/dask_client.py
import argparse
import asyncio
import time
from dask.distributed import Client
from dask_ml.datasets import make_regression
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
import mpi4py
from mpi4py import MPI

parser = argparse.ArgumentParser()
parser.add_argument("--scheduler_address",
    type=str,
    required=True,
    help="dask scheduler address")
parser.add_argument("--node_count",
    type=int,
    required=True,
    help="number of nodes")
args = parser.parse_args()

# Wait for all server's ready message
mpi_comm = MPI.COMM_WORLD
mpi_rank = mpi_comm.Get_rank()
for n in range(args.node_count - 1):
    req = mpi_comm.irecv(source=n, tag=n)
    data = req.wait()

# Create client
time.sleep(3)
c = Client(args.scheduler_address)

# Run program !
X, y = make_regression(
  n_samples=100000,
  n_features=4,
  chunks=50)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)
lr = LinearRegression()
lr.fit(X_train, y_train)
print('##### Trained Model Result #####')
print('Score : {}'.format(lr.score(X_test, y_test)))

# simple test
# y = c.submit(lambda x: x + 1, 10)
# print('The result is {}'.format(y.result()))

# Stop scheduler and wokers
c.shutdown()

Writing script/dask_client.py


## 2. Create shell script to run each role

Create shell script for running each roles and starting training in Dask distribution job.

> Note : See [here](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-train-distributed-gpu) for environment variables in AML distributed cluster.<br>
> The following ```$AZ_BATCHAI_MPI_MASTER_NODE``` is an environment variable for MPI master's host name (such as, ```10.5.0.4```). You can also use the following shell command to retreive master's host name.<br>
> ```cut -d ":" -f 1 <<< $AZ_BATCH_MASTER_NODE```

In [5]:
%%writefile script/run.sh
if [ $OMPI_COMM_WORLD_RANK -eq 0 ]
then
    echo "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    echo "Dask Scheduler"
    echo "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    python dask_scheduler.py --node_count $OMPI_COMM_WORLD_SIZE
elif [ $OMPI_COMM_WORLD_RANK -eq $((OMPI_COMM_WORLD_SIZE-1)) ]
then
    echo "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    echo "Dask Client"
    echo "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    python dask_client.py --scheduler_address $AZ_BATCHAI_MPI_MASTER_NODE:8786 --node_count $OMPI_COMM_WORLD_SIZE
else
    echo "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    echo "Dask Worker"
    echo "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    python dask_worker.py --scheduler_address $AZ_BATCHAI_MPI_MASTER_NODE:8786 --node_count $OMPI_COMM_WORLD_SIZE
fi

Writing script/run.sh


## 3. Submit Job in Azure Machine Learning

### Prepare for connecting to Azure Machine Learning workspace

Login to Azure and prepare for connecting to Azure Machine Learning (AML) workspace.<br>
Please fill the following subscription id, AML workspace name, and resource group name.

In [None]:
!az login

In [None]:
!az account set -s {AZURE_SUBSCRIPTION_ID}

In [None]:
my_resource_group = "{AML_RESOURCE_GROUP_NAME}"
my_workspace = "{AML_WORSPACE_NAME}"

### Create cluster (multiple nodes)

Create a remote cluster with 4 nodes - 1 scheduler, 2 workers, and 1 client.

In [7]:
!az ml compute create --name cluster01 \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace \
  --type amlcompute \
  --min-instances 0 \
  --max-instances 4 \
  --size Standard_DS2_v2

{
  "id": "/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourceGroups/rg-AML/providers/Microsoft.MachineLearningServices/workspaces/ws01/computes/cluster01",
  "idle_time_before_scale_down": 120,
  "location": "eastus",
  "max_instances": 4,
  "min_instances": 0,
  "name": "cluster01",
  "network_settings": {},
  "provisioning_state": "Succeeded",
  "resourceGroup": "rg-AML",
  "size": "STANDARD_DS2_V2",
  "ssh_public_access_enabled": true,
  "tier": "dedicated",
  "type": "amlcompute"
}
[0m

### Create AML environment for dask distribution 

Now we prepare custom environment to run dask distribution job.

In [8]:
%%writefile conda_dask_distribution.yml
name: dask_environment
dependencies:
- python=3.8
- pip:
  - mpi4py
  - dask
  - distributed
  - dask-ml
channels:
- anaconda
- conda-forge

Writing conda_dask_distribution.yml


In [9]:
%%writefile env_dask_distribution.yml
$schema: https://azuremlschemas.azureedge.net/latest/environment.schema.json
name: dask-distribution-env
image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04
conda_file: conda_dask_distribution.yml
description: environment for dask distribution

Writing env_dask_distribution.yml


In [10]:
!az ml environment create --file env_dask_distribution.yml \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace

{
  "conda_file": {
    "channels": [
      "anaconda",
      "conda-forge"
    ],
    "dependencies": [
      "python=3.8",
      {
        "pip": [
          "mpi4py",
          "dask",
          "distributed",
          "dask-ml"
        ]
      }
    ],
    "name": "dask_environment"
  },
  "creation_context": {
    "created_at": "2022-08-25T08:36:50.303293+00:00",
    "created_by": "Tsuyoshi Matsuzaki",
    "created_by_type": "User",
    "last_modified_at": "2022-08-25T08:36:50.303293+00:00",
    "last_modified_by": "Tsuyoshi Matsuzaki",
    "last_modified_by_type": "User"
  },
  "description": "environment for dask distribution",
  "id": "azureml:/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourceGroups/rg-AML/providers/Microsoft.MachineLearningServices/workspaces/ws01/environments/dask-distribution-env/versions/1",
  "image": "mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
  "name": "dask-distribution-env",
  "os_type": "linux",
  "resourceGroup": "rg-AML",
  "tag

### Submit Job

Now let's run dask distributed job.

In [11]:
%%writefile train_dask_distribution.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
code: script
command: bash run.sh
environment: azureml:dask-distribution-env@latest
compute: azureml:cluster01
display_name: dask_dist_test
experiment_name: dask_dist_test
resources:
  instance_count: 4
distribution:
  type: mpi
  process_count_per_instance: 1
description: Dask distribution job

Writing train_dask_distribution.yml


In [12]:
!az ml job create --file train_dask_distribution.yml \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace

[32mUploading script (0.0 MBs): 100%|█████████| 3494/3494 [00:01<00:00, 3408.26it/s][0m
[39m

{
  "code": "azureml:/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourceGroups/rg-AML/providers/Microsoft.MachineLearningServices/workspaces/ws01/codes/effc4de9-bd46-4f4d-b6e1-9f38102180d8/versions/1",
  "command": "bash run.sh",
  "compute": "azureml:cluster01",
  "creation_context": {
    "created_at": "2022-08-25T08:37:23.564101+00:00",
    "created_by": "Tsuyoshi Matsuzaki",
    "created_by_type": "User"
  },
  "description": "Dask distribution job",
  "display_name": "dask_dist_test",
  "distribution": {
    "process_count_per_instance": 1,
    "type": "mpi"
  },
  "environment": "azureml:dask-distribution-env:1",
  "environment_variables": {},
  "experiment_name": "dask_dist_test",
  "id": "azureml:/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourceGroups/rg-AML/providers/Microsoft.MachineLearningServices/workspaces/ws01/jobs/cool_feather_zpcjfq8pqw",
  "inputs": {},

## 4. Check results

Let's see the output results in [Azure Machine Learning studio](https://ml.azure.com/).

![AML outputs](./output_artifact.jpg)

You can download all logs and outputs in local folder and analyze.<br>
Now, let's check the output results in rank 3 (dask client).

In [13]:
job_name = "{FILL_JOB_NAME}"
# Example : job_name = "cool_feather_zpcjfq8pqw"

In [14]:
!az ml job download --name $job_name \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace \
  --download-path dask_training_result

Downloading artifact azureml://datastores/workspaceartifactstore/ExperimentRun/dcid.cool_feather_zpcjfq8pqw to dask_training_result/artifacts
[0m

In [15]:
!tail -n 30 dask_training_result/artifacts/azureml-logs/70_driver_log_3.txt

[21281da39c3740c6888b333530768964000004:00130] btl:tcp: Attempting to bind to AF_INET port 1024
[21281da39c3740c6888b333530768964000004:00130] btl:tcp: Successfully bound to AF_INET port 1024
[21281da39c3740c6888b333530768964000004:00130] btl:tcp: my listening v4 socket is 0.0.0.0:1024
[21281da39c3740c6888b333530768964000004:00130] btl:tcp: examining interface eth0
[21281da39c3740c6888b333530768964000004:00130] btl:tcp: using ipv6 interface eth0
[21281da39c3740c6888b333530768964000004:00130] select: init of component tcp returned success
[21281da39c3740c6888b333530768964000004:00130] select: initializing btl component vader
[21281da39c3740c6888b333530768964000004:00130] select: init of component vader returned failure
[21281da39c3740c6888b333530768964000004:00130] mca: base: close: component vader closed
[21281da39c3740c6888b333530768964000004:00130] mca: base: close: unloading component vader
[21281da39c3740c6888b333530768964000004:00130] select: initializing btl component s

## 5. Clean-up (Remove cluster)

In [None]:
!az ml compute delete --name cluster01 \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace \
  --yes