In [None]:
Multithreading
Developers often consider threading to be a very complex topic. While this statement is totally true, Python provides high-level classes and functions that ease the usage of threading. CPython implementation of threads unfortunately comes with some inconvenient details that make them less useful than in other languages. They are still completely fine for some sets of problems that you may want to solve, but not for as many as in C or Java.

In this section, we will discuss the limitations of multithreading in CPython, as well as the common concurrent problems for which Python threads are still a viable solution.

In [None]:
What is multithreading?
Thread is short for a thread of execution. A programmer can split his or her work into threads that run simultaneously and share the same memory context. Unless your code depends on third-party resources, multithreading will not speed it up on a single-core processor, and will even add some overhead for thread management. Multithreading will benefit from a multiprocessor or multicore machines where each thread can be executed on a separate CPU core, thus making the program run faster. This is a general rule that should hold true for most programming languages. In Python, the performance benefit from multithreading on multicore CPUs has some limits, which we will discuss later. For the sake of simplicity, let's assume for now that this statement is also true for Python.

The fact that the same context is shared among threads means you must protect data from uncontrolled concurrent accesses. If two intertwined threads update the same data without any protection, there might be a situation where subtle timing variation in thread execution can alter the final result in an unexpected way. To better understand this problem, imagine that there are two threads that increment the value of a shared variable in a non-atomic sequence of steps, for example:

counter_value = shared_counter
shared_counter = counter_value + 1
Now, let's assume that the shared_counter variable has the initial value of 0. Now, imagine that two threads process the same code in parallel, as follows:

Thread 1	Thread 2
counter_value = shared_counter      # counter_value = 0
shared_counter = counter_value + 1  # shared_counter = 0 + 1
counter_value = shared_counter      # counter_value = 0
shared_counter = counter_value + 1  # shared_counter = 0 + 1
Depending on the exact timing and how processor context will be switched, it is possible that the result of running two such threads will be either 1 or 2. Such a situation is called a race hazard or race condition, and is one of the most hated culprits of hard to debug software bugs.

Lock mechanisms help in protecting data, and thread programming has always been a matter of making sure that the resources are accessed by threads in a safe way. But unwary usage of locks can introduce a set of new issues on its own. The worst problem occurs when, due to a wrong code design, two threads lock a resource and try to obtain a lock on the other resource that the other thread has locked before. They will wait for each other forever. This situation is called a deadlock, and is similarly hard to debug. Reentrant locks help a bit in this by making sure a thread doesn't get locked by attempting to lock a resource twice.

Nevertheless, when threads are used for isolated needs with tools that were built for them, they might increase the speed of the program.

Multithreading is usually supported at the system kernel level. When the machine has a single processor with a single core, the system uses a time slicing mechanism. Here, the CPU switches from one thread to another so fast that there is an illusion of threads running simultaneously. This is done at the processing level as well. Parallelism without multiple processing units is obviously virtual, and there is no performance gain from running multiple threads on such hardware. Anyway, sometimes, it is still useful to implement code with threads, even if it has to execute on a single core, and we will look at a possible use case later.

Everything changes when your execution environment has multiple processors or multiple CPU cores for its disposition. Even if time slicing is used, processes and threads are distributed among CPUs, providing the ability to run your program faster.

Let's take a look at how Python deals with threads.

In [None]:
How Python deals with threads
Unlike some other languages, Python uses multiple kernel-level threads that can each run any of the interpreter-level threads. But the standard implementation of the CPython language comes with a major limitation that renders threads less usable in many contexts. All threads accessing Python objects are serialized by one global lock. This is done because much of the interpreter internal structures, as well as third-party C code, are not thread-safe and need to be protected.

This mechanism is called the Global Interpreter Lock (GIL), and its implementation details on Python/C API level were already discussed in the Releasing GIL section of Chapter 9, Python Extensions in Other Languages. The removal of GIL is a topic that occasionally appears on the Python-dev emailing list and was postulated by developers multiple times. Sadly, at the time of writing, no one has ever managed to provide a reasonable and simple solution that would allow you to get rid of this limitation. It is highly improbable that we will see any progress in this area anytime soon. It is safer to assume that GIL will stay in CPython, and so we need to learn how to live with it.

So, what is the point of multithreading in Python?

When threads contain only pure Python code, there is little point in using threads to speed up the program since the GIL will globally serialize the execution of all threads. But remember that GIL cares only about Python code. In practice, the global interpreter lock is released on a number of blocking system calls and can be released in sections of C extensions that do not use any of Python/C API functions. This means that multiple threads can do I/O operations or execute C code in certain third-party extensions in parallel.

Multithreading allows you to efficiently utilize time when a program is waiting for a third-party resource. This is because a sleeping thread that has explicitly released the GIL can stand by and wake up when the results are back. Last, whenever a program needs to provide a responsive interface, multithreading can be an answer, even in single-core environments where the operating system needs to use time slicing. With multithreading, the program can easily interact with the user while doing some heavy computing in the so-called background.

Note that GIL does not exist in every implementation of the Python language. It is a limitation of CPython, Stackless Python, and PyPy, but does not exist in Jython and IronPython (see Chapter 1, Current Status of Python). There has been some development of the GIL-free version of PyPy, but at the time of writing this book, it is still at an experimental stage and the documentation is lacking. It is based on Software Transactional Memory and is called PyPy-STM. It is really hard to say when (or if) it will be officially released as a production ready interpreter. Everything seems to indicate that it won't happen soon.

In the next section, we will discuss when we should use threading.

In [None]:
When should we use threading?
Despite the GIL limitation, threads can be really useful in some of the following cases:

Building responsive interfaces
Delegating work
Building multiuser applications
Let's discuss the preceding cases in the next sections.

In [None]:
Building responsive interfaces
Let's say you ask your system to copy files from one folder to another through some program with a graphical user interface. The task will possibly be pushed into the background and the interface window will be constantly refreshed by the program. This way, you get live feedback on the progress of the whole process. You will also be able to cancel the operation. This is less irritating than a raw cp or copy shell command that does not provide any feedback until the whole work is finished.

