# Running Dask on AzureML


In [None]:
import os, socket, subprocess
from azureml.core import Workspace, Experiment
from azureml.widgets import RunDetails
from azureml.core.runconfig import RunConfiguration, MpiConfiguration
from azureml.train.estimator import Estimator

## Starting the cluster

In [None]:
ws = Workspace.from_config()
ws

In [None]:
ct = ws.compute_targets['dask-cluster']
ct

In [None]:
if 'weather-files' not in ws.datasets:
    ds = Dataset.File.from_files('https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather/*/*/*.parquet', validate=False)
    ds = ds.register(ws, 'weather-files')
else:
    ds = ws.datasets['weather-files']
    
ds

Starting the Dask cluster using an Estimator with MpiConfiguration. Make sure the cluster is able to scale up to 10 nodes or change the `node_count` below. 

In [None]:
est = Estimator('dask', 
                compute_target=ct, 
                entry_script='startDask.py', 
                conda_dependencies_file='environment.yml', 
                script_params={'--datastore': ws.get_default_datastore()},
                inputs=[ds.as_named_input('weather').as_download('/tmp/noaa')],
                node_count=10,
                distributed_training=MpiConfiguration())

run = next(ws.experiments['dask'].get_runs())
#run = Experiment(ws, 'dask').submit(est)
run

In [None]:
#run.cancel()

In [None]:
RunDetails(run).show()

In [None]:
import time, daemon
from IPython.display import clear_output

print("waiting for scheduler node's ip")
while run.get_status() != 'Canceled' and 'headnode' not in run.get_metrics():
    print('.', end ="")
    time.sleep(5)

clear_output()

ci_port = 9798

if run.get_status() == 'Canceled':
    print('Run was canceled')
else:
    dashboard_url = 'https://{}-{}.{}.instances.azureml.net/status'.format(socket.gethostname(), ci_port, ws.get_details()['location'])
    run.log('dashboard_url', dashboard_url) if 'dashboard_url' not in run.get_metrics() else 0

    cmd = f'setsid socat tcp-listen:{ci_port},reuseaddr,fork tcp:{run.get_metrics()["dashboard"]} &'
    print(cmd)
    os.system(cmd)

    print('Setup complete, cluster is ready to use.')

In [None]:
print(dashboard_url)

In [None]:
import dask
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

c = Client(f'tcp://{run.get_metrics()["scheduler"]}')
c.restart()
c

In [None]:
import glob

files = dask.delayed(glob.glob)('/tmp/noaa/**/*.parquet', recursive=True).compute()
#files

In [None]:
df = dd.from_delayed([dask.delayed(pd.read_parquet)(file) for file in files])

In [None]:
df.npartitions

In [None]:
#df = df.repartition(npartitions=10*df.npartitions)
#df.npartitions

In [None]:
#df = df.persist()

In [None]:
df.head()

In [None]:
df.npartitions

In [None]:
%time len(df)

In [None]:
df.datetime = dd.to_datetime(df.datetime).dt.floor('d')

In [None]:
#df = df.set_index(df.datetime, sorted=False).persist()

In [None]:
#%time len(df)

In [None]:
df.describe().compute()

In [None]:
means = df.groupby(df.datetime).mean().compute()
means.head()

In [None]:
import matplotlib
import matplotlib.pyplot as plt

%matplotlib inline 

for col in list(means.columns):
    fig = plt.figure(figsize=(16, 8))
    plt.style.use('dark_background')
    means[col].plot(color='b')
    plt.title('Average of {}'.format(col))
    plt.xlim([datetime(2015, 1, 1), datetime(2015, 12, 1)])
    plt.grid()
    
    run.log_image(col, plot=plt)

In [None]:
df = df.drop(['datetime'], axis=1)

In [None]:
df.index

In [None]:
def write_data(path):
    df.to_parquet(path)

In [None]:
a = dask.delayed(write_data)(ds+'/dask/outputs/isd').compute()

In [None]:
counts = df.groupby([df.index.month, df.index.year]).day.count().compute()

In [None]:
cs = [counts[month][2015] for month in range(1, 13)]
cs

In [None]:
for col in list(means.columns):
    fig = plt.figure(figsize=(16, 8))
    plt.style.use('dark_background')
    means[col].plot(color='b')
    plt.title('Average of {}'.format(col))
    plt.xlim([datetime(2015, 1, 1), datetime(2015, 12, 1)])
    plt.grid()
    
    run.log_image(col, plot=plt)

In [None]:
df.memory_usage(index=True, deep=True).sum().compute()

In [None]:
df.info()

See if the cluster works

In [None]:
import time
import numpy as np
from dask import delayed, visualize

def inc(x):
    time.sleep(abs(np.random.normal(5, 2)))
    return x + 1

fut = []
for i in range(10):
    fut.append( c.submit(delayed(inc), i) )

fut

In [None]:
for i in fut:
    print(i.result())

In [None]:
def sum(a):
    x = 0
    for y in a:
        x += y
    return x

results = []
for f in fut:
    results.append(f.result())
    
fut2 = c.submit(sum, results)
fut2

In [None]:
fut2.result().compute()

In [None]:
visualize(fut2.result())

# Training on Large Datasets
(from https://github.com/dask/dask-tutorial)

Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM.

In [None]:
from dask.distributed import Client
import joblib
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

We'll make a small (random) dataset locally using scikit-learn.

In [None]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
    
centers[:4]

The small dataset will be the template for our large random dataset.
We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. 

In [None]:
n_samples_per_block = 200000
n_blocks = 500

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype='float64')
          for obj in delayeds]
X = da.concatenate(arrays)
X

In [None]:
# Check the size of the array
X.nbytes / 1e9

In [None]:
# Only run this on the cluster.
X = X.persist()  

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML.

In [None]:
from dask_ml.cluster import KMeans
clf = KMeans(init_max_iter=3, oversampling_factor=10)

In [None]:
%time clf.fit(X)

In [None]:
clf.labels_

In [None]:
clf.labels_[:10].compute()

## Shut cluster down
To shut the cluster down, cancel the job that runs the cluster. 

In [None]:
for run in ws.experiments['dask'].get_runs():
    if run.get_status() == "Running":
        print(f'cancelling run {run.id}')
        run.cancel()

### Just for convenience, get the latest running Run

In [None]:
for run in ws.experiments['dask'].get_runs():
    if run.get_status() == "Running":
        print(f'latest running run is {run.id}')
        break