# Using Concurrent.Futures
- Two major ways: thread versus process
    - Process helps with information isolation
        - For Python 3.13, this could be subinterpreter independence
        - Python 3.14 provides an avenue for this: InterpreterPoolExecutor

- Executor: just like C++, it execute calls asynchronously
    - Dont call directly, but with context manager
    - submit(fn, /, *args, **kwargs): returns a Future object
    - map(fn, *iterables, timeout=None, chunksize=1)
        - Iterables are consumed immediately
        - Fxn is executed asynchronously

        - Important: when using ProcessPoolExecutor, this method chunks the iterables evenly to the pool

- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
    - initializer is an optional callable that is called at the start of each worker process; initargs is a tuple of arguments passed to the initializer
    - max_tasks_per_child is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default max_tasks_per_child is None which means worker processes will live as long as the pool.

- InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None)
    - A subclass of ThreadPoolExecutor
    - Runs asynchronously using a pool of at most max_workers threads
    - Each thread runs tasks in its own interpreter, with its own runtime state and GIL
    - [IMPORTANT] Means that code run with this executor has true multi-core parallelism
    - submit() and map() passes info to workers through pickle serialization
    - Exceptions may vary

- Future Objects
    - Created by Executor.submit() calls; Future instances are created by Executor.submit()
    - Methods: cancel() -> returns T/F, cancelled(), running(), done(), result(timeout=None) -> TimeoutError, add_done_callback(fn)

## Module function
    - concurrent.futures.as_completed(fs, timeout=None)
        - Same as the asyncio one
        
    - concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
        - Three options for return_when: FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

from math import pow

import os 

In [None]:
# Default in Python 3.13
cpus_available = min(32, (os.cpu_count() if os.cpu_count() else 1) + 4)

print(cpus_available)

In [None]:
bases = [i for i in range(100)]
exponents = [i for i in range(100)]

kwargs_list = [{'base': x, 'exp': y} for x, y in zip(bases, exponents)]

In [None]:
with ThreadPoolExecutor(max_workers=cpus_available) as executor:
    future1 = executor.submit(pow, 32, 125)

    future2 = executor.map(pow, bases, exponents)

    future3 = [executor.submit(pow, kwargs['base'], kwargs['exp']) for kwargs in kwargs_list]


    print(future1.result())
    print(future2, list(future2)[:5])
    print(type(future3), type(future3[0]), [f.result() for f in future3][:5])

In [None]:
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

In [None]:
from sympy.ntheory.modular import crt  # For Chinese Remainder Theorem

def chinese_reminder(value, moduli):
    result, _ = crt(moduli, value % moduli)
    print(f"ChineseRemainderHandler: CRT({value} mod {moduli}) = {result}")
    return result

def power(base, exp):
    result = pow(base, exp)
    print(f"Computed {base}^{exp} = {result}")
    return result

def double(value):
    result = value * 2
    print(f"Doubled result: {result}")
    return result

def process_result(future, executor):
    result = future.result()
    print(f"Processing result: {result}")
    future2 = executor.submit(double, result)
    future2.add_done_callback(done_callback)  # Correctly register next callback

def done_callback(future):
    print(f"Result received: {future.result()}")

In [None]:
with ThreadPoolExecutor() as executor:
    future = executor.submit(power, 2, 3)
    future.add_done_callback(lambda f: process_result(f, executor))


In [None]:
from collections import deque

class BasicHandler:
    # Basic hain of Responsbility

    def __init__(self, strategy: str = 'FIFO', **kwargs):
        # **kwargs later
        self.strategy = strategy
        self.callbacks = deque()
        self.futures = {}
        self.counter = 0

    def add_fxn(self, fxn, *arg, **kwargs) -> bool | int:
        # *args and **kwargs are optionally loaded early
        try:
            if self.strategy == 'LIFO':
                self.callbacks.appendleft((fxn, args, kwargs))
            else:
                self.callbacks.append((fxn, args, kwargs))
        except:
            return False
        
        return True
    
    def submit(self, *args, **kwargs) -> int:
        ''' Run and store futures with idx, return idx
        
            Checks if self.futures idx in kwargs as 'FUTURE IDX': FUTURE as the order of invocation to be passed to the executor '''
        # Run the executor.submit(fxn, *args, **kwargs) with the option of passing future_idxs

        self.counter += 1
        return self.counter

    def get_result(self, all: bool = True, idx: int = None):
        return FUTURES

In [None]:
from collections import deque
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Future

