# RAPIDS Taxi demo on AzureML

References:
 - https://github.com/drabastomek/MLADS_RAPIDS/blob/master/notebook/1_pandasVsRapids_ETL.ipynb
 - https://github.com/danielsc/azureml-and-dask/blob/master/LoadDataFromDatastore.ipynb
 - https://github.com/danielsc/azureml-and-dask/blob/master/StartDask.ipynb
 - https://medium.com/@santiagof/the-holy-bible-of-azure-machine-learning-service-a-work-through-for-the-believer-part-1-4fe8f9853492
 - https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/training-with-deep-learning/distributed-tensorflow-with-horovod/distributed-tensorflow-with-horovod.ipynb

In [1]:
import os
import json
import time

from azureml.core import Workspace, Experiment, Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.data.data_reference import DataReference
from azureml.core.runconfig import RunConfiguration, MpiConfiguration
from azureml.core import ScriptRunConfig
from azureml.train.estimator import Estimator
from azureml.exceptions import ComputeTargetException  
from azureml.widgets import RunDetails

In [2]:
with open('config.json', 'r') as f:
    config = json.loads(f.read())
    
subscription_id = config["subscription_id"]
resource_group = config["resource_group"]
workspace_name = config["workspace_name"]
gpu_cluster_name = "kdd-gpu-cluster"

ws = Workspace(workspace_name=workspace_name, subscription_id=subscription_id, resource_group=resource_group)

## Build cluster

In [3]:
try:
    gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)
    print('Found existing compute target')
    
except ComputeTargetException:
    print("Creating new cluster")

    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size="Standard_ND12s", 
        min_nodes=1, 
        max_nodes=2,
        idle_seconds_before_scaledown=120,
        admin_username='dask',
        admin_user_password='dask',
        admin_user_ssh_key=open(os.path.expanduser('~/.ssh/id_rsa.pub')).read().strip()
    )
    gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)

    print("waiting for nodes")
    gpu_cluster.wait_for_completion(show_output=True)

Found existing compute target


## Upload data

In [4]:
import os
import urllib.request


cwd = os.getcwd()

data_dir = os.path.abspath(os.path.join(cwd, 'data'))
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

taxidir = os.path.join(data_dir, 'nyctaxi')
if not os.path.exists(taxidir):
    os.makedirs(taxidir)

print("- Downloading NYC Taxi dataset... ", flush=True)
for i in range(1, 13):
    filename = "yellow_tripdata_2015-{month:02d}.csv".format(month=i)
    local_path = os.path.join(taxidir, filename)
    if not os.path.exists(local_path):
        url = "http://dask-data.s3.amazonaws.com/nyc-taxi/2015/" + filename
        urllib.request.urlretrieve(url, local_path)
        print("Downloaded " + filename, flush=True)
    else:
        print(filename + " already exists", flush=True)
print("Finished downloads", flush=True)

    
print("- Uploading taxi data... ")
ws = Workspace.from_config()
ds = ws.get_default_datastore()

ds.upload(src_dir=taxidir,
          target_path='nyctaxi',
          show_progress=True)

print("** Finished! **")

- Downloading NYC Taxi dataset... 
yellow_tripdata_2015-01.csv already exists
yellow_tripdata_2015-02.csv already exists
yellow_tripdata_2015-03.csv already exists
yellow_tripdata_2015-04.csv already exists
yellow_tripdata_2015-05.csv already exists
yellow_tripdata_2015-06.csv already exists
yellow_tripdata_2015-07.csv already exists
yellow_tripdata_2015-08.csv already exists
yellow_tripdata_2015-09.csv already exists
yellow_tripdata_2015-10.csv already exists
yellow_tripdata_2015-11.csv already exists
yellow_tripdata_2015-12.csv already exists
Finished downloads
- Uploading taxi data... 
Uploading an estimated of 12 files
Target already exists. Skipping upload for nyctaxi/yellow_tripdata_2015-02.csv
Target already exists. Skipping upload for nyctaxi/yellow_tripdata_2015-01.csv
Target already exists. Skipping upload for nyctaxi/yellow_tripdata_2015-06.csv
Target already exists. Skipping upload for nyctaxi/yellow_tripdata_2015-05.csv
Target already exists. Skipping upload for nyctaxi/ye

