<a href="https://colab.research.google.com/github/jgamel/learn_n_dev/blob/python_programming/Multiprocessing_in_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Multiprocessing in Python

When you work on a computer vision project, you probably need to preprocess a lot of image data. This is time-consuming, and it would be great if you could process multiple images in parallel. Multiprocessing is the ability of a system to run multiple processors at one time. If you had a computer with a single processor, it would switch between multiple processes to keep all of them running. However, most computers today have at least a multi-core processor, allowing several processes to be executed at once. The Python Multiprocessing Module is a tool for you to increase your scripts’ efficiency by allocating tasks to different processes.

1. [Benefits of multiprocessing](#a1)
2. [Basic multiprocessing](#a2)
3. [Multiprocessing for real use](#a3)
4. [Using joblib](#a4)

<a name="a1"></a>
## Benefits of Multiprocessing

You may ask, “Why Multiprocessing?” Multiprocessing can make a program substantially more efficient by running multiple tasks in parallel instead of sequentially. A similar term is multithreading, but they are different.

A process is a program loaded into memory to run and does not share its memory with other processes. A thread is an execution unit within a process. Multiple threads run in a process and share the process’s memory space with each other.

Python’s Global Interpreter Lock (GIL) only allows one thread to be run at a time under the interpreter, which means you can’t enjoy the performance benefit of multithreading if the Python interpreter is required. This is what gives multiprocessing an upper hand over threading in Python. Multiple processes can be run in parallel because each process has its own interpreter that executes the instructions allocated to it. Also, the OS would see your program in multiple processes and schedule them separately, i.e., your program gets a larger share of computer resources in total. So, multiprocessing is faster when the program is CPU-bound. In cases where there is a lot of I/O in your program, threading may be more efficient because most of the time, your program is waiting for the I/O to complete. However, multiprocessing is generally more efficient because it runs concurrently.

<a name="a2"></a>
## Basic multiprocessing

Let’s use the Python Multiprocessing module to write a basic program that demonstrates how to do concurrent programming.

Let’s look at this function, task(), that sleeps for 0.5 seconds and prints before and after the sleep:

```
import time

def task():
    print('Sleeping for 0.5 seconds')
    time.sleep(0.5)
    print('Finished sleeping')
```

To create a process, we simply say so using the multiprocessing module:

```
import multiprocessing
p1 = multiprocessing.Process(target=task)
p2 = multiprocessing.Process(target=task)
```

The target argument to the Process() specifies the target function that the process runs. But these processes do not run immediately until we start them:

```
p1.start()
p2.start()
```


In [1]:
import multiprocessing
import time

def task():
    print('Sleeping for 0.5 seconds')
    time.sleep(0.5)
    print('Finished sleeping')

if __name__ == "__main__":
    start_time = time.perf_counter()

    # Creates two processes
    p1 = multiprocessing.Process(target=task)
    p2 = multiprocessing.Process(target=task)

    # Starts both processes
    p1.start()
    p2.start()

    finish_time = time.perf_counter()

    print(f"Program finished in {finish_time-start_time} seconds")

Sleeping for 0.5 seconds
Program finished in 0.013153862000002903 seconds
Sleeping for 0.5 seconds


We must fence our main program under if __name__ == "__main__" or otherwise the multiprocessing module will complain. This safety construct guarantees Python finishes analyzing the program before the sub-process is created.

However, there is a problem with the code, as the program timer is printed before the processes we created are even executed. Here’s the output for the code above:

We need to call the join() function on the two processes to make them run before the time prints. This is because three processes are going on: p1, p2, and the main process. The main process is the one that keeps track of the time and prints the time taken to execute. We should make the line of finish_time run no earlier than the processes p1 and p2 are finished. We just need to add this snippet of code immediately after the start() function calls:

In [4]:
import multiprocessing
import time

def task():
    print('Sleeping for 0.5 seconds')
    time.sleep(0.5)
    print('Finished sleeping')

if __name__ == "__main__": 
    start_time = time.perf_counter()
    processes = []

    # Creates 10 processes then starts them
    for i in range(10):
        p = multiprocessing.Process(target = task)
        p.start()
        processes.append(p)
    
    # Joins all the processes 
    for p in processes:
        p.join()

    finish_time = time.perf_counter()

    print(f"Program finished in {finish_time-start_time} seconds")

Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Sleeping for 0.5 seconds
Finished sleeping
Finished sleeping
Finished sleeping
Finished sleeping
Finished sleeping
Finished sleeping
Finished sleeping
Finished sleeping
Finished sleeping
Finished sleeping
Program finished in 0.6073943569999756 seconds


<a name="a3"></a>
## Multiprocessing for Real Use

Starting a new process and then joining it back to the main process is how multiprocessing works in Python (as in many other languages). The reason we want to run multiprocessing is probably to execute many different tasks concurrently for speed. It can be an image processing function, which we need to do on thousands of images. It can also be to convert PDFs into plaintext for the subsequent natural language processing tasks, and we need to process a thousand PDFs. Usually, we will create a function that takes an argument (e.g., filename) for such tasks.

Let’s consider a function:
```
def cube(x):
    return x**3
```

If we want to run it with arguments 1 to 1,000, we can create 1,000 processes and run them in parallel:


In [5]:
import multiprocessing

def cube(x):
    return x**3

if __name__ == "__main__":
    # this does not work
    processes = [multiprocessing.Process(target=cube, args=(x,)) for x in range(1,1000)]
    [p.start() for p in processes]
    result = [p.join() for p in processes]
    print(result)

[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, Non


However, this will not work as you probably have only a handful of cores in your computer. Running 1,000 processes is creating too much overhead and overwhelming the capacity of your OS. Also, you may have exhausted your memory. The better way is to run a process pool to limit the number of processes that can be run at a time:

In [6]:
import multiprocessing
import time

def cube(x):
    return x**3

if __name__ == "__main__":
    pool = multiprocessing.Pool(3)
    start_time = time.perf_counter()
    processes = [pool.apply_async(cube, args=(x,)) for x in range(1,1000)]
    result = [p.get() for p in processes]
    finish_time = time.perf_counter()
    print(f"Program finished in {finish_time-start_time} seconds")
    print(result)

Program finished in 0.06902704499998435 seconds
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376

We don’t have the start and join here because it is hidden behind the pool.map() function. What it does is split the iterable range(1,1000) into chunks and runs each chunk in the pool. The map function is a parallel version of the list comprehension:

```result = [cube(x) for x in range(1,1000)]```

But the modern-day alternative is to use map from concurrent.futures, as follows:

In [7]:
import concurrent.futures
import time

def cube(x):
    return x**3

if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(3) as executor:
        start_time = time.perf_counter()
        result = list(executor.map(cube, range(1,1000)))
        finish_time = time.perf_counter()
    print(f"Program finished in {finish_time-start_time} seconds")
    print(result)

Program finished in 0.3692010569999411 seconds
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376,

This code is running the multiprocessing module under the hood. The beauty of doing so is that we can change the program from multiprocessing to multithreading by simply replacing ProcessPoolExecutor with ThreadPoolExecutor. Of course, you have to consider whether the global interpreter lock is an issue for your code.

<a name="a4"></a>
##Using joblib 

The package joblib is a set of tools to make parallel computing easier. It is a common third-party library for multiprocessing. It also provides caching and serialization functions.

In [9]:
import time
from joblib import Parallel, delayed

def cube(x):
    return x**3

start_time = time.perf_counter()
result = Parallel(n_jobs=3)(delayed(cube)(i) for i in range(1,1000))
finish_time = time.perf_counter()
print(f"Program finished in {finish_time-start_time} seconds")
print(result)

Program finished in 0.8924607999999807 seconds
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376,

Indeed, it is intuitive to see what it does. The delayed() function is a wrapper to another function to make a “delayed” version of the function call. Which means it will not execute the function immediately when it is called.

Then we call the delayed function multiple times with different sets of arguments we want to pass to it. For example, when we give integer 1 to the delayed version of the function cube, instead of computing the result, we produce a tuple, (cube, (1,), {}) for the function object, the positional arguments, and keyword arguments, respectively.

We created the engine instance with Parallel(). When it is invoked like a function with the list of tuples as an argument, it will actually execute the job as specified by each tuple in parallel and collect the result as a list after all jobs are finished. Here we created the Parallel() instance with n_jobs=3, so there will be three processes running in parallel.

We can also write the tuples directly. Hence the code above can be rewritten as:

In [12]:
import time
from joblib import Parallel, delayed

def cube(x):
    return x**3

start_time = time.perf_counter()
result = Parallel(n_jobs=3)((cube, (i,), {}) for i in range(1,1000))
finish_time = time.perf_counter()
print(f"Program finished in {finish_time-start_time} seconds")
print(result)



Program finished in 0.7879605149996678 seconds
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376,

The benefit of using joblib is that we can run the code in multithread by simply adding an additional argument:

In [13]:
import time
from joblib import Parallel, delayed

def cube(x):
    return x**3

start_time = time.perf_counter()
result = Parallel(n_jobs=3, prefer="threads")(delayed(cube)(i) for i in range(1,1000))
finish_time = time.perf_counter()
print(f"Program finished in {finish_time-start_time} seconds")
print(result)



Program finished in 0.10252619699986099 seconds
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376

And this hides all the details of running functions in parallel. We simply use a syntax not too much different from a plain list comprehension.