# Dask talk outline

## Pandas runs out of memory on big data sets

1. Ingest/import/load
2. Filter
3. Index
4. Subtotal
5. Aggregate



## So split computations into pieces

Works particularly well with historic timeseries data, where each day/week/month is a new dataset.



## Use a faster storage format, especially one that is chunked and supports filtering

csv is slow and take a lot of disk space
compressed csv is slow, smaller but not seekable
hdf5, bcolz, and more recently parquet

In [None]:
fastparquet.from_pandas()

## Run calculations across multiple cores/computers/in a cluster

Options:
    - Hadoop
    - PySpark
    - xxxx
    - MapReduce
    - JPM's lib.dist.simpleApply(fn, dataSrc, bucketSize, fargs, fkwargs)

## How about if we want to keep using pandas?

Dask is a drop-in replacement for pandas

In [None]:
import dask.dataframe as dd
ddf = dd.read_csv('flights-2016-*.xz')
res = ddf[ddf.Carrier='AA'].groupby('FlightDate').sum()[['Flights','Cancelled']]

In [None]:
Dask to lazily load csv data too big for memory

## Visualising progress of Dask calculations


ProgressBar

Profiler,  # Task profiler
ResourceProfiler, 
CacheProfiler(metric=cachey.nbytes)
```
#All store results in a list of named tuples
import dask.diagnostics
from dask.diagnostics import ProgressBar
from dask.diagnostics import Profiler as TP # Task profiler
from dask.diagnostics import ResourceProfiler as RP
from dask.diagnostics import CacheProfiler as CP
from cachey import nbytes

with ProgressBar():
    with TP() as tp, RP(dt=0.25) as rp, CP(metric=nbytes) as cp:
        res = task.compute()

import bokeh
bokeh.io.output notebook()
dask.diagnostics.visualize([tp, rp, cp], save=False)
```

## Avoiding recalculations by caching

http://dask.readthedocs.io/en/latest/caching.html
https://github.com/dask/cachey
    
Dask forgets intermediate results
```
t1 = ddf.groupby['FlightDate'].sum().Flights
t2 = ddf.groupby['FlightDate'].sum().Cancelled

t1.compute()
t2.compute()

dd.compute(t1, t2)

from dask.cache import Cache
cache = Cache(1e9)
cash.register()

```
Caching strategies:
* 

## Distributed scheduling

From https://github.com/dask/dask/issues/1040

```
pip install distributed
dscheduler
dworker 127.0.0.1:8786 --nprocs 8

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> ...
>>> count, total = dask.compute(G.count(), G.sum(), get=e.get)
```
