## Quick examples to demostrate AML Ray/Dask cluster usage

### Interactive use cases

In [0]:
# pip install --upgrade ray-on-aml, pandas

In [0]:
from azureml.core import Workspace, Experiment, Environment,ScriptRunConfig
# from azureml.widgets import RunDetails
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.environment import Environment
from IPython.display import clear_output
import time




#### Please make sure to create compute cluster in the same vnet with your compute instance. You need to have vnet, otherwise compute cannot communicate with each other

In [0]:
from ray_on_aml.core import Ray_On_AML

ws = Workspace.from_config()
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="d15-v2",additional_pip_packages=['torch==1.10.0', 'torchvision', 'sklearn'], maxnode=2,exp_name='ray-on-aml-test')

ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="d15-v2", maxnode=2)
ray = ray_on_aml.getRay()
# Note that by default, ci_is_head=True which means  compute instance as head node and all nodes in the remote compute cluster as workers 
# But if you want to use one of the nodes in the remote AML compute cluster is used as head node and the remaining are worker nodes.
# then simply specify ray = ray_on_aml.getRay(ci_is_head=False)
# To install additional library, use additional_pip_packages and additional_conda_packages parameters.
time.sleep(50)
ray.cluster_resources()

In [0]:
ray.cluster_resources()

#### Dask on Ray

##### You can use Dask on this Ray cluster by telling Dask to use Ray as the scheduler. By doing this, you will have a cluster with both Dask and Ray without having to setup them saperately

In [0]:
#Scaling up date with Dask dataframe API.
#Please make sure you have pandas version 1.4+ and restart to run this successfully.
# pip install pandas==1.4.2
import dask
from ray.util.dask import ray_dask_get,enable_dask_on_ray
enable_dask_on_ray()

import dask.dataframe as dd

storage_options = {'account_name': 'azureopendatastorage'}
ddf = dd.read_parquet('az://nyctlc/green/puYear=2015/puMonth=*/*.parquet', storage_options=storage_options)
ddf.count().compute()


### Ray Dataset

In [0]:
#Ray also support Ray dataset. You can read into ray dataset then convert to Dask or other ML format which is convenient for ML training.https://docs.ray.io/en/latest/data/dataset.html
from adlfs import AzureBlobFileSystem

abfs = AzureBlobFileSystem(account_name="azureopendatastorage",  container_name="isdweatherdatacontainer")
#if read all years and months
# data = ray.data.read_parquet("az://isdweatherdatacontainer/ISDWeather//", filesystem=abfs)
data =ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2015/"], filesystem=abfs)


In [0]:
data.count()
# 1,584,481,119 is the count for all data 

In [0]:
start = time.time()
#convert Ray dataset to Dask dataframe 
result = data.to_dask().describe().compute()
print(result)
stop = time.time()
print("duration ", (stop-start))
#717s for single machine nc6
# duration  307.69699811935425s for CI as head and 4 workers of DS14_v2


#### Ray Tune for distributed ML tunning

In [0]:
 import sklearn.datasets
 import sklearn.metrics
 from sklearn.model_selection import train_test_split
 import xgboost as xgb

 from ray import tune


 def train_breast_cancer(config):
     # Load dataset
     data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
     # Split into train and test set
     train_x, test_x, train_y, test_y = train_test_split(
         data, labels, test_size=0.25)
     # Build input matrices for XGBoost
     train_set = xgb.DMatrix(train_x, label=train_y)
     test_set = xgb.DMatrix(test_x, label=test_y)
     # Train the classifier
     results = {}
     xgb.train(
         config,
         train_set,
         evals=[(test_set, "eval")],
         evals_result=results,
         verbose_eval=False)
     # Return prediction accuracy
     accuracy = 1. - results["eval"]["error"][-1]
     tune.report(mean_accuracy=accuracy, done=True)


 config = {
     "objective": "binary:logistic",
     "eval_metric": ["logloss", "error"],
     "max_depth": tune.randint(1, 9),
     "min_child_weight": tune.choice([1, 2, 3]),
     "subsample": tune.uniform(0.5, 1.0),
     "eta": tune.loguniform(1e-4, 1e-1)
 }
 analysis = tune.run(
     train_breast_cancer,
     resources_per_trial={"cpu": 1},
     config=config,
     num_samples=10)


