# Agenda

1. Concurrency and parallelism in programming in general, and Python in particular
2. Basic threads
3. Joining threads
4. Switching threads and the GIL
5. Sharing data (and other resources)
6. Producer-consumer 
7. Events and timers
8. Multiprocessing
9. `concurrent.futures` and making life easier for ourselves

# Concurrency and parallelism in Python

- Concurency means: I have several things that I want to be tracking at once, even if they're not necessarily executing at once.
- Parallelism means: I have several things that I want to be tracking at once, *AND* they should also be executing at once.

If you want true parallel execution on a computer, then you need multiple cores (processors).  But you'll probably have more processes running than cores anyway, which means that the computer needs to keep track of each process, swapping it in and out of memory to the CPUs.

How can we have multiple things happen in our program, so that we can break a problem apart and deal with it using concurrency?
- The oldest, and most traditional, way is to use *processes*.  The good news is that each process runs separately, with its own memory, and is independent of other processes.  This means that the computer can decide which core runs which process, and when.  The problem is that there's a lot of overhead to that -- it takes more memory, and switching requires more time + resources.
- A newer way to do things is *threads*.  If your OS runs multiple processes, then your process can contain multiple threads. The idea is that the OS tells a process that it now has a chance to run, and then inside of that process, each thread gets a chance to run.  The advantage of threads is that they're much lighter weight, and thus it's easier to switch between them.  Plus, because they are in the same process, they can share memory.

Threads weren't ever popular in the Unix world.  But they became super popular among Windows programmers and in the Java world.  The combination forced Unix people to admit that maybe threads aren't that bad.

# Simple example of threads

To use threads in Python, we need:

- the `threading` module
- a function we want to run in a thread (i.e., not serially, but in parallel with the "main thread")


In [3]:
# let's run the function serially -- meaning, our Python interpreter will consist of 
# one process and one thread.  It'll run our function 5 times.

def hello():
    print('Hello!')
    
for i in range(5):
    hello()

Hello!
Hello!
Hello!
Hello!
Hello!


In [5]:
# now let's run our function 5 times, but each time we do that, we're going to 
# do so inside of a new thread.

# Meaning: We're not going to run the function ourselves, directly.  We're going to
# create a new Thread object, and hand it the function we want to run.  The 
# Thread object will run the function on our behalf inside of a new thread


In [6]:
import threading     # the module we need to work with threads

def hello():
    print('Hello!')

t = threading.Thread(target=hello)    # the function "hello" is the argument we pass to "target"
t.start()                             # ask t to run our function in a new thread

Hello!


In [12]:
# let's run our function 5 times, as before, each time in its own thread

def hello():
    print('Hello!\n', end='')     # don't add \n to the end of print
    
for i in range(5):
    t = threading.Thread(target=hello)
    t.start()   

Hello!
Hello!
Hello!
Hello!
Hello!


In [15]:
# let's prove that we are running concurrently
# how? We'll add time.sleep to our function call
# then, we'll see if the functions run in order
# we'll also add a number to the function call, so we can identify the threads

import time
import random

def hello(n):
    time.sleep(random.randint(0, 3))   # sleep 0-3 seconds
    print(f'{n} Hello!\n', end='')     # don't add \n to the end of print
    
for i in range(5):
    t = threading.Thread(target=hello, args=(i,))   # run the function with the argument of i
    t.start()   

2 Hello!
3 Hello!
0 Hello!
1 Hello!
4 Hello!


# Things to know

1. We can name our threads, which makes them easier to identify just pass "name=" when we create a new thread object.
2. We can always get the currently running thread object from `threading.current_thread()`.  I can get the name of the current thread with `threading.current_thread().name`
3. Because we're not running our function directly, but are rather outsourcing it to the threading system, our function will not return to us.  Any returned value will be ignored.

# Exercise: Hello and goodbye

