## Simple parallel code

### Part 1: Vectorisation

Vectorisation is straightforward in python (and R). You should always try to code with vectors/arrays, though For loops are sometimes necessary.

In python:

* For loops aren't intrinsically worse, but they encourage poor coding practice.

In R: 

* For loops are very inefficient. For efficiency you may have to go out of your way to vectorise.

In C/C++:

* OpenMP allows for loops to be parallelised without any additional effort - just remember to avoid using the results of previous loops.
* Modern updates (from C++-11) include many explicit vectorisations, allowing map/reduce vectorisations to be exploited directly.

First, an array representation reminder in python:

In [1]:
import numpy as np

In [2]:
a = np.array([[1,2],[3,4]])
b = np.array([[1,1],[1,1]])
print("a =")
print(a)
print("b =")
print(b)

a =
[[1 2]
 [3 4]]
b =
[[1 1]
 [1 1]]


In [3]:
# substraction and addition

print("a + b =")
print(a + b)
print("a - b =")
print(a - b)

a + b =
[[2 3]
 [4 5]]
a - b =
[[0 1]
 [2 3]]


In [4]:
# Element wise multiplication
print("a * b =")
print(a*b)

a * b =
[[1 2]
 [3 4]]


In [5]:
# Matrix multiplication
print("a@b =")
print(a@b)
print("np.dot(a,b) =")
print(np.dot(a,b))

a@b =
[[3 3]
 [7 7]]
np.dot(a,b) =
[[3 3]
 [7 7]]


In [6]:
# numpy array with one row
c =  np.array([1,2,3])
print("c.shape =")
print(c.shape)
# numpy array with three rows
d = np.array([[1],[2],[3]])
print("d.shape")
print(d.shape)

c.shape =
(3,)
d.shape
(3, 1)


In [7]:
# Define an 3x3 2d array
e = np.array([[1,2,3],[4,5,6],[7,8,9]])
print(e)
print("first element in the array, e[0,0] =")
print(e[0,0])
print("first row of the array. e[0,:] =")
print(e[0,:])
print("second coulumn of the array. e[:,1] =")
print(e[:,1])

[[1 2 3]
 [4 5 6]
 [7 8 9]]
first element in the array, e[0,0] =
1
first row of the array. e[0,:] =
[1 2 3]
second coulumn of the array. e[:,1] =
[2 5 8]


Now we'll make a simulation to compare between vectorised and non-vectorised code. This is just a simple matrix times vector computation.


In [8]:
#create some test data and simulate results
N=100000# Number of rows
K=100 # Number of columns
X = np.random.randn(N,K)
W = np.random.rand(K)

In [9]:
def matrix_times_vector_forloop(X,W):
    N,K=X.shape
    # Initialize theta
    forloop = []
    for i in range(N):
        hypo_i = 0
        for j in range(K):
            hypo_i += W[j]*X[i,j]
        forloop.append(hypo_i)
    return(forloop)

In [10]:
%%time
forloop=matrix_times_vector_forloop(X,W)

CPU times: user 4.33 s, sys: 45.7 ms, total: 4.38 s
Wall time: 4.41 s


In [11]:
%%time
# matrix format
vect = X@W

CPU times: user 12.2 ms, sys: 2.47 ms, total: 14.7 ms
Wall time: 9.36 ms


In [12]:
## Check the answers
print("for loop")
print(forloop[0:4])
print("vectorised")
print(vect[0:4])

for loop
[2.2873055232600876, -1.0296846002912856, 3.895888691012576, 4.7246531820039035]
vectorised
[ 2.28730552 -1.0296846   3.89588869  4.72465318]


CHALLENGE:  Make a plot of how the two approaches change in performance as a function of N (and/or K). What is the computational scaling?

See https://towardsdatascience.com/vectorization-implementation-in-machine-learning-ca652920c55d for an example that is much more extreme.

### Accumulate example

In [13]:
seq = np.random.randint(0, 100, size=1000000)

In [14]:
def cumsum_diff_with_accumulate(x):
     x = np.asarray(x)
     return np.max(x - np.minimum.accumulate(x))
def cumsum_diff(x):
     max_px = 0
     min_px = x[0]
     for px in x[1:]:
         min_px = min(min_px, px)
         max_px = max(px - min_px, max_px)
     return max_px

In [15]:
%%time
cumsum_diff(seq)

CPU times: user 588 ms, sys: 5.35 ms, total: 594 ms
Wall time: 597 ms


99

