#What is Dask and Why should I care?

* Ever had a "real" dataset too big to analyze on your laptop when the test dataset worked fine?
  * What did you do? 
    * Run it on a different server? 
    * Break it up into chunks manually and then aggregate the results?
* Ever had a "real" dataset that took too long to analyze when the test dataset worked fine?
  * What did you do?
    * parallelize?
    * rewrite to use numpy or numba?
    
Wouldn't it be nice if we had a tool that could chunk data and parallelize computations without having to add a lot of boilerplate?
    
##What is Dask?
Dask is a way to structure computation for scalability and parallelism.
With dask, we can structure computations to break down problems into digestible chunks that can also be run in parallel.

####Let's see how array computations with Dask can make things easier


In [1]:
#Here is a "black box" of data
import h5py
import sys
import numpy

shape = 3000
chunk = shape//10
with h5py.File("testdata.hdf5", "w") as f:
    dataset = f.create_dataset("/data", shape=(shape, shape), 
                               dtype=numpy.dtype('float64'), chunks = (chunk, shape))

    for index in xrange(0, shape, chunk):
        print "Writing chunk", index, "->", index+chunk
        dataset[index:index+chunk] = numpy.random.rand(chunk, shape)




Writing chunk 0 -> 300
Writing chunk 300 -> 600
Writing chunk 600 -> 900
Writing chunk 900 -> 1200
Writing chunk 1200 -> 1500
Writing chunk 1500 -> 1800
Writing chunk 1800 -> 2100
Writing chunk 2100 -> 2400
Writing chunk 2400 -> 2700
Writing chunk 2700 -> 3000


In [2]:
#Let's get some basic statistics about our data
import h5py
import numpy as np

f = h5py.File("testdata.hdf5", "r")
dataset = f['/data'][:]

print dataset.shape
%timeit dataset.sum()
%timeit dataset.mean()
%timeit dataset.var()
%timeit dataset.std()

f.close()

(3000, 3000)
100 loops, best of 3: 6.34 ms per loop
100 loops, best of 3: 6.31 ms per loop
The slowest run took 8.65 times longer than the fastest. This could mean that an intermediate result is being cached 
1 loops, best of 3: 37 ms per loop
10 loops, best of 3: 36.9 ms per loop


In [3]:
#How do we do that same thing in Dask?
import h5py
import dask
import dask.array
from pprint import pprint

f = h5py.File("testdata.hdf5", "r")
dataset = f['/data']
d = dask.array.from_array(dataset, dataset.chunks)

print dataset.shape
%timeit d.sum().compute()
%timeit d.mean().compute()
%timeit d.var().compute()
%timeit d.std().compute()

f.close()

(3000, 3000)
10 loops, best of 3: 35.3 ms per loop
10 loops, best of 3: 43.6 ms per loop
10 loops, best of 3: 66.3 ms per loop
10 loops, best of 3: 65.2 ms per loop


In [5]:
#Yay! So we just took a simple computation and ...
#  added more code that makes it more complicated,
#  less understandable,
#  and runs slower!
#Exactly what I wanted!

#Let's do it again on our real data that is a little bit bigger

import h5py
import sys
import numpy

shape = 30000
chunk = shape//100
with h5py.File("realdata.hdf5", "w") as f:
    dataset = f.create_dataset("/data", shape=(shape, shape), 
                               dtype=numpy.dtype('float64'), chunks = (chunk, shape))

    for index in xrange(0, shape, chunk):
        print "Writing chunk", index, "->", index+chunk
        dataset[index:index+chunk] = numpy.random.rand(chunk, shape)




0 300
300 600
600 900
900 1200
1200 1500
1500 1800
1800 2100
2100 2400
2400 2700
2700 3000
3000 3300
3300 3600
3600 3900
3900 4200
4200 4500
4500 4800
4800 5100
5100 5400
5400 5700
5700 6000
6000 6300
6300 6600
6600 6900
6900 7200
7200 7500
7500 7800
7800 8100
8100 8400
8400 8700
8700 9000
9000 9300
9300 9600
9600 9900
9900 10200
10200 10500
10500 10800
10800 11100
11100 11400
11400 11700
11700 12000
12000 12300
12300 12600
12600 12900
12900 13200
13200 13500
13500 13800
13800 14100
14100 14400
14400 14700
14700 15000
15000 15300
15300 15600
15600 15900
15900 16200
16200 16500
16500 16800
16800 17100
17100 17400
17400 17700
17700 18000
18000 18300
18300 18600
18600 18900
18900 19200
19200 19500
19500 19800
19800 20100
20100 20400
20400 20700
20700 21000
21000 21300
21300 21600
21600 21900
21900 22200
22200 22500
22500 22800
22800 23100
23100 23400
23400 23700
23700 24000
24000 24300
24300 24600
24600 24900
24900 25200
25200 25500
25500 25800
25800 26100
26100 26400
26400 26700
26700 27

In [4]:
f.close()

In [5]:
import h5py
import dask.array
import dask.threaded
import dask.multiprocessing
from pprint import pprint

f = h5py.File("realdata.hdf5", "r")
dataset = f['/data']
d = dask.array.from_array(dataset, dataset.chunks)

pprint(d.sum().dask)
#print dask.threaded.get(d.sum().dask, ('x_423', 0, 0))




In [None]:
#pprint(d.sum().dask)
dsum = d.sum().dask

In [13]:
pprint(dsum)
print dask.threaded.get(dsum, ('x_17', 0, 0))

[[ 4500429.06907939]]


In [25]:
#Our original, easy-to-understand code doesn't work on bigger problems.
#How do we fix that?

import h5py
import numpy as np

f = h5py.File("realdata.hdf5","r")
dataset = f['/data']
chunksize = f['/data'].chunks[0]

final_sum = []
for chunk_start_idx in xrange(0, dataset.shape[0], chunksize):
    final_sum.append(dataset[chunk_start_idx:chunk_start_idx+chunksize].sum())
print np.array(final_sum).sum()

final_mean = []
for chunk_start_idx in xrange(0, dataset.shape[0], chunksize):
    final_mean.append(dataset[chunk_start_idx:chunk_start_idx+chunksize].mean())
print np.array(final_mean).mean()

#I'm not completely clear on how we compose a correct variance from chunks
#I'll have to think about it a little.
#If only there were a tool that did that for me!
final_var = []
for chunk_start_idx in xrange(0, dataset.shape[0], chunksize):
    final_var.append(dataset[chunk_start_idx:chunk_start_idx+chunksize].var())
print np.array(final_var).var()


print dataset.shape, f['/data'].chunks
#print dataset[:].sum()
#print dataset[:].mean()
#print dataset[:].var()
#print dataset[:].std()

f.close()


449999202.82
0.499999114244
6.39423419252e-10
(30000, 30000) (300, 30000)


##Discussion
The example code that breaks the input into chunks, computes each chunk, and then composes the intermediate results into a final answer is exactly what dask does. 
Our original code worked well for a small dataset that fit into memory.
Our original code did not work when the dataset was too large for memory.
So we had to jump through hoops to partition things.

Something we missed is that dask does things in parallel for us.
Each of the intermediate mean() results could have been calculated in parallel.
But we didn't add that functionality.

Dask has it builtin.



##Let's see how Dask.dataframe can make things easier
<Come up with a decent example>
