Parallel Collections
---------------------

Systems like Spark and Dask include "big data" collections with a small set of high-level primitives like `map`, `filter`, `groupby`, and `join`.  With these common patterns we can often handle computations that are more complex than map, but are still structured.

In this section we repeat the submit example using the PySpark and the Dask.Bag APIs, which both provide parallel operations on linear collections of arbitrary objects.


### Objectives

*  Use high-level `pyspark` and `dask.bag` to parallelize common non-map patterns

### Requirements

*  Dask.bag

*Note: the following exercises were designed to work with `dask 0.10.1`, you can check your installed version of `dask` with the following code*:

```
import dask
print(dask.__version__)
```

## Application

We again start with the following sequential code

```python
series = {}
for fn in filenames:   # Simple map over filenames
    series[fn] = pd.read_hdf(fn)['x']

results = {}

for a in filenames:    # Doubly nested loop over the same collection
    for b in filenames:  
        if a != b:     # Filter out bad elements
            results[a, b] = series[a].corr(series[b])  # Apply function

((a, b), corr) = max(results.items(), key=lambda kv: kv[1])  # Reduction
```

### Spark/Dask.bag methods

We can construct most of the above computation with the following Spark/Dask.bag methods:

*  `collection.map(function)`: apply function to each element in collection
*  `collection.product(collection)`: Create new collection with every pair of inputs
*  `collection.filter(predicate)`: Keep only elements of colleciton that match the predicate function
*  `collection.max()`: Compute maximum element

We use these briefly in isolated exercises and then combine them to rewrite the previous computation from the `submit` section.

### Dask.bag: Example API

In [1]:
import dask.bag as db

In [2]:
b = db.from_sequence(range(5))
b

dask.bag<from_se..., npartitions=5>

In [3]:
b.compute()  # Gather results back to local process

[0, 1, 2, 3, 4]

### `map`

In [4]:
# Square each element

b.map(lambda x: x ** 2).compute()

[0, 1, 4, 9, 16]

In [5]:
from time import sleep

In [6]:
# Square each element and collect results

b.map(lambda x: x ** 2).compute()

[0, 1, 4, 9, 16]

In [7]:
# Select only the even elements

b.filter(lambda x: x % 2 == 0).compute()

[0, 2, 4]

In [8]:
# Cartesian product of each pair of elements in two sequences (or the same sequence in this case)

b.product(b).compute()

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (1, 0),
 (1, 1),
 (1, 2),
 (1, 3),
 (1, 4),
 (2, 0),
 (2, 1),
 (2, 2),
 (2, 3),
 (2, 4),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4)]

In [9]:
# Chain operations to construct more complex computations

(b.map(lambda x: x ** 2)
  .product(b)
  .filter(lambda tup: tup[0] % 2 == 0)
  .compute())

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4),
 (16, 0),
 (16, 1),
 (16, 2),
 (16, 3),
 (16, 4)]

### Exercise: Parallelize pairwise correlations with Dask.bag

To make this a bit easier we're just going to compute the maximum correlation and not try to keep track of the stocks that yielded this maximal result.

In [10]:
from glob import glob
import os
import pandas as pd

filenames = sorted(glob(os.path.join('..', 'data', 'json', '*.h5')))  # ../data/json/*.json
filenames[:5]

['../data/json/aet.h5',
 '../data/json/afl.h5',
 '../data/json/aig.h5',
 '../data/json/al.h5',
 '../data/json/amgn.h5']

In [11]:
%%time

### Sequential Code

series = []
for fn in filenames:   # Simple map over filenames
    series.append(pd.read_hdf(fn)['close'])

results = []

for a in series:    # Doubly nested loop over the same collection
    for b in series:  
        if not (a == b).all():     # Filter out comparisons of the same series 
            results.append(a.corr(b))  # Apply function

result = max(results)
print(result)

0.941803698004
CPU times: user 2.66 s, sys: 164 ms, total: 2.83 s
Wall time: 2.86 s


In [12]:
%%time

### Parallel code

b = db.from_sequence(filenames)
data = b.map(lambda fn: pd.read_hdf(fn)['close'])

result = (data.product(data)
          .filter(lambda d: not (d[0] == d[1]).all())
          .map(lambda d: d[0].corr(d[1]))
          .max()
          .compute())
print(result)

0.941803698004
CPU times: user 6.1 s, sys: 2.37 s, total: 8.47 s
Wall time: 9.01 s


In [13]:
result

0.94180369800361052

In [None]:
# %load solutions/collections-2.py
### Parallel Code

import dask.bag as db

b = db.from_sequence(filenames)
series = b.map(lambda fn: pd.read_hdf(fn)['close'])

corr = (series.product(series)
              .filter(lambda ab: not (ab[0] == ab[1]).all())
              .map(lambda ab: ab[0].corr(ab[1]))
              .max())

result = corr.compute()


In [None]:
result

In [None]:
%%time

import dask

result = corr.compute(get=dask.local.get_sync)

### PySpark solution for comparison

In [None]:
# %load solutions/collections-1.py
### Parallel code

rdd = sc.parallelize(filenames)
series = rdd.map(lambda fn: pd.read_hdf(fn)['close'])

corr = (series.cartesian(series)
              .filter(lambda ab: not (ab[0] == ab[1]).all())
              .map(lambda ab: ab[0].corr(ab[1]))
              .max())

result = corr


### Conclusion

*  Higher level collections include functions for common patterns
*  Move data to collection, construct lazy computation, trigger at the end
*  Used PySpark (`cartesian + map`) and Dask.bag (`product + map`) to handle nested for loop