1. Write two functions, `hello` and `goodbye`, similar to my `hello` here -- it'll take an integer as an ID number, and it'll `time.sleep` for a random number of seconds.  (Keep it small!)
2. Launch 5 threads for each of these functions.
3. You should have a total of 10 lines printed out, 5 from `hello` and 5 from `goodbye`.

In [17]:
import threading
import time
import random

def hello(n):
    time.sleep(random.randint(0, 3))
    print(f'{n} Hello!\n', end='')
    
def goodbye(n):
    time.sleep(random.randint(0, 3))
    print(f'{n} Goodbye!\n', end='')
    
for i in range(5):
    t = threading.Thread(target=hello, args=(i,), name=f'hello-{i}')
    t.start()
    
for i in range(5):
    t = threading.Thread(target=goodbye, args=(i,), name=f'goodbye-{i}')
    t.start() 


0 Goodbye!
3 Goodbye!
2 Goodbye!
1 Goodbye!
4 Hello!
4 Goodbye!
0 Hello!
1 Hello!
2 Hello!
3 Hello!


main thread
hello-0
hello-1
hello-2
hello-3
hello-4
goodbye-0
goodbye-1
goodbye-2
goodbye-3
goodbye-4


# When do threads give up the CPU?

1. When 5ms (about) pass.  Python will allow a thread to run as many bytecodes as it wants within that time slice. But as soon as the time is up, the current thread can finish the current bytecode, and then it gives up the CPU.
2. Every time a thread handles I/O (input/output), it gives us control of the CPU to another thread. That's because I/O (disk/network/screen) takes so long compared with everything else, it's not worth keeping the CPU when we'll be waiting.

When we say `print('a')` in Python, we're basically saying: (1) Print the string `a`, then (2) print the `'\n'`.  Because these are two separate outputs to I/O, the thread often (not always, but often) gives up control.

In [18]:
import dis  # disassembly module in Python 

dis.dis(hello)   # show me the bytecodes for the "hello" function

  6           0 LOAD_GLOBAL              0 (time)
              2 LOAD_METHOD              1 (sleep)
              4 LOAD_GLOBAL              2 (random)
              6 LOAD_METHOD              3 (randint)
              8 LOAD_CONST               1 (0)
             10 LOAD_CONST               2 (3)
             12 CALL_METHOD              2
             14 CALL_METHOD              1
             16 POP_TOP

  7          18 LOAD_GLOBAL              4 (print)
             20 LOAD_FAST                0 (n)
             22 FORMAT_VALUE             0
             24 LOAD_CONST               3 (' Hello!\n')
             26 BUILD_STRING             2
             28 LOAD_CONST               4 ('')
             30 LOAD_CONST               5 (('end',))
             32 CALL_FUNCTION_KW         2
             34 POP_TOP
             36 LOAD_CONST               0 (None)
             38 RETURN_VALUE


In [None]:
# for i in range(5):
#     t1 = threading.Thread(target=hello, args=(i,))  # run the function with the argument of i
#     t2 = threading.Thread(target=goodbye, args=(i,))   # run the function with the argument of i
#     t1.start()
#     t2.start()

In [20]:
# let's print something to the user when we're done!

import threading
import time
import random

def hello(n):
    time.sleep(random.randint(0, 3))   # sleep 0-3 seconds
    print(f'{n} Hello!\n', end='')     # don't add \n to the end of print
    
for i in range(5):
    t = threading.Thread(target=hello, args=(i,))   # run the function with the argument of i
    t.start()   
    
time.sleep(7)   # don't do this!
print('*** DONE! ***')    

1 Hello!
3 Hello!
0 Hello!
4 Hello!
2 Hello!
*** DONE! ***


In [22]:
# what do I do, in order to wait for all of the threads to complete?
# I can use "join"
# "join" is a method we can run on a thread object
# it means: I'll wait for you to finish

import threading
import time
import random

def hello(n):
    time.sleep(random.randint(0, 3))   # sleep 0-3 seconds
    print(f'{n} Hello!\n', end='')     # don't add \n to the end of print
    
