In [10]:
%run ../talktools.py

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">
     
# Dask 

Dask is a parallelization library for Python that works on your laptop all the way to cluster-scale (ie. distributed multi-node)

Main focus on creating distributed array-like abstraction: Numpy- and Pandas-like behavior.

Stack:

- Array, bag, dataframe, delayed
- Graph spec
- Scheduler

Let's you focus on algorithms and not scheduling.

Tutorial: https://github.com/dask/dask-tutorial

See also some amazing new lectures/tutorials:

https://www.youtube.com/watch?v=5Md_sSsN51k&list=PLYx7XA2nY5Gf37zYZMw6OqGFRPjB1jCy6&index=17

and a shorter talk by Matt on Dask:

https://www.youtube.com/watch?v=PAGjm4BMKlk&list=PLYx7XA2nY5Gf37zYZMw6OqGFRPjB1jCy6&index=16

<div class="alert alert-info">
    IPython (via <a href="https://github.com/ipython/ipyparallel">ipyparallel</a>) also has a clustering mechanism for distribution computing. See e.g. <a href="https://ipyparallel.readthedocs.io/en/latest/examples/Monitoring%20an%20MPI%20Simulation%20-%202.html?highlight=mpi">ipyparallel with MPI</a>
</div>


In [None]:
#!brew install graphviz ## on a mac
#!apt-get install graphviz ## on linux
#!pip install graphviz ## dont do this with conda, installs a Python 2 package...

## Dask Arrays

Distributed notion of an array. `Dask.array` translates your array operations into a graph of inter-related tasks with data dependencies between them. Dask then executes this graph in parallel with multiple threads. We'll discuss more about this in the next section.

Manipulate `dask.array` object as you would a numpy array

In [None]:
import dask.array as da
x = da.linspace(1,10,1000000,chunks=(1000,))

In [None]:
import numpy as np
y = np.linspace(1,10,1000000)
sum(y)

In [None]:
x.shape

In [None]:
x.sum()

In [None]:
rez = x.sum()

In [None]:
rez.compute()

In [None]:
import numpy as np
import dask.array as da

x = da.random.normal(10, 0.1, size=(20000, 20000),   # 400 million element array 
                              chunks=(1000, 1000))   # Cut into 1000x1000 sized chunks
y = x.mean(axis=0)[::100]                            # Perform NumPy-style operations

In [None]:
x.nbytes / 1e9  # Gigabytes of the input processed lazily

In [None]:
%%time
y.compute()     # Time to compute the result

In [None]:
import numpy as np

In [None]:
%%time 
# this will take AWHILE (~30 sec)
x = np.random.normal(10, 0.1, size=(20000, 20000)) 
y = x.mean(axis=0)[::100] 
y

In [None]:
del x

In [None]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100] 
y.compute()

## Dask Dataframes

meant to mimick most of pandas dataframes, but now these dataframes can be out of core.

In [None]:
!ls -lah ../01_Plotting_and_Viz/data/

In [None]:
import dask.dataframe as dd
import pandas as pd

In [None]:
%%time
df = pd.read_csv("../01_Plotting_and_Viz/data/uber-raw-data-apr14.csv")

In [None]:
del df

In [None]:
%%time
df = dd.read_csv("../01_Plotting_and_Viz/data/uber-raw-data-apr14.csv")

Some of the reading in is delayed, but we can still inspect the data.

In [None]:
df.head()

Other operations are delayed until you compute them:

In [None]:
df.describe()

In [None]:
df.describe().compute()

In [None]:
df.describe()['Lat'].compute()

Let's use distributed dataframes to analyze NYC Taxi data stored as CSV files on S3.
This data is stored as large CSV files on S3 in a public bucket.

