<center><img src="http://www.nasa.gov/sites/all/themes/custom/nasatwo/images/nasa-logo.svg"></center>

<center>
<h1><font size="+3">GSFC Python Bootcamp</font></h1>
</center>

---

<center><h3>Parallel Programming</h3></center>

In [0]:
import warnings
warnings.filterwarnings('ignore')

There are other ways to do multiprocessing within Python, but here, we wanted to give you an entry point to this parallel programming methodology. Below are other links to the other packages if you so choose to learn them:

* [multithreading](https://docs.python.org/3.4/library/threading.html) - to use multiple threads to perform computations
* [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) - to use multiple processors to perform computations
* [asyncio](https://docs.python.org/3/library/asyncio.html) - to schedule asynchronous (concurrent) tasks such as input and output or calculations

__Warning:__ There is a topic that you will eventually encounter when programming in parallel. This is the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock) (Global Interpreter Lock).

# Outline

1. What is Dask?
2. A simple example.
3. Using Dask with Bokeh server.

## 1. What is Dask?

---

[Dask](https://dask.org/) is smply a Python library/package to add parallel computing capabilities. It primarily is constructed to two parts (I would consider 3, but the documentation says 2):

* __Dynamic task scheduler:__ this is the worker that once Dask has a task graph created, this portion handles the execution on parallel hardware whether it be a single machine or a cluster
* __Big data collections:__ think the usefulness of NumPy and Pandas APIs but used within Dask
* _Task graphs:_ Dask's underlying step that connects the data collections and task schedulers are these graphs that Dask creates before using the functionality of Pandas or scheduling a task through the scheduler.

__Note:__ Dask is NOT [Dash](https://plot.ly/products/dash/) which works with the Bokeh plotting package. Dash is the package to create dashboard-like applications using Plotly.

## Why Dask?

---

Python, in general, is a great tool for data scientists. However, packages used by these scientists such as NumPy, Pandas, and so forth were not designed for computation beyond a single machine. Dask takes the usefulness of these packages and extends them to be used across multiple machines for larger applications, computations and analysis while still being able to scale back down to be used on a single machine. Dask also maintains it's familiarity of these packages for those that have used them before.

In [0]:
# Arrays implement the Numpy API
import dask.array as da
x = da.random.random(size=(10000, 10000),
                     chunks=(1000, 1000))
x + x.T - x.mean(axis=0)

In [0]:
# Dataframes implement the Pandas API
import dask
df = dask.datasets.timeseries()
len(df)

In [0]:
df.head()

In [0]:
df2 = df.groupby(df.id).x.sum()
print(df2)

At this point, dask has not computed the actual sum. We have to tell Dask to compute this explicitly.

In [0]:
df.compute()

## 2. A simple example

---

Here, we will go beyond just calculating in series and try to use Dask's distributed module to do calculations in parallel.

In [0]:
from time import sleep
import random

def inc(x):
    sleep(0.2)
    return x + 1

def double(x):
    sleep(0.2)
    return 2 * x

def add(x,y):
    sleep(0.2)
    return x + y

Serial/Sequential calculations to obtain a mathematical total. (Benchmark ~5 seconds on a Mac laptop.)

In [0]:
%%timeit -r 1

data = [1, 2, 3, 4, 5, 6, 7, 8]

out = []
for x in data:
    y = inc(x)
    z = double(y)
    out.append(z)
    
total = 0
for z in out:
    total = add(total, z)

total

### Update

> We tell dask in the code below that each of these methods are going to be parallelized by wrapping them as dask delayed functions. The functions have not been executed in this block below, merely proxy objects that have a graph associated with it (see visualize below).

In [0]:
import dask

# delayed means to setup but not compute yet
inc = dask.delayed(inc)
double = dask.delayed(double)
add = dask.delayed(add)

### Update

> The __visualize__ method below requires the _graphviz_ package. I have tried installing it via __conda install__, but what worked for me was __pip install__ on the command line in the appropriate Python environment.

In [0]:
x = inc(1)
y = inc(2)
z = add(x, y)
dask.visualize(z, rankdir='LR')

### Update

> The next code block now uses our wrapped/delayed methods to create a delayed total. It has not physically calculated the total in which makes this execution very fast. The actual computation follows afterwards via the __dask.compute__ call.

In [0]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

out = []
for x in data:
    y = inc(x)
    z = double(y)
    out.append(z)
    
total = 0
for z in out:
    total = add(total, z)
    
total

### Update

> Now we have our new delayed methods that will be parallelized rather than sequential. To schedule or perform the processes/calculations we tell dask to compute the total. You will notice that it is faster (~2 seconds)than the prior sequential computation of roughly ~5 seconds on a Mac laptop.
> 
> It also returns the correct calculated value as expected.

In [0]:
%%timeit -r 1
dask.compute(total)

Notice that even though this was parallelized, there are still parts that are sequential in computation. This hints at better parallelization algorithms/schemes.

In [0]:
dask.visualize(total, rankdir='LR') # sequential dependence still evident by visualization

In [0]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

out = []
for x in data:
    y = inc(x)
    z = double(y)
    out.append(z)
    
# tree reduction
while len(out) > 1:
    out = [add(out[i], out[i+1]) for i in range(0, len(out), 2)]

total = out[0]

In [0]:
dask.visualize(total, rankdir='LR')

### Update

> While this new method of the tree reduction seems fast, remember that it has not actually performed the calculation in question. Let's do that now.
> 
> I saw a reduction in time to ~1 second with this new and more appropriate algorithm for summations.

In [0]:
%%timeit -r 1

dask.compute(total)

We can instantly see which of these two algorithms will be better for parallelization. But what if we had a more complex computation...

In [0]:
import dask.array as da

# 15x15 array of ones chunked into 5x5 squares (uses NumPy mainly)
x = da.ones((15, 15), chunks=(5,5))

In [0]:
dask.visualize(x)

In [0]:
dask.visualize((x.dot(x.T) - x.mean(axis=0)).std())


But this graph does not give us any indication whether our parallelization is efficient or not (as well as using resources appropriately). We need a way to visualize how it __performs__ on a system.

## 3. Using Dask with Bokeh Server

---

Locally, here are the steps to carete the dask bokeh server:

1. You need 2 terminal/command prompt sessions/windows that have the appropriate Python environment activated/sourced.
2. Execute dask-scheduler in the first session/window. If you do not have this command line utilty, you need to install the _"distributed"_ package. [Link](http://distributed.readthedocs.io/en/latest/install.html)
3. The scheduler will give addresses:
  * scheduler: this is the address we use for the workers
  * http: to view the local web server, copy this address into a browser
  * bokeh: this is where our bokeh server/visualizer/dashboard will be
4. Schedule a dask worker for the scheduler in the next terminal session/window:


        dask-worker xxx.xxx.xxx.xxx:xxxx


    By using the address given from the scheduler (minus the tcp part).
5. Change the code below in the jupyter notebook to reflect the dask-scheduler address.
6. Open a new browser tab/window and go to the bokeh server address from the __dask-scheduler__ information. If you used the one from the worker, you will not have all the utilities available.
7. Click on the __status__ text to view the dashboard for processes running.
8. Execute the code below from the notebook and view the output in the dask bokeh dashboard. You can repeatedly do this and modify the parallelization to get better performance.

In [0]:
from distributed import Client, LocalCluster
from time import sleep
import random
import dask

def inc(x):
    sleep(random.random() / 10)
    return x + 1

def dec(x):
    sleep(random.random() / 10)
    return x - 1

def add(x, y):
    sleep(random.random() / 10)
    return x + y


# change this to the address and port from your dask bokeh server
client = Client('172.28.0.2:8786')

incs = client.map(inc, range(100))
decs = client.map(dec, range(100))
adds = client.map(add, incs, decs)
total = client.submit(sum, adds)

del incs, decs, adds
total.result()