## Run dask cluster

In [5]:
mpi_config = MpiConfiguration()
mpi_config.process_count_per_node = 2

est = Estimator(source_directory='./dask',
                compute_target=gpu_cluster,
                entry_script='startDask.py',
                script_params={
                    '--data': ws.get_default_datastore(),
                    '--gpus': str(2),  # The number of GPUs available on each node
                },
                node_count=2,
                use_gpu=True,
                distributed_training=mpi_config,
                conda_dependencies_file='environment.yml', 
               )

run = Experiment(ws, 'conda-dask-test').submit(est)

In [6]:
while not 'headnode' in run.get_metrics():
    print("waiting for scheduler node's ip")
    time.sleep(30)

headnode = run.get_metrics()['headnode']
print('Headnode has IP:', headnode)

waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting for scheduler node's ip
waiting 

---

⚠ *TODO: Get in line SSH port forwarding working.*

For now open a terminal and run 

`ssh -N -L 8787:<headnode IP>:8787 -L 8786:<headnode IP>:8786 dask@<Compute VM IP> -p <Compute VM Port>`

---

In [6]:
# import asyncio
# import asyncssh

# async def forawrd_scheduler_port():
#     async with asyncssh.connect('51.105.168.166', username='dask', port=50001, known_hosts=None) as conn:  # TODO Get IP and port programatically
#         listener_dash = await conn.forward_local_port('', 8787, headnode, 8787)
#         listener_server = await conn.forward_local_port('', 8786, headnode, 8786)
#         await asyncio.gather(listener_dash.wait_closed(), listener_server.wait_closed())
        
# port_forward_future = asyncio.create_task(forawrd_scheduler_port())

In [7]:
import distributed
client = distributed.Client('tcp://localhost:8786')
client

0,1
Client  Scheduler: tcp://localhost:8786  Dashboard: http://localhost:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 78.89 GB


## Taxi example

In [8]:
import dask_cudf
import dask

In [9]:
@dask.delayed
def remote_load(path):
     return dask_cudf.read_csv(path)

gdf = remote_load(os.path.join(run.get_metrics()['data'], 'nyctaxi/*.csv')).compute()

In [20]:
gdf

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
npartitions=91,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,int64,datetime64[ms],datetime64[ms],int64,float64,float64,float64,int64,object,float64,float64,int64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [11]:
f = gdf.passenger_count.sum().persist()
distributed.progress(f)

VBox()

In [15]:
f.compute()

245566747

In [23]:
f = gdf.groupby(gdf.passenger_count).trip_distance.mean().persist()
distributed.progress(f)

VBox()

In [25]:
f.compute().to_pandas()

9    1.800759e-04
3    9.650539e-02
6    7.298475e-01
1    2.016156e+02
2    1.033339e+06
5    1.344814e+05
7    1.932367e-02
8    2.347478e-04
0    5.509554e+02
4    3.635078e+00
dtype: float64

## Array example

In [20]:
import dask.array as da
arr = da.random.random((100_000, 100_00), chunks=(10_000, 1000)).persist()
distributed.progress(arr)

VBox()

In [21]:
m = arr.mean().persist()
distributed.progress(m)

VBox()

In [22]:
float(m)

0.5000059014539492

## Clean up Dask cluster

In [15]:
run.cancel()

## Delete AzureML cluster

In [14]:
gpu_cluster.delete()
try:
    gpu_cluster.wait_for_completion(show_output=True)
except ComputeTargetException:
    print('Deleted')

Deleting

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client


Deleted
