In [1]:
import asyncio

### 1.0 When to use Asyncio?

**Asyncio**  
It's for when you need to run a lot of tasks that wait a lot, as well as concurrently without much CPU power. For example:
- Network Requests
- Reading Files

**Threads**  
Threads are suited for tasks that may need to wait but also share data. These are for tasks that are:
- I/O bound (but less CPU intensive)
- Are parallel

**Processes**  
These are for CPU heave tasks or tasks with heavy computations. Each task operates independently and maximizes CPU usage by running in parallel across multiple cores.

### 2.0 The Event Loop

This is like the core that manages and distributes tasks. A task is either executed immediately, or paused if it's waiting for something like data from the internet. When a task is *awaited*, it steps aside and makes room for another task to run ensuring loops are always efficiently utilized. Once the awaited operation is complete, the task will resume.

### 3.0 Coroutines

**Subroutines** are blocks of code in Python designed to perform specific tasks. They are categorized into two main types: **functions** and **methods**. Functions are independent, reusable blocks of code defined using the def keyword. Methods are functions associated with objects or classes. They operate on the attributes and data within a class and are called using the dot notation on instances.

**Coroutines** are a more generalized form of subroutines. Subroutines are entered at one point and exited at another point. Coroutines can be entered, exited, and resumed at many different points. They **allow for cooperative multitasking, non-blocking I/O operations** and are managed by an event loop They can be implemented with the `async def` statement. 

In [15]:
# coroutine function
async def main():
    print("Start of the main coroutine")

In [16]:
main()

<coroutine object main at 0x7f01ba1d13c0>

To **actually start** the Event Loop and access the results of the coroutine, we need to put the `await` keyword in front.

In [None]:
await main() # Jupyter Notebooks

Start of the main coroutine


In Python scripting, we normally use the below. However, as stated in this [StackOverflow thread](https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop-when-using-jupyter-no), we can't do that.

In [18]:
asyncio.run(main()) # Python

RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
async def fetch_data(id, sleep_time):
    print(f"Coroutine {id} starting to fetch data.")
    await asyncio.sleep(sleep_time) # Simulate a network request of I0 operation
    
    # Return some data as a result
    return {"id": id, "data": f"Sample data from coroutine {id}"}

A coroutine only starts running when we `await` it! Very crucially, we notice that only until `result1` is finished do we move on to the next task, getting `result2`. Hence, there's really no performance benefit here.

In [22]:
async def main():
    task1 = fetch_data(1, 2)
    task2 = fetch_data(2, 2)

    result1 = await task1
    print(f"Received result: {result1}")

    result2 = await task2
    print(f"Received result: {result2}")

In [None]:
await main() # ran in 4 seconds

Coroutine 1 starting to fetch data.
Received result: {'id': 1, 'data': 'Sample data from coroutine 1'}
Coroutine 2 starting to fetch data.
Received result: {'id': 2, 'data': 'Sample data from coroutine 2'}


To **ensure we do our task concurrently**, we need to use the `.create_task()` method.

In [None]:
async def main1():
    # Create tasks for running coroutines concurrently
    task1 = asyncio.create_task(fetch_data(1, 2))
    task2 = asyncio.create_task(fetch_data(2, 3))
    task3 = asyncio.create_task(fetch_data(3, 1))

    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(result1, result2, result3)

In [None]:
await main1() # ran in 3 seconds

Coroutine 1 starting to fetch data.
Coroutine 2 starting to fetch data.
Coroutine 3 starting to fetch data.
{'id': 1, 'data': 'Sample data from coroutine 1'} {'id': 2, 'data': 'Sample data from coroutine 2'} {'id': 3, 'data': 'Sample data from coroutine 3'}


In [6]:
async def main2():
    # Create tasks for running coroutines concurrently
    task1 = asyncio.create_task(fetch_data(1, 2))
    task2 = asyncio.create_task(fetch_data(2, 3))

    result1 = await task1
    result2 = await task2

    task3 = asyncio.create_task(fetch_data(3, 1))
    
    result3 = await task3

    print(result1, result2, result3)

In [None]:
await main2() # ran in 4 seconds

Coroutine 1 starting to fetch data.
Coroutine 2 starting to fetch data.
Coroutine 3 starting to fetch data.
{'id': 1, 'data': 'Sample data from coroutine 1'} {'id': 2, 'data': 'Sample data from coroutine 2'} {'id': 3, 'data': 'Sample data from coroutine 3'}


To concurrently run multiple coroutines, we can use the `.gather()` method. However, a limitation with this function is **it's not great with error handling and if one task fails.**

In [8]:
async def main3():
    # Run coroutines concurrently and gather their return values
    results = await asyncio.gather(
        fetch_data(1, 2),
        fetch_data(2, 1),
        fetch_data(3, 3)
    )

    # Process the results
    for result in results:
        print(f"Received result: {result}")


