# Multithreading and Multiprocessing

In this module, you'll learn

- How much (or little) concurrency you can get with Python threads
- How to use synchronization primitives with threading
- How to use the multiprocessing module to better utilize multicore machines


# Threading: the Global Interpreter Lock

- Only one Python thread active at a time
- C libraries can release the GIL
  - I/O libraries, NumPy, etc.
- Python threads are real, preemptive OS threads


# Python Threads

`threading.Thread(target, args=(), kwargs=None)`
- target - Python function to call
- args, kwargs - arguments to function
- can also subclass & override run()


## Basic threading example

In [2]:
import threading

class MyThread(threading.Thread):
    
    def run(self):
        self.text = input('Enter something: ')
        
input('Press enter to begin: ')
thread = MyThread()
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
        
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {thread.text}')

Press enter to begin: 
Enter something: something
calculated squares up to 1637700 * 1637700 = 2,682,058,014,601 while you typed something


## Classless threading

In [3]:
def cabbage():
    global text
    text = input('Enter something: ')
    
input('Press enter to begin: ')
thread = threading.Thread(target=cabbage)
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
    
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {text}')    

Press enter to begin: 
Enter something: carrots
calculated squares up to 1539490 * 1539490 = 2,370,026,381,121 while you typed carrots


# Threading for performance (?!)

In [4]:
import time
import json
from urllib.request import urlopen
from urllib.parse import urlencode

APPID = '10d4440bbaa8581bb8da9bd1fbea5617'
UNITS = 'metric'

cities = [
    'Boulder', 'Atlanta', 'San Francisco',
    'Reno', 'Honolulu', 'Zurich', 'Dubai',
    'Dublin','Stuttgart', 'Rome', 'Singapore', 
    'Bangalore', 'Hyderabad', 'Hong Kong',
]

def get_temp(city, temps, units=UNITS, appid=APPID):
    qs = urlencode({
        'q': city,
        'units': units,
        'appid': appid,
    })
    url = f'http://api.openweathermap.org/data/2.5/weather?{qs}'
    with urlopen(url) as resp:
        data = json.load(resp)
        temps[city] = data['main']['temp']
        
def run_serially():
    for city in cities:
        get_temp(city, temps)
        
def run_threaded():
    # Create the threads
    threads = [
        threading.Thread(target=get_temp, args=(c, temps)) 
        for c in cities
    ]

    # start all threads
    for thread in threads:
        thread.start() # not run()

    # wait for all threads to complete
    for thread in threads:
        thread.join()

In [5]:
%%time
temps = {}

run_serially()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}°C in {k}')

it is 15°C in Atlanta
it is 33°C in Bangalore
it is 3°C in Boulder
it is 31°C in Dubai
it is 7°C in Dublin
it is 20°C in Hong Kong
it is 21°C in Honolulu
it is 33°C in Hyderabad
it is -3°C in Reno
it is 15°C in Rome
it is 7°C in San Francisco
it is 31°C in Singapore
it is 14°C in Stuttgart
it is 13°C in Zurich
CPU times: user 21.2 ms, sys: 10.1 ms, total: 31.2 ms
Wall time: 1.02 s


In [6]:
%%time
temps = {}

run_threaded()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}°C in {k}')

it is 15°C in Atlanta
it is 33°C in Bangalore
it is 3°C in Boulder
it is 31°C in Dubai
it is 7°C in Dublin
it is 20°C in Hong Kong
it is 21°C in Honolulu
it is 33°C in Hyderabad
it is -3°C in Reno
it is 15°C in Rome
it is 7°C in San Francisco
it is 31°C in Singapore
it is 14°C in Stuttgart
it is 13°C in Zurich
CPU times: user 16.3 ms, sys: 9.12 ms, total: 25.4 ms
Wall time: 259 ms


# Synchronization primitives

Like other threading libraries, Python has support for `Lock`s, `Event`s, and `Semaphore`s:

In [7]:
lock = threading.Lock()
lock.acquire()
# critical work here
lock.release()