class BasicHandler:
    """Basic Chain of Responsibility with ThreadPoolExecutor"""

    def __init__(self, strategy: str = 'FIFO', executor: str = 'ThreadPool', max_workers: int = None, **kwargs):
        self.strategy = strategy
        self.callbacks = deque()  # Stores function references and args
        self.futures = {}  # Maps indexes to Future objects
        self.counter = 0  # Unique ID for tracking futures

        max_workers = max_workers if max_workers else min(32, (os.cpu_count() if os.cpu_count() else 1) + 4)

        match executor:
            case 'ThreadPool':
                self.executor = ThreadPoolExecutor()  # Manages concurrency
            case _:
                self.executor = ProcessPoolExecutor(max_workers=max_workers)

    def add_fxn(self, fxn, *args, **kwargs) -> bool | int:
        """Adds a function to the queue with optional arguments."""
        try:
            task = (fxn, args, kwargs)
            if self.strategy == 'LIFO':
                self.callbacks.appendleft(task)
            else:
                self.callbacks.append(task)
            return True
        except Exception as e:
            print(f"Error adding function: {e}")
            return False

    def submit(self, *args, **kwargs) -> int:
        """Submits the next function in the queue to the executor.

        If kwargs contain {'FUTURE_IDX': idx}, it retrieves the result from `self.futures[idx]` as an argument.
        """
        if not self.callbacks:
            print("No functions to execute.")
            return -1

        # Retrieve function and arguments
        fxn, fxn_args, fxn_kwargs = self.callbacks.popleft()

        # Inject FUTURE result if index is provided
        future_idx = kwargs.pop('FUTURE_IDX', None)
        if future_idx is not None and future_idx in self.futures:
            fxn_args = (*fxn_args, self.futures[future_idx].result())  # Append previous future result

        # Submit the function to the executor
        future: Future = self.executor.submit(fxn, *fxn_args, **fxn_kwargs)
        self.counter += 1
        self.futures[self.counter] = future  # Store the future
        return self.counter  # Return the index of this future

    def get_result(self, all: bool = True, idx: int = None):
        """Fetches results of completed futures."""
        if all:
            return {key: future.result() for key, future in self.futures.items()}
        elif idx is not None and idx in self.futures:
            return self.futures[idx].result()
        else:
            return None

    def shutdown(self):
        """Shuts down the executor properly."""
        self.executor.shutdown(wait=True)

In [None]:
def power(base, exp):
    return pow(base, exp)

def double(value):
    return value * 2

# Initialize handler
handler = BasicHandler(strategy='FIFO')

# Add tasks
handler.add_fxn(power, 2, 3)
idx1 = handler.submit()  # Runs power(2,3)

handler.add_fxn(double)  # No args, waiting for result
idx2 = handler.submit(FUTURE_IDX=idx1)  # Uses result of power(2,3)

# Get results
print(handler.get_result(idx=idx1))  # 8
print(handler.get_result(idx=idx2))  # 16

# Cleanup
handler.shutdown()


### Proving Memory Isolation in ProcessPoolExecutor (LIFO)
- Processes copy the parent memory into their process space

In [None]:
var = 10

def power(base, exp):
    global var

    var = var ** 10
    print(f'{os.getpid()}: Power {var}')

    return pow(base, exp)

def double(value):
    global var

    var *= 2
    print(f'{os.getpid()}: Double {var}')

    return value * 2

# Initialize handler
handler = BasicHandler(strategy='LIFO', executor='ProcessPool')

# Add tasks
handler.add_fxn(double)  # No args, waiting for result
handler.add_fxn(power, 2, 3)

idx2 = handler.submit()  # Uses result of power(2,3)
idx1 = handler.submit(FUTURE_IDX=idx2)

# Get results
print('The following proves basic process isolation')
print(handler.get_result(idx=idx1))  # 8
print(handler.get_result(idx=idx2))  # 16

# Cleanup
handler.shutdown()

# Threading
- Locks, RLock, Condition, Semaphore, Event, Timer, Barrier

- threading.Lock
    - Methods
        1. acquire(blocking=True, timeout=-1): could block
        2. release(): if called on an unlocked lock, a RuntimeError is raised
        3. locked() -> returns boolean
    
- threading.RLock
    - A reentrant lock is a synchronization primitive that may be acquired multiple times by the same thread
    
    - Methods
        1. acquire(blocking=True, timeout=-1)
        2. release()

- threading.Condition(lock=None)
    - Always associated with a lock: either passed in or created by default (RLock)
    
    - The wait() method releases the lock, and then blocks until another thread awakens it by calling notify() or notify_all(). Once awakened, wait() re-acquires the lock and returns. 
        - The wait() can return after an arbitrary long time, and the condition which prompted the notify() call may no longer hold true
        - The wait_for() can be used to automate the condition checking 
    
    - Methods
        1. acquire(*args): acquire the underlying lock
        2. release()
        3. wait(timeout=None)
            - This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns.
        4. wait_for(predicate, timeout=None): predicate should be a callable with a boolean result
        5. notify_all()
        6. notify(n=1)
            - If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.

- threading.Semaphore(value=1)
    - A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().
    
    - Methods
        1. acquire(blocking=True, timeout=None)
        2. release(n=1)
            - The order in which threads are awoken should not be relied on.
    
    - threading.BoundedSemaphore(value=1)
        - If the value exceeds the initial value, it raises a ValueError
    
- threading.Event
    - Manages a flag/boolean
    
    - Methods
        1. is_set() -> returns bool
        2. set()
        3. clear() -> resets the internal flag
        4. wait(timeout=None) -> blocks until internal flag is set or the timeout comes