A responsive interface also allows a user to work on several tasks at the same time. For instance, Gimp will let you play around with a picture while another one is being filtered, since the two tasks are independent.

When trying to achieve such responsive interfaces, a good approach is to try to push long-running tasks into the background, or at least try to provide constant feedback to the user. The easiest way to achieve that is to use threads. In such a scenario, threads are not intended to increase performance but only to make sure that the user can still operate the interface, even if it needs to process some data for a longer period of time.

If such background tasks perform a lot of I/O operations, you are able to still get some benefit from multicore CPUs. Then, it's a win-win situation.

In [None]:
Delegating work
If your application depends on many external resources, threads may really help in speeding it up.

Let's consider the case of a function that indexes files in a folder and pushes the built indexes into a database. Depending on the type of file, the function calls a different external program. For example, one is specialized in PDFs and another one in OpenOffice files.

Instead of processing all files in a sequence, by executing the right program and then storing the results into the database, your function can set up a single thread for each converter and push jobs to be done to each one of them through a queue. The overall time taken by the function will be closer to the processing time of the slowest converter than to the sum of all the work.

Note that such an approach is somewhat a hybrid between multithreading and multiprocessing. If you delegate the work to external processes (for example, using the run() function from the subprocess module), you are in fact doing work in multiple processes, so this has symptoms of multiprocessing. Still, in our scenario, we are mainly waiting for the processing of results being handled in separate threads, so it is still mostly multithreading from the perspective of the Python code.

The other common use case for threads is performing multiple HTTP requests to an external service. For instance, if you want to fetch multiple results from a remote web API, it could take a lot of time to do that synchronously, especially if the remote server is located in a distant location. If you wait for every previous response before making new requests, you will spend a lot of time just waiting for the external service to respond, and additional round-trip time delays will be added to every such request. If you are communicating with some efficient service (Google Maps API, for instance), it is highly probable that it can serve most of your requests concurrently without affecting the response times of separate requests. It is then reasonable to perform multiple queries in separate threads. Remember that when doing an HTTP request, the maximum time is spent on reading from the TCP socket. This is a blocking I/O operation, so CPython will release the GIL when performing the recv() C function. This allows for great improvement of your application's performance.

In [None]:
Multiuser applications
Threading is also used as a concurrency base for multiuser applications. For instance, a web server will push a user request into a new thread and then will become idle, waiting for new requests. Having a thread dedicated to each request simplifies a lot of work, but requires the developer to take care of locking the shared resources. But this is not a problem when all the shared data is pushed into a relational database that takes care of the concurrency matters. So, threads in a multiuser application act almost like separate independent processes. They are under the same process only to ease their management at the application level.

For instance, a web server will be able to put all requests in a queue and wait for a thread to be available to send the work to it. Furthermore, it allows memory sharing that can boost up some work and reduce the memory load. The two very popular Python WSGI-compliant webservers Gunicorn (refer to http://gunicorn.org/) and uWSGI (refer to https://uwsgi-docs.readthedocs.org) allow you to serve HTTP requests with threaded workers in a way that generally follows this principle.

Using multithreading to enable concurrency in multiuser applications is less expensive than using multiprocessing. Separate processes cost more resources since a new interpreter needs to be loaded for each one of them. On the other hand, having too many threads is expensive too. We know that GIL isn't such a problem for I/O extensive applications, but there is always a time where you will need to execute Python code. Since you cannot parallelize all of the application parts with bare threads, you will never be able to utilize all of the resources on machines with multicore CPUs and a single Python process. This is why, the optimal solution is often a hybrid of multiprocessing and multithreading—multiple workers (processes) running with multiple threads. Fortunately, many of the WSGI-compliant web servers allow such setup.

In the next section, we will take a look at an example of a threaded application.

In [None]:
An example of a threaded application
To see how Python threading works in practice, let's construct some example applications that can take some benefit from implementing multithreading. We will discuss a simple problem that you may encounter from time to time in your professional practice making multiple parallel HTTP queries. This problem was already mentioned as a common use case for multithreading.

Let's say we need to fetch data from some web service using multiple queries that cannot be batched into a single big HTTP request. As a realistic example, we will use foreign exchange reference rates endpoint from Foreign exchange rates API, available at https://exchangeratesapi.io/. The reasons for that choice are as follows:

This service is open and does not require any authentication keys.
The API of that service is very simple and can be easily queried using popular the requests library. 
Code for this API is open sourced and written in Python. So, in case the official service goes down, you should be able to download its source code from the official repository on GitHub, available at https://github.com/exchangeratesapi/exchangeratesapi.
In our examples, we will try to exchange rates for selected currencies using multiple currencies as reference points. We will then present results as an exchange rate currency matrix, similar to the following:

1 USD =     1.0 USD,   0.887 EUR,     3.8 PLN,    8.53 NOK,    22.7 CZK
1 EUR =    1.13 USD,     1.0 EUR,    4.29 PLN,    9.62 NOK,    25.6 CZK
1 PLN =   0.263 USD,   0.233 EUR,     1.0 PLN,    2.24 NOK,    5.98 CZK
1 NOK =   0.117 USD,   0.104 EUR,   0.446 PLN,     1.0 NOK,    2.66 CZK
1 CZK =   0.044 USD,   0.039 EUR,   0.167 PLN,   0.375 NOK,     1.0 CZK
The API we've chosen offers several ways to query for multiple data points within single requests, but unfortunately does not allow you to query for data using multiple base currencies at once. Obtaining the rate for a single base is as simple as doing the following:

>>> import requests
>>> response = requests.get("https://api.exchangeratesapi.io/latest?base=USD")
>>> response.json()
{'base': 'USD', 'rates': {'BGN': 1.7343265053, 'NZD': 1.4824864769, 'ILS': 3.5777245721, 'RUB': 64.7361000266, 'CAD': 1.3287221779, 'USD': 1.0, 'PHP': 52.0368892436, 'CHF': 0.9993792675, 'AUD': 1.3993970027, 'JPY': 111.2973308504, 'TRY': 5.6802341048, 'HKD': 7.8425113062, 'MYR': 4.0986077858, 'HRK': 6.5923561231, 'CZK': 22.7170346723, 'IDR': 14132.9963642813, 'DKK': 6.6196683515, 'NOK': 8.5297508203, 'HUF': 285.09355325, 'GBP': 0.7655848187, 'MXN': 18.930477964, 'THB': 31.7495787887, 'ISK': 118.6485767491, 'ZAR': 14.0298838344, 'BRL': 3.8548372794, 'SGD': 1.3527533919, 'PLN': 3.8015429636, 'INR': 69.3340427419, 'KRW': 1139.4519819101, 'RON': 4.221867518, 'CNY': 6.7117141084, 'SEK': 9.2444799149, 'EUR': 0.8867606633}, 'date': '2019-04-09'}
Since our goal is to show how a multithreaded solution of concurrent problems compares to a standard synchronous solution, we will start with an implementation that doesn't use threads at all. Here is the code of a program that loops over the list of base currencies, queries the foreign exchange rates API, and displays the results on standard output as a text-formatted table:

import time

import requests

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def fetch_rates(base):
    response = requests.get(
        f"https://api.exchangeratesapi.io/latest?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
    
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.

    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

def main():
    for base in BASES:
        fetch_rates(base)

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed))
Around the execution of the main() function, we added a few statements that are intended to measure how much time it took to finish the job. On my computer, it sometimes takes even more than 1 second to complete the following task:

