In [1]:
%load_ext autoreload
%autoreload 2

Import this library to nest asynchronous operations within Jupyter notebooks
> Jupyter notebooks are asynchronous by nature!

In [2]:
import nest_asyncio
nest_asyncio.apply()

## 1. The difference between a synchronous and asynchronous function
Synchronous functions are "thread-blocking". This means that no other tasks can commence unless this task is completed! 

In contrast, asynchronous functions are non-thread blocking. We allow for other tasks to be run concurrently with the current running tasks.

Let's mock up a list of "documents" for processing to simulate an I/O intensive task.

In [3]:
docs = [f"doc {i}" for i in range(5)]

### The "normal" synchronous method

In [4]:
import time
from tqdm.notebook import tqdm

start_time = time.time()

def process_document(document_name: str) -> str:
    time.sleep(5) # simulating a blocking task    
    return f"{document_name} processed."

for doc in tqdm(docs):
    print(process_document(doc))

end_time = time.time()
print(f"Total time taken: {end_time - start_time} seconds")

  0%|          | 0/5 [00:00<?, ?it/s]

doc 0 processed.
doc 1 processed.
doc 2 processed.
doc 3 processed.
doc 4 processed.
Total time taken: 25.035959005355835 seconds


### The asynchronous approach

#### Step 1: Define the method to be an asynchronous function

In [5]:
import asyncio

async def aprocess_document(document_name: str) -> str:
    await asyncio.sleep(5) # simulating a blocking task    
    return f"{document_name} processed."

#### Step 2: Invoking the asynchronous function

Do not do this!

In [6]:
start_time = time.time()

for doc in tqdm(docs):
    result = await aprocess_document(doc)

end_time = time.time()
print(f"Total time taken: {end_time - start_time} seconds")

  0%|          | 0/5 [00:00<?, ?it/s]

Total time taken: 25.01995015144348 seconds


The above code block still runs the functions sequentially (not concurrently) which is why there's no speed improvement - just that the function invocations are not thread blocking, which means that other funtions can run at the same time.

Instead we invoke this way:

In [7]:
start_time = time.time()
results = await asyncio.gather(
    *[aprocess_document(doc) for doc in docs]
)
end_time = time.time()
print(f"Total time taken: {end_time - start_time} seconds")

Total time taken: 5.002790212631226 seconds


In [8]:
results

['doc 0 processed.',
 'doc 1 processed.',
 'doc 2 processed.',
 'doc 3 processed.',
 'doc 4 processed.']

Wow what a bump! But why!

asyncio *starts* all the tasks at the same time. Then gathers the results once the tasks completes. Since all tasks are running concurrently, the time taken to finish is faster.

### So why don't we make everything `async`?
Let's just always not block the thread?

#### 1. Easier to debug tracebacks of synchronous functions

Consider the function below. We expect the function to always blow up when n = 2 regardless of whether the call is synchronous or asynchronous:

In [None]:
def fetch_doc(n):
    time.sleep(0.1 * n) # simulate I/O
    if n == 2:
        raise ValueError(f"Bad doc {n}")
    return f"doc {n}"

for i in range (5):
    print(fetch_doc(i))

doc 0
doc 1


ValueError: Bad doc 2

This traceback is very simple to debug. Now consider the next one

In [10]:
async def afetch_doc(n):
    await asyncio.sleep(0.1 * n) # simulate I/O
    if n == 2:
        raise ValueError(f"Bad doc {n}")
    return f"doc {n}"

tasks = [asyncio.create_task(afetch_doc(i)) for i in range(5)]
results = await asyncio.gather(*tasks)

ValueError: Bad doc 2

The traceback is much longer even for just a simpler function! Imagine a very complicated function with many different await calls!

##### Why do async functions have complex tracebacks then?
Because multiple functions are running concurrently. The traceback *splits* because the event loop resumes tasks at different points!
> The event loop is like a "traffic controller". It manages all async tasks.

#### 2. Coding overhead
You need to remember to await your async functions and you must have a very clear mental model of what is happening because things get really messy really quickly with concurrency. Consider the following code block

In [11]:
def check_if_doc_exists(doc_name: str) -> bool:
    time.sleep(5) # simulating a DB call
    return doc_name in docs

def read_document(doc_name: str) -> str:
    if check_if_doc_exists(doc_name):
        time.sleep(5)
        return f"{doc_name} exists. Content: Lorem ipsum..."

print(read_document("doc 1"))

doc 1 exists. Content: Lorem ipsum...


In [12]:
async def acheck_if_doc_exists(doc_name: str) -> bool:
    await asyncio.sleep(5) # simulating a DB call
    return doc_name in docs

async def aread_document(doc_name: str) -> str:
    if await acheck_if_doc_exists(doc_name): # notice the await here
        await asyncio.sleep(5)
        return f"{doc_name} exists. Content: Lorem ipsum..."