In [16]:
%%time
cumsum_diff_with_accumulate(seq)

CPU times: user 10.8 ms, sys: 8.69 ms, total: 19.5 ms
Wall time: 18 ms


99

### Part 2

Mapping python (and R) is straightfoward. 

http://chryswoods.com/parallel_python/map.html

In [18]:
import math

def calc_distance(point1, point2):
    """
    Function to calculate and return the distance between
    two points
    """
    
    dx2 = (point1[0] - point2[0]) ** 2
    dy2 = (point1[1] - point2[1]) ** 2
    dz2 = (point1[2] - point2[2]) ** 2
    return math.sqrt(dx2 + dy2 + dz2)

In [19]:
points1 = [(1.0,1.0,1.0), (2.0,2.0,2.0), (3.0,3.0,3.0)]
points2 = [(4.0,4.0,4.0), (5.0,5.0,5.0), (6.0,6.0,6.0)]

distances = map(calc_distance, points1, points2)

print(distances)

<map object at 0x10faa1748>


Python has not done the calculation! Instead it returns an object (an iterator) that would evaluate this computation. But it does so lazily.  To get the answer, we must use the result, for example, by coercing it to a list.

This behaviour is **standard** in parallel processing environments, in which the computation may be performed remotely and there may be additional remove computations to perform. By caching the computation, the software environment can sometimes obtain massive efficiency gains.

This is how we get the answer:

In [20]:
print(list(distances))

[5.196152422706632, 5.196152422706632, 5.196152422706632]


Another example, this time with multiple arguments.

In [21]:
def find_smallest(arg1, arg2, arg3):
    """
    Function used to return the smallest value out 
    of 'arg1', 'arg2' and 'arg3'
    """

    return min(arg1, min(arg2, arg3))

a = [1, 2, 3, 4, 5]
b = [5, 4, 3, 2, 1]
c = [1, 2, 1, 2, 1]

result = map(find_smallest, a, b, c)

In [22]:
list(result)

[1, 2, 1, 2, 1]

CHALLENGE: Generalise calc_distance so that it can accept points in any numbers of dimensions.  Generalise find_smallest so that it can accept any number of arguments.

## Part 3: Reducing

This is also straightfoward, and exists anagously in R.

See http://chryswoods.com/parallel_python/reduce.html

Lets start by defining a mapping problem

In [23]:
def add(x, y):
    """Function to return the sum of x and y"""
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

result = map(add, a, b)

print(list(result))

[7, 9, 11, 13, 15]


This is how we use reduce to automatically apply addition to the enture set of points.

In [24]:
from functools import reduce

result = map(add, a, b)

total = reduce(add, result)

print(total)

55


Reduce has an optional third argument which is the initial value that is used as the first value for the reduction.

In [25]:
result = map(add, a, b)
total = reduce(add, result, 10)
print(total)

65


The standard "reduce" function does *not* do anything clever with the computation tree. It simply evaluates the reduction using the sequential definition. That means that it does *not* assume communitivity, which means it can be used with other operations:

In [26]:
def join_strings(x, y):
    return "%s %s" % (x,y)
c = ["cat", "dog", "mouse", "fish"]

result = reduce(join_strings, c)
print(result)

cat dog mouse fish


### accumulate vs python

Python defines **reduce** to give only the final answer, whereas **accumulate** gives the running total as a list (via an iterator, like map).

This is not a universally recognised separation!

In [29]:
from itertools import accumulate

result = map(add, a, b)
total = accumulate(result, add)
print(list(total))

[7, 16, 27, 40, 55]


In [30]:
print(list(accumulate(c, join_strings)))

['cat', 'cat dog', 'cat dog mouse', 'cat dog mouse fish']


## Part 4: parallel implementation

multiprocessing python code has to be written into a text file and executed using the python interpreter. It is not recommended to try to run a multiprocessing python script interactively, e.g. via ipython or ipython notebook.

This is because the required resources (CPUs) have to be requested from the system and appropriately returned, and the libraries are not reliable across platforms.

