# Concurrency

Definitions of concurrency:

* Concurrency is not the same as parallelism. 

* Concurrency is not a matter of application implementation. 

* Concurrency can be defined as follows: "Two events are concurrent if neither can causally affect the other."

* In other words, something is concurrent if it can be fully or partially decomposed into components (units) that are order-independent.

Common application scenarios where concurrent processing is a viable approach:

* Processing distribution: The scale of the problem is so big that the only way to process it in an acceptable time frame (with constrained resources) is to distribute execution on multiple processing units that can handle the work in parallel.

* Application responsiveness: Your application needs to maintain responsiveness (accept new inputs), even if it did not finish processing previous inputs.

* Background processing: Not every task needs to be performed in a synchronous way. If there is no need to immediately access the results of a specific action, it may be reasonable to defer execution in time.

Python's tools to deal with concurrency:

* Multithreading: 
    * Running multiple threads of execution that share the memory context of the parent process. 
    * The execution of threads is coordinated by the OS kernel.
    * It works best in applications that do a lot of I/O operations or need to maintain UI responsiveness. 
    * Very lightweight, but comes with caveats and memory safety risks.

* Multiprocessing: 
    * Running multiple independent processes to perform work in a distributed manner. 
    * Similar to threads in operation, but there's no shared memory context. 
    * Due to Python's GIL limitaton, it's better suited for CPU-intensive applications. 
    * More heavyweight than multithreading and requires implementing inter-process communication patterns to orchestrate work between processes.

* Asynchronous programming: 
    * Running multiple cooperative tasks within a single application process. 
    * Cooperative tasks work like threads, but switching between them is done by the application, not the OS kernel. 
    * Well suited to I/O-bound applications, especially for programs that need to handle multiple simultaneous network connections. 
    * The downside of asynchronous programming is the need to use dedicated asynchronous libraries.



### Multithreading

To start a new thread in Python, use the `threading.Thread()` class, as follows:


In [1]:
!python _01_multithreading/start_new_thread.py


Starting thread...
printing from thread
Ending thread...


From the last code example:

* To start a new thread, we call the `start()` method.

* Once the new thread is started, it'll run next to the main thread until the target function finishes. 

* To end the thread, we wait for the thread to finish with the `join()` method, which is a blocking operation.


With a small modification, we can also start and join multiple threads in bulk, as follows:


In [2]:
!python _01_multithreading/start_new_threads.py


Starting threads...

printing from thread
printing from threadprinting from thread

printing from thread
printing from thread
printing from thread
printing from thread
printing from thread
printing from threadprinting from thread

Ending threads...


Thread safety:

* All threads share the same memory context, so we must be careful when many threads access the same data structures. 

* If 2 parallel threads update the same variable without any protection, there might be a situation where a subtle timing variation in thread execution can alter the final result in an unexpected way. 

* Such situations are called `race conditions`, and they're very hard to debug. In those cases, the code is not 'thread-safe'.

For example, the following program is not thread-safe, and it returns a different value in every execution:


In [3]:
# The correct output is 100 threads * 100.000 iterations = 10.000.000

!python _01_multithreading/thread_visits.py 


thread_count=100, thread_visits=9594669


To avoid race conditions, we need to to use thread locking primitives. Python provides the `threading.Lock` class, which is a simple implementation of a thread lock. 

The following is an example of a thread-safe variant of the last code:



In [4]:
!python _01_multithreading/thread_safe_visits.py


thread_count=100, thread_visits=10000000


In the last code example, the thread visits with locks are counted properly, but at the expense of lower performance: 

* The lock will make sure that only 1 thread at a time can process a single block of code, so the protected block cannot run in parallel. 

* Also, acquiring and releasing locks are operations that add overhead.


#### Python's limitation with threads:

The standard implementation of Python (the CPython interpreter) comes with a major limitation that renders threads less useful in many contexts: 

* All operations accessing Python objects are serialized by the `Global Interpreter Lock (GIL)`.

* This is done because many of the interpreter's internal structures are not thread-safe and need to be protected. 

* Not every operation requires locking, and there are certain situations when threads release the lock:
    * In blocking system calls like socket calls.     
    * In sections of C extensions that do not use any Python/C API functions.

* So multiple threads can do I/O operations or execute some C extension code completely in parallel.


### A simple multithreaded application

Consider a program that fetches foreign exchange rates from an external source:

* The exchange rates are available from a free API at `https://www.vatcomply.com`. 

* We want to obtain exchange rates for multiple currencies and present the results as an exchange rate currency matrix.

* But the API doesn't allow us to query for data using multiple base currencies at once, so we need to fetch them one by one.

The following is a naive synchronous solution that doesn't use threads at all:


In [5]:
!python _02_example_threaded_application/synchronous.py