- threading.Barrier
    - Creates a wall for thread(s), release activates thread(s)

    - Methods/attributes
        1. wait(timeout=None) -> The return value is an integer in the range 0 to parties – 1, different for each thread.
        2. reset()
            - Return the barrier to the default, empty state. Any threads waiting on it will receive the BrokenBarrierError exception.
        3. parties -> The number of threads required to pass the barrier
        4. n_waiting -> The number of threads currently waiting in the barrier
        5. broken -> returns a boolean

- Concurrent: code that can be executed out of order
- Each process is in fact one instance of the Python interpreter that executes Python instructures (bytecode)

In [None]:
# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

########
## OR ##
########
# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

## Examples

### Future
    - Statistical simulation of atm queuing.

### Simple ATM

In [1]:
import time
import random
import threading
import re
from concurrent.futures import ThreadPoolExecutor

class Account:
    reentrant_lock = threading.RLock()
    
    @staticmethod
    def check_account_number(account_number) -> bool:
        return bool(re.fullmatch(r'\d{10}', account_number))
    
    def __init__(self, account_number: str, balance: int = 0):
        if self.check_account_number(account_number):
            self.__account_number = account_number
        else:
            raise ValueError(f'{account_number} is not formatted correctly')
        
        self.__balance = max(balance, 0)
        
    @property
    def account_number(self):
        return self.__account_number
        
    @property
    def balance(self):
        with Account.reentrant_lock:
            return self.__balance
        
    @balance.setter
    def balance(self, new_amount: int):
        with Account.reentrant_lock:
            if new_amount >= 0:
                self.__balance = new_amount
            else:
                raise ValueError("Balance cannot be negative")

class AtmBackend:
    locks = [threading.Lock() for _ in range(2)]
    account_numbers = {}
    
    @classmethod
    def add_account_number(cls, account_number: str):
        cls.account_numbers[account_number] = cls.account_numbers.get(account_number, 0) + 1
        
    @classmethod
    def acquire_lock(cls, timeout: float = 5.5):
        """Try acquiring any available lock, return the lock object if successful."""
        for lock in cls.locks:
            #acquired = lock.acquire(timeout=timeout)
            if lock.acquire(timeout=timeout):
                return lock
        return None  # No lock available
    
    def __init__(self, number: int):
        self.__number = number
        
    @property
    def number(self):
        return str(self.__number)

    def deposit(self, account: Account, amount: int, timeout: float = 2.5) -> bool:
        self.add_account_number(account.account_number)
        response = False

        if amount > 0 and (lock := self.acquire_lock(timeout=timeout)):
            try:
                with Account.reentrant_lock:  # Ensure proper locking order
                    if amount > 0:
                        account.balance += amount
                        response = True
            finally:
                lock.release()  # Ensure lock is always released

        return response
    
    def withdraw(self, account: Account, amount: int, timeout: float = 2.5) -> bool:
        self.add_account_number(account.account_number)
        response = False

        if amount > 0 and (lock := self.acquire_lock(timeout=timeout)):
            try:
                with Account.reentrant_lock:  # Ensure proper locking order
                    if account.balance >= amount:
                        account.balance -= amount
                        response = True
            finally:
                lock.release()  # Ensure lock is always released

        return response

# Thread Synchronization
atm_semaphore = threading.Semaphore(2)  # Two ATMs available
emergency_stop = threading.Event()

# Dynamic barrier to match the number of transactions
transaction_barrier = threading.Barrier(1) #transaction_count)  

def atm_session(transaction_type, amount, account, atm):
    actions = []

    with atm_semaphore:
        if transaction_type == "withdraw":
            success = atm.withdraw(account, amount)
        elif transaction_type == "deposit":
            success = atm.deposit(account, amount)
        else:
            success = False
        
        print(f"{transaction_type.capitalize()} {amount}: {'Success' if success else 'Failed'}")
        actions.append((threading.current_thread().name, atm.number, account.account_number, amount, transaction_type, success))
        # Synchronization barrier: ensures all transactions reach this point before proceeding
        try:
            transaction_barrier.wait()
        except threading.BrokenBarrierError:
            pass  # Handle barrier error safely

    return actions

# Sample Execution
if __name__ == "__main__":
    atm = AtmBackend(1)
    account = Account("1234567890", 1000)
    
    transactions = [
        ("withdraw", 200, account, atm),
        ("deposit", 500, account, atm),
        ("withdraw", 300, account, atm),
        ("withdraw", 100, account, atm),
        ("deposit", 400, account, atm),
    ]
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(atm_session, *args) for args in transactions]

    for f in futures:
        print(f.result())


Withdraw 200: Success
Deposit 500: Success
Withdraw 300: Success
Withdraw 100: Success
Deposit 400: Success
[('ThreadPoolExecutor-0_0', '1', '1234567890', 200, 'withdraw', True)]
[('ThreadPoolExecutor-0_1', '1', '1234567890', 500, 'deposit', True)]
[('ThreadPoolExecutor-0_1', '1', '1234567890', 300, 'withdraw', True)]
[('ThreadPoolExecutor-0_0', '1', '1234567890', 100, 'withdraw', True)]
[('ThreadPoolExecutor-0_0', '1', '1234567890', 400, 'deposit', True)]
