# Multithreading and Multiprocessing

In this module, you'll learn

- How much (or little) concurrency you can get with Python threads
- How to use synchronization primitives with threading
- How to use the multiprocessing module to better utilize multicore machines


# Threading: the Global Interpreter Lock

- Only one Python thread active at a time
- C libraries can release the GIL
  - I/O libraries, NumPy, etc.
- Python threads are real, preemptive OS threads


# Python Threads

`threading.Thread(target, args=(), kwargs=None)`
- target - Python function to call
- args, kwargs - arguments to function
- can also subclass & override run()


## Basic threading example

In [1]:
import threading

class MyThread(threading.Thread):
    
    def run(self):
        self.text = input('Enter something: ')
        
input('Press enter to begin: ')
thread = MyThread()
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
        
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {thread.text}')

Press enter to begin: 
Enter something: Salesƒorce
calculated squares up to 544665 * 544665 = 296,658,872,896 while you typed Salesƒorce


## Classless threading

In [2]:
def cabbage():
    global text
    text = input('Enter something: ')
    
input('Press enter to begin: ')
thread = threading.Thread(target=cabbage)
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
    
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {text}')    

Press enter to begin: 
Enter something: asdf
calculated squares up to 94387 * 94387 = 8,908,716,996 while you typed asdf


# Threading for performance (?!)

In [3]:
import time
import json
from urllib.request import urlopen
from urllib.parse import urlencode

APPID = '10d4440bbaa8581bb8da9bd1fbea5617'
UNITS = 'imperial'

cities = [
    'Boulder', 'Atlanta', 'San Francisco',
    'Reno', 'Honolulu', 'Zurich', 'Dubai',
    'Dublin','Stuttgart', 'Rome', 'Singapore', 
    'Bangalore', 'Hyderabad', 'Hong Kong',
    'Durham', 'New Orleans',
]

def get_temp(city, temps, units=UNITS, appid=APPID):
    qs = urlencode({
        'q': city,
        'units': units,
        'appid': appid,
    })
    url = f'http://api.openweathermap.org/data/2.5/weather?{qs}'
    with urlopen(url) as resp:
        data = json.load(resp)
        temps[city] = data['main']['temp']
        
def run_serially():
    for city in cities:
        get_temp(city, temps)
        
def run_threaded():
    # Create the threads
    threads = [
        threading.Thread(target=get_temp, args=(c, temps)) 
        for c in cities
    ]

    # start all threads
    for thread in threads:
        thread.start() # not run()

    # wait for all threads to complete
    for thread in threads:
        thread.join()

In [4]:
%%time
temps = {}

run_serially()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}° in {k}')

it is 85° in Atlanta
it is 78° in Bangalore
it is 84° in Boulder
it is 95° in Dubai
it is 71° in Dublin
it is 87° in Durham
it is 82° in Hong Kong
it is 78° in Honolulu
it is 78° in Hyderabad
it is 88° in New Orleans
it is 78° in Reno
it is 84° in Rome
it is 58° in San Francisco
it is 79° in Singapore
it is 73° in Stuttgart
it is 71° in Zurich
CPU times: user 71.1 ms, sys: 37.8 ms, total: 109 ms
Wall time: 1.3 s


In [5]:
%%time
temps = {}

run_threaded()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}° in {k}')

it is 85° in Atlanta
it is 78° in Bangalore
it is 84° in Boulder
it is 95° in Dubai
it is 73° in Dublin
it is 87° in Durham
it is 82° in Hong Kong
it is 78° in Honolulu
it is 78° in Hyderabad
it is 88° in New Orleans
it is 78° in Reno
it is 84° in Rome
it is 57° in San Francisco
it is 79° in Singapore
it is 73° in Stuttgart
it is 71° in Zurich
CPU times: user 58.1 ms, sys: 33.6 ms, total: 91.8 ms
Wall time: 287 ms


# Synchronization primitives

Like other threading libraries, Python has support for `Lock`s, `Event`s, and `Semaphore`s:

In [6]:
lock = threading.Lock()
lock.acquire()
# critical work here 
lock.release()

In [7]:
# Better
lock = threading.Lock()
with lock:
    print('Do things with lock locked, will be released after block')

Do things with lock locked, will be released after block


In [8]:
sem = threading.Semaphore(4)
sem.acquire()
# up to 4 threads could be running in here
sem.release()

In [9]:
# Better
sem = threading.Semaphore(4)
with sem:
    print('Running in up to 4 different threads')


Running in up to 4 different threads


### Threadsafe queue class

If you structure your threads to send/receive data rather than just _share_ data, you can use a `queue.Queue`:

In [10]:
import queue
import threading 

def worker(q):
    while True:
        value = q.get()
        print('I got a {}'.format(value))
        if value is None:
            print('Got none, so exiting')
            break

q = queue.Queue()
thd = threading.Thread(target=worker, args=(q, ))
thd.start()        

In [11]:
q.put('Hello there')

I got a Hello there


In [12]:
q.put('General Kenobi')

I got a General Kenobi


In [13]:
thd.is_alive()

True

In [14]:
q.put(None)

I got a None
Got none, so exiting


In [15]:
thd.is_alive()

False

### Simple thread pool

(There is also a thread pool in `multiprocessing.pool.ThreadPool`)

In [21]:
class Pool():
    def __init__(self, count):
        self.count = count
        self.job = queue.Queue()
        self.result = queue.Queue()
        self.threads = [
            threading.Thread(target=self.worker)
            for i in range(count)
        ]
        for t in self.threads:
            t.setDaemon(True)
            t.start()
            
    def worker(self):
        while True:
            job = self.job.get()
            try:
                result = job()
            except Exception as err:
                self.result.put((False, err))
            else:
                self.result.put((True, result))
                

