<img src='images/dask-horizontal.svg' width=400>

# Dask natively scales Python
## Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love

### Integrates with existing projects
#### BUILT WITH THE BROADER COMMUNITY

Dask is open source and freely available. It is developed in coordination with other community projects like Numpy, Pandas, and Scikit-Learn.

*(from the Dask project homepage at dask.org)*

* * * 

The notebooks used in this tutorial were developed by different members of the Dask community. In particular
* [Notebooks 1-6](https://github.com/jacobtomlinson/dask-video-tutorial-2020) - Adam Breindel, Jacob Tomlinson (NVIDIA) 
* [Notebook 7](https://github.com/pangeo-gallery/osm2020tutorial) - Chelle Gentemann (Farallon Institute), Rich Signell (USGS), and Ryan Abernathey (LDEO) 
* These notebooks were lightly edited by Alex Rybchuk (CU Boulder) for presentation at RMACC 2021

* * *

__What Does This Mean?__
* Built in Python
* Scales *properly* from single laptops to 1000-node clusters
* Leverages and interops with existing Python APIs as much as possible
* Adheres to (Tim Peters') "Zen of Python" (https://www.python.org/dev/peps/pep-0020/) ... especially these elements:
    * Explicit is better than implicit.
    * Simple is better than complex.
    * Complex is better than complicated.
    * Readability counts. <i>[ed: that goes for docs, too!]</i>
    * Special cases aren't special enough to break the rules.
    * Although practicality beats purity.
    * In the face of ambiguity, refuse the temptation to guess.
    * If the implementation is hard to explain, it's a bad idea.
    * If the implementation is easy to explain, it may be a good idea.
* While we're borrowing inspiration, it Dask embodies one of Perl's slogans, making easy things easy and hard things possible
    * Specifically, it supports common data-parallel abstractions like Pandas and Numpy
    * But also allows scheduling arbitary custom computation that doesn't fit a preset mold

### Let's See Some Code

Before we go any further, let's take a look at one particular, common use case for Dask: scaling Pandas dataframes to 
* larger datasets (which don't fit in memory) and 
* multiple processes (which could be on multiple nodes)

In [None]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=1, memory_limit='1GB')

client

In [None]:
import dask.dataframe

ddf = dask.dataframe.read_csv('data/beer_small.csv', blocksize=12e6)  # blocksize=12e6 is 12 MB

In [None]:
ddf

### What is this Dask Dataframe?

A large, virtual dataframe divided along the index into multiple Pandas dataframes:

<img src="images/dask-dataframe.svg" width="400px">

In [None]:
ddf.map_partitions(type).compute()

In [None]:
ddf.head()

In [None]:
ipa = ddf[ddf.beer_style.str.contains('IPA')]

In [None]:
ipa

In [None]:
ipa.compute()

In [None]:
mean_ipa_review = ipa.groupby('brewery_name').review_overall.agg(['mean','count'])

In [None]:
mean_ipa_review

In [None]:
mean_ipa_review.compute()

In [None]:
mean_ipa_review.nlargest(20, 'mean').compute()

`compute` doesn't just run the work, it collects the result to a single, regular Pandas dataframe right here in our initial Python VM.

Having a local result is convenient, but if we are generating large results, we may want (or need) to produce output in parallel to the filesystem, instead. 

There are writing counterparts to read methods which we can use:

- `read_csv` \ `to_csv`
- `read_hdf` \ `to_hdf`
- `read_json` \ `to_json`
- `read_parquet` \ `to_parquet`

In [None]:
mean_ipa_review.to_csv('ipa-*.csv') #the * is where the partition number will go

In [None]:
client.close()

### About Dask

Dask was created in 2014 as part of the Blase project, a DARPA funded project at Continuum/Anaconda. It has since grown into a multi-institution community project with developers from projects including NumPy, Pandas, Jupyter and Scikit-Learn. Many of the core Dask maintainers are employed to work on the project by companies including Continuum/Anaconda, Prefect, NVIDIA, Capital One, Saturn Cloud and Coiled.

Fundamentally, Dask allows a variety of parallel workflows using existing Python constructs, patterns, or libraries, including dataframes, arrays (scaling out Numpy), bags (an unordered collection construct a bit like `Counter`), and `concurrent.futures`

In addition to working in conjunction with Python ecosystem tools, Dask's extremely low scheduling overhead (nanoseconds in some cases) allows it work well even on single machines, and smoothly scale up.

Dask supports a variety of use cases for industry and research: https://stories.dask.org/en/latest/

With its recent 2.x releases, and integration to other projects (e.g., RAPIDS for GPU computation), many commercial enterprises are paying attention and jumping in to parallel Python with Dask.

__Dask Ecosystem__

In addition to the core Dask library and its Distributed scheduler, the Dask ecosystem connects several additional initiatives, including...
* Dask ML - parallel machine learning, with a scikit-learn-style API
* Dask-kubernetes
* Dask-XGBoost
* Dask-YARN
* Dask-image
* Dask-cuDF
* ... and some others

__What's Not Part of Dask?__

There are lots of functions that integrate to Dask, but are not represented in the core Dask ecosystem, including...

* a SQL engine
* data storage
* data catalog
* visualization
* coarse-grained scheduling / orchestration
* streaming

... although there are typically other Python packages that fill these needs (e.g., Kartothek or Intake for a data catalog).


### How Do We Set Up and/or Deploy Dask?

The easiest way to install Dask is with Anaconda: `conda install dask`

__Schedulers and Clustering__

Dask has a simple default scheduler called the "single machine scheduler" -- this is the scheduler that's used if your `import dask` and start running code without explicitly using a `Client` object. It can be handy for quick-and-dirty testing, but I would suggest that a best practice is to __use the newer "distributed scheduler" even for single-machine workloads__

The distributed scheduler can work with 
* threads in one process (although that is often not a great idea due to the GIL)
* multiple processes on one machine
* multiple processes on multiple machines

The distributed scheduler has additional useful features including data locality awareness and realtime graphical dashboards.

### Vocabulary
Words that are used generally when talking about large-scale comuptations
* **Parallel computing** - a computation where many calculations (processes) are being carried out at the same time
* **Distributed computing** - a computing model that connects many machines (processors), each with their own memory, that are linked through a communication network
* **Cluster** - A collection of nodes that shares computing power to complete processes
* **Node** - Machines that are responsible for creating (computing), recieving, or transmitting info within the distributed system
* **Process** - A thing that provides the resources to execute a program. By default, each process usually starts with one thread, but additional threads can be created. Some operating systems use the term "task" to refer to the program that is being executed. Several processes may be associated with the program. 
* **Thread** - An entity within a process that can be scheduled for execution. All threads within a process share its system resources. 

Words that are more closely tied to Dask
* **Client** - The user-facing entry point where you write your Python code
* **Scheduler** - This recieves tasks from the client. It then manages the flow of work and sends tasks to workers.
* **Worker** - These compute the tasks that the scheduler assigns to them. They can also store and serve computed results for other workers (or Clients). 

Courtesy of [Coiled](https://coiled.io/scaling-data-science-in-python-with-dask/) and [StackOverflow](https://stackoverflow.com/questions/200469/what-is-the-difference-between-a-process-and-a-thread)