In [1]:
# 1. **Thread-safe counter**

import threading


class SafeCounter:
    """Increment from multiple threads safely using a Lock."""

    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def inc(self, n=1):
        with self._lock:
            self._value += n

    @property
    def value(self):
        with self._lock:
            return self._value


# Usage and test
cnt = SafeCounter()


def worker():
    for _ in range(1000):
        cnt.inc()


threads = [threading.Thread(target=worker) for _ in range(5)]
[t.start() for t in threads]
[t.join() for t in threads]

assert cnt.value == 5000
print("Test passed: counter =", cnt.value)

Test passed: counter = 5000


In [2]:
# 2. **Order-preserving ThreadPool**
from concurrent.futures import ThreadPoolExecutor


def map_threaded(fn, xs, max_workers=4):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks, collect futures in order
        futures = [executor.submit(fn, x) for x in xs]
        # Wait and get results in original order
        return [f.result() for f in futures]

# Test


def square(x): return x * x


assert map_threaded(square, [1, 2, 3]) == [1, 4, 9]

3. **Producer/consumer with Queue** refer to producer_consumer_queue.py


4. **Process pool for CPU-bound work**
refer to `process_pool.py` file

5. **Async I/O with semaphore (limit concurrency)** refer to semaphore.py


6. **Cancellation & cleanup** refer to asyncio_cancellation.py

This code is used to handle task cancellation safely in async code. It ensures that when a coroutine is cancelled, any necessary cleanup (like closing files, releasing resources) still runs. This prevents resource leaks and keeps your program stable and responsive.

7. **gather with return\_exceptions** refer to python asyncio_gather.py

asyncio.gather(..., return_exceptions=True) behaves — it returns a list of results where exceptions are included as exception instances instead of raising them.

expected result: `[1, RuntimeError('x')]`




8. **Timeouts** refer to timeouts.py

This code runs an asynchronous function `maybe_slow(dt)` which just waits (sleeps) for `dt` seconds and then returns `dt`.

The `with_timeout(dt, timeout)` function runs `maybe_slow(dt)` but enforces a timeout: if `maybe_slow` takes longer than timeout seconds, it raises an asyncio.TimeoutError.

It's used to limit how long an async operation can take, useful for avoiding hanging or slow tasks.


9. **Mixing threads with async (offload CPU)** refer to asyncio_threads.py

* The function cpu_heavy(n) calculates the sum of squares from 0 up to n-1.

* The async function compute_off_thread(n) runs this CPU-heavy calculation in a separate thread using asyncio.to_thread, so it doesn't block the async event loop.

* This allows other async tasks to run concurrently without waiting for the heavy computation to finish.

* When you run asyncio.run(compute_off_thread(10_000)), it returns the sum of squares from 0 to 9999.

* This value matches sum(i*i for i in range(10_000)), which is 333283335000.


10. **Ordered concurrent map with configurable backend** refer to concurrent_map.py


11. **Async rate-limited fetcher** refer to asyncio_rate_limited.py

* Rate limiting: When interacting with APIs or services that limit the number of requests per second, this function helps avoid exceeding those limits.

* Prevent overload: It controls the speed of processing to prevent overwhelming downstream systems or resources.

* Smooth pacing: Instead of bursting many operations at once, it spaces them evenly over time, improving stability and reliability in asynchronous workflows.


12. **Async pipeline: parse → transform → aggregate** refer to python asyncio_pipeline.py