1 USD =     1.0 USD,   0.912 EUR,    3.96 PLN,    10.4 NOK,    22.4 CZK
1 EUR =     1.1 USD,     1.0 EUR,    4.34 PLN,    11.4 NOK,    24.5 CZK
1 PLN =   0.253 USD,   0.231 EUR,     1.0 PLN,    2.62 NOK,    5.65 CZK
1 NOK =  0.0964 USD,   0.088 EUR,   0.382 PLN,     1.0 NOK,    2.16 CZK
1 CZK =  0.0447 USD,  0.0408 EUR,   0.177 PLN,   0.463 NOK,     1.0 CZK

time elapsed: 4.82s


The last code example is mostly I/O bound, so we can improve it with multithreading. The simplest approach is to use one thread per parameter value (defined in `SYMBOLS`) , as follows:


In [6]:
!python _03_one_thread_per_item/one_thread_per_item.py


1 NOK =  0.0964 USD,   0.088 EUR,   0.382 PLN,     1.0 NOK,    2.16 CZK
1 USD =     1.0 USD,   0.912 EUR,    3.96 PLN,    10.4 NOK,    22.4 CZK
1 CZK =  0.0447 USD,  0.0408 EUR,   0.177 PLN,   0.463 NOK,     1.0 CZK
1 EUR =     1.1 USD,     1.0 EUR,    4.34 PLN,    11.4 NOK,    24.5 CZK
1 PLN =   0.253 USD,   0.231 EUR,     1.0 PLN,    2.62 NOK,    5.65 CZK

time elapsed: 1.42s


The last code example shows substantial improvement. But there are issues with it:

* We start a new thread for every parameter:

    * Thread initialization takes time and consume resources like memory or file descriptors. 
    * Our example input has a small number of items, but it's not a good idea to run an unbound number of threads that depend on the size of data input.

* The `fetch_rates()` function calls the built-in `print()` function:

    * `print()` has issues due to the way the standard output is buffered in Python, which may cause malformed output when multiple function calls interweave between threads.
    * `print()` is slow; so if used in many threads, it can lead to slow function executions that will erase any gains from multithreading.

* By delegating every function call to a separate thread, it's hard to control the rate of network calls:

    * External services enforce limits on the rate of requests from a single client. 
    * So it's a good idea to apply throttling to the rate of processing, to avoid being blacklisted by APIs for abusing their usage limits.

We can fix the 1st and 3rd issue with thread pools:

* Thread pools start a predefined number of threads that will consume the work items from a queue until it becomes empty. 

* When there is no more work to do, the threads will quit, so we'll be able to exit from the program. 

* The `Queue` class from the `queue` module can be used. It's a thread-safe FIFO queue implementation. 

The following is a modified versionof the last code that uses a thread pool:



In [7]:
!python _04_Using_thread_pool/thread_pool.py


1 EUR =     1.1 USD,     1.0 EUR,    4.34 PLN,    11.4 NOK,    24.5 CZK
1 PLN =   0.253 USD,   0.231 EUR,     1.0 PLN,    2.62 NOK,    5.65 CZK1 USD =     1.0 USD,   0.912 EUR,    3.96 PLN,    10.4 NOK,    22.4 CZK

1 CZK =  0.0447 USD,  0.0408 EUR,   0.177 PLN,   0.463 NOK,     1.0 CZK
1 NOK =  0.0964 USD,   0.088 EUR,   0.382 PLN,     1.0 NOK,    2.16 CZK

time elapsed: 1.15s


The modified function still has the 2nd issue unsolved: the malformed output when 2 threads attempt to print results at the same time. That can bee fixed by using 2-way queues:

* We use another queue responsible for collecting results from our workers. 

* Then the main threadprints the results from the 2nd queue.

The following is the modified code that uses 2-way queues:


In [8]:
!python _05_Using_2way_queues/two_way_queues.py


1 PLN =   0.253 USD,   0.231 EUR,     1.0 PLN,    2.62 NOK,    5.65 CZK
1 EUR =     1.1 USD,     1.0 EUR,    4.34 PLN,    11.4 NOK,    24.5 CZK
1 USD =     1.0 USD,   0.912 EUR,    3.96 PLN,    10.4 NOK,    22.4 CZK
1 NOK =  0.0964 USD,   0.088 EUR,   0.382 PLN,     1.0 NOK,    2.16 CZK
1 CZK =  0.0447 USD,  0.0408 EUR,   0.177 PLN,   0.463 NOK,     1.0 CZK

time elapsed: 1.08s


Unhandled exceptions in threads:

* If there's an exception in the thread making the request, that thread will exit immediately but won't crash the entire program.

* But the main thread will wait for all tasks to finish, so we end up in a situation where some of the worker threads crashed and the program will never exit.

* To avoid this, the worker threads should handle possible exceptions and make sure that all items from the queue are processed.

So in case there are exceptions in a worker thread: 

* We put an exception instance on the `results_queue` queue. 

* We mark the task as done, even if there was an error; so the main thread won't lock indefinitely.

* The main thread will inspect the results and reraise any exceptions found on the results queue. 