print(await aread_document("doc 1"))

doc 1 exists. Content: Lorem ipsum...


By the way...why "await"?

In [13]:
print(aread_document("doc 1"))

<coroutine object aread_document at 0x119720700>


  print(aread_document("doc 1"))


The object returned from an asynchronous function is known as a coroutine - a task in progress. This task only resolves once you "await" it. i.e. adding "await" means we wait till the task is complete and thus get the results.

## 2. The difference between threading and concurrency
So far we've made everything work concurrently. But this is actually all happening within the same thread! Simply put, a thread in computer science is exactly like it sounds in social media. 

On Twitter, you have a thread that talks about subject A and another about B. In computer science, you have a thread working on subject A and can have another thread working on any other subject (including subject A as well). Imagine having multiple threads that are working on concurrent tasks!

And Python has a library to do this!

In [14]:
import threading

In [15]:
def process_info_in_threads(document_name: str, thread_name: str):
    thread_id = threading.current_thread().ident
    print(f"\n{thread_name} running on thread ID: {thread_id}")
    ## Process document ##
    result = process_document(document_name)
    print(f"\n{thread_name} finished on thread ID: {thread_id}")
    return result
    
start_time = time.time()

threads = []
for i, doc in tqdm(enumerate(doc)):
    working_thread = threading.Thread(target=process_info_in_threads, args=(doc,f"Worker {i}",))
    threads.append(working_thread)
    working_thread.start()

# Wait for all threads
for t in threads:
    t.join()

end_time = time.time()
print(f"Total time taken: {end_time - start_time} seconds")

0it [00:00, ?it/s]


Worker 0 running on thread ID: 6262042624
Worker 1 running on thread ID: 6278868992


Worker 2 running on thread ID: 6295695360

Worker 3 running on thread ID: 13035925504

Worker 4 running on thread ID: 13052751872

Worker 0 finished on thread ID: 6262042624
Worker 2 finished on thread ID: 6295695360

Worker 1 finished on thread ID: 6278868992

Worker 3 finished on thread ID: 13035925504

Worker 4 finished on thread ID: 13052751872

Total time taken: 5.010778903961182 seconds


In [16]:
threads

[<Thread(Thread-5 (process_info_in_threads), stopped 6262042624)>,
 <Thread(Thread-6 (process_info_in_threads), stopped 6278868992)>,
 <Thread(Thread-7 (process_info_in_threads), stopped 6295695360)>,
 <Thread(Thread-8 (process_info_in_threads), stopped 13035925504)>,
 <Thread(Thread-9 (process_info_in_threads), stopped 13052751872)>]

But this reads exactly like the asyncio code doesn't it? The timing is essentially the same!

##### Now let's see something dangerous
Let's run the exact same block of code again

In [17]:
start_time = time.time()

threads = []
for i, doc in tqdm(enumerate(doc)):
    working_thread = threading.Thread(target=process_info_in_threads, args=(doc,f"Worker {i}",))
    threads.append(working_thread)
    working_thread.start()

# Wait for all threads
for t in threads:
    t.join()

end_time = time.time()
print(f"Total time taken: {end_time - start_time} seconds")

0it [00:00, ?it/s]


Worker 0 running on thread ID: 6262042624

Worker 0 finished on thread ID: 6262042624
Total time taken: 5.0101158618927 seconds


Wait what...why is there just 1 worker now?!

This is known as a *race condition*. `print()` is not thread-safe in python. This means that when multiple threads try to print at the same time their output could get lost!

Let's fix it properly

In [18]:
print_lock = threading.Lock()

def fix_process_info_in_threads(document_name: str, thread_name: str):
    thread_id = threading.current_thread().ident
    
    # Thread-safe printing
    with print_lock:
        print(f"{thread_name} running on thread ID: {thread_id}")
    
    # Process document
    result = process_document(document_name)
    
    # Thread-safe printing
    with print_lock:
        print(f"{thread_name} finished on thread ID: {thread_id}")
    
    return result

In [19]:
start_time = time.time()

threads = []
for i, doc in enumerate(docs):  # Fixed: was tqdm(enumerate(doc))
    working_thread = threading.Thread(
        target=fix_process_info_in_threads, 
        args=(doc, f"Worker {i}")
    )
    threads.append(working_thread)
    working_thread.start()

# Wait for all threads
for t in threads:
    t.join()

end_time = time.time()
print(f"Total time taken: {end_time - start_time:.2f} seconds")

Worker 0 running on thread ID: 6262042624
Worker 1 running on thread ID: 6278868992
Worker 2 running on thread ID: 6295695360
Worker 3 running on thread ID: 13035925504
Worker 4 running on thread ID: 13052751872
Worker 0 finished on thread ID: 6262042624
Worker 1 finished on thread ID: 6278868992
Worker 2 finished on thread ID: 6295695360
Worker 3 finished on thread ID: 13035925504
Worker 4 finished on thread ID: 13052751872
Total time taken: 5.01 seconds


