# Setting up a Dask Cluster on AzureML

In this lesson, we'll be using a dask cluster to replicate the [exercise we did in the Big Data section](exercises/Exercise_bigdata.ipynb) where we loaded global temperature data to measure global warming at a number of locations. 

To run this code, in addition the `dask` and `pandas`, which you should already have installed, you'll need to install the following packages (`azureml-sdk` and `dask_cloudprovider`) with the following commands:

```
conda install -c conda-forge azure-storage-blob # For managing storage
pip install azureml-sdk                         # For managing compute
pip install dask_cloudprovider=0.4.1
```

Note that `dask_cloudprovider` sometimes doesn't load the right version if you don't specify, and as of October 2020 the right version isn't even on `conda-forge`, so don't use `conda install`. You can also pip install `azure-storage-blob` if you prefer `pip` to `conda`. 

## Upload our Data

We'll start by uploading our Climate Data to Azure storage. The one thing you don't see in this code is that I've already created a Storage Account with Azure, and I put the "connection string" for that account into the file that I'm reading (you can put it directly into your code, but if I did that y'all could see my connect string and mess with my account!). 

You can get the connection string by going to your Azure Portal, selecting Storage Accounts, selecting the relevant account, and clicking "Access Keys" on the left. 

In [None]:
%load_ext lab_black
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__

# Load connection string
con

# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# Create a unique name for the container
container_name = "quickstart" + str(uuid.uuid4())

# Create the container
container_client = blob_service_client.create_container(container_name)
# Create a blob client using the local file name as the name for the blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=local_file_name)

print("\nUploading to Azure Storage as blob:\n\t" + local_file_name)

# Upload the created file
with open(upload_file_path, "rb") as data:
    blob_client.upload_blob(data)


## Starting a Dask Cluster

In [1]:
%load_ext lab_black
from azureml.core import Workspace, Experiment
from dask_cloudprovider import AzureMLCluster

In [2]:
# to load the workspace from configuration file.
# You can also do this by specifing your subscription ID,
# resource_group, and workspace_name, but those are
# sensitive so I'm using a config file I can exclude from
# this repo. See Workspace docstring for details.

ws = Workspace.from_config("azure_config.json")

In [3]:
amlcluster = AzureMLCluster(
    ws,
    vm_size="STANDARD_DS13_V2",  # Azure VM size for the Compute Target
    datastores=ws.datastores.values(),  # Azure ML Datastores to mount on the headnode
    environment_definition=ws.environments[
        "AzureML-Dask-CPU"
    ],  # Azure ML Environment to run on the cluster
    jupyter=True,  # Flag to start JupyterLab session on the headnode
    initial_node_count=2,  # number of nodes to start
    scheduler_idle_timeout=7200,  # scheduler idle timeout in seconds
)



..........................................................





In [4]:
amlcluster

VBox(children=(HTML(value='<h2>AzureMLCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n  …

## Accessing Your Data

## Using Your Cluster

There are two ways to use your cluster: You can click on the link above to open a connection to JupyterLab running on one of the computers in your cluster, or connect from here with this command:

In [5]:
from dask.distributed import Client

c = Client(amlcluster)


+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| lz4     | None          | 3.1.0         | 3.1.0         |
| numpy   | 1.19.1        | 1.19.2        | 1.19.2        |
| python  | 3.7.8.final.0 | 3.6.9.final.0 | 3.6.9.final.0 |
+---------+---------------+---------------+---------------+
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
ERROR - _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


And you're off to the races!

## Getting Data from Azure

If your data is CSV or parquet... 

Load `adlfs`:

```
conda install -c conda-forge adlfs
```

Then just put your account data in a dictionary, put `az` at start of reads, and use the `storage_options`. 

```
import dask.dataframe as dd

storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}

ddf = dd.read_csv('az://{CONTAINER}/{FOLDER}/*.csv', storage_options=storage_options)
ddf = dd.read_parquet('az://{CONTAINER}/folder.parquet', storage_options=storage_options)
```