for i in range(5):
    t = threading.Thread(target=hello, args=(i,))   # run the function with the argument of i
    t.start()   
    t.join()   # wait for this thread to run, and then go create a new one
    
print('*** DONE! ***')    

0 Hello!
1 Hello!
2 Hello!
3 Hello!
4 Hello!
*** DONE! ***


In [25]:
# what do I do, in order to wait for all of the threads to complete?
# I can use "join"
# "join" is a method we can run on a thread object
# it means: I'll wait for you to finish

# I'm going to put all threads in a list
# then I'll iterate over that list, joining each thread (meaning: wait for the thread to finish)
# when I'm done joining each thread, I know that they're all done

import threading
import time
import random

def hello(n):
    time.sleep(random.randint(0, 3))   # sleep 0-3 seconds
    print(f'{n} Hello!\n', end='')     # don't add \n to the end of print
    
all_threads = []
for i in range(5):
    t = threading.Thread(target=hello, args=(i,), name=f'hello-{i}')   # run the function with the argument of i
    t.start()   
    all_threads.append(t)
    
# Now go through each thread, and join it (i.e., wait for it)
for one_thread in all_threads:
    print(f'\tNow joining {one_thread.name}')
    one_thread.join()    # join blocks -- it hangs, waiting for the thread to finish
    
# by the time I reach line 28, I'm guaranteed that all of the threads are done
print('*** DONE! ***')    

0 Hello!
2 Hello!
3 Hello!
	Now joining hello-0
	Now joining hello-1
4 Hello!
1 Hello!
	Now joining hello-2
	Now joining hello-3
	Now joining hello-4
*** DONE! ***


In [29]:
# as we create each thread (and launch our function in it), we store the thread in all_threads
# then, after our threads have launched, we repeatedly iterate through all_threads
# with each iteration over all_threads, we give one_thread the chance to say, "Yes, I'm done!"
#     we do that with "join"
# but we don't want to give the thread forever to tell us it's done
# so we give it 0.1 seconds to tell us that.  
#     if the thread is done, then we remove it from all_threads
#     if the thread is NOT done, we go onto the next one
# when all_threads is empty, we stop iterating over it

import threading
import time
import random

# define our function
def hello(n):
    time.sleep(random.randint(0, 3))   # sleep 0-3 seconds
    print(f'{n} Hello!\n', end='')     # don't add \n to the end of print
    
# launch our threads
all_threads = []
for i in range(5):
    t = threading.Thread(target=hello, args=(i,), name=f'hello-{i}')   # run the function with the argument of i
    t.start()   
    all_threads.append(t)
    
# wait for each thread to finish
while all_threads:   # meaning: so long as the list is non-empty
    for one_thread in all_threads:
        one_thread.join(0.1)           # wait 0.1 seconds to know if the thread is alive or dead
        if not one_thread.is_alive():  # if the thread died, then let's remove it from the list
            print(f'\tRemoved {one_thread.name}')
            all_threads.remove(one_thread)        # if the thread has finished, then we remove it from the list
    
# BELOW HERE, we *know* that all threads have ended
print('*** DONE! ***')    

0 Hello!
2 Hello!
	Removed hello-0
	Removed hello-2
3 Hello!
1 Hello!
4 Hello!
	Removed hello-4
	Removed hello-1
	Removed hello-3
*** DONE! ***


# Exercise: Vowel counts 

1. Write a function that takes a filename (a string) as an argument. In the function, it'll create a dictionary for each of the vowels in English (a, e, i, o, and u), with a value of 0. The function should then iterate over every character in the file, and count how many of each vowel there is.
2. Get five filenames for text files on your system, and put them in a list.
3. Launch a new thread for each of the files in the list of filenames.
4. Join the threads together, and indicate that things are done.

In [31]:
def vowel_counts(filename):
    counts = dict.fromkeys('aeiou', 0)
    for one_line in open(filename):
        for one_character in one_line.lower():
            if one_character in counts:
                counts[one_character] += 1
    print(f'{filename}: {counts}')

