# Concurrency in Python

In computer science, concurrency is about managing multiple tasks in a program, while parallelism is about actually executing them simultaneously.

Here's a breakdown of concurrency:

- Multiple tasks: A program is broken down into smaller tasks that can be run independently.
- Out-of-order execution: The order in which these tasks are completed isn't necessarily important. As long as the final outcome is correct, it doesn't matter if task B finishes before task A.
- Partial order: There might be some dependencies between tasks, requiring a certain level of order. Task B might need the results of task A before it can start.

There are several benefits to using concurrency:

- Improved responsiveness: If a program encounters a long-running task, concurrency allows it to continue working on other tasks while waiting. This makes the program feel more responsive to the user.
- Efficient resource utilization: By keeping the processor busy with multiple tasks, concurrency can improve overall performance.

However, concurrency also introduces challenges:

- Coordination: When multiple tasks are accessing shared resources, there's a risk of conflicts. Careful coordination is needed to ensure data consistency and prevent issues like deadlocks.
- Complexity: Reasoning about concurrent programs can be difficult. The out-of-order execution can lead to unexpected behavior if not managed properly.


![Deadlock 1](../data/images/deadlock.jpeg)

![Deadlock 2](../data/images/deadlock_2.png)

# Threading

Threads are very useful for maintaining multiple program flows running (quasi-)simultaneously. 
In Python, threads are real system threads and are managed by the operating system.