In [None]:
await main3() # ran in 3 seconds

Coroutine 1 starting to fetch data.
Coroutine 2 starting to fetch data.
Coroutine 3 starting to fetch data.
Received result: {'id': 1, 'data': 'Sample data from coroutine 1'}
Received result: {'id': 2, 'data': 'Sample data from coroutine 2'}
Received result: {'id': 3, 'data': 'Sample data from coroutine 3'}


To overcome the above, we use the `.TaskGroup()` method instead. If one task were to fail, it automatically cancels the other tasks too.

In [None]:
async def main4():
    tasks = []
    async with asyncio.TaskGroup() as tg: # context manager
        for i, sleep_time in enumerate([2, 1, 3], start=1):
            task = tg.create_task(fetch_data(i, sleep_time))
            tasks.append(task)

    # After the Task Group block (above), all tasks have completed
    results = [task.result() for task in tasks]

    for result in results:
        print(f"Received result: {result}")

In [None]:
await main4() # ran in 3 seconds

Coroutine 1 starting to fetch data.
Coroutine 2 starting to fetch data.
Coroutine 3 starting to fetch data.
Received result: {'id': 1, 'data': 'Sample data from coroutine 1'}
Received result: {'id': 2, 'data': 'Sample data from coroutine 2'}
Received result: {'id': 3, 'data': 'Sample data from coroutine 3'}


### 4.0 Futures

In comparison to tasks, when we use Futures, we're waiting for some value to be available and we're not waiting for an entire task or an entire coroutine to finish. It's just a promise of an eventual result.

In [12]:
async def set_future_result(future, value):
    await asyncio.sleep(2)

    # Set the result of the future
    future.set_result(value)
    print(f"Set the future's result to: {value}")

In [13]:
async def main5():
    # Create a future object
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    # Schedule setting the future's result
    asyncio.create_task(set_future_result(future, "Future result is ready."))

    # Wait for the future's result
    result = await future
    print(f"Received the future's result: {result}")

In [14]:
await main5()

Set the future's result to: Future result is ready.
Received the future's result: Future result is ready.


### 5.0 Synchronization

These are tools that allow us to synchronize the execution of various coroutines, especially when we have larger and more complicated programs.

Say we have a shared resource e.g., database connections, tables, files, etc. and each operation takes some time to finish executing and **we don't want 2 coroutines to work on the shared resource at the same time.**

In [24]:
# A shared resource e.g., database connections, tables, files, etc.
shared_resource = 0

In [26]:
# An asyncio Lock
lock = asyncio.Lock()

The `lock` context manager acquires the lock and then checks if any other coroutine is using the lock. If there is, it will wait till the coroutine is finished, and once it has, only then will it go into the specified code chunks e.g., Critial section starts, Critical section ends, and only then will the lock be released.

In [28]:
async def modify_shared_resource():
    global shared_resource
    async with lock:
        # Critical section starts
        print(f"Resouce before modification: {shared_resource}")
        shared_resource += 1 # Modify the shared resource
        await asyncio.sleep(1) # Simulate an IO operation
        print(f"Resource after modification: {shared_resource}")
        # Critical section ends

In [29]:
async def main():
    await asyncio.gather(*(modify_shared_resource() for _ in range(5)))

In [30]:
await main()

Resouce before modification: 0
Resource after modification: 1
Resouce before modification: 1
Resource after modification: 2
Resouce before modification: 2
Resource after modification: 3
Resouce before modification: 3
Resource after modification: 4
Resouce before modification: 4
Resource after modification: 5


In contrast, **semaphores** allow multiple coroutines to have access to the same object at the same time. This lets us throttle our code intentionally to send a maximum of 5 requests at a time, for example.

In [33]:
async def access_resource(semaphore, resource_id):
    async with semaphore:
        # Simulate accessing a limited resource
        print(f"Accessing resource {resource_id}")
        await asyncio.sleep(1) # Simulate work with the resource
        print(f"Releasing resource {resource_id}")

In [34]:
async def main():
    semaphore = asyncio.Semaphore(2) # allow 2 concurrent accesses
    await asyncio.gather(*(access_resource(semaphore, i) for i in range(5)))

In [35]:
await main()

Accessing resource 0
Accessing resource 1
Releasing resource 0
Releasing resource 1
Accessing resource 2
Accessing resource 3
Releasing resource 2
Releasing resource 3
Accessing resource 4
Releasing resource 4


We finally go into **Events**.

In [36]:
async def waiter(event):
    print("waiting for the event to be set")
    await event.wait()
    print("event has been set, continuing execution")

In [38]:
async def setter(event):
    await asyncio.sleep(2) # Simulate doing some work
    event.set()
    print("event has been set!")

In [39]:
async def main():
    event = asyncio.Event()
    await asyncio.gather(waiter(event), setter(event))

In [40]:
await main()

waiting for the event to be set
event has been set!
event has been set, continuing execution
