## Quick examples to demostrate AML Ray/Dask cluster usage

### Interactive use cases

In [None]:
pip install --upgrade ray-on-aml

In [1]:
from azureml.core import Workspace, Experiment, Environment, Datastore, Dataset, ScriptRunConfig
from azureml.core.runconfig import PyTorchConfiguration
# from azureml.widgets import RunDetails
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import PyTorchConfiguration
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies
from IPython.display import clear_output
import time
import platform
import sys
import importlib


In [2]:
#You can pre-provision "worker-cpu-v3" in the same vnet with your compute instance
from ray_on_aml.core import Ray_On_AML
ws = Workspace.from_config()
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="d15-v2", additional_pip_packages=['torch==1.10.0', 'torchvision', 'sklearn'], maxnode=4)
ray = ray_on_aml.getRay(ci_is_head=True)
# Note that by default, ci_is_head=False which means one of the nodes in the remote AML compute cluster is used as head node and the remaining are worker nodes. 
# But if you want to use your current compute instance as head node and all nodes in the remote compute cluster as workers 
#then simply specify ray = ray_on_aml.getRay(ci_is_head=True)
time.sleep(20)
ray.cluster_resources()

Cancel active AML runs if any
Canceling active run  ray_on_aml_1640910389_bc0c5b72
Shutting down ray if any
Found existing cluster d15-v2
Waiting cluster to start and return head node ip
................Headnode has IP: 10.0.0.18


{'CPU': 80.0,
 'object_store_memory': 40000000000.0,
 'node:10.0.0.20': 1.0,
 'memory': 539317611520.0,
 'node:10.0.0.19': 1.0,
 'node:10.0.0.21': 1.0,
 'node:10.0.0.18': 1.0}

#### Dask on Ray

In [6]:

from ray.util.dask import ray_dask_get
import dask
import dask.array as da
import dask.dataframe as dd
import pandas as pd
import numpy as np
dask.config.set(scheduler=ray_dask_get)
d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))

# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)

# Set the scheduler to ray_dask_get in your config so you don't have to
# specify it on each compute call.

df = dd.from_pandas(
    pd.DataFrame(
        np.random.randint(0, 10000, size=(1024, 2)), columns=["age", "grade"]),
    npartitions=2)
df.groupby(["age"]).mean().compute()


Unnamed: 0_level_0,grade
age,Unnamed: 1_level_1
15,7497.0
43,7553.0
68,2953.0
98,2709.0
115,851.0
...,...
9909,4950.0
9910,195.0
9918,1965.0
9971,2804.0


In [7]:
from adlfs import AzureBlobFileSystem

abfs = AzureBlobFileSystem(account_name="azureopendatastorage",  container_name="isdweatherdatacontainer")
data = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2012/"], filesystem=abfs)
data1 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2015/"], filesystem=abfs)
data2 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2010/"], filesystem=abfs)
data3 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2009/"], filesystem=abfs)
data4 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2011/"], filesystem=abfs)
data5 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2013/"], filesystem=abfs)
data6 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2014/"], filesystem=abfs)


Metadata Fetch Progress: 100%|██████████| 16/16 [00:07<00:00,  2.19it/s]


In [None]:
all_data =data.union(data1).union(data2).union(data3).union(data4).union(data5).union(data6)
all_data.count()

In [None]:
start = time.time()
#convert Ray dataset to Dask dataframe 
all_data_dask = data.to_dask().describe().compute()
print(all_data_dask)
stop = time.time()
print("duration ", (stop-start))
#717s for single machine nc6
# duration  307.69699811935425s for CI as head and 4 workers of DS14_v2


#### Ray Tune for distributed ML tunning

In [None]:
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
# import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        # In this example, we don't change the model architecture
        # due to simplicity.
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)
# Change these values if you want the training to run quicker or slower.
EPOCH_SIZE = 512
TEST_SIZE = 256