(it seems to work on linux and mac, but not in windows https://stackoverflow.com/questions/37103243/multiprocessing-pool-in-jupyter-notebook-works-on-linux-but-not-windows)

See http://chryswoods.com/parallel_python/multiprocessing.html

Multiprocessing achieves parallelism by running multiple copies of your script, it forces you to write it in a particular way. All imports should be at the top of the script, followed by all function and class definitions. This is to ensure that all copies of the script have access to the same modules, functions and classes. Then, you should ensure that only the master copy of the script runs the code by protecting it behind an if __name__ == "__main__" statement.

In [1]:
import multiprocessing

In [2]:
print(multiprocessing.cpu_count())

12


If the following works you should be ok in the notebook; otherwise switch to the scripts.

In [3]:
from multiprocessing import Pool
def f(x):
    return x**2
pool = Pool(8)
for res in pool.map(f,range(10)):
    print(res)

0
1
4
9
16
25
36
49
64
81


The following script illustrates the key points:

* Distributing compute over cores
* Detecting the CPU architecture/count
* Ensuring the compute is parallelised
* Performing process-specific evaluation

(Copy into a script and run with python myscript.py if it doesn't run natively.)

In [9]:
from functools import reduce
from multiprocessing import Pool, cpu_count, current_process

def square(x):
    """Function to return the square of the argument"""
    print("Worker %s calculating square of %s" % (current_process().pid, x))
    return x * x

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())
    N=50
    # create a pool of workers
    # start all worker processes
    pool = Pool(processes= cpu_count())
    # create an array of 5000 integers, from 1 to 5000
    r = range(1, N+1)
    result = pool.map(square, r)

    total = reduce(lambda x, y: x + y, result)

    print("The sum of the square of the first %s integers is %s" % (N, total))



Number of cores available equals 12
Worker 43474 calculating square of 1
Worker 43481 calculating square of 15
Worker 43479 calculating square of 11
Worker 43475 calculating square of 3
Worker 43483 calculating square of 19
Worker 43476 calculating square of 5
Worker 43480 calculating square of 13
Worker 43478 calculating square of 9
Worker 43485 calculating square of 23
Worker 43484 calculating square of 21
Worker 43482 calculating square of 17
Worker 43477 calculating square of 7
Worker 43479 calculating square of 12
Worker 43481 calculating square of 16
Worker 43476 calculating square of 6
Worker 43482 calculating square of 18
Worker 43475 calculating square of 4
Worker 43477 calculating square of 8
Worker 43483 calculating square of 20
Worker 43474 calculating square of 2
Worker 43478 calculating square of 10
Worker 43480 calculating square of 14
Worker 43484 calculating square of 22
Worker 43479 calculating square of 25
Worker 43481 calculating square of 27
Worker 43476 calculatin

What do you notice about the order of computation and printing?

In general it is a really bad idea to assume that printing appears to the screen in the correct order!

(NB: Christopher's code used "with Pool(processes=nprocs) as pool" which didn't work for me due to python version issues. Multicore processing is still in active development....)

Christopher has an example in which we define a function inside the __main__ loop. This **hangs**  because the worker nodes can't see it!  **CAREFUL** with this sort of thing.

However, we can reuse our pool of processes in a straightforward way.

In [24]:
from functools import reduce
from multiprocessing import Pool, cpu_count, current_process

def square(x):
    """Function to return the square of the argument"""
    print("square"+" "+str(x))
    return x * x

def cube(x):
    """Function to return the cube of the argument"""
    return x * x * x

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())
    N=5
    # create a pool of workers
    # start all worker processes
    pool = Pool(processes= cpu_count()) ## THIS is where all of the memory state is 
    ## created and all of the processes "know about" everything above. So they "know" N
    ## and hence all compute their own version of r correctly.
    
    # create an array of 5000 integers, from 1 to N 
    r = range(1, N+1)
    squares = pool.map(square, r)
    totalsquares = reduce(lambda x, y: x + y, squares)
    print("The sum of the square of the first %s integers is %s" % (N, totalsquares))

    cubes = pool.map(cube, r)
    totalcubes = reduce(lambda x, y: x + y, cubes)
    print("The sum of the cube of the first %s integers is %s" % (N, totalcubes))



Number of cores available equals 12
square 3
square 2
square 1
square 4
square 5
The sum of the square of the first 5 integers is 55
The sum of the cube of the first 5 integers is 225


To use mapping on multiple inputs, we have to either:

* create a tuple of the arguments
* or pass it through using **zip** and switch from the **map** to **starmap**

In [27]:
from multiprocessing import Pool

def add(x, y):
    """Return the sum of the tuple of two arguments"""
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.starmap(add, zip(a,b))

    print(result)


[7, 9, 11, 13, 15]


There are other implementation niggles, such as support for lambda functions (which is missing) etc. These may be addressed in newer versions or alternative packages.



## Part 5: asynchronous functions