The following is the improved versions of the app, with the changes described above (We simulate exceptions by randomly changing the status code of a request to 500 (Internal Server Error).):


In [11]:
!python _06_errors_in_threads/error_handling.py


1 NOK =  0.0964 USD,   0.088 EUR,   0.382 PLN,     1.0 NOK,    2.16 CZK
Traceback (most recent call last):
  File "/home/work/Documents/Github/will-i-amv-books/Expert-Python-Programming-Fourth-Edition/Chapter_6/_06_errors_in_threads/error_handling.py", line 81, in <module>
    main()
  File "/home/work/Documents/Github/will-i-amv-books/Expert-Python-Programming-Fourth-Edition/Chapter_6/_06_errors_in_threads/error_handling.py", line 75, in main
    raise result
  File "/home/work/Documents/Github/will-i-amv-books/Expert-Python-Programming-Fourth-Edition/Chapter_6/_06_errors_in_threads/error_handling.py", line 49, in worker
    result = fetch_rates(item)
  File "/home/work/Documents/Github/will-i-amv-books/Expert-Python-Programming-Fourth-Edition/Chapter_6/_06_errors_in_threads/error_handling.py", line 27, in fetch_rates
    response.raise_for_status()
  File "/home/work/.cache/pypoetry/virtualenvs/expert-python-programming-fourth-edition-em7utuy--py3.10/lib/python3.10/site-packages/requ

The last modified code still can't handle potential rate limits imposed by external service providers:

* Many services (even paid ones) often do impose rate limits.

* When a service has rate limits, it starts returning responses indicating errors after a certain number of requests are made, surpassing the allocated quota. 

* Exception handling is not enough to properly handle rate limits, because services often count requests made beyond the limit, and if you go beyond the limit consistently, you never get back to the allocated quota.

So we'll apply throttling to our thread pool. The algorithm we'll use is a `token bucket`:

* There is a bucket with a predefined number of tokens.

* Each token corresponds to a single permission to process one item of work

* Each time the worker asks for one or more tokens, we do the following:
    1. We check how much time has passed since the last time we refilled the bucket.
    2. If the time difference allows for it, we refill the bucket with the number of tokens that correspond to the time difference
    3. If the number of stored tokens is bigger than or equal to the amount requested, we decrease the number of stored tokens and return that value
    4. If the number of stored tokens is less than requested, we return zero.

The following is a modified implementation of the last code example, that allows for throttling with the token bucket algorithm:


In [10]:
!python _07_Throttling/throttling.py


1 USD =     1.0 USD,   0.912 EUR,    3.96 PLN,    10.4 NOK,    22.4 CZK
1 EUR =     1.1 USD,     1.0 EUR,    4.34 PLN,    11.4 NOK,    24.5 CZK
1 PLN =   0.253 USD,   0.231 EUR,     1.0 PLN,    2.62 NOK,    5.65 CZK
1 NOK =  0.0964 USD,   0.088 EUR,   0.382 PLN,     1.0 NOK,    2.16 CZK
1 CZK =  0.0447 USD,  0.0408 EUR,   0.177 PLN,   0.463 NOK,     1.0 CZK

time elapsed: 0.97s


From the last exmple:

* The `main()` function uses the throttle instance of `Throttle`, that can be shared across threads since it's thread-safe.

* The `worker()` function receives the throttle instance as arg, and waits with every item until the throttle object releases a new token.


### Multiprocessing

Let's be honest, multithreading is challenging. Dealing with threads in a sane
and safe manner required a tremendous amount of code when compared to the
synchronous approach. We had to set up a thread pool and communication queues,
gracefully handle exceptions from threads, and also worry about thread safety when
trying to provide a rate limiting capability. Dozens of lines of code are needed just
to execute one function from some external library in parallel! And we rely on the
promise from the external package creator that their library is thread-safe. Sounds
like a high price for a solution that is practically applicable only for doing I/O-bound
tasks.

An alternative approach that allows you to achieve parallelism is multiprocessing.
Separate Python processes that do not constrain each other with the GIL allow for
better resource utilization. This is especially important for applications running on
multicore processors that are performing really CPU-intensive tasks. Right now,
this is the only built-in concurrent solution available for Python developers (using
CPython interpreter) that allows you to take benefit from multiple processor cores
in every situation.

The other advantage of using multiple processes over threads is the fact that they
do not share a memory context. Thus, it is harder to corrupt data and introduce
deadlocks or race conditions in your application. Not sharing the memory context
means that you need some additional effort to pass the data between separate
processes, but fortunately there are many good ways to implement reliable inter-
process communication. In fact, Python provides some primitives that make
communication between processes almost as easy as it is between threads.

The most basic way to start new processes in any programming language is usually
by forking the program at some point. On POSIX and POSIX-like systems (like
UNIX, macOS, and Linux), a fork is a system call that will create a new child process.
In Python it is exposed through the os.fork() function. The two processes continue
the program in their own right after the forking. Here is an example script that forks
itself exactly once: