In [1]:
from collections import defaultdict
import multiprocessing
import numpy as np

N = 25
WORKERS = 8

In [2]:
def generate_data(n, max_value=9):
    poisson_data = np.random.poisson(lam=1, size=2**n)
    poisson_data = poisson_data[poisson_data <= max_value]
    remaining = 2**n - len(poisson_data)
    remaining_data = np.random.randint(0, max_value, remaining, dtype='h')
    return np.concatenate((poisson_data, remaining_data))

### Let's generate and check data:

In [3]:
elements = generate_data(N)

In [4]:
def get_sum_by_element(elements):
    key_vals = {}
    for key in elements:
        key_vals[key] = key_vals.get(key, 0) + 1
    return key_vals

In [5]:
%timeit -r1 -n1 get_sum_by_element(elements)
get_sum_by_element(elements)

11.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


{0: 12339598,
 1: 12342904,
 2: 6174943,
 3: 2058482,
 4: 515237,
 5: 103305,
 6: 17199,
 7: 2440,
 8: 299,
 9: 25}

In [6]:
def get_sum_by_element_quick(elements):
    # print("Map process name: ", multiprocessing.current_process().name)
    key_vals = {}
    for i in np.unique(elements):
        key_vals[i] = len(elements[elements == i])
    return key_vals

In [7]:
%timeit -r1 -n4 get_sum_by_element_quick(elements)
get_sum_by_element_quick(elements)

2.15 s ± 0 ns per loop (mean ± std. dev. of 1 run, 4 loops each)


{0: 12339598,
 1: 12342904,
 2: 6174943,
 3: 2058482,
 4: 515237,
 5: 103305,
 6: 17199,
 7: 2440,
 8: 299,
 9: 25}

### Quick implementation of mapper works on average 4-7 quicker than naive one

### Let's do map reduce (for mapper using both functions get_sum_by_element from above):

In [8]:
def shuffle_data(map_results):
    data = defaultdict(list)
    for item in map_results:
        for key, value in item.items():
            data[key].append(value)
    return list(data.items())

def reduce_data(data):
    # print("Reduce process name: ", multiprocessing.current_process().name, "Key: ", data[0])
    return (data[0], np.sum(data[1]))

def run_map_reduce(pool, elements, mapper_function):
    elements_parts = np.array_split(elements, WORKERS)
    mapped = pool.map(mapper_function, elements_parts)
    shuffled = shuffle_data(mapped)
    results = pool.map(reduce_data, shuffled)
    
    return results, shuffled, mapped

In [9]:
pool = multiprocessing.Pool(WORKERS)

In [10]:
%timeit -r1 -n4 run_map_reduce(pool, elements, get_sum_by_element)

7.7 s ± 0 ns per loop (mean ± std. dev. of 1 run, 4 loops each)


In [11]:
%timeit -r1 -n4 run_map_reduce(pool, elements, get_sum_by_element_quick)

7.37 s ± 0 ns per loop (mean ± std. dev. of 1 run, 4 loops each)


In [12]:
results, shuffled, mapped = run_map_reduce(pool, elements, get_sum_by_element_quick)
print("\nMap results: ", mapped)
print("\nShuffle results: ", shuffled)
print("\nReduce results: ", results)


Map results:  [{0: 1542213, 1: 1543051, 2: 771820, 3: 257679, 4: 64410, 5: 12660, 6: 2143, 7: 289, 8: 37, 9: 2}, {0: 1542380, 1: 1542408, 2: 771152, 3: 258008, 4: 64733, 5: 13115, 6: 2167, 7: 287, 8: 46, 9: 8}, {0: 1543549, 1: 1542508, 2: 771081, 3: 257562, 4: 64149, 5: 12960, 6: 2126, 7: 331, 8: 37, 9: 1}, {0: 1541862, 1: 1544106, 2: 771794, 3: 256755, 4: 64297, 5: 12911, 6: 2231, 7: 299, 8: 42, 9: 7}, {0: 1543879, 1: 1542459, 2: 771325, 3: 256464, 4: 64588, 5: 13129, 6: 2143, 7: 285, 8: 29, 9: 3}, {0: 1542164, 1: 1543242, 2: 772097, 3: 257152, 4: 64366, 5: 12820, 6: 2095, 7: 332, 8: 33, 9: 3}, {0: 1541515, 1: 1543161, 2: 772305, 3: 257829, 4: 64202, 5: 12807, 6: 2151, 7: 294, 8: 39, 9: 1}, {0: 1542036, 1: 1541969, 2: 773369, 3: 257033, 4: 64492, 5: 12903, 6: 2143, 7: 323, 8: 36}]

Shuffle results:  [(0, [1542213, 1542380, 1543549, 1541862, 1543879, 1542164, 1541515, 1542036]), (1, [1543051, 1542408, 1542508, 1544106, 1542459, 1543242, 1543161, 1541969]), (2, [771820, 771152, 771081,

In [14]:
### Execution results (milliseconds):
### N:               15       20       22       25
# 1) slow mapper:   11.8     359      1047     11900
# 2) quick mapper:  1.84     48.7     218      2150
# 3) pool (slow):   7.45     117      493      7700
# 4) pool (quick):  2.93     24.5     93       7370

#### As conclusion:
1) Quick mapper, that utilises numpy syntax works 4-7 times quicker than naive one  
2) On small amount of data (2^15) just regular one thread mapper works quicker than multi processes (because no need for shuffle and cross threads communication)  
3) On bigger amount of data (2^20 - 2^22) MapReduce versions works 2-3 times quicker than just single threaded mappers. On that amount we benefit from parallel maps  
4) On much bigger data MapReduce versions work slower than just numpy mapper - reason is because shuffle operation is implemented in python in not optimal way  
5) But still slow (python mapper) MapReduce version perform better than regular slow mapper function - because in that case both shuffle and map functions are implemented non optimally - so we get some benefits from multiple processes :)  