#### Distributed XGBoost https://docs.ray.io/en/latest/xgboost-ray.html

In [0]:
from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

seed = 42

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=0.25, random_state=42
)

clf = RayXGBClassifier(
    n_jobs=10,  # In XGBoost-Ray, n_jobs sets the number of actors
    random_state=seed
)

# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.

clf.fit(X_train, y_train)

pred_ray = clf.predict(X_test)
print(pred_ray.shape)

pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray.shape)

# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization

clf.fit(X_train, y_train, ray_params=RayParams(num_actors=10))

pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=10))
print(pred_ray.shape)


In [0]:
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=10,  # Number of remote actors
        cpus_per_actor=1))

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))


### Ray Serve (https://docs.ray.io/en/latest/serve/index.html)

In [0]:
import requests

from ray import serve

serve.start()


@serve.deployment
def hello(request):
    name = request.query_params["name"]
    return f"Hello {name}!"


hello.deploy()

# Query our endpoint over HTTP.
response = requests.get("http://127.0.0.1:8000/hello?name=serve").text
assert response == "Hello serve!"

### Reinforcement Learning

In [0]:
## Install library at compute instance: pip install gym,dm-tree

In [0]:
#Install additional library at Ray cluster
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="d15-v2",additional_pip_packages=['torch==1.10.0', 'torchvision', 'sklearn', 'pyspark','gym==0.2.1','dm-tree','scikit-image','opencv-python','tensorflow'], maxnode=1)
ray = ray_on_aml.getRay()
time.sleep(20)
ray.cluster_resources()

In [0]:
# Import the RL algorithm (Trainer) we would like to use.
from ray.rllib.agents.ppo import PPOTrainer

# Configure the algorithm.
config = {
    # Environment (RLlib understands openAI gym registered strings).
    "env": "Taxi-v3",
    # Use 2 environment workers (aka "rollout workers") that parallelly
    # collect samples from their own environment clone(s).
    "num_workers": 2,
    # Change this to "framework: torch", if you are using PyTorch.
    # Also, use "framework: tf2" for tf2.x eager execution.
    "framework": "torch",
    # Tweak the default model provided automatically by RLlib,
    # given the environment's observation- and action spaces.
    "model": {
        "fcnet_hiddens": [64, 64],
        "fcnet_activation": "relu",
    },
    # Set up a separate evaluation worker set for the
    # `trainer.evaluate()` call after training (see below).
    "evaluation_num_workers": 1,
    # Only for evaluation runs, render the env.
    "evaluation_config": {
        "render_env": True,
    }
}

# Create our RLlib Trainer.
trainer = PPOTrainer(config=config)

# Run it for n training iterations. A training iteration includes
# parallel sample collection by the environment workers as well as
# loss calculation on the collected batch and a model update.
for _ in range(3):
    print(trainer.train())

# Evaluate the trained Trainer (and render each timestep to the shell's
# output).
trainer.evaluate()



### Shutdown interactive cluster when not used

In [0]:
ray_on_aml.shutdown()


### Ray on Job Cluster with GPU (you don't need interactive Ray cluster to be on to submit the AML job )

In [0]:
from azureml.core import Workspace, Experiment, Environment,ScriptRunConfig
# from azureml.widgets import RunDetails
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DockerConfiguration,RunConfiguration

#Remember the AML job has to have distribted setings (MPI type) for ray-on-aml to work correctly.
ws = Workspace.from_config()
compute_cluster = 'gpunc6' #This can be another cluster different from the interactive cluster. 
ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)

aml_run_config_ml = RunConfiguration(communicator='OpenMpi')
docker_config = DockerConfiguration(use_docker=True, shm_size='48gb')


rayEnv = Environment.from_conda_specification(name = "RLEnv",
                                             file_path = "conda_env.yml")
rayEnv.docker.base_image = "mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.1-cudnn8-ubuntu18.04:20220329.v1"

aml_run_config_ml.target = ray_cluster
aml_run_config_ml.node_count = 2
aml_run_config_ml.environment = rayEnv
aml_run_config_ml.docker =docker_config

src = ScriptRunConfig(source_directory='../examples/job',
                    script='aml_job.py',
                    run_config = aml_run_config_ml,
                   )

run = Experiment(ws, "rl_on_aml_job").submit(src)


In [0]:
from azureml.widgets import RunDetails
RunDetails(run).show()
