Chapter 6. Speeding up map and reduce with advanced parallelization
====
### Mastering Large Datasets with Python by JT Wolohan 



### Timing

In [1]:
from time import clock, sleep
from multiprocessing import Pool


def times_two(x):
  return x*2+7


def lazy_map(xs):
  return list(map(times_two, xs))


def parallel_map(xs, chunck=8500):
  with Pool(2) as P:
    x =  P.map(times_two, xs, chunck)
  return x


for i in range(0, 7):
  N = 10**i
  t1 = clock()
  lazy_map(range(N))
  lm_time = clock() - t1

  t1 = clock()
  parallel_map(range(N))
  par_time = clock() - t1
  print("""
-- N = {} --
Lazy map time:      {}
Parallel map time:  {}
""".format(N, lm_time, par_time))


-- N = 1 --
Lazy map time:      8.000000000008e-06
Parallel map time:  0.01324599999999998


-- N = 10 --
Lazy map time:      9.599999999998499e-05
Parallel map time:  0.014952000000000076


-- N = 100 --
Lazy map time:      5.900000000003125e-05
Parallel map time:  0.01502199999999998


-- N = 1000 --
Lazy map time:      0.0003989999999999272
Parallel map time:  0.014475000000000016


-- N = 10000 --
Lazy map time:      0.0038730000000000153
Parallel map time:  0.01732200000000006


-- N = 100000 --
Lazy map time:      0.03707399999999994
Parallel map time:  0.02400800000000003


-- N = 1000000 --
Lazy map time:      0.199009
Parallel map time:  0.13838499999999998



In [4]:
from time import clock
from multiprocessing import Pool


def times_two(x):
  return x*2+7


def parallel_map(xs, chunk_size=8500):
  with Pool(2) as P:
    x = P.map(times_two, xs, chunk_size)
  return x


print("""
{:<10}  |  {}
-------------------------""".format("chunksize", "runtime"))

for i in range(0, 9):
  N = 1000000
  chunk_size = 5 * (10**i)

  t1 = clock()
  parallel_map(range(N), chunk_size)
  parallel_time = clock() - t1

  print("{:<10}  |   {:>0.3f}".format(chunk_size, parallel_time))


chunksize   |  runtime
-------------------------
5           |   5.083
50          |   1.431
500         |   0.291
5000        |   0.199
50000       |   0.159
500000      |   0.203
5000000     |   0.182
50000000    |   0.164
500000000   |   0.157


### Parallel sum

In [None]:
from pathos.multiprocessing import ProcessingPool as Pool
from toolz.sandbox.parallel import fold
from functools import reduce


def my_add(left, right):
  return left+right


with Pool() as P: 
    fold(my_add, range(500000), map=P.imap)

print(reduce(my_add, range(500)))

### Parallel filter

In [5]:
from pathos.multiprocessing import ProcessingPool as Pool
from toolz.sandbox.parallel import fold
from functools import reduce


def map_combination(left, right):
  return left + right


def keep_if_even(acc, nxt):
    if nxt % 2 == 0:
        return acc + [nxt]
    else: return acc


with Pool() as P:
    fold(keep_if_even, range(500000), [],
         map=P.imap, combine=map_combination)

print(reduce(keep_if_even, range(500), []))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 322, 324, 326, 328, 330, 332, 334, 336, 338, 340, 342, 344, 346, 348, 350, 352, 354, 356, 358, 360, 362, 364, 366, 368, 370, 372, 374, 376, 378, 380, 382, 384, 386, 388, 390, 392, 394, 396, 398, 400, 402, 404, 406, 408, 410, 412, 414, 416, 418, 420,

### Parallel frequencies

In [6]:
from pathos.multiprocessing import ProcessingPool as Pool
from toolz.sandbox.parallel import fold
from random import choice
from functools import reduce


def combine_counts(left, right):
  unique_keys = set(left.keys()).union(set(right.keys()))
  return {k:left.get(k, 0)+right.get(k, 0) for k in unique_keys}


def make_counts(acc, nxt):
    acc[nxt] = acc.get(nxt,0) + 1
    return acc


xs = (choice([1, 2, 3, 4, 5, 6]) for _ in range(500000))

with Pool() as P:
    fold(make_counts, xs, {},
         map=P.imap, combine=combine_counts)

