# WithListeners

- Author: [Donghak Lee](https://github.com/stsr1284)
- Design:
- Peer Review: [Donghak Lee](https://github.com/stsr1284)
- This is a part of [LangChain Open Tutorial](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial)

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/99-TEMPLATE/00-BASE-TEMPLATE-EXAMPLE.ipynb) [![Open in GitHub](https://img.shields.io/badge/Open%20in%20GitHub-181717?style=flat-square&logo=github&logoColor=white)](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/99-TEMPLATE/00-BASE-TEMPLATE-EXAMPLE.ipynb)

## Overview

이 튜토리얼은 `Runnable` 의 `with_listeners()`의 구현 및 활용을 다룹니다.

`with_listeners`는 Runnable에 lifecycle listeners를 bind하여 new Runnable을 반환 합니다. 이를 통해서 데이터 흐름에 이벤트 리스너를 연결하여 실행 중 이벤트를 추적하거나 분석, 디버깅 작업을 수행할 수 있습니다.

with_listeners() 함수는 Runnable 객체에 이벤트 리스너를 추가하는 기능을 제공합니다. 리스너는 특정 이벤트가 발생했을 때 호출되는 함수로, start, end, error 등의 이벤트를 감지하여 실행됩니다.

이 함수는 다음과 같은 상황에서 유용합니다:

- 데이터 처리 시작과 종료를 로깅하기

- 에러 발생 시 알림을 트리거하기

- 디버깅 정보 출력하기

### Table of Contents

- [Overview](#overview)
- [Environement Setup](#environment-setup)
- [with_listeners](#with_listeners)
- [with_alisteners](#with_alisteners)

### References

- [LangChain with_listensers](https://python.langchain.com/v0.2/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.with_listeners)
----

## Environment Setup

Set up the environment. You may refer to [Environment Setup](https://wikidocs.net/257836) for more details.

**[Note]**
- `langchain-opentutorial` is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials. 
- You can checkout the [`langchain-opentutorial`](https://github.com/LangChain-OpenTutorial/langchain-opentutorial-pypi) for more details.

In [50]:
%%capture --no-stderr
!pip install langchain-opentutorial

In [None]:
# Install required packages
from langchain_opentutorial import package

package.install(
    [
        "langchain_core",
        "asyncio",
        "time",
    ],
    verbose=False,
    upgrade=False,
)

In [52]:
# Set environment variables
from langchain_opentutorial import set_env

set_env(
    {
        "OPENAI_API_KEY": "",
        "LANGCHAIN_API_KEY": "",
        "LANGCHAIN_TRACING_V2": "true",
        "LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
        "LANGCHAIN_PROJECT": "WithListeners",
    }
)

Environment variables have been set successfully.


You can alternatively set API keys such as `OPENAI_API_KEY` in a `.env` file and load them.

[Note] This is not necessary if you've already set the required API keys in previous steps.

In [None]:
# Load API keys from .env file
from dotenv import load_dotenv

load_dotenv(override=True)

## with_listeners

Runnable.with_listeners()는 리스너 함수 목록을 받아서 새로운 Runnable 객체를 반환합니다. 리스너 함수는 start, end, error 이벤트에 대응하여 동작합니다.

리스너 함수는 다음 구조를 가집니다:
- on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) 
    - Called before the Runnable starts running. Defaults to None.

- on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) 
    - Called after the Runnable finishes running. Defaults to None.

- on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]])
    - Called if the Runnable throws an error. Defaults to None."

In [54]:
from langchain_core.runnables import RunnableLambda
import time

# 각 Runnable에서 수행할 작업 정의
def stepOne(message):
    time.sleep(1)  # 1초 대기
    return f"Step 1 completed with message {message}"

def stepTwo(message):
    time.sleep(2)  # 2초 대기
    return f"Step 2 completed with message {message}"

def stepThree(message):
    time.sleep(1)  # 1초 대기
    return f"Step 3 completed with message {message}"

# 리스너 함수 정의
def fnStart(runObj):
    print(f"Start: {runObj.inputs}")

def fnEnd(runObj):
    print(f"End: {runObj.inputs}")

def fnError(runObj):
    print(f"Error: {runObj.error}")

# 각 Runnable 정의
runnable1 = RunnableLambda(stepOne).with_listeners(
    on_start=fnStart,
    on_end=fnEnd,
    on_error=fnError
)

runnable2 = RunnableLambda(stepTwo).with_listeners(
    on_start=fnStart,
    on_end=fnEnd,
    on_error=fnError
)

runnable3 = RunnableLambda(stepThree).with_listeners(
    on_start=fnStart,
    on_end=fnEnd,
    on_error=fnError
)

# 체인 연결
chain = runnable1 | runnable2 | runnable3

# 실행
output = chain.invoke("Hello, World!")

Start: {'input': 'Hello, World!'}
End: {'input': 'Hello, World!'}
Start: {'input': 'Step 1 completed with message Hello, World!'}
End: {'input': 'Step 1 completed with message Hello, World!'}
Start: {'input': 'Step 2 completed with message Step 1 completed with message Hello, World!'}
End: {'input': 'Step 2 completed with message Step 1 completed with message Hello, World!'}


## with_alisteners

Bind asynchronous lifecycle listeners to a Runnable, returning a new Runnable

on_start: Asynchronously called before the Runnable starts running. on_end: Asynchronously called after the Runnable finishes running. on_error: Asynchronously called if the Runnable throws an error.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

In [55]:
import asyncio

async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {time.strftime('%S')}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {time.strftime('%S')}")

async def fn_start(run_obj):
    print(f"on start callback starts at {time.strftime('%S')}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {time.strftime('%S')}")

async def fn_end(run_obj):
    print(f"on end callback starts at {time.strftime('%S')}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {time.strftime('%S')}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)

async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

# Check if there's an existing event loop
try:
    loop = asyncio.get_running_loop()
except RuntimeError:
    loop = None

if loop and loop.is_running():
    # If there's an existing event loop, use it to run the coroutine
    await concurrent_runs()
else:
    # Otherwise, create a new event loop and run the coroutine
    asyncio.run(concurrent_runs())

on start callback starts at 36
on start callback starts at 36
on start callback ends at 39
on start callback ends at 39
Runnable[2s]: starts at 39
Runnable[3s]: starts at 39
Runnable[2s]: ends at 41
on end callback starts at 41
Runnable[3s]: ends at 42
on end callback starts at 42
on end callback ends at 43
on end callback ends at 44