In [8]:
# Better
lock = threading.Lock()
with lock:
    print('Do things with lock locked, will be released after block')

Do things with lock locked, will be released after block


In [9]:
sem = threading.Semaphore(4)
sem.acquire()
sem.acquire()
sem.acquire()
sem.acquire()
# sem.acquire() would block
sem.release()

In [10]:
# Better
sem = threading.Semaphore(4)
with sem:
    with sem:
        with sem:
            with sem:
                print('Do things with semaphore acquired, will be released after block')

Do things with semaphore acquired, will be released after block


### Threadsafe queue class

If you structure your threads to send/receive data rather than just _share_ data, you can use a `queue.Queue`:

In [11]:
import queue
import threading 

def worker(q):
    while True:
        value = q.get()
        print('I got a {}'.format(value))
        if value is None:
            break

q = queue.Queue()
thd = threading.Thread(target=worker, args=(q, ))
thd.start()        

In [12]:
q.put('Hello there')

I got a Hello there


In [13]:
q.put('General Kenobi')

I got a General Kenobi


In [14]:
q.put(None)

I got a None


In [16]:
thd.is_alive()

False

### Simple thread pool

(There is also a thread pool in `multiprocessing.pool.ThreadPool`)

In [17]:
class Pool():
    def __init__(self, count):
        self.count = count
        self.job = queue.Queue()
        self.result = queue.Queue()
        self.threads = [
            threading.Thread(target=self.worker)
            for i in range(count)
        ]
        for t in self.threads:
            t.setDaemon(True)
            t.start()
            
    def worker(self):
        while True:
            job = self.job.get()
            try:
                result = job()
            except Exception as err:
                self.result.put(err)
            else:
                self.result.put(result)
                

In [18]:
pool = Pool(4)
import time

def job():
    print('Starting job', flush=True)
    time.sleep(3)
    print('exiting job', flush=True)

for i in range(10):
    pool.job.put(job)
print('Jobs created!', flush=True)

Jobs created!
Starting jobStarting jobStarting job


Starting job
exiting jobexiting job
exiting job
exiting job
Starting job
Starting job

Starting job
Starting job
exiting jobexiting jobexiting job

exiting job
Starting job
Starting job

exiting jobexiting job



# Multiprocessing with Python

In this module, you'll learn

- How to use the multiprocessing module 
- How to use multiprocessing's support for synchronization and communication

# Multiprocessing

- Based on Threading
- No GIL, no shared memory without extra work
- Uses multicore well

# Simple Multiprocessing example

In [21]:
from multiprocessing import Process, cpu_count
import time
import os

class MuchCPU(Process):
    def run(self):
        print(os.getpid())
        print(__name__)
        for i in range(20_000_000):
            result = i * i

if __name__ == '__main__':
    print('Running...')
    procs = [MuchCPU() for f in range(cpu_count())]
    # procs = [MuchCPU(), MuchCPU()]
    t = time.time()
    for p in procs:
        p.start()
        #p.join()
    
    for p in procs:
        p.join()
    
    print('work took {} seconds'.format(time.time() - t))

Running...
94122
__main__
94123
__main__
94125
94126
__main__
94127
__main__
94128
__main__
__main__
94129
__main__
94130
__main__
work took 3.9555437564849854 seconds


In [22]:
from multiprocessing import Process, cpu_count
import time
import os

def target():
    print(os.getpid())
    print(__name__)
    for i in range(20_000_000):
        result = i * i

print('Running...')
procs = [Process(target=target) for f in range(cpu_count())]
t = time.time()
for p in procs:
    p.start()
    # p.join()

for p in procs:
    p.join()

print('work took {} seconds'.format(time.time() - t))

Running...
94135
__main__
94136
94137
__main__
94138
__main__
__main__
94139
94140
__main__
94141
__main__
__main__
94142
__main__
work took 4.530237913131714 seconds


# Pools

In [23]:
from multiprocessing import Pool

In [24]:
def much_cpu(n):
    for i in range(n):
        result = i * i    
    return result