(https://github.com/mrocklin/scipy-2016-parallel/blob/master/notebooks/08-distributed-dataframes.ipynb)

In [None]:
#!pip install s3fs

In [None]:
from s3fs import S3FileSystem
s3 = S3FileSystem(anon=True)

s3.ls('dask-data/nyc-taxi/2015')

We would like to load this data with Pandas, but there is too much data here to fit in memory.

In [None]:
s3.info('dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv')

In [None]:
import pandas as pd

with s3.open('dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=5)  # look at just five rows
    
df

In [None]:
from dask.distributed import Client, progress

e = Client(set_as_default=True)
e

In [None]:
import dask.dataframe as dd

df = dd.read_csv('s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv',
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 storage_options={'anon': True})
df

In [None]:
df = e.persist(df)
progress(df)

Existing Pandas experience transfers over decently well to Dask.dataframe. However there are a few caveats when dealing with distributed systems:
   - Until you call e.persist (for large results) or e.compute (for small results), all computations are lazy
   - Call progress on a dataframe after you persist to track the progress of a computation. You can continue doing work immediately. All work happens in the background.
   - If you are computing a small result, just add .compute() to the end of your result, like df.passenger_count.sum().compute(). This will block and return the result when finished.

In [None]:
positive_fares = df[df.fare_amount > 0]
fares = df[['fare_amount', 'tip_amount', 'payment_type']]

fares = e.persist(fares)  # triggers computation
progress(fares)

In [None]:
fares.head()

In [None]:
(fares.tip_amount == 0).sum().compute()

In [None]:
fares.count().compute()

In [None]:
df.passenger_count.sum().compute()

Instead, we connect to the cluster and use dask.dataframe to load the CSV data into ~700 Pandas dataframes spread across our cluster. We get back a Dask.dataframe to coordinate these small Pandas dataframes.

`dask.delayed` (a la joblib):
 
   - `delayed(function)(*args, **kwargs)` -> lazy function that hasn't yet been evaluated
   - `delayed(data)` -> lazy object that pretends to be your data
 
 See the excellent talk at SciPy 2016: https://www.youtube.com/watch?v=PAGjm4BMKlk&list=PLYx7XA2nY5Gf37zYZMw6OqGFRPjB1jCy6

In [None]:
# get a local Executor client
from distributed import Client
Client(set_as_default=True)

In [None]:
import random
from dask import delayed, visualize
from time import sleep

@delayed(pure=True)
def add(a,b):
    sleep(random.random())
    return a+b

@delayed(pure=True)
def mul(a,b):
    sleep(random.random())
    return a*b

@delayed(pure=True)
def inc(a):
    sleep(random.random())
    return a + 1

In [None]:
x = add(1,2)
x

In [None]:
x.compute()

In [None]:
a = inc(1)
b = mul(1,2)
c = add(a,b)
c

In [None]:
c.visualize(rankdir="LR")

In [None]:
c.compute()

### Loops

In [None]:
results = []
for x in range(4):
    a = inc(1)
    b = mul(1,x)
    c = add(a,b)
    results.append(c)

total = delayed(sum,pure=True)(results)
total

In [None]:
total.visualize(rankdir="LR")

`pure=True`: finds nested shared expressions deep in code that dont need to be recomputed. Eg. `inc(1)` here is the same so it only gets called once. A pure function should have no side-effects.

In [None]:
total.compute()

In [None]:
results = []
for y in range(2,10,2):
    for x in range(4):
        a = inc(1)
        b = mul(y,x)
        c = add(a,b)
        results.append(c)

total = delayed(sum,pure=True)(results)
total

In [None]:
total.visualize()

In [None]:
total.compute()

In [None]:
# Tree reduction --- add up pairwise
while len(results) > 1:
    new_results = []
    
    for i in range(0,len(results),2):
        res = add(results[i], results[i+1])
        new_results.append(res)
    
    results = new_results

total = results[0]
total.visualize()

In [None]:
total.compute()

Note: you cannot iterate over a delayed object and you can't use them in case statements (because we dont know how long they are until they've been computed)

In [None]:
for x in range(inc(1)):
    print("hey!")

## scheduling the execution

where you run a certain piece of a parallel task depends on your architecture, what needs each piece has, and what the bottlenecks are in moving data between pieces.

The **single machine scheduler** is optimizes for larger-than-memory use. It uses:
  
   - Parallel CPU
   - Minimizes RAM: tries to remove intermediary tasks that aren't needed anymore
   - low overhead: 100$\mu$s per task
 

**Distributed scheduler** - tries to minimize data movement so you dont have to move data between computers unnecessarily.
 
 - distributed to schedule across many workers
 - works well with distributed datastores (HDFS)
 - asynchronous
 - data local
 
run `dask-scheduler` on the command line and then 

In [None]:
from dask.distributed import Client, progress
e = Client(set_as_default=True)
e

# swap out concurrent.futures with a dask executor.

In [None]:
%%writefile slowfunc.py
from time import sleep

def slowfunc(x,y,delay=1):
    sleep(delay)
    return(x+y)

In [None]:
%%time
from slowfunc import slowfunc
futures = [e.submit(slowfunc,1,2, delay=1) for _ in range(100)]
[f.result() for f in futures]

In [None]:
e.close()

There are loads of ways to do mapping now in Python, [this notebook](https://github.com/mrocklin/scipy-2016-parallel/blob/master/notebooks/map-rosetta-stone.ipynb) is the Rosetta stone.

<div class="alert alert-info">
   You can also use MPI with Dask...see <a href="http://mpi.dask.org/en/latest/install.html">dask_mpi</a>
</div>

<img src="https://jax.readthedocs.io/en/latest/_static/jax_logo_250px.png">

Jax is accelerated numpy (and more): https://jax.readthedocs.io/en/latest/jax-101/01-jax-basics.html

We'll see more of jax when we do ML, since it also provides automatic differentiation for us.

In [None]:
import jax
import jax.numpy as jnp

In [None]:
x = jnp.arange(10)
print(x)

In [None]:
n_devices = jax.local_device_count() 
n_devices

JAX uses the XLA compiler under the hood, and enables you to just-in-time (jit) compile your code to make it faster and more efficient. This is the purpose of the @jit annotation. 

In [None]:
from jax import numpy as jnp, random

def selu(x, alpha=1.67, lmbda=1.05):
    return lmbda * jnp.where(x > 0, x, alpha * jnp.exp(x) - alpha)

v = random.normal(random.PRNGKey(42), (1000000,))
%timeit selu(v).block_until_ready()

In [None]:
import jax

selu_jit = jax.jit(selu)
%timeit selu_jit(v).block_until_ready()

In [None]:
long_vector = jnp.arange(int(1e7))

%timeit jnp.dot(long_vector, long_vector).block_until_ready()

Now let's try this on a GPU and TPU in [Google Colab](https://colab.research.google.com/github/profjsb/python-seminar/blob/master/DataFiles_and_Notebooks/04_Parallelism/02_dask_and_jax.ipynb)

In [None]:
try:
    import jax.tools.colab_tpu
    jax.tools.colab_tpu.setup_tpu()
except:
    pass

In [None]:
long_vector = jnp.arange(int(1e7))

%timeit jnp.dot(long_vector, long_vector).block_until_ready()