# Map and Reduce style of programming with Python

In [64]:
import dill as pickle
from random import choice
from functools import reduce
from pathos.multiprocessing import ProcessingPool as Pool
from toolz.sandbox.parallel import fold
from itertools import starmap

In [61]:
?reduce

[1;31mDocstring:[0m
reduce(function, sequence[, initial]) -> value

Apply a function of two arguments cumulatively to the items of a sequence,
from left to right, so as to reduce the sequence to a single value.
For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
((((1+2)+3)+4)+5).  If initial is present, it is placed before the items
of the sequence in the calculation, and serves as a default when the
sequence is empty.
[1;31mType:[0m      builtin_function_or_method


In [24]:
xs = [10, 5, 1, 19, 11, 203]

def my_add(acc, nxt):
    return acc + nxt

print(reduce(my_add, xs, 0))

249


In [25]:
print(reduce(lambda acc, nxt: acc+nxt, xs, 0))

249


In [62]:
?Pool

[1;31mInit signature:[0m [0mPool[0m[1;33m([0m[1;33m*[0m[0margs[0m[1;33m,[0m [1;33m**[0m[0mkwds[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m     
Mapper that leverages python's multiprocessing.
    
[1;31mInit docstring:[0m
Important class members:
    nodes       - number (and potentially description) of workers
    ncpus       - number of worker processors
    servers     - list of worker servers
    scheduler   - the associated scheduler
    workdir     - associated $WORKDIR for scratch calculations/files

Other class members:
    scatter     - True, if uses 'scatter-gather' (instead of 'worker-pool')
    source      - False, if minimal use of TemporaryFiles is desired
    timeout     - number of seconds to wait for return value from scheduler
        
NOTE: if number of nodes is not given, will autodetect processors
        
[1;31mFile:[0m           c:\users\fraga\anaconda3\lib\site-packages\pathos\multiprocessing.py
[1;31mType:[0m           t

In [28]:
%%timeit -r 1
with Pool() as P: 
    fold(my_add, range(1000000), map=P.imap)

6.15 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [29]:
%%timeit -r 1
reduce(my_add, range(1000000))

129 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 10 loops each)


### Speeding up map and reduce
> Using a parallel map can counterintuitively be slower than using a lazy map in map an reduce scenarios

We can always use parallelization at the reduce level instead of at the map level

In [30]:
def map_combination(left, right):
  return left + right


def keep_if_even(acc, nxt):
    if nxt % 2 == 0:
        return acc + [nxt]
    else: 
        return acc

In [63]:
?fold

[1;31mSignature:[0m
[0mfold[0m[1;33m([0m[1;33m
[0m    [0mbinop[0m[1;33m,[0m[1;33m
[0m    [0mseq[0m[1;33m,[0m[1;33m
[0m    [0mdefault[0m[1;33m=[0m[1;34m'__no__default__'[0m[1;33m,[0m[1;33m
[0m    [0mmap[0m[1;33m=[0m[1;33m<[0m[1;32mclass[0m [1;34m'map'[0m[1;33m>[0m[1;33m,[0m[1;33m
[0m    [0mchunksize[0m[1;33m=[0m[1;36m128[0m[1;33m,[0m[1;33m
[0m    [0mcombine[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m
Reduce without guarantee of ordered reduction.

inputs:

``binop``     - associative operator. The associative property allows us to
                leverage a parallel map to perform reductions in parallel.
``seq``       - a sequence to be aggregated
``default``   - an identity element like 0 for ``add`` or 1 for mul

``map``       - an implementation of ``map``. This may be parallel and
                determines how work is distributed.
``chunksize`` - Number of

In [39]:
%%timeit -r 1
with Pool() as P:
    fold(keep_if_even, range(100000), [],
         map=P.imap, combine=map_combination)

996 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [22]:
%%timeit -r 1
reduce(keep_if_even, range(100000), [])

4.49 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [56]:
N = 100000
P = Pool()
xs = range(N)


# Parallel summation
def my_add(left, right):
  return left+right

In [57]:
%%timeit -r 1
fold(my_add, xs, map=P.imap)

611 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [58]:
# Parallel filter
def map_combination(left, right):
  return left + right

def keep_if_even(acc, nxt):
    if nxt % 2 == 0:
        return acc + [nxt]
    else: 
        return acc

In [59]:
%%timeit -r 1
fold(keep_if_even, xs, [], map=P.imap, combine=map_combination)

981 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [43]:
#Parallel frequencies
def combine_counts(left, right):
  unique_keys = set(left.keys()).union(set(right.keys()))
  return {k:left.get(k,0)+right.get(k,0) for k in unique_keys}

def make_counts(acc, nxt):
    acc[nxt] = acc.get(nxt,0) + 1
    return acc

xs = (choice([1,2,3,4,5,6]) for _ in range(N))

{1: 16586, 2: 16732, 3: 16727, 4: 16651, 5: 16683, 6: 16621}


In [60]:
%%timeit -r 1
fold(make_counts, xs, {}, map=P.imap, combine=combine_counts)

2.47 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [65]:
?starmap

[1;31mInit signature:[0m [0mstarmap[0m[1;33m([0m[0mself[0m[1;33m,[0m [1;33m/[0m[1;33m,[0m [1;33m*[0m[0margs[0m[1;33m,[0m [1;33m**[0m[0mkwargs[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m     
starmap(function, sequence) --> starmap object

Return an iterator whose values are returned from the function evaluated
with an argument tuple taken from the given sequence.
[1;31mType:[0m           type
[1;31mSubclasses:[0m     


In [67]:
xs = [7, 3, 1, 19, 11]
ys = [8, 1, -3, 14, 22]

loop_maxes = [max(ys[i], x) for i, x in enumerate(xs)]
map_maxes = list(starmap(max, zip(xs, ys)))

print(loop_maxes)

[8, 3, 1, 19, 22]


In [68]:
print(map_maxes)

[8, 3, 1, 19, 22]
