# Asynchrony, Concurrency, and Parallelism

Concurrent Execution: https://docs.python.org/3.8/library/concurrency.html  

* Terminology
    - ***Global Interpreter Lock (GIL)*** prevents simultaneous Python thread execution (memory management is not thread-safe).
    - ***Async coroutines*** overlap operations using cooperative multitasking within a single thread.
    - ***Threads*** achieve parallelism using shared memory within a single process.
    - ***Processes*** achieve parallelism using isolated process memory within a single Machine.
    - ***Distributed computing*** achieve parallelism using a server cluster on a single network.
* Modules
    - Cooperative coroutines with the ```asyncio``` module (single thread in single process)
    - Preemptive multithreading with the ```threading``` module (multiple threads in single process)
    - Multiprocessing with the ```multiprocessing``` module (multiple threads in multiple processes)
* Classes
    - The ```threading.Thread``` Class
    - The ```threading.Lock``` Class
    - The ```threading.local``` Class
    - The ```threading.Event``` Class
    - The ```threading.Condition``` Class
    - The ```multiprocessing.Process``` class
    - The ```multiprocessing.Pool``` class
    - The ```queue.Queue``` class (defined in the ```queue``` module) implements a multi-producer/multi-consumer queue. It allows multiple threads to safely exchange messages using locking semantics.

* Concepts
    - Concurrency vs asynchrony vs parallelism
    - Preemptive multitasking vs cooperative multitasking
    - Multi-core computing vs distributed computing
    - Coroutines and Tasks (async programming)
    - A ```Queue``` supports FIFO message-based communication (two types: threads and processes)
    - Distribute parallel computing load over multiple processor cores
    - Synchronous vs asynchronous parallel processing: The ```asyncio``` module
    - Daemon threads automatically terminate when the main thread ends. Daemon threads do not have the ability to keep the process running on thier own. Daemon threads are used to run in the background without the need to worry about explicitly shutting them down.
    - Embarrassingly parallel means a problem can be broken down into sub-units of work that run simultaniously and independently on multiple processors without overhead due to state sharing and synchronization blocking issues.

In [1]:
!python --version

Python 3.7.6


In [2]:
!jupyter --version

jupyter core     : 4.6.3
jupyter-notebook : 6.0.3
qtconsole        : 4.7.2
ipython          : 7.13.0
ipykernel        : 5.1.4
jupyter client   : 6.1.2
jupyter lab      : 1.2.6
nbconvert        : 5.6.1
ipywidgets       : 7.5.1
nbformat         : 5.0.4
traitlets        : 4.3.3


In [3]:
print(__name__)

__main__


# Global Interpreter Lock (GIL)

* Used by CPython interpreter to limit execution of Python bytecode to one thread at a time
* Only one thread at a time runs Python code while all other threads are sleeping or blocking
* Allows for concurrent execution of code (IO bound) but not parallel execution of code (CPU bound)
* Makes it easier for the interpreter to be thread safe
* Makes it harder to leverage multi-processor machines
* Realeased by some extension modules while doing computationally intensive tasks (numpy)
* Always released when doing I/O
* The multiprocessing package can run on multiple processors bypassing GIL limitations
* NOTE: You can still write Python code that runs concurrently or in parallel

## The ```asyncio``` Module (```async``` and ```await```)

* Async coroutines overlap operations using cooperative multitasking within a single thread.
* The ```asyncio``` package provides an API for running and managing asynchronous coroutines
* The ```async``` and ```await``` keywords that are used to define coroutines
* Coroutines are specialized generator functions that can overlap using cooperative multitasking
* Note that ```asyncio``` is not based on either multithreading or multiprocessing

In [1]:
import asyncio

import time
import threading

async def asyncio_with_blocking_wait(timeout):
    print(f'\nStarting multi_thread_with_blocking_wait with timeout = {timeout} seconds', end='')
    start_time = time.perf_counter()
    await asyncio.sleep(timeout)
    end_time = time.perf_counter()
    print(f'\nEnded multi_thread_with_blocking_wait after {end_time - start_time} seconds', end='')

start = time.perf_counter()
task1 = asyncio.create_task(asyncio_with_blocking_wait(5))
task2 = asyncio.create_task(asyncio_with_blocking_wait(5))
task3 = asyncio.create_task(asyncio_with_blocking_wait(5))
await task1
await task2
await task3
end = time.perf_counter()
print(f'\nTotal time taken: {round(end - start, 2)} second(s)', end='')


