# Data processing with Dask

### What is Dask?

- A flexible parallel computing library for Python
- Multi-core and distributed parallel execution on larger-than-memory datasets
- Parallelised data structures that extend the interfaces of pandas DataFrames, numpy Arrays, Python lists, etc.
- Dynamic task graph scheduling and execution to build complex algorithms, computations, and applications.

### Why use Dask?

- Scales numpy, pandas, sklearn, etc. Many of these libraries weren't originally designed to scale beyond 1 CPU or beyond the size of your RAM.
- Can parallelise complicated custom workloads
- "Scales up to a cluster and down to your laptop", in a purely Python environment
- Interactive feedback, and diagnostics as a first-class citizen (even with the distributed scheduler)

![Dask](https://miro.medium.com/max/1400/1*XSw6_dChDn95nAYm0FKX5A.jpeg)

In [None]:
from dask.distributed import Client

client = Client(n_workers=8) # Local cluster.
client

### Delayed and lazy execution

In [None]:
from time import sleep
from dask import delayed

@delayed
def inc(x):
    sleep(1)
    return x + 1

@delayed
def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time

# Looks like ordinary code - but we're actually creating a task graph.
x = inc(15)
y = inc(30)
total = add(x, y)

In [None]:
total # Delayed object.

In [None]:
total.visualize() # Visualise the task graph.

In [None]:
%%time
total.compute()

In [None]:
%%time

data = [1, 2, 3, 4, 5, 6, 7, 8]

results = []
for x in data:
    y = inc(x) # Could selectively parallelise/delay execution.
    results.append(y)
    
total = delayed(sum)(results) # Can apply delayed() in-line like this.

In [None]:
total.visualize()

In [None]:
total.compute()

### Arrays

In [None]:
import numpy as np

x = np.ones(15)
x

In [None]:
import dask.array as da

x = da.ones(15, chunks=(5,))
x

In [None]:
x.sum().compute()

In [None]:
x = da.ones((10000, 10000), chunks=(1000,1000))
x

In [None]:
y = x + x.T
z = y.mean(axis=0)
z.compute()

### Bags

In [None]:
import json
import os
import dask.bag as db

files = os.path.join('data', 'accounts.**.json.gz')
lines = db.read_text(files)
js = lines.map(json.loads)

In [None]:
def count_transactions(d):
    return {'name': d['name'], 'count': len(d['transactions'])}

counts = js.filter(lambda record: record['name'] == 'Alice') \
           .map(count_transactions)
counts

In [None]:
counts.visualize()

In [None]:
counts.compute()

In [None]:
counts = js.filter(lambda record: record['name'] == 'Alice') \
           .map(count_transactions) \
           .pluck('count') \
           .mean()

counts.compute()

### DataFrames

In [None]:
import dask.dataframe as dd

df = dd.read_csv("data/accounts.*.csv")

df

In [None]:
df.amount.mean().compute()

In [None]:
df.groupby("names").amount.std().compute()