$ python3 synchronous.py
1 USD =     1.0 USD,   0.887 EUR,     3.8 PLN,    8.53 NOK,    22.7 CZK
1 EUR =    1.13 USD,     1.0 EUR,    4.29 PLN,    9.62 NOK,    25.6 CZK
1 PLN =   0.263 USD,   0.233 EUR,     1.0 PLN,    2.24 NOK,    5.98 CZK
1 NOK =   0.117 USD,   0.104 EUR,   0.446 PLN,     1.0 NOK,    2.66 CZK
1 CZK =   0.044 USD,   0.039 EUR,   0.167 PLN,   0.375 NOK,     1.0 CZK
time elapsed: 1.13s  
Every run of our script will always take different times because it mostly depends on a remote service that's accessible through a network connection. So, there area lots of non-deterministic factors affecting the final result. The best approach would be to make longer tests, repeat them multiple times, and also calculate some average from the measurements. But for the sake of simplicity, we won't do that. You will see later that this simplified approach is just enough for illustration purposes.
In the next section, we will discuss the use of one thread per item.

In [None]:
Using one thread per item
Now, it is time for improvement. We don't do a lot of processing in Python, and long execution times are caused by communication with the external service. We send an HTTP request to the remote server, it calculates the answer, and then we wait until the response is transferred back. There is a lot of I/O involved, so multithreading seems like a viable option. We can start all the requests at once in separate threads and then just wait until we receive data from all of them. If the service that we are communicating with is able to process our requests concurrently, we should definitely see a performance improvement.

So, let's start with the easiest approach. Python provides clean and easy to use abstraction over system threads with the threading module. The core of this standard library is the Thread class, which represents a single thread instance. Here is a modified version of the main() function that creates and starts a new thread for every place to geocode and then waits until all the threads finish:

from threading import Thread

def main():
    threads = []
    for base in BASES:
        thread = Thread(target=fetch_rates, args=[base])
        thread.start()
        threads.append(thread)

    while threads:
        threads.pop().join()
It is a quick and dirty solution that approaches the problem in a bit of a frivolous way. And it is not a way to write reliable software that will serve thousands or millions of users. It has some serious issues that we will have to address later. But hey, it works, as we can see from the following code:

$ python3 threads_one_per_item.py
1 CZK =   0.044 USD,   0.039 EUR,   0.167 PLN,   0.375 NOK,     1.0 CZK
1 NOK =   0.117 USD,   0.104 EUR,   0.446 PLN,     1.0 NOK,    2.66 CZK
1 USD =     1.0 USD,   0.887 EUR,     3.8 PLN,    8.53 NOK,    22.7 CZK
1 EUR =    1.13 USD,     1.0 EUR,    4.29 PLN,    9.62 NOK,    25.6 CZK
1 PLN =   0.263 USD,   0.233 EUR,     1.0 PLN,    2.24 NOK,    5.98 CZK
time elapsed: 0.13s
And it is also considerably faster.

So, when we know that threads have a beneficial effect on our application, it is time to use them in a saner way. First, we need to identify the following issues in the preceding code:

We start a new thread for every parameter. Thread initialization also takes some time, but this minor overhead is not the only problem. Threads also consume other resources, like memory or file descriptors. Our example input has a strictly defined number of items, but what if it did not have a limit? You definitely don't want to run an unbound number of threads that depend on the arbitrary size of data input.
The fetch_rates() function that's executed in threads calls the built-in print() function, and in practice it is very unlikely that you would want to do that outside of the main application thread. It is mainly due to the way the standard output is buffered in Python. You can experience malformed output when multiple calls to this function interleave between threads. Also, the print() function is considered slow. If used recklessly in multiple threads, it can lead to serialization that will waste all your benefits of multithreading.
Last but not least, by delegating every function call to a separate thread, we make it extremely hard to control the rate at which our input is processed. Yes, we want to do the job as fast as possible, but very often, external services enforce hard limits on the rate of requests from a single client that they can process. Sometimes, it is reasonable to design the program in a way that enables you to throttle the rate of processing, so your application won't be blacklisted by external APIs for abusing their usage limits.
In the next section, we will see how to use a thread pool.

In [None]:
Using a thread pool
The first issue we will try to solve is the unbound limit of threads that are run by our program. A good solution would be to build a pool of threaded workers with a strictly defined size that will handle all the parallel work and communicate with workers through some thread-safe data structure. By using this thread pool approach, we will also make it easier to solve two other problems that we mentioned in the previous section.

So, the general idea is to start some predefined number of threads that will consume the work items from a queue until it becomes empty. When there is no other work to do, the threads will return and we will be able to exit from the program. A good candidate for our structure to be used to communicate with the workers is the Queue class from the built-in queue module. It is a First In First Out (FIFO) queue implementation that is very similar to the deque collection from the collections module, and was specifically designed to handle inter-thread communication. Here is a modified version of the main() function that starts only a limited number of worker threads with a new worker() function as a target and communicates with them using a thread-safe queue:

import time
from queue import Queue, Empty
from threading import Thread

import requests

THREAD_POOL_SIZE = 4

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def fetch_rates(base):
    response = requests.get(
        f"https://api.exchangeratesapi.io/latest?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]

    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            fetch_rates(item)
            work_queue.task_done()

def main():
    work_queue = Queue()

    for base in BASES:
        work_queue.put(base)

    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed))
The following result of running a modified version of our program is similar to the previous one:

$ python3 threads_thread_pool.py
1 EUR =    1.13 USD,     1.0 EUR,    4.29 PLN,    9.62 NOK,    25.6 CZK
1 NOK =   0.117 USD,   0.104 EUR,   0.446 PLN,     1.0 NOK,    2.66 CZK
1 USD =     1.0 USD,   0.887 EUR,     3.8 PLN,    8.53 NOK,    22.7 CZK
1 PLN =   0.263 USD,   0.233 EUR,     1.0 PLN,    2.24 NOK,    5.98 CZK
1 CZK =   0.044 USD,   0.039 EUR,   0.167 PLN,   0.375 NOK,     1.0 CZK

time elapsed: 0.17s
The overall runtime may be slower than in situations with one thread per argument, but at least now it is not possible to exhaust all the computing resources with an arbitrarily long input. Also, we can tweak the THREAD_POOL_SIZE parameter for better resource/time balance.

We will look at how to use two-way queues in the next section.

In [None]:
Using two-way queues
The other issue that we are now able to solve is the potentially problematic printing of the output in threads. It would be much better to leave such a responsibility to the main thread that started the worker threads. We can handle that by providing another queue that will be responsible for collecting results from our workers. Here is the complete code that puts everything together, with the main changes highlighted:

import time
from queue import Queue, Empty
from threading import Thread

import requests

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

THREAD_POOL_SIZE = 4

def fetch_rates(base):
    response = requests.get(
        f"https://api.exchangeratesapi.io/latest?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]

    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates

def present_result(base, rates):
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            results_queue.put(
                fetch_rates(item)
            )
            work_queue.task_done()

def main():
    work_queue = Queue()
    results_queue = Queue()

    for base in BASES:
        work_queue.put(base)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        present_result(*results_queue.get())

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed)) 
This eliminates the risk of malformed inputs that we could experience if the present_result() function would do more print() statements or perform some additional computation. We don't expect any performance improvement from this approach with small inputs, but in fact we also reduced the risk of thread serialization due to slow print() execution. Here is our final output:

$ python3 threads_two_way_queues.py
1 USD =     1.0 USD,   0.887 EUR,     3.8 PLN,    8.53 NOK,    22.7 CZK
1 PLN =   0.263 USD,   0.233 EUR,     1.0 PLN,    2.24 NOK,    5.98 CZK
1 EUR =    1.13 USD,     1.0 EUR,    4.29 PLN,    9.62 NOK,    25.6 CZK
1 NOK =   0.117 USD,   0.104 EUR,   0.446 PLN,     1.0 NOK,    2.66 CZK
1 CZK =   0.044 USD,   0.039 EUR,   0.167 PLN,   0.375 NOK,     1.0 CZK

time elapsed: 0.17s
Dealing with errors and rate limiting is explained in the next section.

In [None]:
Dealing with errors and rate limiting
The last of the issues mentioned earlier that you may experience when dealing with such types of problems are rate limits that have been imposed by external service providers. In the case of the foreign exchange rates API, the service maintainer did not inform us about any rate limits or throttling mechanisms. But many services (even paid ones) often do impose rate limits. Also, it isn't fair to abuse a service that is provided to users completely for free.

When using multiple threads, it is very easy to exhaust any rate limit or simply—if the service does not throttle incoming requests—saturate the service to the level that it will not be able to respond to anyone (this is known as a denial of service attack). The problem is even more serious due to the fact that we did not cover any failure scenario yet, and dealing with exceptions in multithreaded Python code is bit more complicated than usual.

The request.raise_for_status() function will raise an exception response and will have a status code indicating any type of error (for instance, rate limiting), and this is actually good news. This exception is raised in a separate thread and will not crash the entire program. The worker thread will, of course, exit immediately, but the main thread will wait for all tasks stored on work_queue to finish (with the work_queue.join() call). Without further improvement, we may end up in a situation where some of the worker threads crashed and the program will never exit. This means that our worker threads should gracefully handle possible exceptions and make sure that all items from the queue are processed. 