In [32]:
vowel_counts('/etc/passwd')

/etc/passwd: {'a': 481, 'e': 593, 'i': 333, 'o': 254, 'u': 184}


In [34]:
import threading

filenames = ['/etc/passwd', '/Users/reuven/.zshrc']

# launching threads for our files
all_threads = []
for one_filename in filenames:
    t = threading.Thread(target=vowel_counts, args=(one_filename,), name=f'vowel_counts-{one_filename}')
    all_threads.append(t)
    t.start()   
    
# join the threads -- meaning, wait for them to finish
while all_threads:
    for one_thread in all_threads:
        one_thread.join(0.01)
        if not one_thread.is_alive():
            print(f'Killing {one_thread.name}')
            all_threads.remove(one_thread)
            
print(f'Done!')            

/etc/passwd: {'a': 481, 'e': 593, 'i': 333, 'o': 254, 'u': 184}
Killing vowel_counts-/etc/passwd
/Users/reuven/.zshrc: {'a': 60, 'e': 102, 'i': 63, 'o': 79, 'u': 21}
Killing vowel_counts-/Users/reuven/.zshrc
Done!


In [None]:
one_thread.join()     # no number -- we will wait, possibly forever, for one_thread to end
one_thread.join(0.1)  # we will wait up to 0.1s for one_thread to end... to find out if one_thread is dead, 
                      #   we need to ask if it .is_alive()

In [40]:
# another way to iterate over our threads

import threading

filenames = ['/etc/passwd', '/Users/reuven/.zshrc']

pre_run_thread_count = threading.active_count()

# launching threads for our files
for one_filename in filenames:
    t = threading.Thread(target=vowel_counts, args=(one_filename,), name=f'vowel_counts-{one_filename}')
    t.start()       

while threading.active_count() > pre_run_thread_count:  # so long as we have more than just our main thread
    for one_thread in threading.enumerate():

        # you cannot join the main thread to itself
        if one_thread == threading.current_thread():
            continue

        one_thread.join(0.01)
            
print(f'Done!')            

/etc/passwd: {'a': 481, 'e': 593, 'i': 333, 'o': 254, 'u': 184}
/Users/reuven/.zshrc: {'a': 60, 'e': 102, 'i': 63, 'o': 79, 'u': 21}
Done!


# Switching and the GIL

Python gives each thread a chance to run of 0.05s each time. This means that however many bytecodes our thread can get through during that time, it will.  (This is approximate; Python might give slightly longer than that to a thread.)