Starting multi_thread_with_blocking_wait with timeout = 5 seconds
Starting multi_thread_with_blocking_wait with timeout = 5 seconds
Starting multi_thread_with_blocking_wait with timeout = 5 seconds
Ended multi_thread_with_blocking_wait after 4.99961200000007 seconds
Ended multi_thread_with_blocking_wait after 4.999661100000026 seconds
Ended multi_thread_with_blocking_wait after 4.999636799999962 seconds
Total time taken: 5.0 second(s)

## The ```threading``` Module

https://docs.python.org/2/library/threading.html

* Threads achieve parallelism using shared memory within a single process.
* The ```Thread``` class encapsulates a separate thread of execution within the process
* Threads share access to the same memory space within the process
* Daemon threads
* The ```Thread.join()``` method
* Race conditions
* The ```threading.local``` class supports thread-local data
* ```Lock``` Objects are in one of two states: locked or unlocked: ```acquire()``` and ```release()```
* Synchronization using locks
* Deadlock
* The ```Queue``` threading class
* Other threading classes:
    - ```Condition``` objects ...
    - ```Semaphore``` objects ...
    - ```Event``` objects ...
    - ```Timer``` objects ...
    - ```Barrier``` objects ...

## The ``` threading.Thread``` Class

In [4]:
import threading

def worker(n):
    print(threading.currentThread().getName(), n) # note that concurrent output may get interleaved
    return

for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.start()

Thread-21Thread-22 1 
0
Thread-23 2
Thread-24Thread-25 4
 3


## Subclassing the ```threading.Thread``` class

In [7]:
import threading

class MyThread(threading.Thread):
    def __init__(self, n):
        threading.Thread.__init__(self)
        self.n = n
    def run(self):
        print(threading.currentThread().getName(), self.n) # note that concurrent output is interleaved
        return

for i in range(5):
    t = MyThread(i)
    t.start()

Thread-26 0
Thread-27 1
Thread-28Thread-29 3
 2
Thread-30 4


## The ```threading.Thread.deamon``` Property

* Boolean value set to True for a daemon thread or False for a non-daemon thread
* Must be set before ```start()``` is called or ```RuntimeError``` is raised
* Initial default value is inherited from creating thread
* The main thread is not a daemon so threads created in main thread default to False
* The process continues to run as long as there are any non-daemon threads still running
* The process automatically terminates as soon as there are no non-daemon threads still running
* Daemon threads are not able to keep the process running on their own
* When the process ends then any running daemon threads are automatically terminated and cleaned up

In [10]:
# NOTE: this example does not properly demonstrate daemon threads in Jupyter Noteboook
# To see it work as intend, copy/paste this cell into a .py file and run it stand-alone
import threading
import time

def daemon():
    print('Starting daemon')
    time.sleep(2)
    # following print is not displayed since process ends and this daemon was killed in its sleep
    print('Exiting daemon (Note: this will print in Jupyter cell, but not when run stand-alone)')

def non_daemon():
    print('Starting non_daemon')
    time.sleep(1)
    # following print is displayed since non-daemon thread keeps process running
    print('Exiting non_daemon')
    
d = threading.Thread(name='daemon', target=daemon)
d.daemon = True
d.start()

t = threading.Thread(name='non-daemon', target=non_daemon)
t.daemon = False # redundant here since it inherits parent thread value which is Flase in this case)
t.start()

Starting daemon
Starting non_daemon
Exiting non_daemon
Exiting daemon (Note: this will print in Jupyter cell, but not when run stand-alone)


## The ```queue.Queue``` Class
https://docs.python.org/3/library/queue.html#module-queue

In [3]:
import threading
import queue
import time
import random

def worker():
    while True:
        time.sleep(random.randint(1, 10)/100)    # simulate variability in execution time
        queue_item = q.get()                     # receive a message from the queue
        if queue_item is None:                   # if queue is empty then terminate worker thread loop
            break
        print(threading.get_ident(), queue_item) # display info on thread id and queue item
        q.task_done()                            # indicate formerly enqueued message completed

