![LOGO](../../../img/MODIN_ver2_hrz.png)

<h1>Scale your pandas workflows by changing one line of code</h2>

# Exercise 5: Setting up cluster environment

**GOAL**: Learn how to set up a Dask cluster for Modin, connect Modin to a Dask cluster and run pandas queries on a cluster.

**NOTE**: This exercise has extra requirements. Read instructions carefully before attempting. 

**This exercise instructs users on how to start a 500+ core Dask cluster, and it is not shut down until the end of exercise.**

Often in practice we have a need to exceed the capabilities of a single machine. Modin works and performs well 
in both local mode and in a cluster environment. The key advantage of Modin is that your python code does not 
change between local development and cluster execution. Users are not required to think about how many workers 
exist or how to distribute and partition their data; Modin handles all of this seamlessly and transparently.

![Cluster](../../../img/modin_cluster.png)

# Extra requirements for AWS authentication

First of all, install the necessary dependencies in your environment:

In [None]:
!pip install dask_cloudprovider[aws]

The next step is to setup your AWS credentials, namely, set ``AWS_ACCESS_KEY_ID``, ``AWS_SECRET_ACCESS_KEY``
and ``AWS_SESSION_TOKEN`` (Optional) (refer to [AWS CLI environment variables](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html) to get more insight on this):

In [None]:
import os

os.environ["AWS_ACCESS_KEY_ID"] = "<aws_access_key_id>"
os.environ["AWS_SECRET_ACCESS_KEY"] = "<aws_secret_access_key>"
os.environ["AWS_SESSION_TOKEN"] = "<aws_session_token>"

## Starting and connecting to the cluster

This example starts 1 scheduler node (m5.24xlarge) and 6 worker nodes (m5.24xlarge), 576 total CPUs. Keep in mind the scheduler node manages cluster operation but doesn't perform any execution.

You can check the [Amazon EC2 pricing](https://aws.amazon.com/ec2/pricing/on-demand/) page.

Dask cluster can be deployed in different ways (refer to [Dask documentaion](https://docs.dask.org/en/latest/deploying.html) to get more information about it), but in this tutorial we will use the ``EC2Cluster`` from [dask_cloudprovider](https://cloudprovider.dask.org/en/latest/) to create and initialize a Dask cluster on Amazon Web Service (AWS).

**Note**: EC2Cluster uses a docker container to run the scheduler and each of the workers. Probably you need to use another docker image depending on your python version and requirements. You can find more docker-images on [daskdev](https://hub.docker.com/u/daskdev) page.

In the next cell you can see how the EC2Cluster is being created. <b>Set your ``key_name`` and modify AWS settings as required before running it.</b>

In [None]:
from dask_cloudprovider.aws import EC2Cluster

n_workers = 6
cluster = EC2Cluster(
    # AWS parameters
    key_name = "", # set your keyname
    region = "us-west-2",
    availability_zone = ["us-west-2a"],
    ami = "ami-0387d929287ab193e",
    instance_type = "m5.24xlarge",
    vpc = "vpc-002bd14c63f227832",
    subnet_id = "subnet-09860dafd79720938",
    filesystem_size = 200, # in GB

    # DASK parameters
    n_workers = n_workers,
    docker_image = "daskdev/dask:latest",
    debug = True,
    security=False,
)

scheduler_adress = cluster.scheduler_address
print(f"Scheduler IP address of Dask cluster: {scheduler_adress}")

After creating the cluster you need to connect to it. To do this you should put the ``EC2Cluster`` instance or the scheduler IP address in ``distributed.Client``.

When you connect to the cluster, the workers may not be initialized yet, so you need to wait for them using ``client.wait_for_workers``.

Then you can call ``client.ncores()`` and check which workers are available and how many threads are used for each of them.

In [None]:
from distributed import Client

client = Client(cluster)
# Or use an IP address connection if the cluster instance is unavailable:
# client = Client(f"{scheduler_adress}:8687")

client.wait_for_workers(n_workers)
client.ncores()

After successful initialization of the cluster, you need to configure it.

You can use plugins to install any requirements into workers:
* [InstallPlugin](https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.InstallPlugin)
* [PipInstall](https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.PipInstall)
* [CondaInstall](https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.CondaInstall).

You have to install Modin package on each worker using ``PipInstall`` plugin.

In [None]:
from dask.distributed import PipInstall

client.register_plugin(PipInstall(packages=["modin"]))

If you need an additional workers configuration, you can create your own [WorkerPlugin](https://distributed.dask.org/en/stable/plugins.html#worker-plugins) or function that will be executed on each worker upon calling ``client.run()``.

**NOTE**: Dask cluster does not check if this plugin or function has been called before. Therefore, you need to take this into account when using them.

In this tutorial a CSV file will be read, so you need to download it to each of the workers and local machine with the same global path.

In [None]:
from dask.distributed import Worker
import os
import urllib

def dataset_upload(file_url, file_path):
    try:
        dir_name = os.path.dirname(file_path)
        if not os.path.exists(dir_name):
            os.makedirs(dir_name)
        if os.path.exists(file_path):
            return "File has already existed."
        else:
            urllib.request.urlretrieve(file_url, file_path)
        return "OK"
    except Exception as ex:
        return str(ex)

Set the directory where it should be downloaded (the local directory will be used by default):

In [None]:
directory_path = "./"

Then you need to run `dataset_upload` function on all workers. As the result, you will get a dictionary, where the result of the function execution will be for each workers:

In [None]:
file_path = os.path.join(os.path.abspath(directory_path), "taxi.csv")
client.run(dataset_upload, "https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv", file_path)

You have to also execute this function on the local machine:

In [None]:
dataset_upload("https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv", file_path)

<b>Congratulations! The cluster is now fully configured and we can start running Pandas queries.</b>

## Executing in a cluster environment


Same as local mode Modin on cluster uses Ray as an execution engine by default so no additional action is required to start to use it. Alternatively, if you need to use another engine, it should be specified either by setting the Modin config or by setting Modin environment variable before the first operation with Modin as it is shown below. Also, note that the full list of Modin configs and corresponding environment variables can be found in the [Modin Configuration Settings](https://modin.readthedocs.io/en/stable/flow/modin/config.html#modin-configs-list) section of the Modin documentation.

In [None]:
# Modin engine can be specified either by config
import modin.config as cfg
cfg.Engine.put("dask")

# or by setting the environment variable
# import os
# os.environ["MODIN_ENGINE"] = "dask"

Now you can use Modin on the Dask cluster.

Let's read the downloaded CSV file and execute such pandas operations as count, groupby and map:

In [None]:
import modin.pandas as pd
import time

t0 = time.perf_counter()

df = pd.read_csv(file_path, quoting=3)
df_count = df.count()
df_groupby_count = df.groupby("passenger_count").count()
df_map = df.map(str)

t1 = time.perf_counter()
print(f"Full script time is {(t1 - t0):.3f}")

## Shutting down the cluster

Now that we have finished computation, we can shut down the cluster:

In [None]:
cluster.close()

### This ends the cluster exercise