def train(model, optimizer, train_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # We set this just for the example to run quickly.
        if batch_idx * len(data) > EPOCH_SIZE:
            return
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()


def test(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(data_loader):
            # We set this just for the example to run quickly.
            if batch_idx * len(data) > TEST_SIZE:
                break
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    return correct / total
def train_mnist(config):
    # Data Setup
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])

    train_loader = DataLoader(
        datasets.MNIST("~/data", train=True, download=True, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    test_loader = DataLoader(
        datasets.MNIST("~/data", train=False, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = ConvNet()
    model.to(device)

    optimizer = optim.SGD(
        model.parameters(), lr=config["lr"], momentum=config["momentum"])
    for i in range(10):
        train(model, optimizer, train_loader)
        acc = test(model, test_loader)

        # Send the current training result back to Tune
        tune.report(mean_accuracy=acc)

        if i % 5 == 0:
            # This saves the model to the trial directory
            torch.save(model.state_dict(), "./model.pth")
search_space = {
    "lr": tune.sample_from(lambda spec: 10**(-10 * np.random.rand())),
    "momentum": tune.uniform(0.01, 0.09)
}

# Uncomment this to enable distributed execution
# ray.shutdown()
# ray.init(address="auto",ignore_reinit_error=True)
# ray.init(address =f'ray://{headnode_private_ip}:10001',allow_multiple=True,ignore_reinit_error=True )
# Download the dataset first
datasets.MNIST("~/data", train=True, download=True)

analysis = tune.run(train_mnist, config=search_space)


In [None]:
 import sklearn.datasets
 import sklearn.metrics
 from sklearn.model_selection import train_test_split
 import xgboost as xgb

 from ray import tune


 def train_breast_cancer(config):
     # Load dataset
     data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
     # Split into train and test set
     train_x, test_x, train_y, test_y = train_test_split(
         data, labels, test_size=0.25)
     # Build input matrices for XGBoost
     train_set = xgb.DMatrix(train_x, label=train_y)
     test_set = xgb.DMatrix(test_x, label=test_y)
     # Train the classifier
     results = {}
     xgb.train(
         config,
         train_set,
         evals=[(test_set, "eval")],
         evals_result=results,
         verbose_eval=False)
     # Return prediction accuracy
     accuracy = 1. - results["eval"]["error"][-1]
     tune.report(mean_accuracy=accuracy, done=True)


 config = {
     "objective": "binary:logistic",
     "eval_metric": ["logloss", "error"],
     "max_depth": tune.randint(1, 9),
     "min_child_weight": tune.choice([1, 2, 3]),
     "subsample": tune.uniform(0.5, 1.0),
     "eta": tune.loguniform(1e-4, 1e-1)
 }
 analysis = tune.run(
     train_breast_cancer,
     resources_per_trial={"cpu": 1},
     config=config,
     num_samples=10)


#### Distributed XGBoost https://docs.ray.io/en/latest/xgboost-ray.html

In [None]:
# pip install xgboost_ray 

In [None]:
from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

seed = 42

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=0.25, random_state=42
)

clf = RayXGBClassifier(
    n_jobs=10,  # In XGBoost-Ray, n_jobs sets the number of actors
    random_state=seed
)

# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.

clf.fit(X_train, y_train)

pred_ray = clf.predict(X_test)
print(pred_ray.shape)

pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray.shape)

# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization

clf.fit(X_train, y_train, ray_params=RayParams(num_actors=10))

pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=10))
print(pred_ray.shape)


In [None]:
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=10,  # Number of remote actors
        cpus_per_actor=1))

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))


In [None]:
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

num_actors = 10
num_cpus_per_actor = 1

ray_params = RayParams(
    num_actors=num_actors,
    cpus_per_actor=num_cpus_per_actor)

def train_model(config):
    train_x, train_y = load_breast_cancer(return_X_y=True)
    train_set = RayDMatrix(train_x, train_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(train_set, "train")],
        verbose_eval=False,
        ray_params=ray_params)
    bst.save_model("model.xgb")

from ray import tune

# Specify the hyperparameter search space.
config = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
    "eta": tune.loguniform(1e-4, 1e-1),
    "subsample": tune.uniform(0.5, 1.0),
    "max_depth": tune.randint(1, 9)
}

# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`
analysis = tune.run(
    train_model,
    config=config,
    metric="train-error",
    mode="min",
    num_samples=4,
    resources_per_trial=ray_params.get_tune_resources())
print("Best hyperparameters", analysis.best_config)

### Shutdown when not used


In [None]:
ray_on_aml.shutdown()


### Ray on Job Cluster

In [None]:
ws = Workspace.from_config()

compute_cluster = 'worker-cpu-v3'
maxnode =5
vm_size='STANDARD_DS3_V2'
vnet='rayvnet'
subnet='default'
exp ='ray_on_aml_job'
ws_detail = ws.get_details()
ws_rg = ws_detail['id'].split("/")[4]
vnet_rg=None
try:
    ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)

    print('Found existing cluster, use it.')
except ComputeTargetException:
    if vnet_rg is None:
        vnet_rg = ws_rg
    compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                        min_nodes=0, max_nodes=maxnode,
                                                        vnet_resourcegroup_name=vnet_rg,
                                                        vnet_name=vnet,
                                                        subnet_name=subnet)
    ray_cluster = ComputeTarget.create(ws, compute_cluster, compute_config)

    ray_cluster.wait_for_completion(show_output=True)


rayEnv = Environment.from_conda_specification(name = "rayEnv",
                                             file_path = "../examples/conda_env.yml")

# rayEnv = Environment.get(ws, "rayEnv", version=19)


src = ScriptRunConfig(source_directory='../examples/job',
                script='aml_job.py',
                environment=rayEnv,
                compute_target=ray_cluster,
                distributed_job_config=PyTorchConfiguration(node_count=maxnode),
                    # arguments = ["--master_ip",master_ip]
                )
run = Experiment(ws, exp).submit(src)

In [None]:
from azureml.widgets import RunDetails
RunDetails(run).show()