print(threading.get_ident(), "(main thread)")
number_worker_threads = 5
q = queue.Queue()
threads = []
for i in range(number_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

# send a bunch of messages to the queue
for item in ["Hello", "42", True, [7, 7, 7], (5,)]:
    q.put(item)

# block until all items in queue have are received and processed
q.join()

# stop all other worker threads
for i in range(number_worker_threads):
    q.put(None)
    
# block until all other threads are complete
for t in threads:
    t.join()

10424 (main thread)
5904 Hello
5904 42
10524 True
10524 [7, 7, 7]
10412 (5,)


## The ```threading.Lock``` Class and Race Conditions

* A race condition can happen when two or more threads read and write a shared data value or resource
* Race condition can cause intermittent data corruption and non-deterministic behavior
* Race condition can be very hard to detect, test, and debug
* Race condition can be fixed using a synchronization object, such as a lock, at the expense of some performance

In [2]:
import time
import threading
import random

class DataObject:
    def __init__(self):
        self.x = 0
        self.y = 0
        self.z = self.x + self.y      # business rule at all times: z = x + y
        self._lock = threading.Lock()
    def update(self, x, y):
        # try commenting out the following statement and verify that corrupted data results
        with self._lock: #easier than self._lock.acquire() & self._lock.release()
            self.x = x
            time.sleep(random.randint(0, 100)/1000)
            self.y = y
            time.sleep(random.randint(0, 100)/1000)
            self.y = y
            time.sleep(random.randint(0, 100)/1000)
            self.z = self.x + self.y
            time.sleep(random.randint(0, 100)/1000)
            if self.z == self.x + self.y:
                print('-', end=' ')     # good data 
            else:
                print('*', end=' ')     # corrupted data

do = DataObject()

def foo():
    for i in range(0, 20):
        new_value = do.update(random.randint(0, 100), random.randint(0, 100))

# try commenting out the three lines below that start with t2 and verify no corrupted data results
t1 = threading.Thread(target=foo)   # few seconds CPU bound execution
t2 = threading.Thread(target=foo)   # few seconds CPU bound execution
t1.start()
t2.start()
t1.join()
t2.join()
print("\nDone")

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
Done


## Deadlock Conditions

* Deadlock can result when more than one thread attempts to get multiple locks the same time.
* All threads wait to obtain locks that are already acquired by another thread and no thread can proceed.
* Can be fixed by enforcement of an ordering rule and only allowing multiple locks to be acquired in ascending order.

## The ```threading.local``` Class and Thread Local Storage

* Thread local data supports thread specific data (different value for each thread)
* Create thread local data as ```threading.local```instance (or subclass) and assign attributes to it

```pythhon
thread_local_data = threading.local()
thread_local_data.x = 1
thread_local_data.y = 2
```

In [23]:
import threading
import random

tls = threading.local() # global variable provides access to data shared by functions in same thread

def display_tls_value(tls):
    print("id:", threading.get_ident(), ", tls:", tls.value)    # value depends on thread we are in

def thread_function(value):
    tls.value = value
    display_tls_value(tls)
    tls.value = random.randint(1, 1000)
    display_tls_value(tls)

t1 = threading.Thread(target=thread_function, args=(42,))
t1.start()
t2 = threading.Thread(target=thread_function, args=(43,))
t2.start()

id: 6840 , tls: 42
id: 6840 , tls: 358
id: 11196 , tls: 43
id: 11196 , tls: 801


## The ```threading.Event``` Class

* Used for signaling between threads that an event has occurred 

In [12]:
import threading
import time

def wait_for_event(event):
    t = threading.currentThread()
    print('\nSecondary thread started', end='')
    event_is_set = event.wait()
    print('\nSecondary thread wait ended (event was set in main thread)', end='')

print('\nMain thread creating event', end='')
event = threading.Event()
print('\nMain thread creating secondary thread', end='')
t = threading.Thread(name='wait_for_event', target=wait_for_event, args=(event,))
t.start()
time.sleep(3)
print('\nMain thread setting event', end='')
event.set()
print('\nMain thread done', end='')


Main thread creating event
Main thread creating secondary thread
Secondary thread started
Main thread setting event
Main thread done
Secondary thread wait ended (event was set in main thread)

## The ```threading.Condition``` Class

* A Condition object can be used to synchronize threads
* The ```threading.Condition.wait()``` method releases the underlying lock then blocks until awakened by notify() or notify_all() call for the same condition variable in another thread
* The ```threading.Condition.notifyAll()``` wakes up one thread waiting on this condition (if any)
* The ```threading.Condition.notifyAll()``` Wakes up all threads waiting on this condition (if any)

In [17]:
import threading
import time

def consumer(cond):
    t = threading.currentThread()
    print('Starting', t.getName())
    with cond:
        cond.wait()
        print(t.getName(), 'notified by producer')

def producer(cond):
    t = threading.currentThread()
    print('Starting', t.getName())
    with cond:
        print(t.getName(), 'notifying all consumers')
        cond.notifyAll()

condition = threading.Condition()

consumer_thread_1 = threading.Thread(name='consumer_thread_1', target=consumer, args=(condition,))
consumer_thread_2 = threading.Thread(name='consumer_thread_2', target=consumer, args=(condition,))
producer_thread = threading.Thread(name='producer_thread', target=producer, args=(condition,))

consumer_thread_1.start()
time.sleep(1)
consumer_thread_2.start()
time.sleep(1)
producer_thread.start()

Starting consumer_thread_1
Starting consumer_thread_2
Starting producer_thread
producer_thread notifying all consumers
consumer_thread_2 notified by producer
consumer_thread_1 notified by producer


## Thread Timing Comparisons
* Single thread with blocking wait
* Multi thread with blocking wait
* Single thread with busy wait
* Multi thread with busy wait

In [4]:
import time

def single_thread_with_blocking_wait(timeout):
    print(f'\nStarting single_thread_with_blocking_wait with timeout = {timeout} seconds', end='')
    start_time = time.perf_counter()
    time.sleep(timeout)
    end_time = time.perf_counter()
    print(f'\nEnded single_thread_with_blocking_wait after {end_time - start_time} seconds', end='')

start = time.perf_counter()
single_thread_with_blocking_wait(5)
single_thread_with_blocking_wait(5)
single_thread_with_blocking_wait(5)
end = time.perf_counter()
print(f'\nTotal time taken: {round(end - start, 2)} second(s)', end='')


Starting single_thread_with_blocking_wait with timeout = 5 seconds
Ended single_thread_with_blocking_wait after 5.012447100000001 seconds
Starting single_thread_with_blocking_wait with timeout = 5 seconds
Ended single_thread_with_blocking_wait after 5.0035498 seconds
Starting single_thread_with_blocking_wait with timeout = 5 seconds
Ended single_thread_with_blocking_wait after 5.0089393 seconds
Total time taken: 15.03 second(s)

In [5]:
import time
import threading

def multi_thread_with_blocking_wait(timeout):
    print(f'\nStarting multi_thread_with_blocking_wait with timeout = {timeout} seconds', end='')
    start_time = time.perf_counter()
    time.sleep(timeout)
    end_time = time.perf_counter()
    print(f'\nEnded multi_thread_with_blocking_wait after {end_time - start_time} seconds', end='')

start = time.perf_counter()
t1 = threading.Thread(target=multi_thread_with_blocking_wait, args=(5,))
t2 = threading.Thread(target=multi_thread_with_blocking_wait, args=(5,))
t3 = threading.Thread(target=multi_thread_with_blocking_wait, args=(5,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
end = time.perf_counter()
print(f'\nTotal time taken: {round(end - start, 2)} second(s)', end='')


Starting multi_thread_with_blocking_wait with timeout = 5 seconds
Starting multi_thread_with_blocking_wait with timeout = 5 seconds
Starting multi_thread_with_blocking_wait with timeout = 5 seconds
Ended multi_thread_with_blocking_wait after 5.0035834 seconds
Ended multi_thread_with_blocking_wait after 5.003803900000001 seconds
Ended multi_thread_with_blocking_wait after 5.008586399999999 seconds
Total time taken: 5.02 second(s)

In [6]:
import time

def single_thread_with_busy_wait(n):
    print(f'\nStarting single_thread_with_busy_wait with n = {n}', end='')
    start_time = time.perf_counter()
    count = 1
    for i in range(1, n):
        for j in range(1, n):
            count = i + j          # pointless busy work for CPU to chew on
    end_time = time.perf_counter()
    print(f'\nEnded single_thread_with_busy_wait after {end_time - start_time} seconds', end='')

start = time.perf_counter()
single_thread_with_busy_wait(10000) # few seconds CPU bound execution
single_thread_with_busy_wait(10000) # few seconds CPU bound execution
single_thread_with_busy_wait(10000) # few seconds CPU bound execution
end = time.perf_counter()
print(f'\nTotal time taken: {round(end - start, 2)} second(s)', end='')


Starting single_thread_with_busy_wait with n = 10000
Ended single_thread_with_busy_wait after 5.681020400000001 seconds
Starting single_thread_with_busy_wait with n = 10000
Ended single_thread_with_busy_wait after 5.800952900000002 seconds
Starting single_thread_with_busy_wait with n = 10000
Ended single_thread_with_busy_wait after 5.1858536000000015 seconds
Total time taken: 16.67 second(s)

In [7]:
import time
import threading

def multi_thread_with_busy_wait(n):
    print(f'\nStarting multi_thread_with_busy_wait with n = {n}', end='')
    start_time = time.time()
    count = 1
    for i in range(1, n):
        for j in range(1, n):
            count = i + j          # pointless busy work for CPU to chew on
    end_time = time.time()
    print(f'\nEnded multi_thread_with_busy_wait after {end_time - start_time} seconds', end='')

print('\n***Running one seperate thread:', end='')
start = time.perf_counter()
t0 = threading.Thread(target=multi_thread_with_busy_wait, args=(10000,))  # few seconds CPU bound execution
t0.start()
t0.join()
end = time.perf_counter()
print(f'\nTotal time taken: {round(end - start, 2)} second(s)', end='')

print('\n***Running three spereate threads at the same time:', end='')
start = time.perf_counter()
t1 = threading.Thread(target=multi_thread_with_busy_wait, args=(10000,))  # few seconds CPU bound execution
t2 = threading.Thread(target=multi_thread_with_busy_wait, args=(10000,))  # few seconds CPU bound execution
t3 = threading.Thread(target=multi_thread_with_busy_wait, args=(10000,))  # few seconds CPU bound execution
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
end = time.perf_counter()
print(f'\nTotal time taken: {round(end - start, 2)} second(s)', end='')


***Running one seperate thread:
Starting multi_thread_with_busy_wait with n = 10000
Ended multi_thread_with_busy_wait after 5.704671859741211 seconds
Total time taken: 5.72 second(s)
***Running three spereate threads at the same time:
Starting multi_thread_with_busy_wait with n = 10000
Starting multi_thread_with_busy_wait with n = 10000
Starting multi_thread_with_busy_wait with n = 10000
Ended multi_thread_with_busy_wait after 15.898375272750854 seconds
Ended multi_thread_with_busy_wait after 17.480390787124634 seconds
Ended multi_thread_with_busy_wait after 17.56799864768982 seconds
Total time taken: 17.64 second(s)

## The ```multiprocessing``` Module

* Processes achieve parallelism using isolated process memory within a single Machine.
* Uses processes instead of threads that can take better advantage of multi-core computers
* Good for CPU-intensive parallelism (avoids GIL blocking) but adds multi-process overhead
* The ```Process``` class
* The ```Pool``` class

In [8]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  8


## The ```multiprocessing.Process``` Class

NOTE: This example will not print to Jupyter cell output because it is from another process. To see it work properly, edit the files ```mymodule.py``` and ```main.py``` and run ```main.py``` directly in the python interpreter at the command prompt instead and then see the result in the standard output.

```python
# mymodule.py
def say_hello(name):
    print('hello', name)
```

```python
# main.py
import multiprocessing
import mymodule

if __name__ == '__main__':
    p = multiprocessing.Process(target=mymodule.say_hello, args=('Sally',))
    p.start()
    p.join()
```


## The ```multiprocessing.Queue``` Class
* ```multiprocessing.Queue``` class supports multi-process queues rather than multi-thread queues

In [4]:
# toy example of multiprocessing.Queue that involves only one thread talking to itself in one process 
import multiprocessing as mp

queue = mp.Queue()
queue.put(1)
queue.put(2)
queue.put(3)
print(queue.get())
print(queue.get())
print(queue.get())
queue.close()

1
2
3


In [1]:
# More interesting example of multiprocessing.Queue that involves two threads in two processes talking
# NOTE:
# mymodule.py (has to be in a physical .py file rather than in a local Jupyter cell)
#class QueueMessageObject(object):
#    def __init__(self, value):
#        self.value = value
#    def display(self):
#        print(self.value)
#
#def worker_process(queue):
#    queue.put(QueueMessageObject('This is a message sent from worker_process.'))

import multiprocessing as mp
import mymodule as mm

# Create queue, start process with queue, read and display message from queue
queue = mp.Queue()
process = mp.Process(target=mm.worker_process, args=(queue,))
process.start()
queue.get().display()

queue.close()       # clean up queue resources
queue.join_thread() # clean up associated monitoring thread resources
process.join()      # wait for other process to terminate

This is a message sent from worker_process.


## Multiprocessing Timing Comparisons

In [9]:
# NOTE:
# mymodule.py (has to be in a physical .py file rather than in a local Jupyter cell)
#def multiprocessing_with_busy_wait(n, queue):
#    start_time = time.perf_counter()
#    count = 1
#    for i in range(1, n):
#        for j in range(1, n):
#            count = i + j          # pointless busy work for CPU to chew on
#    end_time = time.perf_counter()
#    if queue is not None:
#        ret = queue.get()
#        ret['delta_time'] = end_time - start_time
#        queue.put(ret)
    
import time
import multiprocessing as mp
import mymodule

print(f'\nStarting multiprocessing_with_busy_wait with n = {10000}', end='')
queue1 = mp.Queue()
queue1.put({'delta_time': 0})
p1 = mp.Process(target=mymodule.multiprocessing_with_busy_wait, args=(10000, queue1)) # few seconds CPU bound execution
print(f'\nStarting multiprocessing_with_busy_wait with n = {10000}', end='')
queue2 = mp.Queue()
queue2.put({'delta_time': 0})
p2 = mp.Process(target=mymodule.multiprocessing_with_busy_wait, args=(10000, queue2)) # few seconds CPU bound execution
print(f'\nStarting multiprocessing_with_busy_wait with n = {10000}', end='')
queue3 = mp.Queue()
queue3.put({'delta_time': 0})
p3 = mp.Process(target=mymodule.multiprocessing_with_busy_wait, args=(10000, queue3)) # few seconds CPU bound execution

start = time.perf_counter()

p1.start()
p2.start()
p3.start()

p1.join()
p2.join()
p3.join()

print(f'\nEnded multiprocessing_with_busy_wait after {queue1.get()} seconds', end='')
print(f'\nEnded multiprocessing_with_busy_wait after {queue2.get()} seconds', end='')
print(f'\nEnded multiprocessing_with_busy_wait after {queue3.get()} seconds', end='')

end = time.perf_counter()
print(f'\nTotal time taken: {round(end - start, 2)} second(s)', end='')


Starting multiprocessing_with_busy_wait with n = 10000
Starting multiprocessing_with_busy_wait with n = 10000
Starting multiprocessing_with_busy_wait with n = 10000
Ended multiprocessing_with_busy_wait after {'delta_time': 7.142707499999999} seconds
Ended multiprocessing_with_busy_wait after {'delta_time': 7.0401701999999995} seconds
Ended multiprocessing_with_busy_wait after {'delta_time': 7.1995857} seconds
Total time taken: 7.35 second(s)

## The ```multiprocessing.Pool``` Class
* ```Pool.map()```
* ```Pool.apply()```
* ```Pool.starmap()```
* ```Pool.map_async()```
* ```Pool.apply_async()```
* ```Pool.starmap_async()```

In [1]:
# Simple example using Pool.map() to distribute function over processes on multiple CPU cores

# NOTE:
# mymodule.py (has to be in a physical .py file rather than in a local Jupyter cell)
# def mysquare(x):
#    return x*x

from multiprocessing import Pool
import mymodule

with Pool(5) as p:
    print(p.map(mymodule.mysquare, [1, 2, 3, 4, 5]))

[1, 4, 9, 16, 25]


## Visualizing CPU Load in Performance Monitor: Threads vs Processes

In [12]:
# Sequential (no paralleization)

def single_thread_with_busy_wait(n):
    count = 1
    for i in range(1, n):
        for j in range(1, n):
            count = i + j          # pointless busy work for CPU to chew on

for i in range(0, mp.cpu_count()):
    single_thread_with_busy_wait(6000) # few seconds CPU bound execution

print("Done")

Done


### Sequential (no paralleization)
![# Sequential (no paralleization)](img/average_sequential.png)

In [13]:
# Parallelizing using Pool.map()

import multiprocessing as mp
import mymodule

# NOTE:
# mymodule.py (has to be in a physical .py file rather than in a local Jupyter cell)
#def multiprocessing_with_busy_wait(n, queue):
#    start_time = time.perf_counter()
#    count = 1
#    for i in range(1, n):
#        for j in range(1, n):
#            count = i + j          # pointless busy work for CPU to chew on
#    end_time = time.perf_counter()
#    if queue is not None:
#        ret = queue.get()
#        ret['delta_time'] = end_time - start_time
#        queue.put(ret)

pool = mp.Pool(mp.cpu_count()) # Establish multiprocessing pool
inputs = []
for i in range(0, mp.cpu_count()):
    inputs.append(6000)
pool.map(mymodule.multiprocessing_with_busy_wait, inputs) # Map function on pool
pool.close() # Close pool
print("Done")

Done


### Parallelizing with the Pool.map() Function
![# Sequential (no paralleization)](img/average_pool_map.png)