Race conditions happen in `asyncio` too! Tasks "race" each other to the finish line. So the asyncio has its own `lock` capability as well

## 3. The difference between concurrency, threading and parallelism

I hate to burst your bubble, but Python's threads are not truly parallel because threads don't run at the exact same time - they're just *started at the same time*. This is by design. Python's Global Interpreter Lock (GIL) means that the Python interpreter is only accessible by a single thread at any point of time!

### The Global Interpreter Lock (GIL)
Python's GIL is a mutex that prevents multiple native threads from executing Python bytecode simultaneously. This means:

- Only one thread can execute Python code at a time
- Threads take turns getting access to the Python interpreter
- For CPU-intensive tasks, threading often provides no performance benefit and can even be slower due to context switching overhead

> When you're waiting for something external (network, disk, database), your CPU is just sitting idle. Asyncio lets your single thread work on other tasks while waiting.

So how do we get true parallelism where everything is happening at the same time? Use a `ProcessPoolExecutor`

### Where Threading and Multiprocessing Fit In
1. asyncio (Single Thread)

- Best for: I/O-bound tasks (web requests, file operations, database calls)
- How it works: Cooperative multitasking - tasks voluntarily yield control
- GIL impact: No impact because it's single-threaded anyway

2. threading (Multiple Threads, GIL Limited)

- Best for: I/O-bound tasks when you can't use asyncio
- How it works: Preemptive multitasking - OS switches between threads
- GIL impact: Only one thread executes Python code at a time, but threads release GIL during I/O waits

3. multiprocessing (Multiple Processes)

- Best for: CPU-bound tasks (heavy calculations, data processing)
- How it works: Separate Python interpreters, each with its own GIL
- GIL impact: None - each process has its own GIL

> In general, use `asyncio` as much as you can instead of `threading` because Python's GIL means that they work the same - unless the framework does not have an async/await methods (i.e fully synchronous libraries like numpy, pandas, etc.).

But there's a slight problem with parallelism in Jupyter Notebooks

In [20]:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_intensive_work(n):
    """Heavy computation"""
    total = 0
    for i in range(n * 1_000_000):
        total += i ** 2
    return total

def io_intensive_work(delay):
    """I/O simulation"""
    time.sleep(delay)
    return f"Slept for {delay} seconds"

# Test data
cpu_tasks = [100, 200, 300, 400]
io_tasks = [1, 1, 1, 1]

# CPU-bound work comparison:
start = time.time()
with ThreadPoolExecutor() as executor:
    results = list(executor.map(cpu_intensive_work, cpu_tasks))
print(f"ThreadPoolExecutor (CPU): {time.time() - start:.2f}s")  # ~8s (no speedup)

start = time.time()  
with ProcessPoolExecutor() as executor:
    results = list(executor.map(cpu_intensive_work, cpu_tasks))
print(f"ProcessPoolExecutor (CPU): {time.time() - start:.2f}s")  # ~2s (4x speedup!)

# I/O-bound work comparison:
start = time.time()
with ThreadPoolExecutor() as executor:
    results = list(executor.map(io_intensive_work, io_tasks))
print(f"ThreadPoolExecutor (I/O): {time.time() - start:.2f}s")  # ~1s (good speedup)

start = time.time()
with ProcessPoolExecutor() as executor:
    results = list(executor.map(io_intensive_work, io_tasks))
print(f"ProcessPoolExecutor (I/O): {time.time() - start:.2f}s")  # ~1s (same, but more overhead)

ThreadPoolExecutor (CPU): 47.65s


Process SpawnProcess-1:
Traceback (most recent call last):
Process SpawnProcess-2:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/py313/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/opt/anaconda3/envs/py313/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/py313/lib/python3.13/concurrent/futures/process.py", line 242, in _process_worker
    call_item = call_queue.get(block=True)
  File "/opt/anaconda3/envs/py313/lib/python3.13/multiprocessing/queues.py", line 120, in get
    return _ForkingPickler.loads(res)
           ~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get attribute 'cpu_intensive_work' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Process SpawnProcess-3:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/py313/lib/python3.13/multip

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Multiprocessing needs to pickle (serialize) your function to send it to other processes. Jupyter notebooks has issues with this:

- Jupyter thinks your function is in "__main__" 
- But new processes can't find "__main__" from a notebook
```
def cpu_intensive_work(n):  # This can't be pickled properly
    # ...
```

So let's just save the functions in a .py file and import them

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from worker_functions import cpu_intensive_work, io_intensive_work

# --------------------------
# CPU-bound benchmarks
# --------------------------

