<a href="https://colab.research.google.com/github/jasreman8/OOPs-for-Intelligent-Agentic-Systems/blob/main/Asynchronous_Programming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Learning Objectives

*   Understand the basic concepts of asynchronous programming in Python using `async` and `await`.
*   Recognize why asynchronous programming is beneficial when dealing with I/O-bound operations (like network requests to LLMs).
*   Learn how to define and call asynchronous functions.
*   See how LangChain components, particularly LLMs like `ChatOpenAI`, provide asynchronous methods (e.g., `.ainvoke()`, `.astream()`, `.abatch()`).
*   Understand the use of `async with` for managing resources asynchronously (though less directly used with simple LLM calls, it's a core async concept).


# `async` and `await`: The Keywords for Asynchronous Code

So far, most of the Python code we've written has been **synchronous**. This means when you call a function, your program waits for that function to complete before moving on to the next line of code. If that function involves waiting (e.g., for a network request to an LLM to come back), your whole program just sits there, waiting.

**Asynchronous programming** allows your program to do other useful work while it's waiting for slow operations to finish. It doesn't necessarily make the slow operation itself faster, but it makes your overall application more efficient and responsive because it's not "blocked."


Python provides the `async` and `await` keywords to write asynchronous code. This is often used with the `asyncio` library.

*   **`async def`**: Used to define an **asynchronous function** (also called a **coroutine**).
    *   When you call an `async def` function, it doesn't run immediately. Instead, it returns a **coroutine object**.
*   **`await`**: Used *inside* an `async def` function to pause its execution until an "awaitable" operation (like another coroutine or certain I/O operations) completes.
    *   While `await` is waiting, Python's event loop can switch to run other tasks, making the program non-blocking.
*   **Event Loop (`asyncio.run()`)**: To actually run an `async def` function from synchronous code and manage the execution of asynchronous tasks, you typically use an event loop. `asyncio.run(coroutine_object)` is a common way to start it.


**Simple Conceptual Example:**


In [1]:
import asyncio
import time
import nest_asyncio
nest_asyncio.apply() # required only in notebooks

## Synchronous instance

In [2]:
def make_tea(tea_type: str, delay: int) -> str:
    print(f"Starting to make {tea_type} tea... (will take {delay}s)")
    time.sleep(delay) # Simulate a time-consuming I/O operation (like steeping)
    print(f"{tea_type} tea is ready!")
    return f"{tea_type} tea"

In [3]:
def toast_bread(slices: int, toast_type: str, delay: int) -> str:
    print(f"Starting to toast {slices} slice(s) of {toast_type} bread... (will take {delay}s)")
    time.sleep(delay) # Simulate toasting time
    print(f"Bread is toasted!")
    return f"{slices} toasted slice(s)"

In [4]:
def make_breakfast():
    print("--- Starting breakfast preparation (asynchronously) ---\n")

    # Start both tasks (coroutines) concurrently
    # asyncio.gather runs multiple awaitables concurrently and waits for all to complete.
    tea_task = make_tea("Masala", 3)
    toast_task = toast_bread(2, 'french omlette', 2)

    # Await their results
    # The 'await' here means 'make_breakfast' will pause until BOTH tea_task AND toast_task are done.
    # But tea_task and toast_task can run "at the same time" (interleaved) by the event loop.
    results = (tea_task, toast_task)

    tea_result, toast_result = results # Unpack results

    print(f"\n--- Breakfast is served! ---")
    print(f"- {tea_result}")
    print(f"- {toast_result}")
    return results

In [5]:
start_time = time.time()

# It manages the event loop.
breakfast_items = make_breakfast()

end_time = time.time()
print(f"\nTotal breakfast preparation time: {end_time - start_time:.2f} seconds")

--- Starting breakfast preparation (asynchronously) ---

Starting to make Masala tea... (will take 3s)
Masala tea is ready!
Starting to toast 2 slice(s) of french omlette bread... (will take 2s)
Bread is toasted!

--- Breakfast is served! ---
- Masala tea
- 2 toasted slice(s)

Total breakfast preparation time: 5.00 seconds


## Aysnchronous Instance

In [6]:
async def make_tea(tea_type: str, delay: int) -> str:
    print(f"Starting to make {tea_type} tea... (will take {delay}s)")
    await asyncio.sleep(delay) # Simulate a time-consuming I/O operation (like steeping)
    print(f"{tea_type} tea is ready!")
    return f"{tea_type} tea"

In [7]:
async def toast_bread(slices: int, toast_type: str, delay: int) -> str:
    print(f"Starting to toast {slices} slice(s) of {toast_type} bread... (will take {delay}s)")
    await asyncio.sleep(delay) # Simulate toasting time
    print(f"Bread is toasted!")
    return f"{slices} toasted slice(s)"

In [8]:
async def make_breakfast():
    print("--- Starting breakfast preparation (asynchronously) ---")

    # Start both tasks (coroutines) concurrently
    # asyncio.gather runs multiple awaitables concurrently and waits for all to complete.
    tea_task = make_tea("Masala", 3)
    toast_task = toast_bread(2, 'french omlette', 2)

    # Await their results
    # The 'await' here means 'make_breakfast' will pause until BOTH tea_task AND toast_task are done.
    # But tea_task and toast_task can run "at the same time" (interleaved) by the event loop.
    results = await asyncio.gather(tea_task, toast_task)

    tea_result, toast_result = results # Unpack results

    print(f"\n--- Breakfast is served! ---")
    print(f"- {tea_result}")
    print(f"- {toast_result}")
    return results

In [9]:
start_time = time.time()

# asyncio.run() takes a coroutine and runs it until completion.
# It manages the event loop.
breakfast_items = asyncio.run(make_breakfast())

end_time = time.time()
print(f"\nTotal breakfast preparation time: {end_time - start_time:.2f} seconds")
# This time will be close to the LONGER of the two tasks (3s for tea),
# not the sum (3s + 2s = 5s), because they ran concurrently

--- Starting breakfast preparation (asynchronously) ---
Starting to make Masala tea... (will take 3s)
Starting to toast 2 slice(s) of french omlette bread... (will take 2s)
Bread is toasted!
Masala tea is ready!

--- Breakfast is served! ---
- Masala tea
- 2 toasted slice(s)

Total breakfast preparation time: 3.01 seconds


**Why is this useful?**

Imagine `make_tea` and `toast_bread` were calls to different slow web services. With synchronous code, you'd wait for tea, *then* start toasting. With `async`/`await`, you can effectively initiate both requests, and the program can work on other things (or just efficiently wait) until the responses come back.


This asynchronous paradigm is very useful when working with LLMs as well. Calling an LLM is a network I/O operation. It can take several seconds. If you need to make multiple LLM calls (e.g., to summarize different documents, or ask multiple questions), doing them asynchronously can significantly speed up your overall application.

As an example, let us look at how asynchronous programmming works with LLM calls in LangChain.

# LangChain's Asynchronous Methods

LangChain `Runnable` components (including LLMs, Prompts, Parsers, Chains) are designed with asynchronous operations in mind. They typically provide asynchronous versions of their main execution methods:

*   `.invoke()` (synchronous)  ->  `.ainvoke()` (asynchronous)
*   `.stream()` (synchronous)  ->  `.astream()` (asynchronous)
*   `.batch()` (synchronous)   ->  `.abatch()` (asynchronous)

These `a` prefixed methods are `async def` methods themselves and need to be `await`ed.



In [10]:
! pip install -q langchain-openai==0.3.24

In [11]:
import os
import asyncio
import nest_asyncio
import time

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

from google.colab import userdata

nest_asyncio.apply() # required only in notebooks

In [12]:
# Initialize LLM (same as before)
llm = ChatOpenAI(
    api_key=userdata.get('OPEN_API_KEY'),
    base_url="https://aibe.mygreatlearning.com/openai/v1",
    model="gpt-4o-mini",
    temperature=0.7
)

In [13]:
# --- Define a simple chain (also a Runnable) ---
prompt = ChatPromptTemplate.from_template("Write a very short, positive quote about {topic}.")
parser = StrOutputParser()
quote_chain = prompt | llm | parser # This chain is also a Runnable and has .ainvoke()

In [14]:
# --- Asynchronous function to get a single quote ---
async def get_single_quote_async(topic: str) -> str:
    print(f"Async: Requesting quote for '{topic}'...")
    # .ainvoke() is an async method, so we await it
    result = await quote_chain.ainvoke({"topic": topic}) #over here we are awaiting the .ainvoke even if it is used for the first time.
    print(f"Async: Received quote for '{topic}': '{result[:30]}...'")
    return result

In [15]:
# --- Asynchronous function to get multiple quotes concurrently using .abatch() on the chain ---
async def get_multiple_quotes_concurrently_chain_abatch(topics: list[str]) -> list[str]:
    print(f"\nAsync Batch (Chain): Requesting quotes for topics: {topics} using chain.abatch()...")
    # Create input dictionaries for each topic
    batch_inputs = [{"topic": topic} for topic in topics]
    # .abatch() on a chain takes a list of inputs and processes them concurrently
    results = await quote_chain.abatch(batch_inputs) #over here we are awaiting the .abatch even if it is used for the first time.
    print("Async Batch (Chain): All quotes received.")
    for i, topic in enumerate(topics):
        print(f"  - Topic '{topic}': '{results[i][:30]}...'")
    return results

In [16]:
# --- Asynchronous function to get multiple quotes concurrently using asyncio.gather ---
# This demonstrates making multiple independent .ainvoke calls concurrently
async def get_multiple_quotes_concurrently_gather(topics: list[str]) -> list[str]:
    print(f"\nAsync Gather: Requesting quotes for topics: {topics} using asyncio.gather with chain.ainvoke()...")

    # Create a list of coroutine objects (tasks)
    tasks = [quote_chain.ainvoke({"topic": topic}) for topic in topics]

    # asyncio.gather runs all tasks concurrently
    results = await asyncio.gather(*tasks) # Use * to unpack the list of tasks
    #Finally await finds its way in the async gather function.

    print("Async Gather: All quotes received.")
    for i, topic in enumerate(topics):
        print(f"  - Topic '{topic}': '{results[i][:30]}...'")
    return results

In [25]:
async def run_all(topics):
    tasks = [quote_chain.ainvoke({"topic": t}) for t in topics]
    results = await asyncio.gather(*tasks)
    print("Async Gather: All trial quotes received.")
    for i, topic in enumerate(topics):
        print(f"  - Topic '{topic}': '{results[i][:65]}...'")
    return results

In [26]:
# --- Main async execution function ---
async def main_async_operations():
    start_time_main = time.time()

    # 1. Get a single quote asynchronously
    single_quote_task = get_single_quote_async("perseverance")

    # 2. Get multiple quotes using chain.abatch()
    topics_for_batch = ["innovation", "teamwork", "learning"]
    batch_quotes_task = get_multiple_quotes_concurrently_chain_abatch(topics_for_batch)

    # 3. Get multiple quotes using asyncio.gather with chain.ainvoke()
    topics_for_gather = ["creativity", "leadership"] # Different topics to see separate calls
    gather_quotes_task = get_multiple_quotes_concurrently_gather(topics_for_gather)

    # 4. Get multiple quotes using asyncio.gather with chain.ainvoke()
    topics_for_gather_2 = ["motivation", "discipline"] # Different topics to see separate calls
    gather_run_task = run_all(topics_for_gather_2)

    # Run all top-level tasks concurrently
    # The individual print statements within the functions will show interleaving
    await asyncio.gather(
        single_quote_task,
        batch_quotes_task,
        gather_quotes_task,
        gather_run_task
    )

    end_time_main = time.time()
    print(f"\nTotal time for all async operations: {end_time_main - start_time_main:.2f} seconds")

In [27]:
# --- Run the main async function ---
# For comparison, let's do one synchronous call first
print("--- SYNC Call (for baseline) ---")
sync_start_time = time.time()
sync_result = quote_chain.invoke({"topic": "patience"})
sync_end_time = time.time()
print(f"Sync: Received quote for 'patience': '{sync_result[:45]}...'")
print(f"Time for one sync call: {sync_end_time - sync_start_time:.2f}s\n")

--- SYNC Call (for baseline) ---
Sync: Received quote for 'patience': '"Patience is the gentle art of waiting for th...'
Time for one sync call: 1.43s



In [28]:
print("--- ASYNC Operations ---")
asyncio.run(main_async_operations())

--- ASYNC Operations ---
Async: Requesting quote for 'perseverance'...

Async Batch (Chain): Requesting quotes for topics: ['innovation', 'teamwork', 'learning'] using chain.abatch()...

Async Gather: Requesting quotes for topics: ['creativity', 'leadership'] using asyncio.gather with chain.ainvoke()...
Async Gather: All trial quotes received.
  - Topic 'motivation': '"Each step forward ignites the spark of possibility."...'
  - Topic 'discipline': '"Discipline is the bridge between goals and achievement."...'
Async Batch (Chain): All quotes received.
  - Topic 'innovation': '"Innovation is the spark that ...'
  - Topic 'teamwork': '"Together we achieve more; uni...'
  - Topic 'learning': '"Learning is the key that unlo...'
Async Gather: All quotes received.
  - Topic 'creativity': '"Creativity is the spark that ...'
  - Topic 'leadership': '"True leadership inspires othe...'
Async: Received quote for 'perseverance': '"Perseverance transforms obsta...'

Total time for all async operati

**Observations from the example:**
*   When `asyncio.run(main_async_operations())` is called, the event loop starts.
*   `get_single_quote_async`, `get_multiple_quotes_concurrently_chain_abatch`, and `get_multiple_quotes_concurrently_gather` are called. They return coroutine objects.
*   `await asyncio.gather(...)` in `main_async_operations` allows these top-level tasks to proceed concurrently.
*   Inside `get_multiple_quotes_concurrently_gather`, `await asyncio.gather(*tasks)` allows all the individual `quote_chain.ainvoke()` calls to OpenAI to happen "in parallel" from the perspective of your program (Python sends off the requests and can do other work or efficiently wait).
*   Similarly, `quote_chain.abatch(batch_inputs)` is designed to make multiple requests to the LLM backend as efficiently as possible, often concurrently.
*   The total time for all async operations will be significantly less than if you had made all those LLM calls synchronously one after another. It will be closer to the time taken by the slowest *set* of concurrent operations.



In [34]:
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain-openai import ChatOpenAI
import asyncio

llm = ChatOpenAI(
    api_key=userdata.get('OPEN_API_KEY'),
    base_url="https://aibe.mygreatlearning.com/openai/v1",
    model="gpt-4o-mini",
    temperature=0.7
)
prompt = PromptTemplate.from_template("What is {thing}?")
chain = LLMChain(llm=llm, prompt=prompt)

def main():
    result = chain.invoke({"thing":"quantum computing"})
    print(result)

main()

SyntaxError: invalid syntax (ipython-input-3552215880.py, line 3)

In [35]:
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
import asyncio

llm = ChatOpenAI(
    api_key=userdata.get('OPEN_API_KEY'),
    base_url="https://aibe.mygreatlearning.com/openai/v1",
    model="gpt-4o-mini",
    temperature=0.7
)
prompt = PromptTemplate.from_template("What is {thing}?")
chain = LLMChain(llm=llm, prompt=prompt)

def main():
    result = chain.invoke({"thing":"quantum computing"})
    print(result)

main()

{'thing': 'quantum computing', 'text': "Quantum computing is a type of computing that harnesses the principles of quantum mechanics to process information in fundamentally different ways compared to classical computing. Here are some key concepts and features of quantum computing:\n\n1. **Qubits**: Unlike classical bits, which can be either 0 or 1, quantum bits (qubits) can exist in a superposition of states, meaning they can be both 0 and 1 simultaneously. This property enables quantum computers to process a vast amount of information in parallel.\n\n2. **Superposition**: This quantum phenomenon allows qubits to represent multiple combinations of states at once. When a qubit is in superposition, it can perform many calculations simultaneously, which can lead to significant speedups for certain types of problems.\n\n3. **Entanglement**: Qubits can become entangled, which means the state of one qubit is directly related to the state of another, no matter the distance between them. This 

In [37]:
llm = ChatOpenAI(
    api_key=userdata.get('OPEN_API_KEY'),
    base_url="https://aibe.mygreatlearning.com/openai/v1",
    model="gpt-4o-mini",
    temperature=0.7
)
prompt = PromptTemplate.from_template("What is {thing}?")
chain = LLMChain(llm=llm, prompt=prompt)

async def main():
    result = await chain.arun(thing="quantum computing")
    print(result)

asyncio.run(main())

Quantum computing is a type of computation that leverages the principles of quantum mechanics to process information in fundamentally different ways than classical computers. While classical computers use bits as the smallest unit of data, which can be either 0 or 1, quantum computers use quantum bits, or qubits. Qubits can exist in multiple states simultaneously due to a property known as superposition.

Here are some key concepts associated with quantum computing:

1. **Superposition**: Unlike classical bits, which are in one state at a time, qubits can exist in a combination of states. This allows quantum computers to process a vast amount of possibilities simultaneously.

2. **Entanglement**: Qubits can become entangled, meaning the state of one qubit is directly related to the state of another, regardless of the distance between them. This property can be harnessed to perform complex calculations more efficiently.

3. **Quantum Gates**: Quantum operations are performed using quant