print(reduce(make_counts, (choice([1, 2, 3, 4, 5, 6]) for _ in range(500)), {}))

{1: 76, 2: 94, 3: 74, 4: 78, 5: 88, 6: 90}


### Parallel Naive Bayes
**NB:** *This code ended up getting cut from the book. It implements the naive Bayes algorithm in parallel using map and reduce patterns. Feel free to read through it as a bonus.*

In [None]:
from itertools import starmap, repeat
from functools import reduce, partial
import dill as pickle
from toolz.sandbox.parallel import fold
from pathos.multiprocessing import ProcessingPool as PathosPool
from multiprocessing import Pool
from csv import DictReader

In [None]:
def unique_keys(left, right):
    return set(left.keys()).union(set(right.keys()))

def prod(xs):
    return reduce(lambda acc,nxt: acc*nxt, xs)

In [None]:
def compute_prob(model, k, v, label, N):
    """Compute probabilities for event."""
    Cn = model['LABELS'][label]
    prior = Cn / N
    evidence = model[k][v].get(label,.001) / Cn
    return prior * evidence

def _nb_suggest(ob, model, target):
    """maknaive Bayes prediction"""
    ob.pop(target)
    N = sum(model['LABELS'].values())
    results = {}
    for label in model['LABELS'].keys():
        p = prod(compute_prob(model, k, v, label, N) for k, v in ob.items())
        results[label] = p
    return results

def naive_bayes_suggest(obs, model, target):
    """Parallel naive Bayes prediction function"""
    with Pool() as P:
        f = partial(_nb_suggest, target=target)
        return P.starmap(f, zip(obs, repeat(model)))

In [None]:
def nb_acc(acc, nxt, target):
    label = nxt.pop(target)
    if not acc.get('LABELS', False):
        acc['LABELS'] = {}
    acc['LABELS'][label] = acc['LABELS'].get(label,0) + 1
    for k,v in nxt.items():
        if not acc.get(k,False):
            acc[k] = {}
        if not acc[k].get(v, False):
            acc[k][v] = {}
        acc[k][v][label] = acc.get(k,{}).get(v,{}).get(label,0) + 1
    return acc

In [None]:
def _nb_comb(left, right):
    acc = {}
    acc['LABELS'] = {}
    for k in unique_keys(left['LABELS'], right['LABELS']):
        acc['LABELS'][k] = left['LABELS'].get(k,0) + right['LABELS'].get(k,0)
    for k in unique_keys(left, right):
        if k == 'LABELS': continue
        acc[k] = {}
        for v in unique_keys(left.get(k,{}), right.get(k,{})):
            acc[k][v] = {}
            for label in acc['LABELS']:
                count_left = left.get(k,{}).get(v,{}).get(label,0)
                count_right = right.get(k,{}).get(v,{}).get(label,0)
                acc[k][v][label] = count_left + count_right
    return acc

In [7]:
def naive_bayes(xs, target):
    """Create a naive Bayes model.


    Inputs
    xs: input data
    target: target variable
    
    Output
    prediction function
"""
    acc = partial(nb_acc, target=target)
    with PathosPool() as P:
        model = fold(acc, xs, {}, map=P.map, combine=_nb_comb)
    return partial(naive_bayes_suggest, model=model, target=target)

def max_prob(probs):
    return max(((k,v) for k,v in probs.items()), key=lambda x:x[1])[0]

Download [the nursery data](https://archive.ics.uci.edu/ml/machine-learning-databases/nursery/nursery.data) and assign its path to `fp` in the next block

In [None]:
fp = ""
with open(fp) as f:
    reader = DictReader(f, fieldnames=["parents", "has_nurs", "form",
                                 "children", "housing", "finance",
                                 "social", "health", "recc"])
    data = [row for row in reader]

model = naive_bayes(data, "recc")
probs = model(data)
print("{}\t\t{}\t{}".format("Match", "Suggestion", "Actual"))
print("{}".format("-"*45))
for i,p in enumerate(probs):
    suggestion = max_prob(p)
    actual = data[i]['recc']
    match = suggestion == actual
    print("{}\t\t{}\t{}".format(match, suggestion, actual))
    if i > 25: break

[Read for more? Go to chapter 7!](./Ch07_notebook.ipynb)