## Thinking about Concurrency, Raymond Hettinger

Markos Flavio B. G. O.

This notebook highlights the keynotes and the walkthrough of the presentation of Raymond Hettinger on PyCon 2016, 'Thinking about Concurrency'.

The original content is available at:
    - https://www.youtube.com/watch?v=Bv25Dwe84g0
    - https://pybay.com/site_media/slides/raymond2017-keynote/index.html
    - https://www.youtube.com/watch?v=9zinZmE3Ogk&t=122s
    - https://pybay.com/site_media/slides/raymond2017-keynote/index.html

### Introduction

We'll walk through two examples of threading and multiprocessing to illustrate rules and best practices for taking advantage of concurrency.

### Why Concurrency?

Concurrency is about taking advantage of the computer power available to you.

1.	Improve perceived responsiveness (slower wait time for different requests)
2.	Improve speed: mainly when we use multiple processes (multiple cores) - you through more clock cycles to the problem; remember, sometimes, concurrency can make your code go slower.
3.	Because that is how the real world works. Pretty much of all of the concurrency primitives that we use in a computer are also used in project management. For example, let’s say two independent teams are working on a project of a satellite. The two teams need to finish their own job before the two parts can be assembled together, this is a simple thread join. During the satellite construction, it needs to be put in a “shake room” that effectively shakes the satellite to see if it falls apart before the launch. You can have two satellites in the shake room at the same time, you need mutually exclusion, you need locks.
In this context, when we want to computer systems to model the real world, it makes sense to think about concurrency as the standard way of approaching a problem, instead of viewing it as a last resort.

### Martelli Model of Scaleability

There’re three kinds of programs:

- 1 core: Single thread and single process: they take advantage of one core.
- 2-8 cores: Multiple threads and multiple processes.
- 9+ cores: Distributed processing.