def test_threadpool_cpu():
    cpu_tasks = [100, 200, 300, 400]
    start = time.time()
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(cpu_intensive_work, cpu_tasks))
    print(f"ThreadPoolExecutor (CPU): {time.time() - start:.2f}s")


def test_processpool_cpu():
    cpu_tasks = [100, 200, 300, 400]
    start = time.time()
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(cpu_intensive_work, cpu_tasks))
    print(f"ProcessPoolExecutor (CPU): {time.time() - start:.2f}s")


async def async_cpu_test():
    cpu_tasks = [100, 200, 300, 400]
    loop = asyncio.get_running_loop()
    start = time.time()
    with ProcessPoolExecutor() as executor:
        results = await asyncio.gather(*[
            loop.run_in_executor(executor, cpu_intensive_work, task)
            for task in cpu_tasks
        ])
    print(f"Asyncio + ProcessPool (CPU): {time.time() - start:.2f}s")


# --------------------------
# I/O-bound benchmarks
# --------------------------

def test_threadpool_io(n_tasks=10, delay=1, max_workers=10):
    start = time.time()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(io_intensive_work, [delay] * n_tasks))
    print(f"ThreadPool (I/O, {n_tasks} tasks, {max_workers} workers): {time.time() - start:.2f}s")

async def test_asyncio_io(n_tasks=10, delay=1):
    start = time.time()
    results = await asyncio.gather(*[asyncio.sleep(delay) for _ in range(n_tasks)])
    print(f"Asyncio (I/O, {n_tasks} tasks): {time.time() - start:.2f}s")

def test_processpool_io(n_tasks=10, delay=1, max_workers=10):
    start = time.time()
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(io_intensive_work, [delay] * n_tasks))
    print(f"ProcessPool (I/O, {n_tasks} tasks, {max_workers} workers): {time.time() - start:.2f}s")

In [34]:
print("=== CPU-bound ===")
test_threadpool_cpu()
test_processpool_cpu()
await async_cpu_test()

=== CPU-bound ===
ThreadPoolExecutor (CPU): 46.35s
ProcessPoolExecutor (CPU): 20.14s
Asyncio + ProcessPool (CPU): 19.89s


In [None]:
print("\n=== I/O-bound ===")
test_threadpool_io(n_tasks=10, delay=1, max_workers=5)
test_threadpool_io(n_tasks=10, delay=1, max_workers=20)
await test_asyncio_io(n_tasks=10, delay=1)
test_processpool_io(n_tasks=10, delay=1, max_workers=20)


=== I/O-bound ===
ThreadPool (I/O, 10 tasks, 5 workers): 2.01s
ThreadPool (I/O, 10 tasks, 20 workers): 1.01s
Asyncio (I/O, 10 tasks): 1.00s
ProcessPool (I/O, 10 tasks, 20 workers): 1.10s


In [None]:
print("\n=== I/O-bound (scaled) ===")
test_threadpool_io(n_tasks=100, delay=1, max_workers=10)
test_threadpool_io(n_tasks=100, delay=1, max_workers=100)
test_processpool_io(n_tasks=100, delay=1, max_workers=100)
await test_asyncio_io(n_tasks=100, delay=1)


=== I/O-bound (scaled) ===
ThreadPool (I/O, 100 tasks, 10 workers): 10.04s
ThreadPool (I/O, 100 tasks, 100 workers): 1.02s
ProcessPool (I/O, 100 tasks, 100 workers): 1.45s
Asyncio (I/O, 100 tasks): 1.00s


# TLDR

##### I/O-bound tasks
- It doesn’t matter whether you use threads, processes, or asyncio — performance is basically the same.
- The “win” comes from concurrency: letting the program do something else while waiting on I/O.
- That’s why ThreadPool, asyncio, and asyncio+ThreadPool all landed around ~1 second.

##### CPU-bound tasks
- Threads don’t help, because of the Global Interpreter Lock (GIL) — they still run one at a time, so performance was ~45s.
- Processes (ProcessPoolExecutor or asyncio+ProcessPool) bypass the GIL by running in separate Python interpreters, so they run in parallel across CPU cores, giving you ~19–20s.
- Asyncio itself doesn’t provide speedup here, it just gives you nicer orchestration around using a ProcessPool.


## An analogy

### Asynchronous
- Definition: A programming style where tasks can be paused and resumed, so one task doesn’t block others from starting.
- Analogy: Ordering food at a restaurant without waiting for the guy in front of you to get his food. 

### Concurrency
- Definition: The ability to deal with many tasks at once by interleaving progress. Doesn’t necessarily mean they’re literally executing at the same instant.
- Analogy: I’m breathing, blinking, thinking and coding at work. But I’m still one dude.

### Parallelism
- Definition: Tasks literally run at the same time, usually on multiple CPU cores or machines.
- Analogy: My team and I are working at the same time (during office hours) and are building different features to the same app.