**# Asyncio for Agentic AI in Python**
=====================================

----
**Asyncio is one way to achieve [concurrent programming in Python](https://www.geeksforgeeks.org/python-program-with-concurrency/).** You need async when you need massive scaling - tens of thousands of processes.

----

<font color="#a9a56c" size=2> **@[Learn Core Asyncio Concepts and Python from Offical Panaversity Free Modern Python for Agent AI Notebooks](https://github.com/panaversity/learn-modern-ai-python/tree/main/00_python_colab/16_asyncio) Authored by Arif Kasim Rozani - (Team Operation Badar)** </font>

[*Asynchronous Python for the Complete Beginner PyCon 2017*](https://www.youtube.com/watch?v=iG6fr81xHKA)

## "Processes vs. Threads vs. Async"

You can play in different ways: with **Processes**, **Threads**, or **Async**. Let’s think of these like different ways to have fun!

1. **Optimize waiting (Can you take breaks smartly?)**
   - All three—Processes, Threads, and Async—say "Yes!" It’s like if you’re waiting for your friend to bring a toy, you can play with another toy instead of just sitting there. They all know how to keep playing without wasting time.

2. **Use all CPU cores (Can you use all your toy helpers?)**
   - Processes say "Yes!" It’s like having lots of friends helping you play with toys at the same time, each using their own space.
   - Threads and Async say "No." They’re more like one friend who’s really fast but can only use one space at a time.

3. **Scalability (How many toys can you play with?)**
   - Processes can handle a few toys (ones or tens), like playing with 5 or 10 toys.
   - Threads can handle more toys (hundreds), like 100 toys!
   - Async is the best—it can handle thousands of toys or more! It’s like juggling lots and lots of toys without dropping any.

4. **Use blocking std library functions (Do you have to wait for some toys?)**
   - Processes and Threads say "Yes." It’s like if you’re playing with a toy that takes a long time, you have to wait before grabbing the next one.
   - Async says "No!" It’s like playing with a toy but still being able to grab other toys without waiting.

5. **GIL interference (Does a big rule stop you?)**
   - Processes say "No." They don’t have any big rules stopping them.
   - Threads say "Some." They have a rule (called GIL) that sometimes slows them down, like if you can only play with one toy at a time even if you have two hands.
   - Async says "No." It doesn’t care about that rule and can keep playing freely.

---

### **Processes vs. Threads vs. Async**

|                  | Processes      | Threads      | Async         |
|------------------|----------------|--------------|---------------|
| Optimize waiting | Yes (preemptive) | Yes (preemptive) | Yes (cooperative) |
| Use all CPU cores | Yes            | No           | No            |
| Scalability      | Low (ones/tens) | Medium (hundreds) | High (thousands+) |
| Use blocking std library functions | Yes  | Yes          | No            |
| GIL interference | No             | Some         | No            |

---

**[Asyncio](https://docs.python.org/3/library/asyncio.html) Key Concepts**
----------------

Asyncio is a library in Python that allows you to write single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, and implementing network clients and servers.

*   **`Coroutines`**: A coroutine is a special type of function that can suspend and resume its execution at specific points. Think of it as a task that can be paused and resumed.
*   **`Event Loop`**: The event loop is the central component of Asyncio. It manages the execution of coroutines, handling I/O operations and scheduling tasks.
*   **`Tasks`**: A task is a coroutine that's scheduled to run concurrently with other tasks.
*   **`Futures`**: A future represents the result of a task. You can wait for a future to complete, allowing you to retrieve the result of a task.



<font color=orange> By design asyncio does not allow its event loop to be nested. This presents a practical problem: When in an environment where the event loop is already running it’s impossible to run tasks and wait for the result. Trying to do so will give the error “RuntimeError: This event loop is already running”.

<font color=orange>The issue pops up in various environments, such as web servers, GUI applications and in **`Jupyter notebooks`**. </font>

This module patches asyncio to allow nested use of asyncio.run and loop.run_until_complete. </font>

[nest-asyncio 1.6.0](https://pypi.org/project/nest-asyncio/)

In [None]:
import nest_asyncio
nest_asyncio.apply()

print("\033[94m Successfully enable Colab/Jupyter Notebook to handle nested event loop \033[0m")

[94m Successfully enable Colab/Jupyter Notebook to handle nested event loop [0m


**Example: Making Coffee and Pastry**
------------------------------------

Let's create an example that demonstrates the synchronous and asynchronous approaches:

<font color=gold>In this example just focus on out put to get the feel how **`synchronous`** and **`asynchronous`** function work.</font>

In [None]:
import asyncio
import time
import threading
from rich import print

# Synchronous approach
def make_coffee_sync():
    print(f"\tMaking coffee... : Thread Name: {threading.current_thread().name}")
    time.sleep(2)  # Simulate coffee preparation time
    print("\tCoffee is ready!")

def make_pastry_sync():
    print(f"\tMaking pastry... : Thread Name: {threading.current_thread().name}")
    time.sleep(3)  # Simulate pastry preparation time
    print("\tPastry is ready!")

def order_sync():
    start_time = time.time()
    make_coffee_sync()
    make_pastry_sync()
    end_time = time.time()
    print(f"Total time: {end_time - start_time} seconds")

# Asynchronous approach using Asyncio
async def make_coffee_async():
    print(f"\tMaking coffee... : Thread Name: {threading.current_thread().name}")
    await asyncio.sleep(2)  # Simulate coffee preparation time
    print("\tCoffee is ready!")

async def make_pastry_async():
    print(f"\tMaking pastry... : Thread Name: {threading.current_thread().name}")
    await asyncio.sleep(3)  # Simulate pastry preparation time
    print("\tPastry is ready!")

async def order_async():
    start_time = time.time()
    tasks = [
        asyncio.create_task(make_coffee_async()),
        asyncio.create_task(make_pastry_async())
    ]
    await asyncio.gather(*tasks)
    end_time = time.time()
    print(f"Total time: {end_time - start_time} seconds")

# Run the synchronous example
print("[red]Synchronous approach: [/red]")
start_time = time.time()

print("Main thread entring order_sync()")
order_sync()
print("Main thread exiting order_sync()")

print( f"[magenta]Total time: {time.time() - start_time} seconds[/magenta]")

# Run the asynchronous example
print("\n\n[red]Asynchronous approach: [/red]")

start_time = time.time()



print("Main thread entring order_async()")
asyncio.run(order_async())
print("Main thread exiting order_async()")

print(f"[magenta]Total time: {time.time() - start_time} seconds[/magenta]")

In the synchronous example, the `make_coffee_sync` and `make_pastry_sync` functions are called sequentially, resulting in a total execution time of `5 seconds`.

In the asynchronous example, the `make_coffee_async` and `make_pastry_async` coroutines are run concurrently using `asyncio.create_task` and `asyncio.gather`. This reduces the total execution time to approximately `3 seconds`, as the coffee and pastry are prepared simultaneously.

**Best Practices**
------------------

*   **Use `asyncio.run()`**: Run your asynchronous code using `asyncio.run()` to ensure proper event loop setup and teardown.
*   **Await Coroutines**: Use the `await` keyword to wait for the completion of coroutines.
*   **Use `asyncio.gather()`**: Run multiple coroutines concurrently using `asyncio.gather()`.

**Common Asyncio Functions**
---------------------------

*   `asyncio.sleep()`: Suspends the execution of the current coroutine for a specified amount of time.
*   `asyncio.create_task()`: Creates a new task from a coroutine, allowing it to run concurrently.
*   `asyncio.gather()`: Runs multiple coroutines concurrently and returns their results as a list.

By following these guidelines and using Asyncio effectively, you can write efficient and responsive asynchronous code in Python.

## **Understanding `asyncio` and `await`**

`asyncio` provides the event loop and tools to manage async operations, while `await` is the keyword that allows functions to yield control at specific points, enabling non-blocking waits. Using them together allows writing efficient `single-threaded concurrent code`, especially for I/O-bound tasks.

To address the question of understanding `asyncio` and `await` in Python, let's break down the concepts and their usage:

### Understanding `asyncio` and `await`

1. **`Asyncio`**:

   - **`Definition`**: Asyncio is a Python library that facilitates writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets, and other resources. It's ideal for handling I/O-bound tasks, such as network requests or database queries.

   - **`Key Components`**:
     - **`Event Loop`**: Manages the scheduling and switching between tasks.

     - **`Coroutines`**: Functions defined with `async def` that can suspend and resume execution.
     - **`Tasks`**: Used to run coroutines concurrently.
     - **`Futures`**: Represent the result of a task that may not have completed yet.(we will discuss `futures` later on)

2. **`Await`**:

   - **`Definition`**: `await` is a keyword used within async functions to suspend the execution of the coroutine until the awaited task is complete. It allows the event loop to handle other tasks while waiting.

   - **`Usage`**: It is used to wait for coroutines, tasks, or futures, enabling non-blocking operations.

### Difference Between `asyncio` and `await`

- **`Asyncio`**: The library provides the framework for asynchronous operations, including the event loop and tools for creating and managing tasks.
- **`Await`**: The keyword used within async functions to create points where execution can yield control back to the event loop, allowing other tasks to run.

### Using Them Together

1. **Basic Structure**:
   - Define an async function using `async def`.
   - Use `await` within the function to wait for coroutines.
   - Run the top-level async function with `asyncio.run()`.
   

2. **Example**:

In [None]:
import asyncio

async def my_function():
  await asyncio.sleep(4)
  print("Hello! from my_function()")

async def main():
  await my_function()

asyncio.run(main())


## **Awaitable**

This function checks if an object is `awaitable`, meaning it supports `await`.

In [None]:
import inspect

print("isawaitable = ", inspect.isawaitable(my_function()))

  print("isawaitable = ", inspect.isawaitable(my_function()))


## **Custom Awaitables (Implementing `__await__()`)**

-   Any class can be made **awaitable** by defining `__await__()`.
    

### **Example**

In [None]:
class CustomAwaitable:
    def __await__(self):
        yield from asyncio.sleep(2).__await__()  # Delegate to asyncio
        return "Custom Awaitable Done"

async def main():
    result = await CustomAwaitable()
    print(result)

asyncio.run(main())

In [None]:
import asyncio

async def task1():
       await asyncio.sleep(4)
       print("\tTask 1 done")

async def task2():
       await asyncio.sleep(1)
       print("\tTask 2 done")

async def main():
       t1 = asyncio.create_task(task1())
       t2 = asyncio.create_task(task2())
       print("Start main()...")
       await asyncio.gather(t1, t2)
       print("End main()...")

asyncio.run(main())

### Implications

- **`Efficiency`**: Ideal for I/O-bound tasks, allowing efficient handling of multiple operations without blocking.

- **`Single-Threaded`**: Not suitable for CPU-bound tasks, which may require multiprocessing.
- **`Correct Usage`**: Ensure all coroutines are awaited properly to avoid returning coroutine objects instead of results.

### Conclusion

`asyncio` and `await` work together to enable asynchronous programming in Python, allowing for efficient handling of concurrent operations, particularly in I/O-bound scenarios. Understanding their roles and correct usage is key to writing effective async code.



---


# **Futures**



---



In Python's `asyncio` library, **Futures** are a fundamental concept that represents the result of an asynchronous operation that may not have completed yet. Futures are closely related to coroutines and tasks, but they serve a specific purpose in handling asynchronous operations.

### **What Are Futures**?

A **Future** is an object that represents the result of a task that is not yet complete. It acts as a placeholder for the result, which will be available at some point in the future. Once the task completes, the Future is resolved, and its result (or exception, if the task failed) can be retrieved.

### **Key Characteristics of Futures**

1. **`Asynchronous Result`**:
   - A Future is created when an asynchronous operation is initiated, but the result of that operation is not immediately available.
   - The Future keeps track of the result and notifies any waiting code once the result is available.

2. **`States of a Future`**:
   - **`Pending`**: The initial state where the operation has not yet started or is still in progress.
   - **`Done`**: The operation has completed, and the result is available.
   - **`Cancelled`**: The operation was cancelled before it could complete.

3. **`Awaitable`**:
   - Futures are awaitable objects, meaning they can be used with the `await` keyword in async functions.

4. **`One-Time Use`**:
   - A Future can only be resolved once. Once it is done, it cannot be reset or reused.

### **Relationship Between Futures, Coroutines, and Tasks**

- **`Coroutines`**: Defined using `async def`, coroutines are functions that can be paused and resumed. They are not directly executable and must be scheduled as tasks.
  
- **`Tasks`**: A Task wraps a coroutine and is responsible for running it. When you create a Task, it automatically schedules the coroutine to run on the event loop. A Task returns a Future that represents the result of the coroutine.

- **`Futures`**: A Future is the result of a Task. When you await a Task, you are effectively awaiting its Future.

### **Example of Using Futures**

Here’s an example that demonstrates how Futures work:

In [None]:
import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    print("\tTask done...")
    return "My Coroutine completed"

async def main():
    # Create a Task (which returns a Future)
    print("Starting Coroutine...")
    future = asyncio.create_task(my_coroutine())

    print("Going to Maldives on a 2 day vacation")
    await asyncio.sleep(10)
    print("Come back from vacation")


    # Await the Future
    result = await future
    print("future = ",result)

asyncio.run(main())
#await asyncio.create_task(main())

### **States of a `Future` in Python (`asyncio.Future`)**

A `Future` represents a **placeholder for a result** that is **not yet available** in asynchronous programming. It can be in the following states:

1️⃣ **`Pending`** – The initial state when the operation **has not started** or is **still in progress**.  
2️⃣ **`Done`** – The operation has **completed successfully**, and the result is available.  
3️⃣ **`Cancelled`** – The operation was **cancelled before completion**.

----------

### **Example: Understanding Future States in `asyncio`**

In [None]:
import asyncio

async def example_future():
    future = asyncio.Future()  # Create a Future object
    print(f"Initial state: {future.done()}")  # False (Pending)

    # Simulate computation
    await asyncio.sleep(2)

    future.set_result("Task Completed")  # Mark future as done
    print(f"Final state: {future.done()}")  # True (Done)
    print(f"Result: {future.result()}")  # "Task Completed"

asyncio.run(example_future()) # Error, will see it indepth in next topic

#await example_future() # Directly await the coroutine instead of calling asyncio.run

✅ **Initially in `Pending` state**, then **transitions to `Done`** once the result is set.

### **Cancelling a Future**

A `Future` can be **cancelled** before completion

In [None]:
import asyncio

async def example_cancellation():
    future = asyncio.Future()

    future.cancel()  # Cancelling the future
    print(f"Cancelled: {future.cancelled()}")  # True

asyncio.run(example_cancellation())
#await example_cancellation() # Directly await the coroutine instead of calling asyncio.run


In [None]:
# prompt: examples of future methods

async def example_future():
    future = asyncio.Future()  # Create a Future object
    print(f"Initial state: {future.done()}")  # False (Pending)

    # Simulate computation
    await asyncio.sleep(2)

    future.set_result("Task Completed")  # Mark future as done
    print(f"Final state: {future.done()}")  # True (Done)
    print(f"Result: {future.result()}")  # "Task Completed"

asyncio.run(example_future()) # Error, will see it indepth in next topic

#await example_future() # Directly await the coroutine instead of calling asyncio.run




---


# What is an Event Loop?

An **event loop** is a core concept in asynchronous programming. It is a programming construct that waits for and dispatches events or messages in a program. The event loop is responsible for managing and executing asynchronous tasks, handling I/O operations, and coordinating the execution of coroutines (asynchronous functions).

In simpler terms, the event loop is the "engine" that drives asynchronous execution. It continuously checks for tasks that are ready to run, executes them, and waits for new tasks to be added.

In [None]:
loop = asyncio.get_event_loop()
task = loop.create_task(example_future())
task

<Task pending name='Task-23' coro=<example_future() running at <ipython-input-16-8cc165c26d0d>:3>>

In [None]:
task

<Task pending name='Task-23' coro=<example_future() running at <ipython-input-16-8cc165c26d0d>:8> wait_for=<Future pending cb=[Task.__wakeup()]>>

### How Does an Event Loop Work?

1.  **Task Queue**: The event loop maintains a queue of tasks (coroutines, callbacks, etc.) that need to be executed.
2.  **Event Handling**: It listens for events (e.g., I/O operations, timers, or external signals) and schedules the corresponding tasks to run.
3.  **Execution**: When a task is ready to run (e.g., an I/O operation completes or a timer expires), the event loop executes it.
4.  **Non-blocking**: While one task is waiting (e.g., for I/O), the event loop can switch to other tasks, ensuring that the program remains responsive.

Did you know? [Cpython](https://www.geeksforgeeks.org/python-vs-cpython/) in the most popular python!

### **Context Variables (`contextvars`)**

`contextvars` is a Python module that provides a way to manage thread-local-like data in asynchronous code. It's designed to solve a specific problem that arises in concurrent programming: how to correctly manage state when you have code that switches execution contexts (as `asyncio` does).

In [None]:
import contextvars

var = contextvars.ContextVar('var')
var.set('spam')
print(var.get())  # 'spam'

ctx = contextvars.copy_context()

def main():
    # 'var' was set to 'spam' before
    # calling 'copy_context()' and 'ctx.run(main)', so:
    print(var.get())  # 'spam'
    print(ctx[var])  # 'spam'

    var.set('ham')

    # Now, after setting 'var' to 'ham':
    print(var.get())  # 'ham'
    print(ctx[var])  # 'ham'

# Any changes that the 'main' function makes to 'var'
# will be contained in 'ctx'.
ctx.run(main)

# The 'main()' function was run in the 'ctx' context,
# so changes to 'var' are contained in it:
print(ctx[var])  # 'ham'

# However, outside of 'ctx', 'var' is still set to 'spam':
print(var.get())  # 'spam'

* **The Problem:**
    
    * In traditional threaded programming, you can use `threading.local()` to store data that is local to a specific thread. This allows you to avoid accidentally sharing data between threads.
    * However, in `asyncio`, execution switches between different coroutines within the same thread. This means that `threading.local()` doesn't work as expected because the "current thread" doesn't really change in the same way.
    * You need a way to have variables that are local to a *context* of execution, where a context might be the execution of a specific coroutine or a specific task.
* **How `contextvars` Solves It:**
    
    * `contextvars` allows you to create `ContextVar` objects. These are like thread-local variables, but they are local to a context.
    * Each `asyncio` task has its own context. When a coroutine runs within a task, it can access and modify `ContextVar` values, and these values will be isolated from other tasks.
    * This is crucial for managing state that should not be shared between concurrent operations, even though they run in the same thread.
* **Key Components:**
    
    * `contextvars.ContextVar`:
        
        * Creates a context variable. You give it a name.
        * Methods:
            * `get()`: Gets the current value of the variable in the current context.
            * `set(value)`: Sets the value of the variable in the current context.
    * `contextvars.Context`:
        
        * Represents a context. Each task has its own context.
        * Methods:
            * `run(callable, *args, **kwargs)`: Executes a callable within the context. Any changes to context variables are isolated to this `run()`.
            * `copy_context()`: Creates a copy of the current context.
* **Example:**

In [None]:
import asyncio
import contextvars

my_var = contextvars.ContextVar("my_var")

async def task_one():
    my_var.set("Value from task_one")
    print(f"Task One: {my_var.get()}")
    await asyncio.sleep(1)
    print(f"Task One (after sleep): {my_var.get()}")  # Still task_one's value

async def task_two():
    my_var.set("Value from task_two")
    print(f"Task Two: {my_var.get()}")

async def main():
    task1 = asyncio.create_task(task_one())
    task2 = asyncio.create_task(task_two())
    await asyncio.gather(task1, task2)

asyncio.run(main())


In this example, even though `task_one` and `task_two` run concurrently, they each have their own isolated value for `my_var`.

* **Why are `contextvars` important for AI agents?**
    
    * AI agents may need to manage state that is specific to a particular request, user interaction, or task.
    * For example:
        * Tracking user authentication information.
        * Storing request-specific data (e.g., a query ID).
        * Managing conversational state within a dialogue.
    * When an agent handles multiple concurrent requests or interactions, `contextvars` ensures that this state is kept separate and doesn't leak between different operations.
    * This is crucial for correctness, security, and maintainability.

The value of `contextvars` becomes apparent specifically when multiple asynchronous tasks are running concurrently and you need to manage state that is local to each of those tasks.

In a scenario where you have a single task running sequentially, a regular variable would often suffice for managing state within that task.

**The Value During Concurrent Execution:**

The power of `contextvars` shines when you have concurrent tasks because:

1.  **Isolation:** Each concurrent task (or more precisely, the context in which it runs) gets its own independent value for a `ContextVar`. When one task sets the value of a `ContextVar`, it doesn't affect the value of that same `ContextVar` in other concurrently running tasks.

2.  **Runtime State Management Across Context Switches:** In `asyncio`, a single thread might rapidly switch between different coroutines (tasks) that are in progress. `contextvars` ensures that when a coroutine resumes, it sees the value of the context variable that was in effect when it last yielded, even if other coroutines have run in the meantime and modified the same `ContextVar` in their own contexts.

**Analogy:**

Think of it like having separate local storage for each "worker" (asyncio task) in a busy workshop (your single thread). Each worker can put items in their local storage, and those items are not mixed up with what other workers have in their own storage.

**In above example:** Without `contextvars`, if you just used a regular global or local variable, the interleaving of `task_one` and `task_two` could lead to one task overwriting the state of the other in unexpected ways. `contextvars` prevents this. Each task operates within its own context where `my_var` has the value it set.

**So, to directly answer:**

Yes, `contextvars` has significant value when multiple tasks run concurrently. It's specifically designed for managing runtime state in such scenarios, ensuring isolation and correct context switching. For purely sequential code within a single coroutine, its benefits are less pronounced compared to regular variables.


In [None]:
import asyncio
import contextvars

my_var = "new_val"

async def task_one():
    global my_var
    my_var = "Value from task_one"
    print(f"Task One: {my_var}")
    await asyncio.sleep(9)
    print(f"Task One (after sleep): {my_var}")  # Still task_one's value

async def task_two():
    global my_var
    my_var = "Value from task_two"
    print(f"Task Two: {my_var}")

async def main():
    task1 = asyncio.create_task(task_one())
    task2 = asyncio.create_task(task_two())
    await asyncio.gather(task1, task2)

asyncio.run(main())

In [None]:
import asyncio
import sys

async def get_date():
    code = 'import datetime; print(datetime.datetime.now())'

    # Create the subprocess; redirect the standard output
    # into a pipe.
    proc = await asyncio.create_subprocess_exec(
        sys.executable, '-c', code,
        stdout=asyncio.subprocess.PIPE)

    # Read one line of output.
    data = await proc.stdout.readline()
    line = data.decode('ascii').rstrip()

    # Wait for the subprocess exit.
    await proc.wait()
    return line

date = asyncio.run(get_date())
print(f"Current date: {date}")