# Dask

Dask performs two different tasks:
1. it optimizes dynamic task scheduling, similar to [Airflow](https://airflow.apache.org/), [Luigi](https://github.com/spotify/luigi) or [Celery](https://docs.celeryproject.org/).
2. it performs parallel data like arrays, dataframes, and lists with dynamic task scheduling.

## Scales from laptops to clusters

Dask can be easily installed on a laptop with pipenv and expands the size of the datasets from *fits in memory* to *fits on disk*. Dask can also scale to a cluster of hundreds of machines. It is resilient, elastic, data-local and has low latency. For more information, see the [distributed scheduler](https://distributed.dask.org/en/latest/) documentation. This simple transition between a single machine and a cluster allows users to both start easily and grow as needed.

## Install Dask

You can install everything that is required for most common applications of Dask (arrays, dataframes, …). This installs both Dask and dependencies such as NumPy, Pandas, etc. that are required for various workloads:

``` bash
$ pipenv install "dask[complete]"
```

However, only individual subsets can be installed with:

``` bash
$ pipenv install "dask[array]"
$ pipenv install "dask[dataframe]"
$ pipenv install "dask[diagnostics]"
$ pipenv install "dask[distributed]"
```

### Testing the installation

In [1]:
!pytest   ../../../spackenvs/python-374/.spack-env/view/lib/python3.7/site-packages/dask/tests  ../../../spackenvs/python-374/.spack-env/view/lib/python3.7/site-packages/dask/array/tests

platform darwin -- Python 3.7.4, pytest-6.2.5, py-1.10.0, pluggy-1.0.0
Users/veit
plugins: anyio-3.3.1
collected 4218 items / 16 skipped / 4202 selected                              [0m[1m[1m[1m[1m[1m[1m[1m[1m
…
…


## Familiar operation

### Dask DataFrame

… imitates Pandas

In [2]:
import pandas as pd
df = pd.read_csv('2021-09-01.csv')
df.groupby(df.user_id).value.mean()

In [3]:
import dask.dataframe as dd
dd = pd.read_csv('2021-09-01.csv')
dd.groupby(dd.user_id).value.mean()

<div class="alert alert-block alert-info">
<b>See also:</b>
    <ul>
        <li><a href="https://docs.dask.org/en/latest/dataframe.html">Dask DataFrame Docs</a></li>
        <li><a href="https://docs.dask.org/en/latest/dataframe-best-practices.html">Dask DataFrame Best Practices</a></li>
    </ul>
</div>

### Dask Array

… imitates NumPy

In [4]:
import numpy as np
f = h5py.File('mydata.h5')
x = np.array(f['.'])

In [5]:
import dask.array as da
f = h5py.File('mydata.h5')
x = da.array(f['.'])

<div class="alert alert-block alert-info">
<b>See also:</b>
    <ul>
        <li><a href="https://docs.dask.org/en/latest/array.html">Dask Array Docs</a></li>
        <li><a href="https://docs.dask.org/en/latest/array-best-practices.html">Dask Array Best Practices</a></li>
    </ul>
</div>

### Dask Bag

… imitates [iterators](https://docs.python.org/3/library/itertools.html), [Toolz](https://toolz.readthedocs.io/en/latest/index.html) und [PySpark](http://spark.apache.org/docs/latest/api/python/).

In [6]:
import dask.bag as db
import json
b = db.read_text('2021-09-01.csv').map(json.loads)
b.pluck('user_id').frequencies().topk(10, lambda pair: pair[1]).compute()

<div class="alert alert-block alert-info">
<b>See also:</b>
    <ul>
        <li><a href="https://docs.dask.org/en/latest/bag.html">Dask Bag Docs</a></li>
    </ul>
</div>

### Dask Delayed

… imitates loops and wraps custom code

In [7]:
from dask import delayed
L = []
for fn in '2021-*-*.csv':             # Use for loops to build up computation
    data = delayed(load)(fn)          # Delay execution of function
    L.append(delayed(process)(data))  # Build connections between variables

result = delayed(summarize)(L)
result.compute()

<div class="alert alert-block alert-info">
<b>See also:</b>
    <ul>
        <li><a href="https://docs.dask.org/en/latest/delayed.html">Dask Delayed Docs</a></li>
        <li><a href="https://docs.dask.org/en/latest/delayed-best-practices.html">Dask Delayed Best Practices</a></li>
        <li><a href="../clean-prep/dask-pipeline.ipynb">Dask pipeline example: Tracking the International Space Station with Dask</a></li>
    </ul>
</div>

## The **concurrent.futures** interface enables the submission of user-defined tasks.

<div class="alert alert-block alert-info">
<b>Note:</b>
    <ul>
        <li><a href="https://devblogs.microsoft.com/python/python-in-visual-studio-code-april-2022-release/">Python in Visual Studio Code – April 2022 Release</a></li>
        <p>For the following example, Dask must be installed with the <code>distributed</code> option, e.g.</p>
        <code class="cm-s-ipython language-bash"><span class="cm-def">$ pipenv</span> install <span class="cm-string">"dask[distributed]"</span></code>
    </ul>
</div>

In [8]:
from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

<div class="alert alert-block alert-info">
<b>See also:</b>
    <ul>
        <li><a href="https://docs.dask.org/en/latest/futures.html">Dask Futures Docs</a></li>
        <li><a href="https://distributed.dask.org/en/latest/quickstart.html">Dask Futures Quickstart</a></li>
        <li><a href="https://examples.dask.org/futures.html">Dask Futures Examples</a></li>
    </ul>
</div>