Let's do some minor changes to our code in order to be prepared for any issues that may occur. In case of exceptions in the worker thread, we may put an error instance in to the results_queue queue and mark the current task as done, the same as we would do if there was no error. That way, we make sure that the main thread won't lock indefinitely while waiting in work_queue.join(). The main thread might then inspect results and re-raise any of the exceptions found on the results queue. Here are the improved versions of the worker() and main() functions that can deal with exceptions in a safer way (the changes are highlighted):

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            try:
                result = fetch_rates(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()

def main():
    work_queue = Queue()
    results_queue = Queue()

    for base in BASES:
        work_queue.put(base)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        result = results_queue.get()
        if isinstance(result, Exception):
            raise result

        present_result(*result)
When we are ready to handle exceptions, it is time to break our code. We don't want to abuse our free API and cause a denial of service. So, instead of putting a high load on that API, we will simulate a typical situation that is a result of many service throttling mechanisms. Many APIs return a 429 Too Many Requests HTTP response when the client exceeds the allowed rate limit. So, we will update the fetch_rates() function to override the status code of every few responses in a way that will cause an exception. The following is the updated version of the function that simulates a HTTP error every few requests:

def fetch_rates(base):
    response = requests.get(
        f"https://api.exchangeratesapi.io/latest?base={base}"
    )

    if random.randint(0, 5) < 1:
        # simulate error by overriding status code
        response.status_code = 429

    response.raise_for_status()
    rates = response.json()["rates"]
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates
If you use it in your code, you should get the following similar error:

$ python3 threads_exceptions_and_throttling.py
1 PLN =   0.263 USD,   0.233 EUR,     1.0 PLN,    2.24 NOK,    5.98 CZK
1 EUR =    1.13 USD,     1.0 EUR,    4.29 PLN,    9.62 NOK,    25.6 CZK
1 USD =     1.0 USD,   0.887 EUR,     3.8 PLN,    8.53 NOK,    22.7 CZK
Traceback (most recent call last):
  File "threads_exceptions_and_throttling.py", line 136, in <module>
    main()
  File "threads_exceptions_and_throttling.py", line 129, in main
    raise result
  File "threads_exceptions_and_throttling.py", line 96, in worker
    result = fetch_rates(item)
  File "threads_exceptions_and_throttling.py", line 70, in fetch_rates
    response.raise_for_status()
  File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 429 Client Error: OK for url: https://api.exchangeratesapi.io/latest?base=NOK
Let's forget about our simulated failure and pretend that the preceding exception is not a result of faulty code. In such a situation, our program would be simply a bit too fast for this free service. It makes too many concurrent requests, and, in order to work correctly, we need to have a way to limit the program's pace.

Limiting the pace of work is often called throttling. There are a few packages in PyPI that allow you to limit the rate of any kind of work that are really easy to use. But we won't use any external code here. Throttling is a good opportunity to introduce some locking primitives for threading, so we will try to build some throttling solutions from scratch.

The algorithm we will use is sometimes called token bucket and is very simple. It includes the following functionality:

There is a bucket with a predefined amount of tokens.
Each token corresponds to a single permission to process one item of work.
Each time the worker asks for a single or multiple tokens (permissions), we do the following:
We measure how much time was spent from the last time we refilled the bucket
If the time difference allows for that, we refill the bucket with the amount of tokens that correspond to this time difference
If the amount of stored tokens is bigger or equal to the amount requested, we decrease the number of stored tokens and return that value
If the amount of stored tokens is less than requested, we return zero
The two important things are to always initialize the token bucket with zero tokens and never allow it to overfill. If we don't follow these precautions, we can release the tokens in bursts that exceed the rate limit. Because, in our situation, the rate limit is expressed in requests per second, we don't need to deal with arbitrary quanta of time. We assume that the base for our measurement is one second, so we will never store more tokens than the number of requests allowed for that quant of time. Here is an example implementation of the class that allows for throttling with the token bucket algorithm:

From threading import Lock 
 
class Throttle: 
    def __init__(self, rate): 
        self._consume_lock = Lock() 
        self.rate = rate 
        self.tokens = 0 
        self.last = 0 
 
    def consume(self, amount=1): 
        with self._consume_lock: 
            now = time.time() 
             
            # time measument is initialized on first 
            # token request to avoid initial bursts 
            if self.last == 0: 
                self.last = now 
 
            elapsed = now - self.last 
 
            # make sure that quant of passed time is big 
            # enough to add new tokens 
            if int(elapsed * self.rate): 
                self.tokens += int(elapsed * self.rate) 
                self.last = now 
 
            # never over-fill the bucket 
            self.tokens = ( 
                self.rate 
                if self.tokens > self.rate 
                else self.tokens 
            ) 
 
            # finally dispatch tokens if available 
            if self.tokens >= amount: 
                self.tokens -= amount 
            else: 
                amount = 0 
 
            return amount 
The usage of this class is very simple. Let's assume that we created only one instance of Throttle (for example, Throttle(10)) in the main thread and passed it to every worker thread as a positional argument. Using the same data structure in different threads is safe because we guarded the manipulation of its internal state with the instance of the Lock class from the threading module. We can now update the worker() function implementation to wait with every item until the throttle object releases a new token, as follows:

def worker(work_queue, results_queue, throttle): 
    while True: 
        try: 
            item = work_queue.get(block=False) 
        except Empty: 
            break 
        else: 
            while not throttle.consume():
                pass 
 
            try: 
                result = fetch_rates(item) 
            except Exception as err: 
                results_queue.put(err) 
            else: 
                results_queue.put(result) 
            finally: 
                work_queue.task_done()
Let's take a look at a different concurrency model, which is explained in the next section.

In [None]:
Multiprocessing
Let's be honest, multithreading is challenging—we have already seen that in the previous section. It's a fact that the simplest approach to the problem required only minimal effort. But dealing with threads in a sane and safe manner required a tremendous amount of code.

We had to set up a thread pool, communication queues, gracefully handle exceptions from threads, and also care about thread safety when trying to provide a rate limiting capability. Dozens of lines of code are needed just to execute one function from some external library in parallel! And we only assume that this is production ready because there is a promise from the external package creator that their library is thread-safe. Sounds like a high price for a solution that is practically applicable only for doing I/O bound tasks.

An alternative approach that allows you to achieve parallelism is multiprocessing. Separate Python processes that do not constrain each other with GIL allow for better resource utilization. This is especially important for applications running on multicore processors that are performing really CPU intensive tasks. Right now, this is the only built-in concurrent solution available for Python developers (using CPython interpreter) that allows you to take benefit from multiple processor cores in every situation.

The other advantage of using multiple processes is the fact that they do not share memory context. So, it is harder to corrupt data and introduce deadlocks in your application. Not sharing the memory context means that you need some additional effort to pass the data between separate processes, but fortunately there are many good ways to implement reliable interprocess communication. In fact, Python provides some primitives that make communication between processes almost as easy as it is possible between threads.

The most basic way to start new processes in any programming language is usually by forking the program at some point. On POSIX systems (UNIX, macOS, and Linux), a fork is a system call that's exposed in Python through the os.fork() function, which will create a new child process. The two processes then continue the program in their own right after the forking. Here is an example script that forks itself exactly once:

import os 
 
pid_list = [] 
 
 
def main(): 
    pid_list.append(os.getpid()) 
    child_pid = os.fork() 
 
    if child_pid == 0: 
        pid_list.append(os.getpid()) 
        print() 
        print("CHLD: hey, I am the child process") 
        print("CHLD: all the pids i know %s" % pid_list) 
 
    else: 
        pid_list.append(os.getpid()) 
        print() 
        print("PRNT: hey, I am the parent") 
        print("PRNT: the child is pid %d" % child_pid) 
        print("PRNT: all the pids i know %s" % pid_list) 
 
 
if __name__ == "__main__": 
    main() 
And here is an example of running it in a Terminal:

$ python3 forks.py
    
PRNT: hey, I am the parent
PRNT: the child is pid 21916
PRNT: all the pids i know [21915, 21915]    
CHLD: hey, I am the child process
CHLD: all the pids i know [21915, 21916] 
Notice how both processes have exactly the same initial state of their data before the os.fork() call. They both have the same PID number (process identifier) as a first value of the pid_list collection. Later, both states diverge, and we can see that the child process added the 21916 value while the parent duplicated its 21915 PID. This is because the memory contexts of these two processes are not shared. They have the same initial conditions but cannot affect each other after the os.fork() call.

After the fork memory context is copied to the child, each process deals with its own address space. To communicate, processes need to work with system-wide resources or use low-level tools like signals.

Unfortunately, os.fork is not available under Windows, where a new interpreter needs to be spawned in order to mimic the fork feature. Therefore, it needs to be different, depending on the platform. The os module also exposes functions that allow you to spawn new processes under Windows, but eventually you will use them rarely. This is also true for os.fork(). Python provides the great multiprocessing module, which creates a high-level interface for multiprocessing.

The great advantage of this module is that it provides some of the abstractions that we had to code from scratch in the An example of threaded application section. It allows you to limit the amount of boilerplate code, so it improves application maintainability and reduces its complexity. Surprisingly, despite its name, the multiprocessing module exposes a similar interface for threads, so you will probably want to use the same interface for both approaches.

Let's take a look at the built-in multiprocessing module in the next section.

In [None]:
The built-in multiprocessing module
multiprocessing provides a portable way to work with processes as if they were threads.

This module contains a Process class that is very similar to the Thread class, and can be used on any platform, as follows:

from multiprocessing import Process 
import os 
 
 
def work(identifier): 
    print( 
        'hey, i am a process {}, pid: {}' 
        ''.format(identifier, os.getpid()) 
    ) 
 
 
def main(): 
    processes = [ 
        Process(target=work, args=(number,)) 
        for number in range(5) 
    ] 
    for process in processes: 
        process.start() 
     
    while processes: 
        processes.pop().join() 
 
 
if __name__ == "__main__": 
    main()
The preceding script, when executed, gives the following result:

$ python3 processing.py
hey, i am a process 1, pid: 9196
hey, i am a process 0, pid: 8356
hey, i am a process 3, pid: 9524
hey, i am a process 2, pid: 3456
hey, i am a process 4, pid: 6576
When processes are created, the memory is forked (on POSIX systems). The most efficient usage of processes is to let them work on their own after they have been created to avoid overhead, and check on their states from the parent process. Besides the memory state that is copied, the Process class also provides an extra args argument in its constructor so that data can be passed along.

The communication between process modules requires some additional work because their local memory is not shared by default. To ease this, the multiprocessing module provides the following few ways of communicating between processes:

Using the multiprocessing.Queue class, which is a functional clone of queue.Queue that was used earlier for the communication between threads
Using multiprocessing.Pipe, which is a socket-like two-way communication channel
Using the multiprocessing.sharedctypes module, which allows you to create arbitrary C types (from the ctypes module) in a dedicated pool of memory that is shared between processes
The multiprocessing.Queue and queue.Queue classes have the same interface. The only difference is that the first is designed for usage in multiple process environments, rather than with multiple threads, so it uses different internal transports and locking primitives. We already saw how to use Queue with multithreading in the An example of threaded application section, so we won't do the same for multiprocessing. The usage stays exactly the same, so such an example would not bring anything new.

A more interesting communication pattern is provided by the Pipe class. It is a duplex (two-way) communication channel that is very similar in concept to UNIX pipes. The interface of Pipe is very similar to a simple socket from the built-in socket module. The difference between raw system pipes and sockets is that it allows you to send any pickable object (using the pickle module) instead of just raw bytes.

This allows for a lot easier communication between processes because you can send almost any basic Python type, as follows:

from multiprocessing import Process, Pipe 
 
 
class CustomClass: 
    pass 
 
 
def work(connection): 
    while True: 
        instance = connection.recv() 
 
        if instance: 
            print("CHLD: {}".format(instance)) 
 
        else: 
            return 
 
 
def main(): 
    parent_conn, child_conn = Pipe() 
 
    child = Process(target=work, args=(child_conn,)) 
 
    for item in ( 
        42, 
        'some string', 
        {'one': 1}, 
        CustomClass(), 
        None, 
    ): 
        print("PRNT: send {}:".format(item)) 
        parent_conn.send(item) 
         
    child.start() 
    child.join() 
 
 
if __name__ == "__main__": 
    main()
When looking at the following example output of the preceding script, you will see that you can easily pass custom class instances and that they have different addresses, depending on the process:

PRNT: send: 42
PRNT: send: some string
PRNT: send: {'one': 1}
PRNT: send: <__main__.CustomClass object at 0x101cb5b00>
PRNT: send: None
CHLD: recv: 42
CHLD: recv: some string
CHLD: recv: {'one': 1}
CHLD: recv: <__main__.CustomClass object at 0x101cba400>
The other way to share a state between processes is to use raw types in a shared memory pool with classes provided in multiprocessing.sharedctypes. The most basic ones are Value and Array. Here is some example code from the official documentation of the multiprocessing module:

from multiprocessing import Process, Value, Array 
 
 
def f(n, a): 
    n.value = 3.1415927 
    for i in range(len(a)): 
        a[i] = -a[i] 
 
 
if __name__ == '__main__': 
    num = Value('d', 0.0) 
    arr = Array('i', range(10)) 
 
    p = Process(target=f, args=(num, arr)) 
    p.start() 
    p.join() 
 
    print(num.value) 
    print(arr[:]) 
And this example will print the following output:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
When working with multiprocessing.sharedctypes, you need to remember that you are dealing with shared memory, so to avoid the risk of data corruption, you need to use locking primitives. Multiprocessing provides some of the classes similar to those available in the threading module, such as Lock, RLock, and Semaphore. The downside of classes from sharedctypes is that they allow you only to share the basic C types from the ctypes module. If you need to pass more complex structures or class instances, you need to use Queue, Pipe, or other inter-process communication channels instead. In most cases, it is reasonable to avoid types from sharedctypes because they increase code complexity and bring all the dangers known from multithreading.

Let's a take a look at how to use process pools in the next section.

In [None]:
Using process pools
Using multiple processes instead of threads adds some substantial overhead. Mostly, it increases the memory footprint because each process has its own and independent memory context. This means allowing unbound numbers of child processes is even more of a problematic issue than it is in multithreaded applications.

The best pattern to control resource usage in applications that rely on multiprocessing is to build a process pool in a similar way to what we described for threads in the Using thread pool section.

And the best thing about the multiprocessing module is that it provides a ready to use Pool class that handles all the complexity of managing multiple process workers for you. This pool implementation greatly reduces the amount of required boilerplate and the number of issues related to two-way communication. You also don't have to use the join() method manually, because Pool can be used as a context manager (using the with statement). Here is one of our previous threading examples, rewritten to use the Pool class from the multiprocessing module:

import time
from multiprocessing import Pool

import requests


SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

POOL_SIZE = 4


def fetch_rates(base):
    response = requests.get(
        f"https://api.exchangeratesapi.io/latest?base={base}"
    )

    response.raise_for_status()
    rates = response.json()["rates"]
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates


def present_result(base, rates):
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")


def main():
    with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_rates, BASES)

    for result in results:
        present_result(*result)


