# Async programming in python with asyncio

## Table of Contents
- Threading Vs Multiprocessing Vs Asyncio
- Understanding `async` and `await`
- Profiling
- Asyncio patterns
  - `asyncio.gather` is enough for most cases
  - get response as soon as they complete
  - cancel tasks while executing
  - prevent bubbling up async/await to parent functions
  - asyncio cheat sheet
- Excercise
  - Optimize the following code snippets
    - Executing Steps and logging to server
    - Performing multi stage async calls
    - Concurrency with caching
- Side effects
  - facing rate limit errors from recipient
    - to avoid
      - use semaphores to limit requests
      - use batch requests to reduce request count
      - use debouncing and throttling
    - to handle
      - use a retry mechanism
- resources

# Threading vs Multiprocessing vs Asyncio 

[Understanding Process, Threads and Context Switching](https://dev.to/coderatul/threading-vs-asyncio-vs-multiprocessing-10ed)

- CPU Bound (parallelism)
  - multiprocessing
    - distributes tasks to cores
    - starting and stopping process is expensive

- IO Bound (pseudo parallelism)
  - threading
    - starting and stopping threads is expensive
    - low control over context switching
    - [prone to race conditions](https://dev.to/coderatul/race-condition-in-pythons-threading-3o4g)
  - asyncio
    - high control over context switching
  
- [Example Code](https://stackoverflow.com/a/74327302)

> **Tip** : Always prefer asyncio over threading for IO bound tasks in python

# Understanding async and await

`async` will mark the function as coroutine

`await` will be used on coroutine for context-switching


In [8]:
import asyncio

async def aget_data()->str:
    print("Fetching data...")
    await asyncio.sleep(1)
    print("Fetched data")
    return "data"

aget_data() # gives back coroutine instead of executing
# await aget_data()

<coroutine object aget_data at 0x00000188250F7C40>

# Profiling

Premature optimization is the root of all evil. If it feels your application in slow, picking up a random module and optimizing it might not be the best idea.

You need to know which part of your code causes the major time lag and fix it.
Profiling helps you to understand your code better

In python, profiling comes for free as builtins
- `cProfile` and `pstats` will help profile your code


In [51]:
from cProfile import Profile
from pstats import Stats
import time

def util():
    time.sleep(0.1)

def my_expensive_code():
    for _ in range(30):
        util()

with Profile() as profile:
    my_expensive_code()
stats = Stats(profile)

In [52]:
stats.sort_stats("cumulative").print_stats()

         63 function calls in 3.019 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    3.019    3.019 C:\Users\vignesh.arivazhagan\AppData\Local\Temp\ipykernel_5368\3965047745.py:8(my_expensive_code)
       30    0.000    0.000    3.019    0.101 C:\Users\vignesh.arivazhagan\AppData\Local\Temp\ipykernel_5368\3965047745.py:5(util)
       30    3.018    0.101    3.018    0.101 {built-in method time.sleep}
        1    0.000    0.000    0.000    0.000 C:\Users\vignesh.arivazhagan\AppData\Local\Programs\Python\Python311\Lib\cProfile.py:117(__exit__)
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}




<pstats.Stats at 0x188262d0090>

### Making an async request
`requests` is a popular built in for synchronous requests

`aiohttp` can be used as a replacement

In [None]:
import aiohttp

async def afetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()
        
url = "https://api.sampleapis.com/codingresources/codingResources"

await afetch(url)

# Asyncio Patterns
List of common concurrency usecases with `asyncio`

### `asyncio.gather` is enough for most cases
A common usecase of concurrency is to make a bunch of requests, we can achieve it using `asyncio.gather`

In [16]:
# It is Preferred to add Prefix 'a' to all async functions for readablity
async def aget_data(param)->str:
    print(f"Fetching data for {param = }...")
    await asyncio.sleep(param)
    print(f"Fetched data for {param = }")
    return f"Data for {param = }"

request_params = [3,1,2]
tasks = [aget_data(param) for param in request_params]

await asyncio.gather(*tasks) # Order is preserved

Fetching data for param = 3...
Fetching data for param = 1...
Fetching data for param = 2...
Fetched data for param = 1
Fetched data for param = 2
Fetched data for param = 3


['Data for param = 3', 'Data for param = 1', 'Data for param = 2']

### Get response as soon as they complete
this can be used for progress indication

In [26]:
request_params = [3,1,2]
tasks = [aget_data(param) for param in request_params]

for task in asyncio.as_completed(tasks):
    result = await task
    print(result)

  for task in asyncio.as_completed(tasks):


Fetching data for param = 2...
Fetching data for param = 1...
Fetching data for param = 3...
Fetched data for param = 1
Data for param = 1
Fetched data for param = 2
Data for param = 2
Fetched data for param = 3
Data for param = 3


### Cancel tasks while executing
lets say you want to cancel already made request, when user clicks cancel button

In [23]:
request_params = [3,1,2]
tasks = [asyncio.create_task(aget_data(param)) for param in request_params]

def is_cancel_button_pressed()->bool:
    return True

try:
    for task in asyncio.as_completed(tasks):
        result = await task
        
        if is_cancel_button_pressed():
            for task in tasks:
                if not task.done():
                    task.cancel()
                    
except asyncio.CancelledError:
    pass


Fetching data for param = 3...
Fetching data for param = 1...
Fetching data for param = 2...
Fetched data for param = 1


### Prevent bubbling up async/await to parent functions
lets say you are making a deeply nested function as async, then you need to make all its parent functions from the callstack async and need to add await statements to all of the calls, to overcome this we can use `asyncio.run` to make it synchronous

In [57]:
import nest_asyncio
nest_asyncio.apply() # Jupyter notebook is already in event loop

async def autil(i):
    await asyncio.sleep(1)
    return f"result for {i}"

def deeply_nested_function():
    tasks = [autil(i) for i in range(3)]
    return asyncio.run(asyncio.gather(*tasks))

# can be executed without await
deeply_nested_function()

['result for 0', 'result for 1', 'result for 2']

### Asyncio cheat sheet
The above techniques are some of most common usage patterns

but `asyncio` is capable of doing a lot more, refer [asyncio cheatsheet](https://marvelous-writer-6152.kit.com/d29b7d8dfb) for more information

# Excercises
Now you know the techniques to do different tasks, lets test our understanding with few problem statements

### Identify a potential optimization in the following codes

#### 1. Executing steps and storing logs

In [32]:
async def alog_to_cloud(data:str)->None:
    # Make request to logging server
    await asyncio.sleep(1)

async def execute_step(param)->str:
    await alog_to_cloud(f"Executing Step with {param = }...")
    result = await aget_data(param)
    await alog_to_cloud(f"Executed Step with {param = }")
    return result

# Main program

steps = [3,1,2]
tasks = [execute_step(param) for param in steps]
await asyncio.gather(*tasks)


Fetching data for param = 3...
Fetching data for param = 1...
Fetching data for param = 2...
Fetched data for param = 1
Fetched data for param = 2
Fetched data for param = 3


['Data for param = 3', 'Data for param = 1', 'Data for param = 2']

### Solution : Don't wait for tasks you don't need to
we are not going to process the response of logging request, so it is not need to wait there

In [34]:

async def alog_to_cloud(data:str)->None:
    # Make request to logging server
    asyncio.create_task(asyncio.sleep(1)) # will trigger request

async def execute_step(param)->str:
    await alog_to_cloud(f"Executing Step with {param = }...")
    result = await aget_data(param)
    await alog_to_cloud(f"Executed Step with {param = }")
    return result

# Main program

steps = [3,1,2]
tasks = [execute_step(param) for param in steps]
await asyncio.gather(*tasks)

Fetching data for param = 3...
Fetching data for param = 1...
Fetching data for param = 2...
Fetched data for param = 1
Fetched data for param = 2
Fetched data for param = 3


['Data for param = 3', 'Data for param = 1', 'Data for param = 2']

#### 2. Performing multi stage async calls
While developing solution, not all requests can be made concurrent, because the response of one can be an input of another

try to come up with a more optimal way for the below code

In [35]:
async def aprocess_data(execution_time,data):
    print(f"Processing {data}...")
    await asyncio.sleep(execution_time)
    print(f"Processing {data}...")
    return f"Processed {data}"

params = [3,1,2]
processing_time = [1,1,3]

data_fetching_tasks = [aget_data(param) for param in params]
results = await asyncio.gather(*data_fetching_tasks)

data_processing_tasks = [aprocess_data(exec_time,data) for exec_time,data in zip(processing_time,results)]
processed_results = await asyncio.gather(*data_processing_tasks)
processed_results

Fetching data for param = 3...
Fetching data for param = 1...
Fetching data for param = 2...
Fetched data for param = 1
Fetched data for param = 2
Fetched data for param = 3
Processing Data for param = 3...
Processing Data for param = 1...
Processing Data for param = 2...
Processing Data for param = 3...
Processing Data for param = 1...
Processing Data for param = 2...


['Processed Data for param = 3',
 'Processed Data for param = 1',
 'Processed Data for param = 2']

#### Solution : Again 🤦‍♂️, Don't wait when you don't need to
the 2nd task with fetching time 1 don't need to wait for all other fetch to complete to get processed

In [37]:
async def aget_processed_data(execution_time,param):
    result = await aget_data(param)
    return await aprocess_data(execution_time,result)

params = [3,1,2]
processing_time = [1,1,3]

tasks = [aget_processed_data(exec_time,param) for exec_time,param in zip(processing_time,params)]
await asyncio.gather(*tasks)

Fetching data for param = 3...
Fetching data for param = 1...
Fetching data for param = 2...
Fetched data for param = 1
Processing Data for param = 1...
Fetched data for param = 2
Processing Data for param = 2...
Processing Data for param = 1...
Fetched data for param = 3
Processing Data for param = 3...
Processing Data for param = 3...
Processing Data for param = 2...


['Processed Data for param = 3',
 'Processed Data for param = 1',
 'Processed Data for param = 2']

#### 3. Concurrency with caching on pure APIs
pure functions are functions which will always give same output for same inputs

while executing things sequencially, to prevent recomputing, we will store input and output in a hashmap

but for pure api's (api which gives same result for same params), how can we balance concurrency and caching?

can you bring down compute cost in the following code without compromising time

In [40]:
compute_cost = 0

async def aget_result(num:int)->int:
    global compute_cost
    compute_cost+=1
    await asyncio.sleep(1)
    return num*num

nums = [3,2,1,3,5,2,1,1]

tasks = [aget_result(num) for num in nums]

results = await asyncio.gather(*tasks)
print(f"{results = }, {compute_cost = }")

results = [9, 4, 1, 9, 25, 4, 1, 1], compute_cost = 8


In [41]:
# Compromising time with compute_cost

compute_cost = 0
cache = {}
results = []

for num in nums:
    if num not in cache:
        result = await aget_result(num)
        cache[num] = result
    else:
        result = cache[num]
    results.append(result)

print(f"{results = }, {compute_cost = }")

results = [9, 4, 1, 9, 25, 4, 1, 1], compute_cost = 4


#### Solution: Collect unique tasks, compute and map to input order
we can maintain a hashmap with key as unique num and value as their task
then iterate through nums, get result from map, frame results

In [43]:
compute_cost = 0

unique_nums = list(set(nums))
unique_task_map = {num: asyncio.create_task(aget_result(num)) for num in unique_nums}

await asyncio.gather(*unique_task_map.values())

# Map results back to the original input order
results = [unique_task_map[num].result() for num in nums]
print(f"{results = }, {compute_cost = }")

results = [9, 4, 1, 9, 25, 4, 1, 1], compute_cost = 4


# Side Effects
now you know how to optimize performance by making concurrent requests. In ideal scenario it should work great, but not all API that we interact will have the capablity to process such huge demand (if you are making 1000s of concurrent requests), lets discuss how to handle if you got limited by APIs capacity

## Facing Rate limit errors from recipient
to prevent DDOS attacks, servers which exposes API's will usually throw ratelimit error if high requests recieved from single source in a short span of time

there are few ways to avoid this from happening and handle this if happened
### To avoid
#### 1. Use semaphores to limit requests

In [67]:
active_requests = 0
semaphore = asyncio.Semaphore(2)

async def api_with_less_capacity(param):
    global active_requests
    active_requests+=1
    if active_requests>2:
        raise Exception("Can't process more than 2 active requests")
    await asyncio.sleep(param)
    active_requests-=1
    return param

async def acall_api(param):
    async with semaphore:
        return await api_with_less_capacity(param)
    
params = [3,1,2,2,1,2]

tasks = [acall_api(param) for param in params]

await asyncio.gather(*tasks)

[3, 1, 2, 2, 1, 2]

#### 2. Use batch requests
If the recipient accepts a batch of inputs, it is always recommended to use it instead of making individual concurrent requests

#### 3. Use debouncing and throttling
debouncing - cancel deffered previous request if a new request comes within timeframe
throttling - dont allow further requests if last request time is below threshold

[visualization](https://web.archive.org/web/20220117092326/http://demo.nimius.net/debounce_throttle/)


In [95]:
import functools
import time

def debounce(timeout: float):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            if wrapper.task:
                wrapper.task.cancel()
            wrapper.task = asyncio.create_task(wrapper._debounced_call(func, *args, **kwargs))
            
        async def _debounced_call(func, *args, **kwargs):
            await asyncio.sleep(timeout)
            return await func(*args, **kwargs)
        
        wrapper.task = None
        wrapper._debounced_call = _debounced_call
        return wrapper
    return decorator

def throttle(wait_time):
    def decorator(function):
        @functools.wraps(function)
        async def throttled(*args, **kwargs):
            if time.time() - throttled._last_time_called >= wait_time:
                throttled._last_time_called = time.time()
                return await function(*args, **kwargs)

        throttled._last_time_called = 0
        return throttled

    return decorator

@throttle(0.9)
async def high_fequency_function(index,param):
    print(f"Function called with {index = } {param = }")
    return param

async def wait_and_call(index,param):
    await asyncio.sleep(param)
    return await high_fequency_function(index,param)

params  = [1,1,3,4,5]
tasks = [wait_and_call(i,param) for i,param in enumerate(params)]
await asyncio.gather(*tasks)

Function called with index = 0 param = 1
Function called with index = 2 param = 3
Function called with index = 3 param = 4
Function called with index = 4 param = 5


[1, None, 3, 4, 5]

### To handle
retry after a delay, but remember to set max_retries, so that you won't retry forever


In [102]:
import random

def retry_on_exception(max_retries: int, delay: float):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"Retrying after {delay} seconds...")
                        await asyncio.sleep(delay)
                    else:
                        raise e
        return wrapper
    return decorator

@retry_on_exception(max_retries=3,delay=1)
async def api_call(param):
    if random.random() < 0.2:
        raise Exception("Can't process")
    await asyncio.sleep(1)
    return param

params  = [1,1,3,4,5]
tasks = [api_call(param) for param in params]
await asyncio.gather(*tasks)

Retrying after 1 seconds...
Retrying after 1 seconds...


[1, 1, 3, 4, 5]

# Resources
- [SuperFastPython](https://superfastpython.com/)
- [PythonSpeed](https://pythonspeed.com/)