But CPython, the standard Python implementation, is not thread-safe, so the [Global Interperter Lock (**GIL**)](http://www.dabeaz.com/GIL) allows only one thread to execute at any given time. Therefore, the main benefit from threading is that one waiting job (I/O, sleep, waiting for user event) doesn't block other jobs from running. Or as the saying goes, threads are good for doing nothing: waiting mostly.


# Understanding the GIL for Biology Students

Imagine you have a lab with a single, very expensive microscope that everyone needs to use. The Global Interpreter Lock (GIL) in Python works similarly to how you might manage this shared resource.

## The Single-Microscope Lab Analogy

In your biology lab:

1. **The Microscope = Your Computer's Processor**: Just as there's only one high-powered microscope, Python's GIL only allows one thread to execute Python code at a time.

2. **Lab Researchers = Python Threads**: Multiple researchers (threads) want to use the microscope (processor) simultaneously to work on different samples.

3. **Lab Protocol = GIL Management**: You implement a rule where each researcher can only use the microscope for 5 minutes before they must let someone else have a turn, even if they're not finished with their work.

## How It Works in Practice

When a Python program with multiple threads runs:

1. **Taking Turns**: Just like biologists taking turns with the microscope, Python threads take turns executing code by acquiring and releasing the GIL.

2. **Waiting Room**: When one thread is "looking through the microscope" (holding the GIL), the other threads wait in line.

3. **Time-Sharing**: After a brief period, Python forces the active thread to release the GIL so another thread can have a turn, even if the first thread isn't finished.

## Two Types of Lab Work

In your biology lab, researchers do two types of work:

1. **Sample Analysis (CPU-bound)**: Looking at slides under the microscope, which requires active use of the microscope the entire time.

2. **Sample Preparation (I/O-bound)**: Preparing slides, retrieving samples from storage, or waiting for chemical reactions - tasks that don't require the microscope.

Similarly, Python tasks fall into two categories:

1. **CPU-bound tasks**: Calculations, data processing, and analysis that need active processor time.

2. **I/O-bound tasks**: Reading/writing files, network operations, or waiting for user input - tasks that involve waiting.

## Why Threading Still Helps Sometimes

Imagine a researcher who needs to look at a slide for 10 seconds, then wait 50 seconds for a chemical reaction, then look at it again. While they're waiting for the reaction, they don't need the microscope!

In Python:
- When a thread is waiting for I/O (like downloading data), it voluntarily releases the GIL
- Other threads can use the processor during this waiting time
- This is why threading helps with I/O-bound tasks, even with the GIL


## Simple example

A worker thread that counts from 1 to 10, waiting one second between numbers, but doesn't block the main thread that counts from 11 to 20 (also waiting). 

We use the [threading](https://docs.python.org/3.5/library/threading.html) module from the standard library.

In [None]:
import threading
import time

def task(start, end):
    for i in range(start, end):
        print(" {} ".format(i), end="", flush=True)
        time.sleep(1)

worker = threading.Thread(target=task, args=(1, 10))
worker.start()
task(11, 20)
worker.join()

### I/O bounded programs

When we do any kind of I/O, the GIL is released as soon as control is given to the OS or to lower-level C code. So threads are great for concurrency in I/O bounded programs, because as one thread waits for I/O, other threads can go on doing their jobs, as the GIL is released. This is true as long as I/O is not very quick and there are not too many concurrent jobs; if there are many concurrent short jobs, they will start a [GIL war](http://www.dabeaz.com/GIL/), which is bad for performence.

Let's start with a synchrounous program that reads books from the Gutenberg project and finds the most common word. Finding the most common word takes a while, but a lot less than reading the data from the web, so this is definately an I/O-bounded program.

In [None]:
from collections import Counter
import urllib.request
import time

We download and parse a [set of stop words](https://github.com/Alir3z4/stop-words/raw/25c6a0aea665871e887f155b883e950c3743ce50/english.txt) not to be included in the analysis:

For this we use `urlopen` which opens a remote URL as if it was a file, allowing us to read line-by-line. We then `decode` each line, as `urlopen` reads data as `bytes` rather than `str`, and `decode` wil decode those bytes to a string. 
We then use `strip` to remove whitespace. `words` is therefore a generator expression on the single lines in the URL, each a word; we thus consume the generator with the constructor of `frozenset`, an immutable set.

In [None]:
stop_words_url = 'https://github.com/Alir3z4/stop-words/raw/25c6a0aea665871e887f155b883e950c3743ce50/english.txt'

with urllib.request.urlopen(stop_words_url) as f:
    words = (line.decode().strip() for line in f)
    stop_words = frozenset(words)

print(list(stop_words)[:5])

We will read a bunch of books (see `names`) and parse them for the most common word (`most_common_word`).

First, create a tuple of the book names, and a dictionary that maps book names to book URLs.

In [None]:
names = (
    'Gulliver',
    'Alice in Wonderland',
    'Pride and prejudice',
    'Yellow wallpaper',
    'Metamorphosis',
    'A Tale of Two Cities',
    'The Importance of Being Earnest',
    'Frankenstein'
)
url_template = 'https://raw.githubusercontent.com/yoavram/Py4Eng/master/data/{}.txt'
urls = {
    name: urllib.parse.quote(url_template.format(name), safe=":/") 
    for name in names
}
print('Gulliver:', urls['Gulliver'])

The `most_cmmon_word` function accepts a `book_item` which is a tuple of `(name, url)`, opens the URL for reading, reads the "file" line-by-line, and for each line it decodes, strips, lowers, and splits. Therefore, each lines is converted to a list of lowercase words.
These words are then added to a `Counter` object, which is similar to a `set`, only it remembers **how many times** each element was added, and it allows to query about number of occurences and most common elements.

The function returns the name of the book, the most common word, and the count for the most common word, after zeroing the count for stop-words.

In [None]:
def most_common_word(book_item):
    name, url = book_item
    counter = Counter()
    with urllib.request.urlopen(url) as f:        
        for line in f:
            if not line:
                break
            line = line.decode().strip().lower().split()
            counter.update(line)
    for word in stop_words:
        counter[word] = 0
    word, count = counter.most_common(1)[0] # [0] gives only the first most common word
    return name, word, count

In [None]:
most_common_word(list(urls.items())[0])

Here is a simple [context manager](https://docs.python.org/3.5/library/contextlib.html) for measuring time (`%timeit` is less useful for concurrency):

In [None]:
from contextlib import contextmanager

@contextmanager
def tictoc():
    tic = time.time()
    yield
    toc = time.time()
    print("Elapsed time: {:.2f} seconds".format(toc - tic))

In [None]:
with tictoc():
    print("hey")

### Sequential run

We start by running the analysis in sequence using a single thread to get a baseline.

Note that a `map` applies a function to elements of an iterable using lazy evaluation

```python
results = map(most_common_word, urls.items())
```

very similar to the generator expression

```python
results = (most_common_word(item) for item in urls.items())
```

In [None]:
cube_map = map(lambda x: x**3, [1,2,3])
print(list(cube_map))

In [None]:
with tictoc():
    results = map(most_common_word, urls.items())
    for name, word, count in results:
        print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

### Multi-threading

To run a multi-threaded version of the above, we could use `threading` and create our threads etc., but there is a lot of boilerplate. This boilerplate can be handeled by a thread pool from the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) module.
The thread pool executor is created using a context manager, so that all the threads in the pool will be closed when we are done.
Using the executor is really easy if we already used the `map` pattern.

In [None]:
import concurrent.futures
n_workers = len(urls)

with tictoc():
    with concurrent.futures.ThreadPoolExecutor(n_workers) as executor:
        results = executor.map(most_common_word, urls.items())
        for name, word, count in results:
            print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Much better, almost 5-fold improvement in running time.

### What are the futures?

The module is called `concurrent.futures` and in the documentation you can read that you are actually creating `Future` objects. These are like promises - they represent computational tasks that will be completed; therefore, they allow for an asynchronous style of programming, as we can start a task, go on to do something else, and then either check if it finished, wait for it to finish, or a assign a callback to be called when it is finished.

In the above, the `Future`s were handled by the executor `map` function, which creates `Future`s and waits for them to finish working. Now we will use them directly:

In [None]:
with tictoc():
    with concurrent.futures.ThreadPoolExecutor(n_workers) as executor:
        futures = [
            executor.submit(most_common_word, item) 
            for item in urls.items()
        ]
        for future in concurrent.futures.as_completed(futures):
            name, word, count = future.result()
            print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Here, `as_completed` allows us to iterate over the futures as they are completed, i.e. in roughly the order they finished their tasks, rather the order they were created (which is the case in the previous example).

### CPU bounded program

In some cases running multiple threads actually helps even if we are in a CPU bounded scenario, because the OS may run these threads on separate cores, and **if the code that we use releases the GIL** in some way, then we can achieve "true multi-threading". Note: if the code doesn't release the GIL, we will get into a [GIL war](http://www.dabeaz.com/GIL/) and performance will suffer compared to a single-core single-thread program!

In the following example we calculate a hash of our books using the *very slow* function, `pbkdf2_hmac`. The [`haslib` docs](https://docs.python.org/3/library/hashlib.html) specify that if the data is larger than 2047 bit, **the GIL is released** (the computation is done in C, so the GIL can be explicitly released) and therefore if we use threads we will see an improvement on multi-core machines.

In [None]:
import hashlib
import binascii
import concurrent.futures
import time
import multiprocessing
multiprocessing.cpu_count()

Of course, if the number of CPUs is 1 (see last command in previous cell), we won't get any benefit from a multi-threading approach - on the contrary.

We start by reading books to memory so that I/O won't be an issue. 

In [None]:
names = (
    'Gulliver',
    'Alice in Wonderland',
    'Pride and prejudice',
    'Yellow wallpaper',
    'Metamorphosis',
    'The Importance of Being Earnest'
)
filenames = {
    name: '../data/books/{}.txt'.format(name) 
    for name in names
}
print('Gulliver:', filenames['Gulliver'])

def read_book(item):
    name, filename = item    
    with open(filename) as f:
        data = f.read()        
    return name, data

books = dict(read_book(item) for item in filenames.items())

`hash_book` is a **slow** function that takes an entire book and performs a specific [hash function](https://docs.python.org/3/library/hashlib.html#hashlib.pbkdf2_hmac) on it with multiple iterations.

In [None]:
def hash_book(item):
    name, data = item    
    # very slow function
    fingerprint = hashlib.pbkdf2_hmac('sha512', data.encode('utf8'), b'salt', 1000000)
    return name, binascii.hexlify(fingerprint).decode()

%timeit -n 3 hash_book(('Gulliver', books['Gulliver']))

### Sequential

Running in a single-thread mode - open your process monitor to see that only one core is used:

In [None]:
with tictoc():
    results = map(hash_book, books.items())
    for name, fingerprint in results:
        print('Fingerprint for {} is "{}"'.format(name, fingerprint))

### Multi-threaded

In multi-thread mode, you'll see that all the cores are used, at least on some OS (it is OS-dependent, and requires multiple cores):

In [None]:
with tictoc():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(
            hash_book, books.items()
        )
        for name, fingerprint in results:
            print('Fingerprint for {} is "{}"'.format(name, fingerprint))

The `concurrent.futures` module has another pool executor - a `ProcessPoolExecutor` that uses processes for the jobs. It's as easy to use as the `ThreadPoolExecutor`, but in this case no further improvement can be had by replacing `ThreadPoolExecutor` with `ProcessPoolExecutor`, at least on my machine.

## Multi-processing

From the [threading](https://docs.python.org/3/library/threading.html) module:
> CPython implementation detail: In CPython, due to the Global Interpreter Lock, **only one thread can execute Python code at once**... If you want your application to make better use of the **computational resources of multi-core machines**, you are advised to use `multiprocessing` or `concurrent.futures.ProcessPoolExecutor`.

The standard library module, [multiprocessing](https://docs.python.org/3/library/multiprocessing.html), provides low-level interfaces for the use of multiple processes. 

We will use [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) which provides a high-level API - a process pool. If you get a persistent error about broken processes, try to restart the kernel and possibly the notebook server, then debug without the executor (non-parallel) and when it works, re-insert the executor.

In [None]:
import concurrent.futures
import math

primes = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
    3399726899288419,
    1125828054422712,
    237397848077029,
]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

In [None]:
with tictoc():
    results = map(
        is_prime, primes
    )
    for n, p in zip(primes, results):
        print('{} is prime: {}'.format(n, p))

In [None]:
with tictoc():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(
            is_prime, primes
        )
        for n, p in zip(primes, results):
            print('{} is prime: {}'.format(n, p))

In [None]:
from multiprocessing import Pool
with tictoc():
    with Pool() as p:
        results = p.map(is_prime, primes)
    print(results)

## The "Multiprocessing" Solution:

If your lab really needs multiple microscopes running simultaneously, you'd buy more microscopes.

Similarly, Python's multiprocessing module creates multiple separate Python processes, each with its own GIL. This is like having multiple complete labs, each with its own microscope. It requires more resources but allows true parallel execution for CPU-intensive tasks.

## Why Does the GIL Exist?

The GIL exists primarily for memory safety. Just as lab protocols prevent sample contamination, the GIL prevents memory corruption in CPython's memory management system, making the code safer and more reliable.

## Practical Implications

When deciding between threading and multiprocessing in Python, ask yourself:

- **Am I mostly waiting?** (I/O-bound) → Use threading
- **Am I mostly calculating?** (CPU-bound) → Use multiprocessing

Or to continue our analogy:

- "Am I spending more time actively using the microscope, or am I spending more time waiting for other things to happen?"

![thread vs process](https://i.ytimg.com/vi/4rLW7zg21gI/maxresdefault.jpg)

## 🪐 Zorgian Research Lab I/O vs CPU Exercise
**Time**: ~5 minutes

**Background**:  
Zorgian scientists are optimizing their research processes using Earth's Python programming. They need to understand how different types of tasks affect concurrent processing.

**Task**:  
Implement and compare the performance of I/O-bound tasks (like reading research files) versus CPU-bound tasks (like analyzing genetic sequences) using both sequential and threaded approaches.



In [None]:
import time
import threading

def io_bound_task():
    # Simulating reading from a Zorgian data repository
    print("Reading Zorgian specimen data...")
    time.sleep(1)  # Simulates I/O wait time
    print("I/O task completed")

def cpu_bound_task():
    # Simulating Zorgian genetic sequence analysis
    print("Analyzing Zorgian DNA sequence...")
    result = 0
    for i in range(1000000):  # Computationally intensive task
        result += i
    print("CPU task completed")

# Sequential execution
def run_sequential():
    start = time.time()
    for _ in range(4):
        io_bound_task()
    for _ in range(4):
        cpu_bound_task()
    end = time.time()
    return end - start

# Threaded execution
def run_threaded():
    start = time.time()
    threads = []
    for _ in range(4):
        t = threading.Thread(target=io_bound_task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
        
    threads = []
    for _ in range(4):
        t = threading.Thread(target=cpu_bound_task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    end = time.time()
    return end - start

# Compare execution times
sequential_time = run_sequential()
print(f"Sequential execution: {sequential_time:.2f} seconds")

threaded_time = run_threaded()
print(f"Threaded execution: {threaded_time:.2f} seconds")

print(f"Speedup for I/O: Significant")
print(f"Speedup for CPU: Minimal due to GIL")



## 🪐 Zorgian Biological Sample Analysis
**Time**: ~5 minutes

**Background**:  
Zorgian biologists collected samples from various ecosystems and need to process them quickly to prevent degradation.

**Task**:  
Process multiple biological samples concurrently using thread pooling and compare with sequential processing.



In [None]:
import concurrent.futures
import time
import random

def process_zorgian_sample(sample_id):
    """Process a Zorgian biological sample"""
    print(f"Processing sample {sample_id}...")
    # Simulate varying processing times based on sample complexity
    processing_time = random.uniform(0.5, 1.5)
    time.sleep(processing_time)
    microbe_count = random.randint(1, 100)
    return f"Sample {sample_id}: {microbe_count} zorgian microbes detected"

# Samples collected from various Zorgian ecosystems
zorgian_samples = list(range(1, 11))  # 10 samples to process

# Sequential processing
def process_sequentially():
    start = time.time()
    results = []
    for sample in zorgian_samples:
        results.append(process_zorgian_sample(sample))
    end = time.time()
    return results, end - start

# Concurrent processing
def process_concurrently():
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(executor.map(process_zorgian_sample, zorgian_samples))
    end = time.time()
    return results, end - start

# Compare methods
seq_results, seq_time = process_sequentially()
conc_results, conc_time = process_concurrently()

print(f"Sequential processing: {seq_time:.2f} seconds")
print(f"Concurrent processing: {conc_time:.2f} seconds")
print(f"Speedup factor: {seq_time/conc_time:.2f}x")

# Print a few results
for i in range(3):
    print(seq_results[i])



## 🪐 Zorgian Laboratory Deadlock Detector
**Time**: ~5 minutes

**Background**:  
Two Zorgian researchers are working in the same laboratory and need to share limited equipment. If they don't coordinate properly, they might create a deadlock.

**Task**:  
Identify and fix the deadlock in the shared laboratory equipment access code.



In [None]:
import threading
import time

# Shared laboratory equipment
microscope = threading.Lock()
centrifuge = threading.Lock()

def zorgian_researcher_1():
    """First researcher's procedure"""
    print("Researcher 1 starting experiment...")
    with microscope:
        print("Researcher 1 acquired the microscope")
        time.sleep(0.1)  # Preparing sample
        print("Researcher 1 waiting for centrifuge")
        with centrifuge:
            print("Researcher 1 acquired the centrifuge")
            print("Researcher 1 completed their experiment")

def zorgian_researcher_2():
    """Second researcher's procedure"""
    print("Researcher 2 starting experiment...")
    with centrifuge:
        print("Researcher 2 acquired the centrifuge")
        time.sleep(0.1)  # Preparing sample
        print("Researcher 2 waiting for microscope")
        with microscope:
            print("Researcher 2 acquired the microscope")
            print("Researcher 2 completed their experiment")

# This will cause a deadlock
def run_with_deadlock():
    t1 = threading.Thread(target=zorgian_researcher_1)
    t2 = threading.Thread(target=zorgian_researcher_2)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()

# Solution: Fix the order of resource acquisition
def zorgian_researcher_2_fixed():
    """Second researcher's procedure with fixed resource order"""
    print("Researcher 2 starting experiment (fixed)...")
    with microscope:  # Acquire resources in the same order as researcher 1
        print("Researcher 2 acquired the microscope")
        time.sleep(0.1)  # Preparing sample
        with centrifuge:
            print("Researcher 2 acquired the centrifuge")
            print("Researcher 2 completed their experiment")

def run_fixed():
    t1 = threading.Thread(target=zorgian_researcher_1)
    t2 = threading.Thread(target=zorgian_researcher_2_fixed)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()

# Uncomment to demonstrate deadlock (caution: will hang)
# run_with_deadlock()

# Run the fixed version
print("Running with fixed resource ordering:")
run_fixed()



## 🪐 Zorgian Calendar API Simulation
**Time**: ~5 minutes

**Background**:  
The Zorgian Calendar Authority needs to process multiple calendar conversion requests simultaneously for interplanetary communications.

**Task**:  
Create a simulated Zorgian Calendar API that can handle multiple concurrent requests using threads.



In [None]:
import threading
import time
import random
import queue
from datetime import datetime

# Simulated request queue
request_queue = queue.Queue()

def zorgian_calendar_conversion(earth_date):
    """Convert Earth date to Zorgian date format"""
    # Simulating complex calculation
    time.sleep(random.uniform(0.2, 0.5))
    
    # Simple conversion algorithm (for demonstration)
    year = earth_date.year + 3761  # Base Zorgian year offset
    moon_cycle = (earth_date.month * 29 + earth_date.day) % 30
    zorg_time = earth_date.hour
    
    return f"Year {year}, Moon Cycle {moon_cycle}, Zorg {zorg_time}"

def process_requests(worker_id):
    """Worker thread to process calendar conversion requests"""
    while True:
        try:
            # Get a request with a timeout
            earth_date, request_id = request_queue.get(timeout=2)
            
            print(f"Worker {worker_id} processing request {request_id}...")
            zorgian_date = zorgian_calendar_conversion(earth_date)
            
            print(f"Worker {worker_id} completed request {request_id}: {earth_date} → {zorgian_date}")
            request_queue.task_done()
            
        except queue.Empty:
            print(f"Worker {worker_id} shutting down - no more requests")
            break

# Generate some sample requests
for i in range(10):
    # Generate random Earth dates
    year = random.randint(2000, 2025)
    month = random.randint(1, 12)
    day = random.randint(1, 28)
    hour = random.randint(0, 23)
    
    earth_date = datetime(year, month, day, hour)
    request_queue.put((earth_date, i+1))

# Create worker threads
workers = []
for i in range(4):  # 4 worker threads
    t = threading.Thread(target=process_requests, args=(i+1,))
    t.daemon = True  # Allow the program to exit even if threads are running
    workers.append(t)
    t.start()

# Wait for all requests to be processed
request_queue.join()
print("All calendar conversion requests completed!")