In [25]:
args_list = [20_000_000] * 14
args_list

[20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000,
 20000000]

multiprocessing.Pool
 
 - map(f, args) => list
 - imap(f, args) => iterator
 - imap_unordered(f, args) => iterator but unordered

In [27]:
with Pool(processes=4) as p:
    for result in p.imap_unordered(much_cpu, args_list):
        print(result)

399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001
399999960000001


# Multiprocess synchronization and communication

- Lock, Condition, Semaphore, Event
- Queue & Pipe
- Shared Memory

### Multiprocessing basic shared memory

In [28]:
%%time
import random
import multiprocessing

ROWS = 100_000
COLS = 8

X = [random.random() for i in range(ROWS * COLS)]
Y = [random.random() for i in range(ROWS * COLS)]

CPU times: user 219 ms, sys: 30.4 ms, total: 249 ms
Wall time: 250 ms


In [29]:
def mularray(X, Y):
    return [x * y for x, y in zip(X, Y)]

Run #1: single-process

In [30]:
%%time
Z = mularray(X, Y)

CPU times: user 70.2 ms, sys: 18 ms, total: 88.1 ms
Wall time: 88.3 ms


In [31]:
Z[:10]

[0.36843399973405455,
 0.1605392364981037,
 0.16724758172989154,
 0.20537871617059872,
 0.031997214062215285,
 0.14615601862759336,
 0.3288515721557365,
 0.13964027822644307,
 0.3875046208441948,
 0.3690174313243539]

Run #2: multiprocess (pool)

In [32]:
offsets = [c * ROWS for c in range(COLS)]

offsets

[0, 100000, 200000, 300000, 400000, 500000, 600000, 700000]

In [33]:
%%time
#offsets = [c * ROWS for c in range(COLS)]

def pmul(offset):
    return mularray(
        X[offset:offset + ROWS], 
        Y[offset:offset + ROWS],
    )

Z = []
with multiprocessing.Pool() as pool:
    for Zpart in pool.imap(pmul, offsets):
        Z += Zpart
    

CPU times: user 70.2 ms, sys: 72.5 ms, total: 143 ms
Wall time: 276 ms


In [34]:
Z[:10]

[0.36843399973405455,
 0.1605392364981037,
 0.16724758172989154,
 0.20537871617059872,
 0.031997214062215285,
 0.14615601862759336,
 0.3288515721557365,
 0.13964027822644307,
 0.3875046208441948,
 0.3690174313243539]

Run #3: multiprocess (pool), shared memory

In [35]:
offsets

[0, 100000, 200000, 300000, 400000, 500000, 600000, 700000]

In [38]:
sX = multiprocessing.Array('f', X)
sY = multiprocessing.Array('f', Y)
sZ = multiprocessing.Array('f', ROWS * COLS)

In [39]:
%%time
def pmul(offset):
    for i in range(offset, offset + ROWS):
        sZ[i] = sX[i] * sY[i]
        
with multiprocessing.Pool() as pool:
    pool.map(pmul, offsets)

CPU times: user 17.3 ms, sys: 27.6 ms, total: 44.9 ms
Wall time: 6.42 s


### Aside: if you _really_ want it fast, just use numpy (or better yet, numpy + multiprocessing)

In [40]:
!pip install numpy

Looking in links: /Users/rick446/src/wheelhouse
[33mYou are using pip version 19.0.3, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [41]:
import numpy as np

In [42]:
X = np.random.random(80000)
Y = np.random.random(80000)

In [43]:
%%time
Z = X * Y

CPU times: user 10.6 ms, sys: 8.05 ms, total: 18.7 ms
Wall time: 18.1 ms


# Lab

Open [concurrency lab][multithreading-multiprocessing-lab]

[multithreading-multiprocessing-lab]: ./multithreading-multiprocessing-lab.ipynb

In [47]:
multiprocessing.pool.ThreadPool.mro()

[multiprocessing.pool.ThreadPool, multiprocessing.pool.Pool, object]