Martelli’s observation: As time goes on, the second category becomes less common and relevant and single cores become more powerful (current cores are thousand time faster than 80’s cores; thus, the range of problems that can be solved using one core, is much larger than before). For example, we can use Tensor Flow to train a very good hand written or voice recognition model in a feasible amount of time just in a single core. However, if we're dealing with a big data context where big datasets grow ever larger (if the data that you’re talking about fits in your machine, that’s not big data), we must rely on distributed processing.
The consequence of this is that the second category (that refers to single machiune swith multiple cores) becomes less relevant over time, because the other two paradigms fits better to current problems. However, current machines still have more than one core embed on it and it’s really unpleasant to use only 1/8th to the power of programs that are built on top of that machines. Now, suppose we're dealing with a problem in an 8-core machine that requires seven cores. We can use this machine to attack the problem using multiple threads and multiple processes. However, the problem complexity is so close to the computational limit of the machine that if the probelm grows 20% we'll not be able to use the current setup to solve it anymore. Thus, if you have a problem in this range (that's solvable from 2 to 8 cores) you happend to be be lucky for just that point in time. And, it doesn't wirth the effort to be working in this space, because you just happend to temporarily be in it. **However, even if you're in the distributed processing scenario, you want to take advgantage of all cores of all parallel machines; thus the second category, whose approach relies on multi-threading and multi-processing is still an area of interesting even if the problem is going to be bigger than that.**

### Global Interpreter Lock

CPython has a single lock for its internal shared global state. The payoff of GIL is that you don't pay the performance costs that individual locks (necessary if GIL didn't existed to make Python work properly) requires. The unfortunate effect of the GIL is that no more than one thread can run at a time. However, we have some ways of solving that problem: if you can't get free threading in one python, why don't run eght Python codes in parallel, each with their own thread to get the advantage of all cores. Or, we can combine multi-threading and multi-processing, etc (there're a number of ways to ignore the GIL).

For I/O bound applications, the GIL doesn’t present much of an issue (for Web Servers and whatnot, multi-threading works fine). However, for CPU bound applications, using threading makes the application speed worse. Accordingly, that drives us to multi-processing to gain more CPU cycles.

### Threads vs Processes

“Your weakness is your strength and your strength is your weakness”.

The strength of threads is shared state (communication is quickly). The weakness of threads is shared state (managing race conditions: every multi-threading have a race condition (so, you have to design "manual" locks so the threads "don't step on each other", because if it didn’t, you didn’t really needed multi-threading).

The strength of processes is their independence from one another (they don’t have a shared state). The weakness of processes is lack of communication (hence, the need for IPC to move objects between the process and other additional overhead: in the multiprocessing  module, some issues are abstracted away from the user and they’re hidden; however, if you’re moving a lot of data, it’s very important to be aware pickling the data through IPC and such tremendous amount of overhead; if you’re using multi-process in a thread pool you need to be aware of the good news, a shared state, and bad news, a potential for race conditions and a GIL that keeps you for using multiple cores).

### Threads vs Async

#### Threads
Threads switch preemptively (the thread manager (not the current thread itself) decides for you when to switch tasks). This is convenient because you don’t need to add explicit code to cause a task switch.

The cost of this convenience is that you have to assume a switch can happen at any time. For example,  let's say two variables must be consistent to each other, having equal values. However, if you update one and get preempted the other one might not be updated and you'll leave the system in an incoherent state. In fact, that's reason for the GIL to exist, so as you execute your Python program, the global state is constantly updating which task is runing, which line number are you on, etc.

Accordingly, critical sections (very important code sections) have to be guarded with locks, or queues, or other type of synchronization tools. And the idea is that if two things are happpened together, I aquire a lock which says "nobody else should be runing right now", do the critical section and then, release the lock and let other tasks to run. Reasoning about critical sections and developing locks correctly in large multi-threading programs is insanely difficult, even though it's possible to get it correct. In this context, a lock is essentially a flag (it, per se, doesn't lock anything) that signals something is locked. And, if someone checks that signal and receive the "locking information", it will not touch on the locked resource, but only if it's checked. If another thread forgets to aquire the lock. Thus, even if the large multi-threading program is  correct, it won't necessarily stay correct over time because tiny little adjustments to the code can cause it to become incorrect in a way that is hard to see during code reviews.   

The limit on threads is total CPU power (number of CPU cycles) minus the cost of task switches and synchronization overhead. For example, every time a task switch and every time you acquire and release locks they eat some CPU cycles. That's why one single lock (GIL) is constless than several. Thus, threading never adds power to the system, if fact it consumes some power. Now, threading is useful when you can spend some CPU power for solving the problem, if you can't spend such power (as in CPU bound applications) you must rely on a different approach: asynchronous (Async).

#### Async
Async switches cooperatively (you don't get interrupted in arbitrarily times, when doing your work and when you get to a good stopping point and your state is consistent you go back to the async manager and let other task to run), so you do need to add explicit code “yield” or “await” to cause a task switch (that's a little disadvantage that worth to go thorugh given its advantages).

Now you control when task switches occur, so locks and other synchronization are no longer needed. 

Also, the cost task switches is very low. Calling a pure Python function has more overhead than restarting a generator or awaitable, i.e. the cost of calling a function in Python is more expensive than a task switch in Async. This means that async is very cheap. Async uses generators under the hood that store all of their states (stack frame) and when we call the generator again, it has all the information it needs to continue to work. Otherwise, when calling a funciton, it needs to build up the state, build a new stack frame on every call.  

We can say that for CPU-bound applications, because Async is very cheap (by far) than threading, it makes sense to use it, because it gives us more CPU clocks, that would be lost in threading due task switches and synchronization overhead. While we can run hundreads of threads in parallel we can run thousands or tens of thousands async tasks per second.

Also, because Asyncs doesn't have locks ut's a lot easier to get the code correct. You just change tasks when they are consistent and you don't have to worry about arbitrarily interruptions.

In return, you’ll need a non-blocking version of just about everything you do; you can't just read from a file anymore using a standard Python funciton for it - you need to use a Async compatible function. Accordingly, the async world has a huge ecosystem of support tools. This increases the learning curve, which is even higher than the one of threading.

#### Comparison
- Async maximizes CPU utilization because it has less overhead than threads.
- Threading typically works with existing code and tools as long as locks are added around critical sections. Thus, if you are using in a project a lot of existing code and existing libraries and want to take advantages of concurrency, you should use threading because changing all that code to fit Async may be unfeasible. 
- For complex systems, async is much easier to get right than threads with locks.
- Threads require very little tooling (locks and queues).
- Async needs a great deal of tooling (futures, event loops, and non-blocking versions of just about everything). "We only know half of Python, the other half of it, is Async." Async is continually to grow, reaching its coverage on the entire language; anything in the standard Python language that doesn't fit with Async is about to get its own version of it that is asynchronous. 

Raymond Gettinger here says that that even though with async Python will roughly double its size, Async it's the future for concurrency in Python community. That's because threads are hard to develop and are very expensive. Also, as the ecosystem of Async is getting bigger (which it is), it becomes easier to use. 

### Threading Example
#### Scripting style

Start with working code that is clear, simple, and runs top to bottom (create global variables). This is easy to develop and test incrementally (type a few lines, run itm, type a few lines, run it..). This style is very quick, it let's you concretely see your results, it let's you to tst as you go and you can reliablly knock out code even with little experience in programming.

**An important note abnout threading: Get your app tested and debugged in a singled threaded mode first before you start threading. Threading NEVER makes debugging easier (it always includes another layer of complexity).**

Updating and printing a counter:

In [6]:
counter = 0 # global variable

print('Starting up')
for i in range(10):
    counter += 1
    print('The count is %d' % counter)
    print('---------------')
print('Finishing up')

Starting up
The count is 1
---------------
The count is 2
---------------
The count is 3
---------------
The count is 4
---------------
The count is 5
---------------
The count is 6
---------------
The count is 7
---------------
The count is 8
---------------
The count is 9
---------------
The count is 10
---------------
Finishing up


#### Function style
A next step in development is to factor re-usable code into functions. Below, the reusable component is the *worker* function.

In [8]:
counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter

    counter += 1
    print('The count is %d' % counter)
    print('---------------')

print('Starting up')
for i in range(10):
    worker()
print('Finishing up')

Starting up
The count is 1
---------------
The count is 2
---------------
The count is 3
---------------
The count is 4
---------------
The count is 5
---------------
The count is 6
---------------
The count is 7
---------------
The count is 8
---------------
The count is 9
---------------
The count is 10
---------------
Finishing up


#### Multi-threading is easy!
It is just a matter of launching a few worker threads. Note below that instead of callinf the *worker* function directly, we launch it in a thread so that we have a main thread running (the *for* loop) and the *worker^* thread.

This code is very similar to the original functional one.

In [13]:
import threading

counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter

    counter += 1
    print('The count is %d' % counter)
    print('---------------')

print('Starting up')
for i in range(10):
    threading.Thread(target=worker).start()
print('Finishing up')

Starting up
The count is 1
---------------
The count is 2
---------------
The count is 3
---------------
The count is 4
---------------
The count is 5
---------------
The count is 6
---------------
The count is 7
---------------
The count is 8
---------------
The count is 9
---------------
The count is 10
---------------
Finishing up


The code above is broken (we'll see later why), however it passed in the test of running the code: no errors were returned. In this context..

#### Can you spot the race conditions?

*A race condition occurs when two or more threads can access shared data and they try to change it at the same time. Because the thread scheduling algorithm can swap between threads at any time, you don't know the order in which the threads will attempt to access the shared data.*

Most people spot the “counter increment” race condition: we can have one thread looking at the value of *counter*, another thread looking at the same value of *counter*, both will increment the same base value, both of them will write out the same incremented values and we have two workers run, but our *counter* is incremented one time.

Even though, there's a race condition here; now, because this happens so fast, it's unlikely that a task switch is going to happen in between the read and the write of the counter inside the worker function, so you can possibly run this code billions of times and never have this problem show up, even after deployment. 

Also, there's another race condition happening here, that's a little more hidden, which is “print function” race condition. We have the main thread printing the 'Finishing up' string and the other threads printing the counters. It's possible to have the main thread string printed before some of the counter's printing.

Why didn’t testing reveal the flaws?

**Note: Testing cannot prove the absence of errors. It is still useful, don’t rely on it in the context of multi-threading. Many interest racing conditions don’t reveal themselves in test environments (they manifest themselves under load, abnormal conditions, in ways there're really hard to reproduce, creating Heisenberg's bugs (from Heisenberg's uncertainty principle): if you're looking at the bug, the act of looking at it, causes the object to changes its behavior towards no longer a buggy. In particular, if we run a code like this through a dubugger, we'll never see the race condition, because the debugger interferes with all the races.**

Nevertheless, there's a technique that can be used to amplify the race existing conditions.

#### Fuzzing
Fuzzing is a technique for amplifying race conditions, so they become visible. The technique is based on putting the program to sleep in between to each command of the operation a random period of time; we put it between every code step, becuse a thread switch can happen any time. It isn't a perfect technique, but it's a decent way of detecting race conditions.

In [2]:
import threading, time, random

##########################################################################################
# Fuzzing is a technique for amplifying race condition
# errors to make them more visible

FUZZ = True

def fuzz():
    if FUZZ:
        # random sleep between 0 and 1 second
        time.sleep(random.random()) 

###########################################################################################

counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter

    fuzz()
    oldcnt = counter
    fuzz()
    counter = oldcnt + 1
    fuzz()
    print('The count is %d' % counter, end='')
    fuzz()
    print() # prints a new line
    fuzz()
    print('---------------', end='')
    fuzz()
    print() # prints a new line
    fuzz()

print('Starting up')
fuzz()
for i in range(10):
    threading.Thread(target=worker).start()
    fuzz()
print('Finishing up')
fuzz()

Starting up
The count is 2
---------------
The count is 2The count is 2

---------------
---------------The count is 3The count is 3


---------------
---------------
The count is 4The count is 5The count is 5
---------------Finishing up
The count is 6

The count is 6


---------------
------------------------------
---------------



This kind of output could have happened in production runs of the original code, under load or other unreproducible conditions.

This technique is limited to relatively small blocks of code and is imperfect in that is can’t prove the absence of errors. Still, fuzzed tests do reveal the presence of errors (like above).

#### 1st Approach: More Careful Threading with Queues
Interestingly, the rules for threading are just for computing and programming. The physical world is full of concurrency as well. Many of these techniques has physical analogs that are useful for managing people and projects.

One careful way of dealing with multi-hreading is to use atomic message queues instead of locks. Locks are great if you're developing an operating system, however, for anything higher-level than that, i.e. real applications, people don't think in terms of the complexity of locks. Thus, we need a more high-level primitives: we'd like to have something that we can emulate to, atomic message queues.

We deal with a atomic message queues because we indeed have a queue object in Python, but anything that you can communicate atomically would work, such as email accounts.

#### Raymond Rules


**RR 1000**

ALL shared resources SHALL be run in EXACTLY ONE thread. ALL communication with that thread SHALL be done using an atomic message queue: typically the Queue module, email, message queues like RabbitMQ or ZeroMQ, interesting you can communicate via a database as well.

Resources that need this technique: global variables, user input, output devices, files, sockets, etc.

Some resources that already have locks inside (thread-safe): logging module, decimal module (thread local variables), databases (reader locks and writer locks), email (this is an atomic message queue). Pretty much everything else should presumed to be non-atomic in nature and should be wrapped in *its own* thread.

**RR 1001**

One category of sequencing problems is to make sure that step A and step B happen sequentially. The solution is to put both in the same thread where all actions proceed sequentially.

**RR 1002**

To implement a “barrier” that waits for parallel threads to complete, just join() all of the threads. Remember: a terrible time to use a join() (or to wait to some task to be finished) is when we're not sure if the thread is going to finish; an infinit loop. A thread that has an infinite loop that never finishes is called a *deamon* thread. *Deamonize a thread is to mark the thread and say that it's never going to finish, so don't wait on it!*

**RR 1003**

You can’t wait on daemon threads to complete (they are infinite loops). Instead, you join() on the message queue itself, rather on the thread itself. It waits until all the requested tasks are marked as being done.

Let's see an example where workers (threads) are people and links of communication are email accounts. You send an email to a worker and says "I'd like you to do task A". Now, you should not wait to the worker to read of all the emails because this didn't mean the worker finished task A, it only means that the worker received and started the task. Thus, it's insufficient to wait for the message queue to be empty. Instead, I need to send the following email to my worker: "Do task A and answer me when you're done". And that's a tradicityonal way of doing in Python: using two message queues. An existing method built on the message queue itself is a method called *task done*. The workerretrieves a message, do the work and mark the task as being done, so that someone can join, not the thread, but join the email (message) queue. Thus:
- when you have a non-deamon threads, you join the thread.
- when you have a deamon threds, you join the email queue for talking to it.

**RR 1004**

Sometimes you need a global variable to communicate between functions. Global variables work great for this purpose in a single threaded program. In multi-threaded code, it mutable global state is a disaster. The better solution is to use a threading.local() that is global WITHIN a thread but not without; that mans each tread will have its wn copy of that global variable.

**RR 1005**
Never try to kill a thread from something external to that thread. You never know if that thread is holding a lock; if it is, every other thread that's going to wait on that lock, instantly become dead locks. Python doesn’t provide a direct mechanism (API) for kill threads externally (because you shouldn't do it); however, you can do it using ctypes, but that is a recipe for a deadlock.

Killing a thread is a conceptually terrible idea (not an implementation problem), because if we kill a thread that's holding a lock, our program will be in a deadlock state.

*Note: a deadlock is a situation in which two computer programs sharing the same resource are effectively preventing each other from accessing the resource, resulting in both programs ceasing to function.*

We can still kill a thread in Python, calling the operating system and killing manually. Another way is to using the *ctypes* module to reach into the thread and kill it: *However, just because a language allows you to dop something, it doesn't mean you should do something".

#### Applying all the rules

Note below that:
- The counter and print (two apparent cources of race conditions) were isolated in their own threads.
- The counter and print threads are both deamon, they both have infinitue loops inside them (*while True..*), and they are "marked" as such.
- Because both threads are deamon, we should join the message queue instead of the threads themselves.
- The counter thread, that has exclusive rights to updating the counter varible, performs:
    1. It constantly checks if it receives any new message through the *get()* method.
    2. It increments the counter.
    3. It sends an atomic message to the print queue saying "print this message". The print might run in its own speed, but it also has an atomic message queue that sequences all of the actions coming in.
    4. After done, it call the *task_done()* method, so that later we can wait on the queue itself to see if its done.
- The print thread, that has exclusive rights to call the print keyword, performs:
    1. Looking if it receives any new message through the *get()* method; otherwise it sleeps.
    2. Prints every line of the message queue. In fact, there's a race condition here. However, it's not a problem, because this thread has exclusive rights of using the print condition and thus, it will only win the race (it has only one competitor).
    3. After done, it call the *task_done()* method.
- The worker job is to send a message to the conter queue. Remember, we cannot increment the counter directly, we can only communicate between queues.
- In the main loop we start the worker threads, because they are not deamon threads. Thus, after starting such threads we join them. It doesn't mean that the increments and printing are done, because all the workers job is to send an email (message); so our guaranteee is that we wait until all the ten messages are sent.
- After joining the workers, we join the counter and print queues. We do not wait until these queues are empty, because we're dealing will deamon threads; we wait on ther queues untill both of them says "for every message that I've received, I have a 'task done'".
- When the final message to the print queue saying "Finishing up", we're not guaranteed that all the 10 print messages are in fact, already printed. However, we've guaranteed that such queue already received 11 previous messages, starting with the "Starting up" message. Because the queue is FIFO, we've guaranteed that the last message to be printed is the last in the queue.

That's the smallest possible solution to deal with this problem correclty considering multi-threading. Look how a simple problem has its complexity increased in a multi-threading context. We must fear and respect for multi-threading. However, if we apply the rules above, we can systematically work through any problem and guaranteee that the code is correct. 

In [6]:
import queue #FIFO
##########################################################################################
# Fuzzing is a technique for amplifying race condition
#errors to make them more visible

FUZZ = True # we set this to true for debugging purposes

def fuzz():
    if FUZZ:
        time.sleep(random.random())

###########################################################################################

counter = 0

counter_queue = queue.Queue()

def counter_manager():
    'I have EXCLUSIVE rights to update the counter variable'
    global counter

    while True:
        increment = counter_queue.get()
        fuzz()
        oldcnt = counter
        fuzz()
        counter = oldcnt + increment
        fuzz()
        print_queue.put([
            'The count is %d' % counter,
            '---------------'])
        fuzz()
        counter_queue.task_done()

t = threading.Thread(target=counter_manager)
t.daemon = True
t.start()
del t

###########################################################################################

print_queue = queue.Queue()

def print_manager():
    'I have EXCLUSIVE rights to call the "print" keyword'
    while True:
        job = print_queue.get()
        fuzz()
        for line in job:
            print(line, end='')
            fuzz()
            print()
            fuzz()
        print_queue.task_done()
        fuzz()

t = threading.Thread(target=print_manager)
t.daemon = True
t.start()
del t

###########################################################################################

def worker():
    'My job is to increment the counter and print the current count'
    counter_queue.put(1)
    fuzz()

print_queue.put(['Starting up'])
fuzz()

# main loop:
worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
    fuzz()
for t in worker_threads:
    fuzz()
    t.join()

counter_queue.join()
fuzz()
print_queue.put(['Finishing up'])
fuzz()
print_queue.join()
fuzz()

Starting up
The count is 1
---------------
The count is 2
---------------
The count is 3
---------------
The count is 4
---------------
The count is 5
---------------
The count is 6
---------------
The count is 7
---------------
The count is 8
---------------
The count is 9
---------------
The count is 10
---------------
Finishing up


#### Cleaned-up code without fuzzing
For production.

In [8]:
import threading, queue

###########################################################################################

counter = 0

counter_queue = queue.Queue()

def counter_manager():
    'I have EXCLUSIVE rights to update the counter variable'
    global counter

    while True:
        increment = counter_queue.get()
        # RR1: if want to thing to happen sequantially: but that things in
        # the same thread. Below, after the increment we send a request a print to be done
        # (by putting a new massage in the print queue). That guarantess that updating and printing
        # happen sequentially.
        
        counter += increment
        print_queue.put([
            'The count is %d' % counter,
            '---------------'])
        counter_queue.task_done()

# the counter is isolated in its own deamon thread: counter manager
# other thread is never going to update the counter, they just send a
# message to the counter thread that an update must be done. Then, the updates
# contained in the atomic message queue are performed sequentially, one 
# at a time. We're isolating resources: RR2.
t = threading.Thread(target=counter_manager)
t.daemon = True
t.start()
del t # The del keyword is used to delete objects.
# In Python everything is an object, so the del keyword
# can also be used to delete variables, lists, or parts of a list etc.

###########################################################################################

print_queue = queue.Queue()

# The print has its own deamon thread and we comunicate to that through
# its message queue. 
def print_manager():
    'I have EXCLUSIVE rights to call the "print" keyword'
    while True:
        job = print_queue.get()
        for line in job:
            print(line)
        print_queue.task_done()

t = threading.Thread(target=print_manager)
t.daemon = True
t.start()
del t # The del keyword is used to delete objects.
# In Python everything is an object, so the del keyword
# can also be used to delete variables, lists, or parts of a list etc.

###########################################################################################

def worker():
    'My job is to increment the counter and print the current count'
    counter_queue.put(1)

print_queue.put(['Starting up'])
# launching the workers
worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
# waiting for the workers to be done (RR3)
for t in worker_threads:
    t.join()
# waiting for the deamon threads to be done (joining the queue
# instead of the thread) 
counter_queue.join()
print_queue.put(['Finishing up'])
print_queue.join()

Starting up
The count is 1
---------------
The count is 2
---------------
The count is 3
---------------
The count is 4
---------------
The count is 5
---------------
The count is 6
---------------
The count is 7
---------------
The count is 8
---------------
The count is 9
---------------
The count is 10
---------------
Finishing up


The previous code with just seven lives of code, becomes a code with about 60 lines. Indeed, it requires a great effort to get multi-threading correct: **It's far more complicated and time demanding than creating the application itself**.

#### 2nd Approach: Careful Threading with locks
This isn't a recommended way of handling multi-threads due the complexity of the primitive, even though the code itself might be easier sometimes. 

In [9]:
import threading, time, random

##########################################################################################
# Fuzzing is a technique for amplifying race condition
# errors to make them more visible

FUZZ = True

def fuzz():
    if FUZZ:
        time.sleep(random.random())

###########################################################################################

counter_lock = threading.Lock()
printer_lock = threading.Lock()

counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter
    with counter_lock:
        oldcnt = counter
        fuzz()
        counter = oldcnt + 1
        fuzz()
        with printer_lock:
            print('The count is %d' % counter, end='')
            fuzz()
            print()
            fuzz()
            print('---------------', end='')
            fuzz()
            print()
        fuzz()

with printer_lock:
    print('Starting up', end='')
    fuzz()
    print()
fuzz()

worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
    fuzz()
for t in worker_threads:
    t.join()
    fuzz()

with printer_lock:
    print('Finishing up', end='')
    fuzz()
    print()

fuzz()

Starting up
The count is 1
---------------
The count is 2
---------------
The count is 3
---------------
The count is 4
---------------
The count is 5
---------------
The count is 6
---------------
The count is 7
---------------
The count is 8
---------------
The count is 9
---------------
The count is 10
---------------
Finishing up


Without fuzzing:

In [10]:

counter_lock = threading.Lock()
printer_lock = threading.Lock()

counter = 0

def worker():
    'My job is to increment the counter and print the current count'
    global counter
    with counter_lock:
        counter += 1
        with printer_lock:
            print('The count is %d' % counter)
            print('---------------')

with printer_lock:
    print('Starting up')

worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
for t in worker_threads:
    t.join()

with printer_lock:
    print('Finishing up')

Starting up
The count is 1
---------------
The count is 2
---------------
The count is 3
---------------
The count is 4
---------------
The count is 5
---------------
The count is 6
---------------
The count is 7
---------------
The count is 8
---------------
The count is 9
---------------
The count is 10
---------------
Finishing up


Lock approach (preliminary) results:

- It is perfect!
- It is beautiful.
- It is simpler than using queues.

However..

#### Notes on Locks

**RR 1005**

Locks don’t lock anything. They are just flags and can be ignored. It is a cooperative tool, not an enforced tool.

Look at the *print* function. Is it locked? Somewhere else in the code I can add a *print* without a lock; so locks don't really lock anything in the multi-threading code; someone can access a global variable anywhere.

**RR 1006**

In general, locks should be considered a low level primitive that is difficult to reason about in non-trivial examples. For more complex applications, you’re almost always better off with using atomic message queues.

**RR 1007**

The more locks you are acquire at one time, the more you lose the advantages of concurrency.

The code above is correct, but it's slower than the original, it takes no advantage of the concurrency at all, it's fully sequential and thus, deterministic (it runs in same way every time). Putting too many locks on a problem, may transform the solution to a sequential one, where we undone all the efects (positive and negative) of multi-hreading. This version works exactly the same as the original seven lines version.

Multi-htreading approaches that relies on locks to avoid race conditions that run deterministically during several tests, means that there's no multi-htreading and the code runs sequentially due locking.

Also, if you write a correct program using locks that as well designed and is not actually sequential, you're doing something that is almost isomorphic to using the atomic message queues. In fact, the *queue* module is a very thin layer built on top of a deck that has locks on it, the same locks that you've written anyway. However, wether we're using message queues or locks, it doesn't take away the fact that we have a GIL that threading does nothing for you in CPython. So, you would want to switch to the multi-processing module to take advantage of multiple cores. Multi-processing have all the same concepts, including atomic message queues.

Now, the queue module already have locks in it, so we do not need to work with the low level tool and mess with the synchronization.

#### Dining Philosophers

*In computer science, the dining philosophers problem is an example problem often used in concurrent algorithm design to illustrate synchronization issues and techniques for resolving them.*

*Five silent philosophers sit at a round table with bowls of spaghetti. Forks are placed between each pair of adjacent philosophers.*

*Each philosopher must alternately think and eat. However, a philosopher can only eat spaghetti when they have both left and right forks. Each fork can be held by only one philosopher and so a philosopher can use the fork only if it is not being used by another philosopher. After an individual philosopher finishes eating, they need to put down both forks so that the forks become available to others. A philosopher can take the fork on their right or the one on their left as they become available, but cannot start eating before getting both forks.*

*Eating is not limited by the remaining amounts of spaghetti or stomach space; an infinite supply and an infinite demand are assumed.*

*The problem is how to design a discipline of behavior (a concurrent algorithm) such that no philosopher will starve; i.e., each can forever continue to alternate between eating and thinking, assuming that no philosopher can know when others may want to eat or think.*

The rules given above help you reliably create multi-threaded code when the underlying data flow is a DAG (directed acyclic graph).

When the control flow or data flow is circular, the problem can be much harder (but still solvable). At that point, more formal design and verification techniques are warranted. Otherwise, it can be quite difficult in complex applications to avoid deadlock, have thread starvation, or to have unfair solutions.

### Multi-processing Example

#### Scrypting style
Displaying the homepage sizes for multiple website

In [12]:
import urllib.request

sites = [
    'https://www.yahoo.com/',
    'http://www.cnn.com',
    'http://www.python.org',
    'http://www.jython.org',
    'http://www.pypy.org',
    'http://www.perl.org',
    'http://www.cisco.com',
    'http://www.facebook.com',
    'http://www.twitter.com',
    #'http://www.macrumors.com/',
    'http://arstechnica.com/',
    'http://www.reuters.com/',
    'http://abcnews.go.com/',
    'http://www.cnbc.com/',
]

for url in sites:
    with urllib.request.urlopen(url) as u:
        page = u.read()
        print(url, len(page))

https://www.yahoo.com/ 439618
http://www.cnn.com 1133779
http://www.python.org 49694
http://www.jython.org 10286
http://www.pypy.org 5539
http://www.perl.org 12397
http://www.cisco.com 93949
http://www.facebook.com 127619
http://www.twitter.com 385266
http://arstechnica.com/ 76107
http://www.reuters.com/ 182283
http://abcnews.go.com/ 199595
http://www.cnbc.com/ 1678958


#### Fucntion sytle

In [13]:

sites = [
    'https://www.yahoo.com/',
    'http://www.cnn.com',
    'http://www.python.org',
    'http://www.jython.org',
    'http://www.pypy.org',
    'http://www.perl.org',
    'http://www.cisco.com',
    'http://www.facebook.com',
    'http://www.twitter.com',
    #'http://www.macrumors.com/',
    'http://arstechnica.com/',
    'http://www.reuters.com/',
    'http://abcnews.go.com/',
    'http://www.cnbc.com/',
]

def sitesize(url):
    ''' Determine the size of a website '''
    with urllib.request.urlopen(url) as u:
        page = u.read()
        return url, len(page)

for result in map(sitesize, sites):
    print(result)

('https://www.yahoo.com/', 438839)
('http://www.cnn.com', 1133779)
('http://www.python.org', 49694)
('http://www.jython.org', 10286)
('http://www.pypy.org', 5539)
('http://www.perl.org', 12397)
('http://www.cisco.com', 93949)
('http://www.facebook.com', 127767)
('http://www.twitter.com', 385265)
('http://arstechnica.com/', 76107)
('http://www.reuters.com/', 182298)
('http://abcnews.go.com/', 199595)
('http://www.cnbc.com/', 1678949)


Remember to make your code running correctly in a single thread before putting it to work with concurrency. In this case, the easiest way is to use the *map* function, which is sequential. Interestingly, the *map* function makes the transition easier to the multi-processing approach:
** a good development strategy is to use map to test the code in a single process and single thread mode before switching to multi-processing.**

#### What is parallelizeable?
A key pattern of thinking is to divide the world into to “lawn mowing” (total time is roughly (that's a little overhead due coordination of workers) divided by the number of workers) versus “baby making” (we have to wait 9 months for the work to be done, regardless the number of workers) – identifying tasks that are significantly parallelizeable versus those that are intrinsically serial. Most problems are in a slighly scale in between these two extremes of "baby making" and "lawn mowing" and this is quantitied by something called Amdahl’s Law.

Amdahl’s Law (according to wikipedia):
*Amdahl’s law is often used in parallel computing to predict the theoretical speedup when using multiple processors. For example, if a program needs 20 hours using a single processor core, and a particular part of the program which takes one hour to execute cannot be parallelized, while the remaining 19 hours (p = 0.95) of execution time can be parallelized, then regardless of how many processors are devoted to a parallelized execution of this program, the minimum execution time cannot be less than that critical one hour. Hence, the theoretical speedup is limited to at most 20 times (1/(1 − p) = 20). For this reason parallel computing is relevant only for a low number of processors and very parallelizable programs.*

Detailed example:

In the parallizeable approach, both steps assigned with 1) can be done together. However, the first one, is an expensive step, measured in miliseconds and the second one is cheap, being measured in microseconds. The following steps must that both previous steps be done to start working. Thus, in this case, there's really no value at parallelizing this task.

Now, the HTTP request (step 2)) it's interesting. One requesting from home, you tipically has only one connection out to the internet. But, when requesting from your work, you've a bundle of fiber. In this scenario, you can send many several HTTP range requests (each request covering a portion of the whole page), getting the data that come back in parallel and then reassebmle it. Here, we've a great deal of potential to speed our code due parallelization. This particular approach is called Chanel bonding (an example of appication is provided here: http://stackoverflow.com/questions/8293687/sample-http-range-request-session).

Also, we don't have to wait to count all characters, we may count one packet at a time.

In other words, the three commands below are indeed parallelizable. However, it probably don't worth the time of doing it, unless there're a lot of data comming out of the requests. A better approach for this problem is to treat these three commands as one single step and do many different url's in parallel, which is much easier (check the code under 'Pools of processes' section below). **Remember: when you do multi-processing, do it in a highest-level possible so that you get the maximum pay-off.**

In [14]:
def sitesize(url):
    ''' Determine the size of a website

    This is non-parallizeable (the following steps are all sequential actions):
    * UDP DNS request for the url
    * UDP DNS response
    * Acquire socket from the OS
    * TCP Connection (three-way handshake):  SYN, ACK, SYN/ACK
    * Send HTTP Request for the root resource
    * Wait for the TCP response which is broken-up
      into packets (we must join them!).
    * Count the characters of the webpage
    
    However, some steps can be parallizeable. This problem would be classified
    as 95% as "baby making" (instrinsicaly serial) and 5% as "lawn mowing". Thus, it
    doesn't worth the trouble to get this program parallelizable.
    
    For example, steps marked as 1) can be done together.
    Do ten times in parallel:
        1) DNS lookup (UDP request and resp)
        1) Acquire the socket
        2) Send HTTP range requests
        3) The sections comes back in parallel
           across different pieces of fiber.
        4) Count the characters for a single
           block as received.
    Add up the 10 results!

    '''
    u = urllib.request.urlopen(url)
    page = u.read()
    return url, len(page)

#### Pools of processes

We can get less than 9 months for one baby but we can get multiple n babies in less than n*9 months, using pools of processes. The total time here it will be the speed of making the slowest baby.

Note below the multi-processing version of the single-thread map version. We had only one line changing: we've only changed the call to the *map* function to the *imap_unordered* function.

Note that the mapping funciton returns its input (url) together with the output, which may appear unnecessary. However, if you design your mapping functions that way, you can use the imap_unordered to improve responsiveness and you don't need to care about the order of the results.

**The use of imap_unordered is made possible by designing the function to return both its argument and its result as a tuple.**

In [17]:
import urllib.request
from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing.pool import Pool

sites = [
    'https://www.yahoo.com/',
    'http://www.cnn.com',
    'http://www.python.org',
    'http://www.jython.org',
    'http://www.pypy.org',
    'http://www.perl.org',
    'http://www.cisco.com',
    'http://www.facebook.com',
    'http://www.twitter.com',
    #'http://www.macrumors.com/',
    'http://arstechnica.com/',
    'http://www.reuters.com/',
    'http://abcnews.go.com/',
    'http://www.cnbc.com/',
    'http://www.cnbc.com/',
]

def sitesize(url):
    ''' Determine the size of a website '''
    with urllib.request.urlopen(url) as u:
        page = u.read()
        return url, len(page)

pool = Pool(10)
for result in pool.imap_unordered(sitesize, sites):
    print(result)

('http://www.perl.org', 12397)
('http://www.jython.org', 10286)
('http://www.cisco.com', 93949)
('http://www.reuters.com/', 182219)
('http://www.python.org', 49694)
('http://www.cnn.com', 1133779)
('http://www.pypy.org', 5539)
('http://www.twitter.com', 385266)
('http://www.facebook.com', 127925)
('http://www.cnbc.com/', 1679107)
('http://www.cnbc.com/', 1679107)
('http://abcnews.go.com/', 199946)
('http://arstechnica.com/', 76194)
('https://www.yahoo.com/', 439968)


#### Hazards of thin channel communication


Consider a traveler from US that travels to Paris for a lunch and then, travels to Rome for dinner and to Porto for sleepping. What's wrong with this plan? There're too many trips back and forth between works. This example shows up three useful tricks to deal with multi-processing, that when violated, often generate poor solutions:
1. Don’t make too many trips back and forth.
2. Do significant work on each trip.
3. Don’t send or receive a lot of data (send in summary queries send out and summary results).

Below, we have SQL examples of violations of the rules we've say, in a xontext where we want to compute the sumation of the salary of all employees in the same department.

In [22]:
###########################################################################################
# Too many trips back and forth
# Running a query to each department.
summary = dict()
for dept in c.execute('SELECT DISTINCT dept FROM Employee'):
    c.execute('SELECT SUM(salary) FROM Employee')
    summary[dept] = c.fetchone()[0]

###########################################################################################
# Bringing too much back and not doing enough work while you are there
# Below, we load an entire employee's database and compute the summary
summary = collections.Counter()
for employee, dept, salary in c.execute('SELECT employee, dept, salary FROM Employee')
    summary[dept] += salary

###########################################################################################
# Right way is one trip with where a lot of work gets done and only a summary result in returned

summary = dict(execute('SELECT dept, SUM(salary) FROM Employee GROUPBY dept'))


SyntaxError: invalid syntax (<ipython-input-22-0d277807fb73>, line 5)

If you try any of the two first approaches, the performance will be poor. Even though, this is obvious for databases, when working with multi-processing people tend to forget these, making these same mistakes.

- In the first case, we're using a range request and returning summary data (only the length of the page), but we're making far too many trips.
- In the second case we're returning one line at a time (to many trips back and forth).
- In the third case, we're returning the entire webpage instead of its link (to much data).

In [None]:
# Too many trips back and forth
# If the input iterable to map is very large, it suggests you're making too many trips

def sitesize(url, start):
    req = urllib.request.Request()
    req.add_header('Range:%d-%d' % (start, start+1000))
    u = urllib.request.urlopen(url, req)
    block = u.read()
    return url, len(block)


###########################################################################################
# Not doing enough work relative to the travel time
# Once you get to a process, be sure to do enough work to make the trip worthwhile

def sitesize(url, results):
    with urllib.request.urlopen(url) as u:
        while True:
            line = u.readline()
            results.put((url, len(line)))


###########################################################################################
# Taking too much with you or bringing too much back
def sitesize(url):
    u = urllib.request.urlopen(url)
    page = u.read()
    return url, page

#### Other Multi-processing notes

Never run a multi-processing example from within an IDE that runs in the same process as the code you are developing. Otherwise, the forking step will fork the IDE itself as well as your code.

When partitioning into subtasks, a common challenge is how to handle data at the boundaries of the partition.

Setting the number of processes is a bit of an art. If the code is CPU bound, the number of cores times two is a reasonable starting point. If the code is IO bound, the number of cores can be much higher. Experimentation is the key.

### Combining Threading and Forking

Never combine threading and forking (fork is an operation whereby a process creates a copy of itself), it will produce deadlocks every time.

The general rule is if you will combine threading and forking, which you shouldn't, “thread after you fork, not before”. Otherwise, the locks used by the thread executor will get duplicated across processes. If one of those processes dies while it has the lock, all of the other processes using that lock will deadlock.

In other words. If you create the thread first, these threads will inevitably create locks. As soons as you fork, these locks will be share accross different processes. If you kill one of the processes that has a lock, all of the other processes will become deadlock.

In [23]:
#!/usr/bin/env python3
# coding:utf8

import sys
import multiprocessing
import subprocess
from concurrent.futures import ThreadPoolExecutor

def run(arg):
    print("starting %s" % arg)
    p = multiprocessing.Process(target=print, args=("running", arg))
    p.start()
    p.join()
    print("finished %s" % arg)


if __name__ == "__main__":
    n = 16
    tests = range(n)
    with ThreadPoolExecutor(n) as pool:
        for r in pool.map(run, tests):
            pass

starting 0
starting 1
starting 2
starting 3
starting 4starting 5

starting 6
starting 7
starting 8
starting 9starting 10

starting 11
starting 12
starting 13
starting 14
starting 15
finished 8
finished 6
finished 4
finished 10
finished 12
finished 14
finished 15
finished 7
finished 1
finished 2
finished 9
finished 0
finished 5
finished 3
finished 13
finished 11


### Async Example
The following example covers how to build a performant non-blocking server from scratch, how to isolate the user’s business logic in callbacks, how to write the callback logic in-line with generators, and how to schedule timed events.

In [1]:
import socket, time, types, select
from collections import namedtuple
from heapq import heappush, heappop

######### Reactor ####################################################################

ScheduledEvent = namedtuple('ScheduleEvent', ['event_time', 'task'])
Session = namedtuple('Session', ['address', 'file'])

events = []                   # heap with events prioritized by earliest time
sessions = {}                 # { csocket : Session(address, file)}
callback = {}                 # { csocket : callback(client, line) }
generators = {}               # { csocket : inline callback generator}

def reactor(host='localhost', port=9600):
    'Main event loop that triggers the appropriate business logic callbacks'
    s = socket.socket()
    s.bind((host, port))
    s.listen(5)
    s.setblocking(0)          # Make asynchronous.  Never wait on a client socket.
    sessions[s] = None
    print('Server up, running, and waiting for call on %s %s' % (host, port))
    try:
        while True:
            # Serve existing clients BUT only if they already have data ready
            ready_to_read, _, _ = select.select(sessions, [], [], 0.1)
            for c in ready_to_read:
                if c is s:
                    c, a = c.accept()
                    connect(c, a)
                    continue
                line = sessions[c].file.readline()
                if line:
                    callback[c](c, line.rstrip())
                else:
                    disconnect(c)

            # Run events scheduled at the appropriate event time
            while events and events[0].event_time <= time.monotonic():
                event = heappop(events)
                event.task()
    finally:
        s.close()

def connect(c, a):
    'Reactor logic for new connections'
    sessions[c] = Session(a, c.makefile())
    on_connect(c)                            # call into user's business logic

def disconnect(c):
    'Reactor logic to end sessions'
    on_disconnect(c)                         # call into user's business logic
    sessions[c].file.close()
    c.close()
    del sessions[c]
    del callback[c]

def add_task(event_time, task):
    'Helper function to schedule one-time tasks at specific time'
    heappush(events, ScheduledEvent(event_time, task))

def call_later(delay, task):
    'Helper function to schedule one-time tasks after a given delay'
    add_task(time.time() + delay, task)

def call_periodic(delay, interval, task):
    'Helper function to schedule recurring tasks'
    def inner():
        task()
        call_later(interval, inner)
    call_later(delay, inner)


def on_connect(c):
        g = nbcaser(c)          # 'g' is a coroutine
        generators[c] = g       # generators -> awaitables
        callback[c] = g.send(None)  # we do this to advance `nbcaser` coroutine
                                    # to yield through the 'readline' coroutine
                                    # which will sleep on its 'yield' expression

def on_disconnect(c):
        g = generators.pop(c)
        g.close()

@types.coroutine
def readline(c):
    'A non-blocking readline to use with two-way generators'
    def inner(c, line):
        g = generators[c]
        try:
            callback[c] = g.send(line)  # `g.send(line)` will resume the `yield inner` point
        except StopIteration:
            disconnect(c)
    line = yield inner
    return line

def sleep(c, delay):
    'A non-blocking sleep to use with two-way generators'
    def inner():
        g = generators[c]
        callback[c] = next(g)
    call_later(delay, inner)
    return lambda *args: callback[c]


######### User's Business Logic ######################################################

def announcement():
    print('The event loop is still running at:', time.ctime())

call_periodic(delay=1, interval=15, task=announcement)

async def nbcaser(c):
    upper, title = 'upper', 'title'
    mode = upper
    print("Received connection from", sessions[c].address)
    try:
        c.sendall(b'<welcome: starting in upper case mode>\n')
        while 1:
            line = await readline(c)
            if line == 'quit':
                c.sendall(b'quit\r\n')
                return
            if mode is upper and line == 'title':
                c.sendall(b'<switching to title case mode>\r\n')
                mode = title
                continue
            if mode is title and line == 'upper':
                line = c.sendall(b'<switching to upper case mode>\r\n')
                mode = upper
                continue
            print(sessions[c].address, '-->', line)
            if mode is upper:
                c.sendall(b'Upper-cased: %a\r\n' % line.upper())
            else:
                c.sendall(b'Title-cased: %a\r\n' % line.title())
    finally:
        print(sessions[c].address, 'quit')


if __name__ == '__main__':
    reactor('localhost', 9600)

Server up, running, and waiting for call on localhost 9600


KeyboardInterrupt: 

Let's focus on code under 'User's Business Logic'.
Inside it, we have a function 'announcement' (that prints some information) that needs to be called every 15 seconds.

Also, we're runing a server that assynchronously receives a connection from multiple soruces (users) that receives a sequence of characters and display them either in upper case or title case modes (title mode is when only the first letter of the string is upper case). An user can write 'title' or 'upper'to change their modes.

What makes this assyncrhonous?

1. The 'async def' at the beggining of the function.
2. Everywhere we would have blocked we use a non-blocking version of it. For example, we're using a non-blocking version of readline, putting it in a 'await'.

Otherwise, this code looks almost exactly as the single-process version.

After 1:11:16 on Keynote on Concurrency video, Raymond presents a testing where two different users access the same server. During the testing an user A changes its mode to 'upper'. Another user, B, that haven't changes it's mode continues to receive responses in 'title' (default) mode. Thus, every user has its own individual state, and that was as easy to write as the single process version.

...