We saw above that the "print" statements were out of order. This is because the threads had to "race" to collect the next job, and also race to print to the screen. There is only one job queue and one screen.

The following script performs jobs asynchronously. You should see that there are 3 workers, which complete one task before taking the next.

In [12]:
import time
from multiprocessing import Pool, current_process

def slow_function(nsecs):
    """
    Function that sleeps for 'nsecs' seconds, returning
    the number of seconds that it slept
    """
    print("Process %s going to sleep for %s second(s)" % (current_process().pid, nsecs))
    # use the time.sleep function to sleep for nsecs seconds
    time.sleep(nsecs)
    print("Process %s waking up" % current_process().pid)
    return nsecs

if __name__ == "__main__":
    print("Master process is PID %s" % current_process().pid)

    with Pool(3) as pool:
        r = pool.map(slow_function, [1,2,3,4,5,7,8])

    print("Result is %s" % r)

Master process is PID 43358
Process 43511 going to sleep for 2 second(s)
Process 43510 going to sleep for 1 second(s)
Process 43512 going to sleep for 3 second(s)
Process 43510 waking up
Process 43510 going to sleep for 4 second(s)
Process 43511 waking up
Process 43511 going to sleep for 5 second(s)
Process 43512 waking up
Process 43512 going to sleep for 7 second(s)
Process 43510 waking up
Process 43510 going to sleep for 8 second(s)
Process 43511 waking up
Process 43512 waking up
Process 43510 waking up
Result is [1, 2, 3, 4, 5, 7, 8]


Sometimes we don't want to wait for a computation to be completed before distributing some other computation. In that case we can use "async" versions of map (or apply, or whatever).

In [30]:
import time
from multiprocessing import Pool, current_process

def slow_function(nsecs):
    """
    Function that sleeps for 'nsecs' seconds, returning
    the number of seconds that it slept
    """
    print("Process %s going to sleep for %s second(s)" % (current_process().pid, nsecs))
    # use the time.sleep function to sleep for nsecs seconds
    time.sleep(nsecs)
    print("Process %s waking up" % current_process().pid)
    return nsecs

if __name__ == "__main__":
    print("Master process is PID %s" % current_process().pid)

    with Pool(3) as pool:
        print("Starting r1 pool")
        r1 = pool.map_async(slow_function, [1,2,3,4,5])
        print("Starting r2 pool")
        r2 = pool.map_async(slow_function, [1,2,3,4,5])

        r1.wait()
        r2.wait()
        # if two wait in front of these two code it will be wait until getting all the ans 
        print("Result one is %s" % r1.get())
        r2.wait()
        print("Result two is %s" % r2.get())

Master process is PID 43358
Process 43713 going to sleep for 1 second(s)
Process 43714 going to sleep for 2 second(s)
Process 43715 going to sleep for 3 second(s)
Starting r1 pool
Starting r2 pool
Process 43713 waking up
Process 43713 going to sleep for 4 second(s)
Process 43714 waking up
Process 43714 going to sleep for 5 second(s)
Process 43715 waking up
Process 43715 going to sleep for 1 second(s)
Process 43715 waking up
Process 43715 going to sleep for 2 second(s)
Process 43713 waking up
Process 43713 going to sleep for 3 second(s)
Process 43715 waking up
Process 43715 going to sleep for 4 second(s)
Process 43714 waking up
Process 43714 going to sleep for 5 second(s)
Process 43713 waking up
Process 43715 waking up
Process 43714 waking up
Result one is [1, 2, 3, 4, 5]
Result two is [1, 2, 3, 4, 5]


#### Asynchronous computation 

Here we encountered "futures" which are computations that may not have completed and therefore may not be available.

Futures are a very common variable type in parallel programming across many languages. Futures provide several common functions;

* Block (wait) until the result is available. In multiprocessing, this is via the .wait() function, e.g. r1.wait() in the above script.
* Retrieve the result when it is available (blocking until it is available). This is the .get() function, e.g. r1.get().
* Test whether or not the result is available. This is the .ready() function, which returns True when the asynchronous function has finished and the result is available via .get().
* Test whether or not the function was a success, e.g. whether or not an exception was raised when running the function. This is the .successful() function, which returns True if the asynchronous function completed without raising an exception. Note that this function should only be called after the result is available (e.g. when .ready() returns True).

## Additional Information

There are additional ways to help parallelisation be efficient. One is the idea of "chunksize", or how many commands get sent to each worker; it is a parameter of starmap/map.

In [31]:
import numpy as np
