Parallel Map on Files
------------------------

For each of a set of filenames, we parse JSON data contents, load that data into a Pandas DataFrame, and then output the result to another file with a nicer format, HDF5.

We find that parsing JSON is slow and so we parallelize the process using the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) module to do this work in multiple processes.

### Objectives

*  Profile code to find bottleneck
*  Use `concurrent.futures` to `map` a function across many inputs in parallel


### Requirements

*  Pandas
*  concurrent.futures (standard in Python 3, `pip install futures` in Python 2)
*  snakeviz (for profile visualization, `pip install snakeviz`)


    pip install snakeviz
    pip install futures

In [5]:
! pip install snakeviz

Collecting snakeviz
  Downloading snakeviz-0.4.1-py2.py3-none-any.whl (166kB)
Installing collected packages: snakeviz
Successfully installed snakeviz-0.4.1


### Extra Exercise

Try out alternative binary formats.  Perhaps try [feather](https://github.com/wesm/feather).

## Before we start

We need to get some data to work with.
We are going to generate some [fake stock data](https://github.com/mrocklin/fakestockdata) by adding a bunch of points between real stock data points. This will take a few minutes the first time we run it.

In [7]:
! pip install fakestockdata



In [38]:
%run ../prep.py

## Sequential Execution

In [39]:
%load_ext snakeviz

The snakeviz extension is already loaded. To reload it, use:
  %reload_ext snakeviz


In [40]:
from glob import glob
import json
import pandas as pd
import os

In [41]:
filenames = sorted(glob(os.path.join('..', 'data', 'json', '*.json')))  # ../data/json/*.json
filenames[:5]

['..\\data\\json\\aa.json',
 '..\\data\\json\\aapl.json',
 '..\\data\\json\\abc.json',
 '..\\data\\json\\aig.json',
 '..\\data\\json\\amgn.json']

In [42]:
%%snakeviz

for fn in filenames:
    print(fn)
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

..\data\json\aa.json
..\data\json\aapl.json
..\data\json\abc.json
..\data\json\aig.json
..\data\json\amgn.json
..\data\json\amzn.json
..\data\json\bwa.json
..\data\json\cost.json
..\data\json\csco.json
..\data\json\d.json
..\data\json\ebay.json
..\data\json\emr.json
..\data\json\esrx.json
..\data\json\gas.json
..\data\json\ge.json
..\data\json\goog.json
..\data\json\hal.json
..\data\json\hp.json
..\data\json\hpq.json
..\data\json\ibm.json
..\data\json\jbl.json
..\data\json\jpm.json
..\data\json\met.json
..\data\json\nyx.json
..\data\json\pcg.json
..\data\json\usb.json
..\data\json\vrsn.json
..\data\json\yhoo.json
..\data\json\zmh.json
 
*** Profile stats marshalled to file 'C:\\Users\\caleb\\AppData\\Local\\Temp\\tmpstxcgp6d'. 


Parallel Execution
--------------------

We can process each file independently and in parallel.  To accomplish this we'll transform the body of our for loop into a function and then use the [concurrent.futures.ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) to apply that function across all of the filenames in parallel using multiple processes.

### Before

Whenever we have code like the following:

```python
results = []
for x in L:
    results.append(f(x))
```

or the following:

```python
results = [f(x) for x in L]
```

or the following:

```python
results = list(map(f, x))
```

### After

We can instead write it as the following:

```python
from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()

results = list(e.map(f, L))
```

### Example

In [None]:
%%time

### Sequential code

from time import sleep

results = []

for i in range(8):
    sleep(1)
    results.append(i + 1)

In [None]:
%%time

### Parallel code

from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()

def slowinc(x):
    sleep(1)
    return x + 1

results = list(e.map(slowinc, range(8)))

### Exercise:  Convert JSON data to HDF5 in parallel using `concurrent.futures.Executor.map`

In [None]:
import json

In [None]:
%%time

### Sequential code

for fn in filenames:
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

In [None]:
%%time
# try replacing %%time with %%snakeviz when everything's working
# to get a profile

### Parallel code

# TODO

In [None]:
%load solutions/map-1.py

Try visualizing your parallel version with `%%snakeviz`. Where does it look like it's spending all its time?

Parallelism isn't everything
--------------------------------

We get a moderate increase in performance when using multiple processes.  However parallelism isn't the only way to accelerate this computation.  Recall that the bulk of the cost comes from the `json.loads` function.  A quick internet search on "fast json parsing in python" yields the [ujson](https://pypi.python.org/pypi/ujson) library as the top hit.

Knowing about and importing the optimized `ujson` library is just as effective as multi-core execution.

In [None]:
import ujson

In [None]:
%%snakeviz
filenames = sorted(glob(os.path.join('..', 'data', 'json', '*.json')))

for fn in filenames:
    with open(fn) as f:
        data = [ujson.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

History: multiprocessing.Pool
--------------------------------

Perviously people have done multi-processing computations with the `multiprocessing.Pool` object, which behaves more or less identically.

However, today most library designers are coordinating around the `concurrent.futures` interface, so it's wise to move over.

In [None]:
%%time 

from multiprocessing import Pool
p = Pool()

list(p.map(load_parse_store, filenames))

Conclusion
-----------

*  Used `snakeviz` to profile code
*  Used `concurrent.futures.ProcessPoolExecutor` for simple parallelism across many files
    *  Gained some speed boost (but not as much as expected)
    *  Lost ability to diagnose performance within parallel code
*  Describing each task as a function call helps use tools like map for parallelism
*  Saw that other options than parallelism exist to speed up code, including the `ujson` library.
*  Making your tasks fast is often at least as important as parallelizing your tasks.