if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed))
As you can see, the code is now a lot shorter. This means that it is now easier to maintain and debug in case of issues. Actually, there are now only two lines of code that explicitly deal with multiprocessing. This is a great improvement over the situation where we had to build the processing pool from scratch. Now, we don't even need to care about communication channels because they are created implicitly inside of the Pool class implementation.

Let's take a look at how to use multiprocessing.dummy as a multithreading interface in the next section.

In [None]:
Using multiprocessing.dummy as the multithreading interface
The high-level abstractions from the multiprocessing module, such as the Pool class, are great advantages over the simple tools provided in the threading module. But it does not mean that multiprocessing is always better than multithreading. There are a lot of use cases where threads may be a better solution than processes. This is especially true for situations where low latency and/or high resource efficiency is required.

Still, it does not mean that you need to sacrifice all the useful abstractions from the multiprocessing module whenever you want to use threads instead of processes. There is the multiprocessing.dummy module that replicates the multiprocessing API, but uses multiple threads instead of forking/spawning new processes.

This allows you to reduce the amount of boilerplate in your code and also have a more pluggable code structure. For instance, let's take yet another look at our main() function from the previous examples. We could give the user control over which processing backend to use (processes or threads). We could do that simply by replacing the pool object constructor class, as follows:

from multiprocessing import Pool as ProcessPool 
from multiprocessing.dummy import Pool as ThreadPool 
 
 
def main(use_threads=False): 
    if use_threads: 
        pool_cls = ThreadPool 
    else: 
        pool_cls = ProcessPool 
 
    with pool_cls(POOL_SIZE) as pool: 
        results = pool.map(fetch_rates, BASES) 
 
    for result in results: 
        present_result(*result) 
