#Parallel Computing

##multiprocessing

Python has a neat `multiprocessing` package that will allow you to run your tasks using multiple cores in parallel.

Let's use five cores.

In [1]:
n_nodes = 5

In [2]:
from multiprocessing import Pool

def f(x):
    return x * x

p = Pool(n_nodes)  # specify the number of cores for parallel computing
print p.map(f, [1, 2, 3, 4, 5, 6])

[1, 4, 9, 16, 25, 36]


##MapReduce
Although your tasks run locally, this is a nice simulation of MapReduce, since you'll be working with distributed systems as well.

At first, we need to distribute our big dataset into several clusters. In real life, your big dataset usually lives across multiple nodes already.


In [3]:
def distribute_data(data, n_nodes):
    return [data[i::n_nodes] for i in xrange(n_nodes)]    

In the MapReduce framework, an operation consists of two steps:
1. The **map** step performs the task locally on each node
1. The **reduce** step aggregates all local results into the final result.

For example, suppose we have a list of errors, and we would like to compute the mean square error.

In [4]:
import numpy as np

N = 1e5  # some big number
errors = np.random.randn(N)  # our list of errors

With python or numpy, we would compute the MSE as follows.

In [5]:
print sum([error ** 2 for error in errors]) / float(len(errors))  # python way (slow)
print np.square(errors).mean()  # numpy way  (fast)

1.00266565643
1.00266565643


Using distributed systems, we would define the map and reduce functions as follows. 

We intentionally write the code to be suboptimal, so it's rather slow and you'll see the effects of the parallellization more clearly.

In [6]:
def f_map(data):
    """Perform your function locally in one node"""
    return sum([x ** 2 for x in data]), len(data)  # return result and the count
    
def f_reduce(data):
    """Reduce all the local results back to final answer"""
    sums, counts = np.array(data).T
    return sums.sum() / counts.sum()

Let's see if we indeed get the same if we would pipe these functions directly.

In [7]:
mse = f_reduce([f_map(errors)])
print mse
print np.isclose(mse, np.square(errors).mean())

1.00266565643
True


Now let's use distributed data.

In [8]:
f_reduce([f_map(data) for data in distribute_data(errors, n_nodes)])

1.0026656564318008

And now let's use the actual cores.

In [9]:
p = Pool(n_nodes)
f_reduce(p.map(f_map, distribute_data(errors, n_nodes)))

1.0026656564318008

### Performance

Let's compare the duration of serial computing with parallel computing.

In [10]:
N = 1e6  # one million samples
errors = np.random.randn(N)
distributed_data = distribute_data(errors, n_nodes)

In [11]:
%timeit f_reduce([f_map(data) for data in distributed_data])  # serialized mapreduce
%timeit f_reduce(p.map(f_map, distributed_data))  # parallel mapreduce

1 loops, best of 3: 315 ms per loop
10 loops, best of 3: 95.9 ms per loop


Using several cores in parallel is indeed a multiple faster than serial computing. 

It takes time to spin up machines in a network and to distribute your data amongst them. Hence, the parallel computing pays off when your data is very big and your operation is rather slow.

Let's try a larger number of samples.

In [12]:
N = 1e7  # ten million samples
errors = np.random.randn(N)
distributed_data = distribute_data(errors, n_nodes)

In [13]:
%timeit f_reduce([f_map(data) for data in distributed_data])  # serialized mapreduce
%timeit f_reduce(p.map(f_map, distributed_data))  # parallel mapreduce

1 loops, best of 3: 3.14 s per loop
1 loops, best of 3: 938 ms per loop


More or less same factor.

### NumPy

Note, however, that for these kinds of operations, just a simple `numpy` operation beats all other methods.

In [14]:
%timeit np.square(errors).mean()  # numpy way

10 loops, best of 3: 29.9 ms per loop


[SciPy.org](http://wiki.scipy.org/ParallelProgramming) writes this about that:
    
One of the great strengths of numpy is that you can express array operations very cleanly. For example to compute the product of the matrix A and the matrix B, you just do:

     C = numpy.dot(A,B)

Not only is this simple and clear to read and write, since numpy knows you want to do a matrix dot product it can use an optimized implementation obtained as part of "BLAS" (the Basic Linear Algebra Subroutines). This will normally be a library carefully tuned to run as fast as possible on your hardware by taking advantage of cache memory and assembler implementation. But many architectures now have a BLAS that also takes advantage of a multicore machine. If your numpy/scipy is compiled using one of these, then `dot()` will be computed in parallel (if this is faster) without you doing anything. Similarly for other matrix operations, like inversion, singular value decomposition, determinant, and so on. For example, the open source library ATLAS allows compile time selection of the level of parallelism (number of threads). 

<hr>
## Exercise

Using `multiprocessing.Pool`, write a mapreduce job for a common task that is easy to parallize. You could think of averaging numbers, counting words or maybe even a `mergesort` implementation.