In [22]:
pool = Pool(4)
import time
import random

def job():
    print('Starting job', flush=True)
    time.sleep(3 + random.random())
    print('exiting job', flush=True)

for i in range(10):
    pool.job.put(job)
print('Jobs created!', flush=True)

Jobs created!Starting jobStarting jobStarting job

Starting job


exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
exiting job
exiting job
exiting job


# Multiprocessing with Python

In this module, you'll learn

- How to use the multiprocessing module 
- How to use multiprocessing's support for synchronization and communication

# Multiprocessing

- Based on Threading
- No GIL, no shared memory without extra work
- Uses multicore well
- Much more memory intensive than threads

# Simple Multiprocessing example

In [24]:
cpu_count()

8

In [26]:
from multiprocessing import Process, cpu_count
import time
import os

class MuchCPU(Process):
    def run(self):
        print(os.getpid())
        print(__name__)
        for i in range(20_000_000):
            result = i * i

if __name__ == '__main__':
    print('Running...')
    procs = [MuchCPU() for f in range(cpu_count())]
    # procs = [MuchCPU(), MuchCPU()]
    t = time.time()
    for p in procs:
        p.start()
        #p.join()
    
    for p in procs:
        p.join()
    
    print('work took {} seconds'.format(time.time() - t))

Running...
work took 1.040837287902832 seconds


In [27]:
from multiprocessing import Process, cpu_count
import time
import os

def target():
    print(os.getpid())
    print(__name__)
    for i in range(20_000_000):
        result = i * i

print('Running...')
procs = [Process(target=target) for f in range(cpu_count())]
t = time.time()
for p in procs:
    p.start()
    #p.join()

for p in procs:
    p.join()

print('work took {} seconds'.format(time.time() - t))

Running...
work took 1.0674312114715576 seconds


# Pools

In [1]:
from multiprocessing import Pool

In [2]:
import time
import random

def much_cpu(n):
    # time.sleep(random.random())
    
    result = 0
    for i in range(n):
        result += i * i    
    return result

In [3]:
args_list = [200] * 14
args_list

[200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200]

multiprocessing.Pool
 
 - map(f, args) => list
 - imap(f, args) => iterator
 - imap_unordered(f, args) => iterator but unordered

In [None]:
with Pool() as p:
    for result in p.imap(much_cpu, args_list):
        print(result)

In [None]:
p.apply_async()

In [44]:
from multiprocessing.pool import ThreadPool

In [48]:
with ThreadPool(processes=16) as p:
    for result in p.imap_unordered(much_cpu, args_list):
        print(result)

2646700
2646700
2646700
2646700
2646700
2646700
2646700
2646700
2646700
2646700
2646700
2646700
2646700
2646700


# Multiprocess synchronization and communication

- Lock, Condition, Semaphore, Event
- Queue & Pipe
- Shared Memory

### Multiprocessing basic shared memory

In [49]:
%%time
import random
import multiprocessing

ROWS = 100_000
COLS = 8

X = [random.random() for i in range(ROWS * COLS)]
Y = [random.random() for i in range(ROWS * COLS)]

CPU times: user 1.02 s, sys: 123 ms, total: 1.14 s
Wall time: 1.26 s


In [50]:
def mularray(X, Y):
    return [x * y for x, y in zip(X, Y)]

Run #1: single-process

In [51]:
%%time
Z = mularray(X, Y)

CPU times: user 254 ms, sys: 45.4 ms, total: 300 ms
Wall time: 344 ms


In [52]:
Z[:10]

[0.09042892364668567,
 0.41253470977579804,
 0.6290376423078893,
 0.8033305434242515,
 0.15525467125212294,
 0.08001527829349128,
 0.019202742449277835,
 0.01875598526078059,
 0.6903301780945288,
 0.2706754300776931]

Run #2: multiprocess (pool)

In [53]:
offsets = [c * ROWS for c in range(COLS)]

offsets

[0, 100000, 200000, 300000, 400000, 500000, 600000, 700000]

In [54]:
%%time
#offsets = [c * ROWS for c in range(COLS)]

def pmul(offset):
    return mularray(
        X[offset:offset + ROWS], 
        Y[offset:offset + ROWS],
    )

Z = []
with multiprocessing.Pool() as pool:
    for Zpart in pool.imap(pmul, offsets):
        Z += Zpart
    

KeyboardInterrupt: 

In [None]:
Z[:10]

Run #3: multiprocess (pool), shared memory

In [None]:
offsets

In [55]:
sX = multiprocessing.Array('f', X, lock=False)
sY = multiprocessing.Array('f', Y, lock=False)
sZ = multiprocessing.Array('f', ROWS * COLS, lock=False)

In [56]:
len(sX)

800000

In [None]:
%%time
def pmul(offset):
    for i in range(offset, offset + ROWS):
        sZ[i] = sX[i] * sY[i]
        
with multiprocessing.Pool() as pool:
    for x in pool.imap_unordered(pmul, offsets):
        pass

### Aside: if you _really_ want it fast, just use numpy (or better yet, numpy + multiprocessing)

In [57]:
!pip install numpy

Looking in links: /Users/rick446/src/wheelhouse
You should consider upgrading via the '/Users/rick446/.virtualenvs/py38/bin/python3.8 -m pip install --upgrade pip' command.[0m


In [58]:
import numpy as np

In [59]:
X = np.random.random(800_000)
Y = np.random.random(800_000)

In [60]:
%%time
Z = X * Y

CPU times: user 13 ms, sys: 11.6 ms, total: 24.5 ms
Wall time: 66.8 ms


# Lab

Open [concurrency lab][multithreading-multiprocessing-lab]

[multithreading-multiprocessing-lab]: ./multithreading-multiprocessing-lab.ipynb