Let's take a look at asynchronous programming in the next section.

In [None]:
Asynchronous programming
Asynchronous programming gained a lot of traction in the last few years. In Python 3.5, it finally got some syntax features that solidify concepts of asynchronous execution. But this does not mean that asynchronous programming is possible only starting from Python's 3.5 version. A lot of libraries and frameworks were provided a lot earlier, and most of them have origins in the old versions of Python 2. There is even a whole alternate implementation of Python called Stackless (see Chapter 1, Current Status of Python) that concentrated on this single programming approach. Some of these solutions, such as Twisted, Tornado, and Eventlet, still have huge and active communities and are really worth knowing. Anyway, starting from Python 3.5, asynchronous programming is easier than ever before. Therefore, it is expected that its built-in asynchronous features will replace the bigger part of the older tools, or external projects will gradually transform to kind of high-level frameworks based on Python built-ins.

When trying to explain what asynchronous programming is, the easiest way is to think about this approach as something similar to threads, but without system scheduling involved. This means that an asynchronous program can concurrently process problems, but its context is switched internally and not by the system scheduler.

But, of course, we don't use threads to concurrently handle the work in an asynchronous program. Most of the solutions use a different kind of concept and, depending on the implementation, it is named differently. The following are some example names that are used to describe such concurrent program entities:

Green threads or greenlets (greenlet, gevent, or eventlet projects)
Coroutines (Python 3.5 native asynchronous programming)
Tasklets (Stackless Python)
These are mainly the same concepts, but often implemented in a slightly different way.

For obvious reasons, in this section, we will concentrate only on coroutines that are natively supported by Python, starting from version 3.5.

In [None]:
Cooperative multitasking and asynchronous I/O
Cooperative multitasking is the core of asynchronous programming. In this style of computer multitasking, it's not a responsibility of the operating system to initiate a context switch (to another process or thread), but instead every process voluntarily releases the control when it is idle to enable simultaneous execution of multiple programs. This is why it is called cooperative. All processes need to cooperate in order to multitask smoothly.

This model of multitasking was sometimes employed in the operating systems, but now it is hardly found as a system-level solution. This is because there is a hazard that one poorly designed service can easily break the whole system's stability. Thread and process scheduling with context switches managed directly by the operating system is now the dominant approach for concurrency on the system level. But cooperative multitasking is still a great concurrency tool on the application level.

When doing cooperative multitasking on the application level, we do not deal with threads or processes that need to release control because all the execution is contained within a single process and thread. Instead, we have multiple tasks (coroutines, tasklets, or green threads) that release the control to the single function that handles the coordination of tasks. This function is usually some kind of event loop.

