# "Python is slow"

In [1]:
def is_prime(number):
    for i in range(2, number):
        if number % i == 0:
            return False
    return True

In [2]:
numbers = [7853,7867,7873,7877,7879,7883,7901,7907,7919]

In [3]:
%%timeit
for number in numbers:
    is_prime(number)

3.85 ms ± 41.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [4]:
import threading

In [5]:
%%timeit
threads = []
for number in numbers:
    thread = threading.Thread(target=is_prime, args=(number,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

14.5 ms ± 148 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### **don't use threads for CPU-bound tasks**

# Why do we even have threading in Python?

Threads are useful for I/O work

In [6]:
import time

In [7]:
%%timeit
for _ in range(10):
    time.sleep(0.1)

1 s ± 121 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
%%timeit
threads = []
for _ in range(10):
    thread = threading.Thread(target=time.sleep, args=(0.1,))
    thread.start()
    threads.append(thread)
    
for thread in threads:
    thread.join()

104 ms ± 121 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


### **Use threads to speed up I/O tasks**

# More examples of I/O that is faster with threading

In [9]:
# to be continued... 

# I don't have to worry about race conditions if I have GIL, right?

Not quite

In [10]:
class Counter:
    def __init__(self):
        self.count = 0
        
    def increment(self):
        self.count += 1

In [11]:
def worker(wid, how_many, counter):
    for _ in range(how_many):
        counter.increment()

In [12]:
how_many = 10**5
counter = Counter()

In [13]:
threads = []
NUM_THREADS = 5
for i in range(NUM_THREADS):
    thread = threading.Thread(target=worker, args=(i, how_many,counter))
    threads.append(thread)
    thread.start()
    
for thread in threads:
    thread.join()

In [14]:
print(f'Counter should be {NUM_THREADS*how_many}, got {counter.count}')

Counter should be 500000, got 425917


The issue is that while increment() looks like a atomic operation it consist of several bytecode operations that can be interrupted by various threads

In [15]:
import dis

In [16]:
dis.dis(counter.increment)

  6           0 LOAD_FAST                0 (self)
              2 DUP_TOP
              4 LOAD_ATTR                0 (count)
              6 LOAD_CONST               1 (1)
              8 INPLACE_ADD
             10 ROT_TWO
             12 STORE_ATTR               0 (count)
             14 LOAD_CONST               0 (None)
             16 RETURN_VALUE


Example from `Effective Python: 90 Specific Ways to Write`
```python
counter.count += 1
```

corresponds to
```python
value = getattr(counter, 'count')  
result = value + 1  
setattr(counter, 'count', result)
```

Example of such interruption:

Running in Thread A  
```python
value_a = getattr(counter, 'count')
```  
Context switch to Thread B  
```python
value_b = getattr(counter, 'count')  
result_b = value_b + 1  
setattr(counter, 'count', result_b)  
```
Context switch back to Thread A  
```python
result_a = value_a + 1  
setattr(counter, 'count', result_a)
```

To prevent data races like this one, we should use locks

In [17]:
from threading import Lock

class LockingCounter:
    def __init__(self):
        self.lock = Lock()
        self.count = 0
        
    def increment(self):
        with self.lock:
            self.count += 1

In [18]:
counter = LockingCounter()
threads = []
NUM_THREADS = 5
for i in range(NUM_THREADS):
    thread = threading.Thread(target=worker, args=(i, how_many,counter))
    threads.append(thread)
    thread.start()
    
for thread in threads:
    thread.join()

In [19]:
print(f'Counter should be {NUM_THREADS*how_many}, got {counter.count}')

Counter should be 500000, got 500000


### **use locks to safely update data from multiple threads**

# Coordination between threads

## Cooridnation example - pipeline

In [20]:
import random

In [21]:
def init_number(number):
    return random.random()

In [22]:
def multiply_by_2(number):
    return 2 * number

In [23]:
def add_1(number):
    return number + 1

Distribute work between pipeline phases

In [24]:
from collections import deque
from threading import Lock

class MyQueue:
    def __init__(self):
        self.numbers = deque()
        self.lock = Lock()
        
    def put(self, number):
        with self.lock:
            self.numbers.append(number)
            
    def get(self):
        with self.lock:
            return self.numbers.popleft()

Each phase of the pipeline will be represented as a Python thread that takes work form one queue like this, runs a function on it, and puts the result on another queue.

In [25]:
from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
        
    def run(self):
        while True:
            # for statistics 
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                # wait a bit if the input queue is empty
                time.sleep(0.01)
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1
                

In [26]:
init_queue = MyQueue()
multiply_queue = MyQueue()
add_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(init_number, init_queue, multiply_queue),
    Worker(multiply_by_2, multiply_queue, add_queue),
    Worker(add_1, add_queue, done_queue),
]

In [27]:
for thread in threads:
    thread.start()

In [28]:
# dummy work to start the pipeline
for _ in range(1000):
    init_queue.put(None)

In [29]:
while len(done_queue.numbers) < 1000:
    # do something useful
    pass

In [30]:
processed = len(done_queue.numbers)
polled = sum(t.polled_count for t in threads)
print(f'Processed {processed} items after polling {polled} times')

Processed 1000 items after polling 3080 times


Workers tried to poll work many times and were wasting CPU doing nothing useful; they're constatnly raising and catching IndexError exceptions.

## Problems with this model
1. Determining that all of the input work is compelete requires yet another busy wait on the done_queue
2. In `Worker` the run method will execute forever in its busy loop
3. If the first phase makes rapid progress but the second phase makes slow progress, then the queue connecting the first phase to the second phase will constantly increase in size.

## Better approach - `Queue`

In [31]:
from queue import Queue
import threading
import time

In [32]:
my_queue = Queue()

Queue eliminates the busy waiting in the worker by making the get method block until new data is available

In [33]:
def consumer():
    print('Consumer waiting')
    my_queue.get()
    print('Consumer done')

In [34]:
thread = threading.Thread(target=consumer)

In [35]:
thread.start()

Consumer waiting


Even though the thread is running first, it won't finish until an item is put on the Queue instance and the get method has something to return.

In [36]:
print('Producer putting')
my_queue.put(None)
print('Producer done')

Producer putting
Producer done
Consumer done


To solve the pipeline backup issue, the `Queue` class lets you specify the maximum amout of pending work to allow between two phases. This buffer size causes calls to put to block when the queue is already full. For example, here I define a thread that waits for a while before consuming a queue:

In [37]:
my_queue = Queue(1)

In [38]:
def consumer():
    time.sleep(0.1)
    my_queue.get()
    print('Consumer got 1')
    my_queue.get()
    print('Consumer got 2')
    print('Consumer done')

In [39]:
thread = threading.Thread(target=consumer)

In [40]:
thread.start()

In [41]:
my_queue.put(None)
print('Producer put 1')
my_queue.put(None)
print('Producer put 2')
print('Producer done')
thread.join()

Producer put 1
Consumer got 1
Producer put 2
Producer done
Consumer got 2
Consumer done


The `Queue` class can also track the progress of work using the task_done method. This lets you wait for a phase's input queue to drain and elimates the need to poll the last phase of a pipline (as with the done_queue above). For example, here I define a consumer thread that calls task_done when it finishes working on an item:

In [42]:
in_queue = Queue()

In [43]:
def consumer():
    print('Consumer waiting')
    work = in_queue.get()
    print('Consumer working')
    # work
    print('Consumer done')
    in_queue.task_done()

In [44]:
thread = threading.Thread(target=consumer)
thread.start()

Consumer waiting


In [45]:
print('Producer putting')
in_queue.put(None)
print('Producer waiting')
in_queue.join()
print('Producer done')
thread.join()

Producer putting
Producer waiting
Consumer working
Consumer done
Producer done


In [46]:
class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return # Cause the thread to exit
                yield item
            finally:
                self.task_done()

Now, I can redefine my worker thread to rely on the behaviour of the ClosableQueue class. The thread will exit when the for loop is exhausted.

In [47]:
class StoppableWorker(threading.Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

In [48]:
import random

In [49]:
def init_number(number):
    return random.randrange(number)

In [50]:
def multiply_by_2(number):
    return number*2

In [51]:
def add_1(number):
    return number + 1

In [52]:
init_queue = ClosableQueue()
multiply_queue = ClosableQueue()
add_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(init_number, init_queue, multiply_queue),
    StoppableWorker(multiply_by_2, multiply_queue, add_queue),
    StoppableWorker(add_1, add_queue, done_queue),
]

In [53]:
for thread in threads:
    thread.start()
    
for number in range(1, 100):
    init_queue.put(number)

In [54]:
init_queue.close()
init_queue.join()
multiply_queue.close()
multiply_queue.join()
add_queue.close()
add_queue.join()

for thread in threads:
    thread.join()

In [55]:
print(done_queue.qsize(), 'items finished')

99 items finished


### **use `Queue` to create pipeline of tasks**

## Can we do even better with multiple workers per phase?

In [56]:
def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

In [57]:
def stop_threads(closable_queue, threads):
    for _ in threads:
        closable_queue.close()
    
    closable_queue.join()
    
    for thread in threads:
        thread.join()

In [58]:
initial_queue = ClosableQueue()
multiply_queue = ClosableQueue()
add_queue = ClosableQueue()
done_queue = ClosableQueue()

In [59]:
initial_threads = start_threads(3, init_number, initial_queue, multiply_queue)
multiply_threads = start_threads(3, multiply_by_2, multiply_queue, add_queue)
add_threads = start_threads(3, add_1, add_queue, done_queue)

for number in range(1,100):
    initial_queue.put(number)

stop_threads(initial_queue, initial_threads)
stop_threads(multiply_queue, multiply_threads)
stop_threads(add_queue, add_threads)

In [60]:
print(done_queue.qsize(), 'items finished')

99 items finished


### **you can use multiple threads for tasks in one `Queue`**