Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOCS-#6949: Create Modin on Dask cluster tutorial #6950

Merged
merged 8 commits into from
Feb 27, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![LOGO](../../../img/MODIN_ver2_hrz.png)\n",
"\n",
"<h1>Scale your pandas workflows by changing one line of code</h2>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Exercise 5: Setting up cluster environment\n",
"\n",
"**GOAL**: Learn how to set up a Dask cluster for Modin, connect Modin to a Dask cluster and run pandas queries on a cluster.\n",
"\n",
"**NOTE**: This exercise has extra requirements. Read instructions carefully before attempting. \n",
"\n",
"**This exercise instructs users on how to start a 500+ core Dask cluster, and it is not shut down until the end of exercise.**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Often in practice we have a need to exceed the capabilities of a single machine. Modin works and performs well \n",
"in both local mode and in a cluster environment. The key advantage of Modin is that your python code does not \n",
"change between local development and cluster execution. Users are not required to think about how many workers \n",
"exist or how to distribute and partition their data; Modin handles all of this seamlessly and transparently.\n",
"\n",
"![Cluster](../../../img/modin_cluster.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Extra requirements for AWS authentication\n",
"\n",
"First of all, install the necessary dependencies in your environment:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install dask_cloudprovider[aws]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The next step is to setup your AWS credentials. One can set ``AWS_ACCESS_KEY_ID``, ``AWS_SECRET_ACCESS_KEY``\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about aws configure? Is there such an option?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no, because aws configure requires interactive mode, but this is not possible for Jupyter Notebook.

Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"and ``AWS_SESSION_TOKEN`` (Optional) (refer to [AWS CLI environment variables](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html) to get more insight on this):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.environ[\"AWS_ACCESS_KEY_ID\"] = \"\"\n",
"os.environ[\"AWS_SECRET_ACCESS_KEY\"] = \"\"\n",
"os.environ[\"AWS_SESSION_TOKEN\"] = \"\""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be empty strings? If not, Let's specify "<aws_access_key_id>", "<aws_secret_access_key>", "<aws_session_token>".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these strings are empty, connection to the AWS will be failed.
Ok, let's specify this as you suggest.

]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Starting and connecting to the cluster\n",
"\n",
"This example starts 1 scheduler node (m5.24xlarge) and 6 worker nodes (m5.24xlarge), 576 total CPUs. Please keep in mind, the scheduler node manages cluster operation but doesn't perform any executions.\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"You can check the [Amazon EC2 pricing](https://aws.amazon.com/ec2/pricing/on-demand/) page.\n",
"\n",
"Dask cluster can be deployed in different ways (please read [Dask documentaion](https://docs.dask.org/en/latest/deploying.html) to get more information about it), but in this tutorial we will use the ``EC2Cluster`` from [dask_cloudprovider](https://cloudprovider.dask.org/en/latest/) to create and initialize a Dask cluster on Amazon Web Service (AWS).\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"**Note**: EC2Cluster uses a docker container to run the scheduler and each of the workers. Probably you need to use another docker image depending on your python version and requirements. You can find more docker-images [here](https://hub.docker.com/u/daskdev).\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"In the next cell you can see how the EC2Cluster is created. <b>Please set your ``key_name`` and modify AWS settings if it is required before you run it.</b>"
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask_cloudprovider.aws import EC2Cluster\n",
"\n",
"n_workers = 6\n",
"cluster = EC2Cluster(\n",
" # AWS parameters\n",
" key_name = \"\", # set your keyanme\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
" region = \"us-west-2\",\n",
" availability_zone = [\"us-west-2a\"],\n",
" ami = \"ami-0387d929287ab193e\",\n",
" instance_type = \"m5.24xlarge\",\n",
" vpc = \"vpc-002bd14c63f227832\",\n",
" subnet_id = \"subnet-09860dafd79720938\",\n",
" filesystem_size = 200,\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's specify the units after "#" like "# in <units>".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

"\n",
" # DASK parameters\n",
" n_workers = n_workers,\n",
" docker_image = \"daskdev/dask:latest\",\n",
" debug = True,\n",
" security=False,\n",
" # worker_options={\n",
" # \"nthreads\": 1,\n",
" # \"nworkers\": \"auto\",\n",
" # },\n",
")\n",
"\n",
"scheduler_adress = cluster.scheduler_address\n",
"print(f\"Scheduler IP address of Dask cluster: {scheduler_adress}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After creating the cluster, you need to connect to it. To do this, you should put the EC2Cluster instance or the scheduler IP address in ``distributed.Client``.\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"When you connect to the cluster, the workers may not yet be initialized, so you need to wait for them using ``client.wait_for_workers``.\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"Then you can call ``client.ncores()`` and check which workers are available and how many threads are used for each of them."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from distributed import Client\n",
"\n",
"client = Client(cluster)\n",
"# Or use an IP address connection if the cluster instance is unavailable:\n",
"# client = Client(f\"{scheduler_adress}:8687\")\n",
"\n",
"client.wait_for_workers(n_workers)\n",
"client.ncores()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After successful initialization of the cluster, you need to configure it.\n",
"\n",
"You can use plugins to install any requirements into workers:\n",
"* [InstallPlugin](\"https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.InstallPlugin\")\n",
"* [PipInstall](\"https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.PipInstall\")\n",
"* [CondaInstall](\"https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.CondaInstall\").\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"You have to install Modin package on each worker using ``PipInstall`` plugin."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import PipInstall\n",
"\n",
"client.register_plugin(PipInstall(packages=[\"modin\"]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you need an additional workers configuration, you can create your own [WorkerPlugin](https://distributed.dask.org/en/stable/plugins.html#worker-plugins) or function that will be executed on each worker calling ``client.run()``.\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"**NOTE**: Dask cluster does not check if this plugin or function has been called before. Therefore, you need to take this into account when using them.\n",
"\n",
"In this tutorial a CSV file will be read, so you need to download it to each of the workers and local machine in the same gloabal path. The following function has been implemented for this:"
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Worker\n",
"import os\n",
"import urllib\n",
"\n",
"def dataset_upload(file_url, file_path):\n",
" try:\n",
" dir_name = os.path.dirname(file_path)\n",
" if not os.path.exists(dir_name):\n",
" os.makedirs(dir_name)\n",
" if os.path.exists(file_path): # os.path.isfile(file_path):\n",
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
" return \"File has already existed.\"\n",
" else:\n",
" urllib.request.urlretrieve(file_url, file_path)\n",
" return \"OK\"\n",
" except Exception as ex:\n",
" return str(ex)"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
" try:\n",
" dir_name = os.path.dirname(file_path)\n",
" if not os.path.exists(dir_name):\n",
" os.makedirs(dir_name)\n",
" if os.path.exists(file_path): # os.path.isfile(file_path):\n",
" return \"File has already existed.\"\n",
" else:\n",
" urllib.request.urlretrieve(file_url, file_path)\n",
" return \"OK\"\n",
" except Exception as ex:\n",
" return str(ex)"
" os.makedirs(filepath, exist_ok=True)
" urllib.request.urlretrieve(file_url, file_path)

Can we use something like this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but in this case we will have some problems:

  • if any exception is raised we won't receive any messages (Dask worker doesn't send any outputs so it must be managed by user)
  • Since this function may be called more than once. we mast guarantee the correct and predictable result.

In my opinion my suggestion is better.

]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Please, set the directory where it should be downloded (the local directory will be used by the default):"
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"directory_path = \"./\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then you need run `dataset_upload` function on all workers. As a result, you will get a dictionary, where the result of the function execution will be for each workers:"
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"file_path = os.path.join(os.path.abspath(directory_path), \"taxi.csv\")\n",
"client.run(dataset_upload, \"https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv\", file_path)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Yeou need also execute this function on the local machine:"
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dataset_upload(\"https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv\", file_path)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<b>Congratulations! The cluster is now fully configured and we can start running Pandas queries.</b>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Executing in a cluster environment\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Same as local mode Modin on cluster uses Ray as an execution engine by default so no additional action is required to start to use it. Alternatively, if you need to use another engine, it should be specified either by setting the Modin config or by setting Modin environment variable before the first operation with Modin as it is shown below. Also, note that the full list of Modin configs and corresponding environment variables can be found in the [Modin Configuration Settings](https://modin.readthedocs.io/en/stable/flow/modin/config.html#modin-configs-list) section of the Modin documentation."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Modin engine can be specified either by config\n",
"import modin.config as cfg\n",
"cfg.Engine.put(\"dask\")\n",
"\n",
"# or by setting the environment variable\n",
"# import os\n",
"# os.environ[\"MODIN_ENGINE\"] = \"dask\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now you can use Modin on the Dask cluster.\n",
"\n",
"Let's read the downloaded CSV file and executes such pandas operations as count, groupby and map:"
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import modin.pandas as pd\n",
"import time\n",
"\n",
"t0 = time.perf_counter()\n",
"\n",
"df = pd.read_csv(file_path, quoting=3)\n",
"df_count = df.count()\n",
"df_groupby_count = df.groupby(\"passenger_count\").count()\n",
"df_map = df.map(str)\n",
"\n",
"t1 = time.perf_counter()\n",
"print(f\"Full script time is {(t1 - t0):.3f}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Shutting down the cluster\n",
"\n",
"Now that we have finished computation, we can shut down the cluster:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### This ends the cluster exercise"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}