## Part 2.1: Multiprocessing

Python has many libraries available to help you parallelise your scripts across the cores of a single multicore computer. The established option is the multiprocessing library. You can import multiprocessing by typing into ipython

In [1]:
import multiprocessing

You can read the documentation for this module by typing

In [2]:
help(multiprocessing)

Help on package multiprocessing:

NAME
    multiprocessing

MODULE REFERENCE
    https://docs.python.org/3.8/library/multiprocessing
    
    The following documentation is automatically generated from the Python
    source files.  It may be incomplete, incorrect or include features that
    are considered implementation detail and may vary between Python
    implementations.  When in doubt, consult the module reference at the
    location listed above.

DESCRIPTION
    # Package analogous to 'threading.py' but using processes
    #
    # multiprocessing/__init__.py
    #
    # This package is intended to duplicate the functionality (and much of
    # the API) of threading.py but uses processes instead of threads.  A
    # subpackage 'multiprocessing.dummy' has the same API but is a simple
    # wrapper for 'threading'.
    #
    # Copyright (c) 2006-2008, R Oudkerk
    # Licensed to PSF under a Contributor Agreement.
    #

PACKAGE CONTENTS
    connection
    context
    dummy (packag

One of the useful functions in multiprocessing is cpu_count(). This returns the number of CPUs (computer cores) available on your computer to be used for a parallel program. Type into ipython

In [3]:
print(multiprocessing.cpu_count())

12


to see how many cores you have available.

Nearly all modern computers have several processor cores, so you should see that you have at least 2, and perhaps as many as 40 available on your machine. Each of these cores is available to do work, in parallel, as part of your Python script. For example, if you have two cores in your computer, then your script should ideally be able to do two things at once. Equally, if you have forty cores available, then your script should ideally be able to do forty things at once.

Multiprocessing allows your script to do lots of things at once by actually running multiple copies of your script in parallel, with (normally) one copy per processor core on your computer. One of these copies is known as the master copy, and is the one that is used to control all of worker copies. Because of this, multiprocessing python code has to be written into a text file and executed using the python interpreter. It is not recommended to try to run a multiprocessing python script interactively, e.g. via ipython or ipython notebook. In addition, because multiprocessing achieves parallelism by running multiple copies of your script, it forces you to write it in a particular way. All imports should be at the top of the script, followed by all function and class definitions. This is to ensure that all copies of the script have access to the same modules, functions and classes. Then, you should ensure that only the master copy of the script runs the code by protecting it behind an <code>if __name__ == "__main__"</code> statement.

An example (non-functional) script is shown below;

In [4]:
# all imports should be at the top of your script
import multiprocessing
import sys
import os

# all function and class definitions must be next
def add(x, y):
    """Function to return the sum of the two arguments"""
    return x + y

def product(x, y):
    """Function to return the product of the two arguments"""
    return x * y

if __name__ == "__main__":
    # You must now protect the code being run by
    # the master copy of the script by placing it
    # in this block

    a = [1, 2, 3, 4, 5]
    b = [6, 7, 8, 9, 10]

    # Now write your parallel code...
    # etc. etc.z

(if you are interested, take a look [here](https://chryswoods.com/parallel_python/gil.html) for more information about why parallel Python is based on forking multiple processes, rather than splitting multiple threads)

## Part 2.2: Pool

One of the core multiprocessing features is multiprocessing.Pool. This provides a pool of workers that can be used to parallelise a map.

For example, create a new script called pool.py and type into it;

In [5]:
from functools import reduce
from multiprocessing import Pool, cpu_count

def square(x):
    """Function to return the square of the argument"""
    return x * x

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())

    # create a pool of workers
    with Pool() as pool:
        # create an array of 5000 integers, from 1 to 5000
        r = range(1, 5001)

        result = pool.map(square, r)

    total = reduce(lambda x, y: x + y, result)

    print("The sum of the square of the first 5000 integers is %s" % total)

Number of cores available equals 12
The sum of the square of the first 5000 integers is 41679167500


So how does this work? The line

<code>with Pool() as pool:</code>

has created a pool of worker copies of your script, with the number of workers equalling the number of cores reported by cpu_count(). You can control the number of copies by specifying the value of processes in the constructor for Pool, e.g.

<code>with Pool(processes=5) as pool:</code>

The line

<code>r = range(1,5001)</code>

is a quick way to create a list of 5000 integers, from 1 to 5000. The parallel work is conducted on the line

<code>result = pool.map(square, r)</code>

This performs a map of the function square over the list of items of r. The map is divided up over all of the workers in the pool. This means that, if you have 10 workers (e.g. if you have 10 cores), then each worker will perform only one tenth of the work (e.g. calculating the square of 500 numbers). If you have 2 workers, then each worker will perform only half of the work (e.g. calculating the square of 2500 numbers).

The next line

<code>total = reduce(lambda x, y: x + y, result)</code>

is just a standard reduce used to sum together all of the results.

You can verify that the square function is divided between your workers by using a multiprocessing.current_process().pid call, which will return the process ID (PID) of the worker process. Edit your pool.py script and set the contents equal to;

In [6]:
from functools import reduce
from multiprocessing import Pool, current_process

def square(x):
    """Function to return the square of the argument"""
    print("Worker %s calculating square of %s\n" % (current_process().pid, x))
    return x * x

if __name__ == "__main__":
    nprocs = 2

    # print the number of cores
    print("Number of workers equals %d" % nprocs)

    # create a pool of workers
    with Pool(processes=nprocs) as pool:
        # create an array of 5000 integers, from 1 to 5000
        r = range(1, 21)

        result = pool.map(square, r)

    total = reduce(lambda x, y: x + y, result)

    print("The sum of the square of the first 5000 integers is %s" % total)

Number of workers equals 2
Worker 3290 calculating square of 1
Worker 3291 calculating square of 4


Worker 3290 calculating square of 2
Worker 3291 calculating square of 5


Worker 3290 calculating square of 3
Worker 3291 calculating square of 6


Worker 3290 calculating square of 10
Worker 3291 calculating square of 7


Worker 3290 calculating square of 11
Worker 3291 calculating square of 8


Worker 3290 calculating square of 12
Worker 3291 calculating square of 9


Worker 3290 calculating square of 13
Worker 3291 calculating square of 16


Worker 3290 calculating square of 14
Worker 3291 calculating square of 17


Worker 3290 calculating square of 15
Worker 3291 calculating square of 18


Worker 3290 calculating square of 19

Worker 3290 calculating square of 20

The sum of the square of the first 5000 integers is 2870


(the exact PIDs of the workers, and the order in which they print will be different on your machine)

You can see in the output that there are two workers, signified by the two different worker PIDs. The work has been divided evenly amongst them. Edit pool.py and change the value of nprocs. How is the work divided as you change the number of workers?

### Using multiple pools in a single script

You can use more than one multiprocessing.Pool at a time in your script, but you should ensure that you use them one after another. The way multiprocessing.Pool works is to fork your script into the team of workers when you create a Pool object. Each worker contains a complete copy of all of the functions and variables that exist at the time of the fork. This means that any changes after the fork will not be held by the other workers, e.g. open a new python script called broken_pool.py and type

In [7]:
from multiprocessing import Pool

def square(x):
    """Return the square of the argument"""
    return x * x

if __name__ == "__main__":

    r = [1, 2, 3, 4, 5]

    with Pool() as pool:
        result = pool.map(square, r)

        print("Square result: %s" % result)

        def cube(x):
            """Return the cube of the argument"""
            return x * x * x

        # result = pool.map(cube, r)
        # If you run this script you should see an error such as
        # AttributeError: Can't get attribute 'cube' on <module '__main__' from 'pool.py'>

        print("Cube result: %s" % result)

Square result: [1, 4, 9, 16, 25]
Cube result: [1, 4, 9, 16, 25]


(you may also find that your python script hangs and cannot be killed. To kill the script, hold CTRL and Z to background the task, then type kill -9 %1 to kill the python script)

The problem is that pool was created before the cube function. The worker copies of the script were thus created before cube was defined, and so don’t contain a copy of this function. This is one of the reasons why you should always define your functions above the <code>if __name__ == "__main__"</code> block.

Alternatively, if you have to define the function in the <code>__main__</code> block, then ensure that you create the pool after the definition. For example, one fix here is to create a second pool for the second map, e.g.

In [8]:
from multiprocessing import Pool

def square(x):
    """Return the square of the argument"""
    return x * x

if __name__ == "__main__":

    r = [1, 2, 3, 4, 5]

    with Pool() as pool:
        result = pool.map(square, r)

        print("Square result: %s" % result)

    def cube(x):
        """Return the cube of the argument"""
        return x * x * x

    with Pool() as pool:
        result = pool.map(cube, r)

        print("Cube result: %s" % result)

Square result: [1, 4, 9, 16, 25]
Cube result: [1, 8, 27, 64, 125]


## Part 2.3: Parallel map/reduce

The multiprocessing.Pool provides an excellent mechanism for the parallelisation of map/reduce style calculations. However, there are a number of caveats that make it more difficult to use than the simple map/reduce that was introduced in [Part 1](https://chryswoods.com/parallel_python/map.html).

### Mapping functions with multiple arguments

The Pool.map function only supports mapping functions that have a single argument. This means that if you want to map over a function which expects multiple arguments you can’t use it. Instead, you can use Pool.starmap which expects you to pass it a list of tuples where each tuple will be unpacked and passed to the function.

For example:

In [9]:
args = [(1, 6), (2, 7), (3, 8)]
with Pool() as pool:
    print(pool.starmap(add, args))

[7, 9, 11]


will effectively return:

<code>[add(1, 6), add(2, 7), add(3, 8)]</code>

The above trick allows you to use any multiple-argument function. However, doing this now means that you have to convert multiple lists of arguments into a single list of multiple arguments. For example, we need to convert

In [10]:
def add(x, y):
    """Return the sum of the two arguments"""
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

result = map(add, a, b)
print(list(result))

[7, 9, 11, 13, 15]


to

In [11]:
from multiprocessing import Pool

def add(x, y):
    """Return the sum of the tuple of two arguments"""
    return x+y

a_b = [(1,6), (2,7), (3,8), (4,9), (5,10)]

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.starmap(add, a_b)

    print(result)

[7, 9, 11, 13, 15]


Combining the two lists of arguments into a single list of tuples could be painful. Fortunately, python provides the zip function. This automatically zips up N lists into one list of tuples (each tuple containing N items). For example, type into ipython

In [12]:
a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]
c = [11, 12, 13, 14, 15]

args = zip(a, b, c)
print(list(args))

[(1, 6, 11), (2, 7, 12), (3, 8, 13), (4, 9, 14), (5, 10, 15)]


You thus need to use zip to zip together the arguments when you call Pool.starmap. For example, the above example should be written

In [13]:
from multiprocessing import Pool

def add(x, y):
    """Return the sum of the tuple of two arguments"""
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.starmap(add, zip(a,b))

    print(result)

[7, 9, 11, 13, 15]


### Exercise 1

Edit your countlines.py script that you wrote for [Part 1](https://chryswoods.com/parallel_python/reduce_answer1.html) so that you use multiprocessing to parallelise the counting of lines. Note that you will not be able to use lambda in the pool.map function.

If you get stuck or want some inspiration, a possible answer is given [here](https://chryswoods.com/parallel_python/mapreduce2_answer1.html).

### Exercise 2

Below are two functions. The first counts the number of times every word in a file appears in that file, returning the result as a dictionary (key is the word, the value is the number of times it appears). The second function combines (reduces) two dictionaries together.

In [14]:
import re


def count_words(filename):
    """
    Count the number of times every word in the file `filename`
    is contained in this file.

    Args:
        filename (str): the filename to count the words in

    Returns:
        dict: a mapping of word to count
    """

    all_words = {}

    with open(filename) as f:
        for line in f:
            words = line.split()

            for word in words:
                #lowercase the word and remove all
                #characters that are not [a-z] or hyphen
                word = word.lower()
                match = re.search(r"([a-z\-]+)", word)

                if match:
                    word = match.groups()[0]

                    if word in all_words:
                        all_words[word] += 1
                    else:
                        all_words[word] = 1

    return all_words


def reduce_dicts(dict1, dict2):
    """
    Combine (reduce) the passed two dictionaries to return
    a dictionary that contains the keys of both, where the
    values are equal to the sum of values for each key
    """

    # explicitly copy the dictionary, as otherwise
    # we risk modifying 'dict1'
    combined = {}

    for key in dict1:
        combined[key] = dict1[key]

    for key in dict2:
        if key in combined:
            combined[key] += dict2[key]
        else:
            combined[key] = dict2[key]

    return combined

Use the above two function to write a parallel Python script called countwords.py that counts how many times each word used by Shakespeare appears in all of his plays, e.g. by using the command line call

<code>python countwords.py shakespeare/*</code>

Have your script print out every word that appears more than 2000 times across all of the plays. The words should be printed out in alphabetical order, and printed together with the number of times that they are used.

If you get stuck or want some inspiration, a possible answer is given [here](https://chryswoods.com/parallel_python/mapreduce2_answer2.html).