To avoid confusion later (due to Python terminology), from now on, we will refer to such concurrent tasks as coroutines. The most important problem in cooperative multitasking is when to release control. In most asynchronous applications, the control is released to the scheduler or event loop on I/O operations. It doesn't matter if the program reads data from the filesystem or communicates through a socket, as such I/O operation is always related with some waiting time when the process becomes idle. The waiting time depends on the external resource, so it is a good opportunity to release the control so that other coroutines can do their work until they too would need to wait.

This makes such an approach somehow similar in behavior to how multithreading is implemented in Python. We know that GIL serializes Python threads, but it is also released on every I/O operation. The main difference is that threads in Python are implemented as system level threads so that the operating system can preempt the currently running thread and give control to the other one at any point of time. In asynchronous programming, tasks are never preempted by the main event loop. This is why this style of multitasking is also called non-preemptive multitasking.

Of course, every Python application runs on some operating system where there are other processes competing for resources. This means that the operating system always has the right to preempt the whole process and give control to another one. But when our asynchronous application is running back, it continues from the same place where it was paused when the system scheduler stepped in. This is why coroutines are still considered non-preemptive.

In the next section, we will take a look at the async and await keywords.

In [None]:
Python async and await keywords
The async and await keywords are the main building blocks in Python asynchronous programming.

The async keyword, when used before the def statement, defines a new coroutine. The execution of the coroutine function may be suspended and resumed in strictly defined circumstances. Its syntax and behavior are very similar to generators (refer to Chapter 3, Modern Syntax Elements - Below the Class Level). In fact, generators need to be used in the older versions of Python whenever you want to implement coroutines. Here is an example of function declaration that uses the async keyword:

async def async_hello(): 
    print("hello, world!") 
Functions defined with the async keyword are special. When called, they do not execute the code inside, but instead return a coroutine object, for example:

>>> async def async_hello():
...     print("hello, world!")
... 
>>> async_hello()
<coroutine object async_hello at 0x1014129e8>
  
The coroutine object does not do anything until its execution is scheduled in the event loop. The asyncio module is available in order to provide the basic event loop implementation, as well as a lot of other asynchronous utilities, as follows:

>>> import asyncio
>>> async def async_hello():
...     print("hello, world!")
... 
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(async_hello())
hello, world!
>>> loop.close()
Obviously, since we have created only one simple coroutine, there is no concurrency involved in our program. In order to see something really concurrent, we need to create more tasks that will be executed by the event loop.

New tasks can be added to the loop by calling the loop.create_task() method or by providing another object to wait for using the asyncio.wait() function. We will use the latter approach and try to asynchronously print a sequence of numbers that's been generated with the range() function, as follows:

import asyncio 
 
async def print_number(number): 
    print(number) 
 
 
if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
 
    loop.run_until_complete( 
        asyncio.wait([ 
            print_number(number) 
            for number in range(10) 
        ]) 
    ) 
    loop.close() 
The asyncio.wait() function accepts a list of coroutine objects and returns immediately. The result is a generator that yields objects representing future results (so-called futures). As the name suggests, it is used to wait until all of the provided coroutines complete. The reason why it returns a generator instead of a coroutine object is backwards compatibility with previous versions of Python, which will be explained later in the asyncio in the older version of Python section. The result of running this script may be as follows:

$ python asyncprint.py 
0
7
8
3
9
4
1
5
2
6
As we can see, the numbers are not printed in the same order as the ones we created for our coroutines. But this is exactly what we wanted to achieve.

The second important keyword that was added in Python 3.5 was await. It is used to wait for results of coroutines or a future (explained later), and release the control over execution to the event loop. To better understand how it works, we need to review a more complex example of code.

Let's say we want to create the following two coroutines that will perform some simple task in a loop:

Wait a random number of seconds
Print some text provided as an argument and the amount of time spent in sleep
Let's start with the following simple implementation that has some concurrency issues that we will later try to improve with the additional await usage:

import time 
import random 
import asyncio 
 
 
async def waiter(name): 
    for _ in range(4): 
        time_to_sleep = random.randint(1, 3) / 4 
        time.sleep(time_to_sleep) 
        print( 
            "{} waited {} seconds" 
            "".format(name, time_to_sleep) 
        ) 
 
 
async def main(): 
    await asyncio.wait([waiter("first"), waiter("second")]) 
 
 
if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 
    loop.close() 
When executed in the Terminal (with time command to measure time), it might give the following output:

$ time python corowait.py 
second waited 0.25 seconds
second waited 0.25 seconds
second waited 0.5 seconds
second waited 0.5 seconds
first waited 0.75 seconds
first waited 0.75 seconds
first waited 0.25 seconds
first waited 0.25 seconds
    
real  0m3.734s
user  0m0.153s
sys   0m0.028s
As we can see, both the coroutines completed their execution, but not in an asynchronous manner. The reason is that they both use the time.sleep() function that is blocking, but not releasing the control to the event loop. This would work better in a multithreaded setup, but we don't want to use threads now. So, how can we fix this?

The answer is to use asyncio.sleep(), which is the asynchronous version of time.sleep(), and await its result using the await keyword. We have already used this statement in the first version of the main() function, but it was only to improve the clarity of the code. It clearly did not make our implementation more concurrent. Let's see the following improved version of the waiter() coroutine that uses await asyncio.sleep():

async def waiter(name): 
    for _ in range(4): 
        time_to_sleep = random.randint(1, 3) / 4 
        await asyncio.sleep(time_to_sleep) 
        print( 
            "{} waited {} seconds" 
            "".format(name, time_to_sleep) 
        ) 
If we run the updated script, we can see how the output of two functions interleave with each other:

$ time python corowait_improved.py 
second waited 0.25 seconds
first waited 0.25 seconds
second waited 0.25 seconds
first waited 0.5 seconds
first waited 0.25 seconds
second waited 0.75 seconds
first waited 0.25 seconds
second waited 0.5 seconds
    
real  0m1.953s
user  0m0.149s
sys   0m0.026s
The additional advantage of this simple improvement is that the code ran faster. The overall execution time was less than the sum of all sleeping times because coroutines were cooperatively releasing the control.

In the next section, we will take a look at asyncio in older versions of Python.