You can find out how long each thread has by asking `sys.getswitchinterval()`.  You can even set it to something different (if you're really daring!) by running `sys.setswitchinterval()`.  Also, whenever you use I/O, Python automatically gives other threads a chance to run.

This means that your thread might lose control of the CPU between any two statements you wrote.  Or, sometimes, inside of one statement, if that statement includes more than one bytecode.

The other (and famous/infamous) issue with Python threads is the GIL (the global interpreter lock).  The GIL ensures that only one thread runs at a time. That's right: Even if you have many cores and many threads, and even if each thread is assigned to a separate core, *ONLY ONE THREAD IS RUNNING AT A TIME*.

The GIL is a mechanism that ensures only one thread can be executing Python bytecode at a given time.  Why do this? Because at the core of Python are data structures that must be synchronized across threads -- especially object reference counts.

This horrifies many people with threading experience in other languages.

If your threaded program is IO-bound -- meaning, it is spending lots of time waiting for I/O (e.g., screen, network, or filesystem) then Python threads aren't so bad (in this way).

But if your threaded program is CPU bound -- meaning, it's trying to crunch lots of data, and calculate lots of things, such that CPU execution speed is the reason why it's fast or slow -- then threads are almost certainly a TERRIBLE idea.

# What are the real problems with threads, then?  Shared data

Remember: Threads share global variables. That makes them lightweight and efficient. But you have to be really, really careful when you're working with shared, mutable data.

In [44]:
from collections import Counter

element_count = 10              # how many elements are in our list?
numbers = [0] * element_count   # create a list of all 0s, with element_count items
index = 0                       # start our index at 0

def increment_all_elements():
    global index    # I want to be able to modify index from within my function
    while True:
        if index >= element_count:
            break
            
        numbers[index] += 1     # add 1 to the element at index
        index += 1          # increment the index
        
increment_all_elements()

# how many times does each number appear in our list, numbers?

for key, value in Counter(numbers).items():
    print(f'count {key} appeared {value} times')

count 1 appeared 10 times


In [47]:
# why do this serially?
# I can fire up a few threads
# each thread can increment a few of the elements in the list

import threading
from collections import Counter

element_count = 10_000_000      # how many elements are in our list?
thread_count = 2                # how many threads do I want to share the load?
numbers = [0] * element_count   # create a list of all 0s, with element_count items
index = 0                       # start our index at 0

def increment_all_elements():
    global index    # I want to be able to modify index from within my function
    while True:
        if index >= element_count:
            break
            
        numbers[index] += 1     # add 1 to the element at index
        index += 1              # increment the index
        
all_threads = []
for i in range(thread_count):
    t = threading.Thread(target=increment_all_elements)
    all_threads.append(t)
    t.start()

for one_thread in all_threads:
    one_thread.join()

# how many times does each number appear in our list, numbers?
for key, value in Counter(numbers).items():
    print(f'count {key} appeared {value} times')

Exception in thread Thread-85:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.7/Frameworks/Python.framework/Versions/3.9/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python@3.9/3.9.7/Frameworks/Python.framework/Versions/3.9/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-47-eadc512ed50b>", line 19, in increment_all_elements
IndexError: list index out of range


count 1 appeared 7980898 times
count 2 appeared 1982903 times
count 0 appeared 33 times
count 3 appeared 36166 times


# What can we do?

One answer: Lock a region of our code.  Make sure that only one thread can execute within part of our code. Even if the currently executing (locking) thread loses the CPU, other threads won't be allowed to execute that code.  This will solve the problem... and lead to others.

In [49]:
# we can use threading.Lock -- a class whose instances ensure this kind of locking

import threading
from collections import Counter

increment_lock = threading.Lock()
element_count = 10_000_000      # how many elements are in our list?
thread_count = 2                # how many threads do I want to share the load?
numbers = [0] * element_count   # create a list of all 0s, with element_count items
index = 0                       # start our index at 0

def increment_all_elements():
    global index    # I want to be able to modify index from within my function
    while True:
        with increment_lock:   # only one thread can execute lines 15-20 at a time
            if index >= element_count:
                break

            numbers[index] += 1     # add 1 to the element at index
            index += 1              # increment the index
        
all_threads = []
for i in range(thread_count):
    t = threading.Thread(target=increment_all_elements)
    all_threads.append(t)
    t.start()

for one_thread in all_threads:
    one_thread.join()

# how many times does each number appear in our list, numbers?
for key, value in Counter(numbers).items():
    print(f'count {key} appeared {value} times')

count 1 appeared 10000000 times


# Next up

1. queues (and how they solve two problems, sometimes)
2. producer-consumer systems
3. events and timers

# Queues

The only truly thread-safe data structure in Python is the `Queue` class, which lives in the `queue` module in the standard library.  Yes, it's `queue.Queue` (notice the capitalization).    A queue is also known as as FIFO, short for "first in, first out."

A `Queue` object has a few methods:
- `put` adds a new object to its end
- `get` retrieves the first object at its start

In a threaded program, you can create a global `Queue` object that is shared across all threads. Each thread, instead of printing to the screen or returning a value (which would be ignored), can `put` its return value onto a `Queue`.

Then, when our threads are all done with their work, we can join them and retrieve items from the queue.

