# CallBacks模块

回调模块允许接到LLM应用程序的各个阶段，鉴于LLM的幻觉问题，这对于日志记录、监视、流式处理和其他任务非常有用

## 1. 支持的回调方法

CallbackHandlers 回调助手类: 指的是实现 CallbackHandler 接口的对象，
该接口为每个可以订阅的事件都有一个方法。触发事件时，CallbackManager 将在每个处理程序上调用相应的方法。

下面是用于处理langchain回调的，基础的回调处理类，以及包含的所有支持的事件方法

In [None]:

class BaseCallbackHandler:
    """Base callback handler that can be used to handle callbacks from langchain."""

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> Any:
        """Run when LLM starts running."""

    def on_chat_model_start(
        self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs: Any
    ) -> Any:
        """Run when Chat Model starts running."""

    def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
        """Run on new LLM token. Only available when streaming is enabled."""

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
        """Run when LLM ends running."""

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when LLM errors."""

    def on_chain_start(
        self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
    ) -> Any:
        """Run when chain starts running."""

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
        """Run when chain ends running."""

    def on_chain_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when chain errors."""

    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
    ) -> Any:
        """Run when tool starts running."""

    def on_tool_end(self, output: Any, **kwargs: Any) -> Any:
        """Run when tool ends running."""

    def on_tool_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when tool errors."""

    def on_text(self, text: str, **kwargs: Any) -> Any:
        """Run on arbitrary text."""

    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
        """Run on agent action."""

    def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
        """Run on agent end."""

## 2. 简单案例

将所有的事件记录到输出结果中

In [7]:
from langchain.callbacks import StdOutCallbackHandler
from langchain.chains import LLMChain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate

#设置回调函数，采用框架提供的回调输出函数
handler = StdOutCallbackHandler()

llm = ChatOpenAI(model_name="gpt-3.5-turbo-0125")

prompt = PromptTemplate.from_template("1 + {number} = ")

# 将回调函数设置到LLM链中StdOutCallbackHandler
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler])
chain.invoke({"number":2})



[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3m1 + 2 = [0m

[1m> Finished chain.[0m


{'number': 2, 'text': '3'}

In [8]:
# 通过使用verbose参数查看输出结果（和设置StdOutCallbackHandler回调函数效果一样）
chain = LLMChain(llm=llm, prompt=prompt, verbose=True)
chain.invoke({"number":2})



[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3m1 + 2 = [0m

[1m> Finished chain.[0m


{'number': 2, 'text': '3'}

In [9]:
# 在请求调用是设置回调查看结果（和在LLM链中设置效果一样）
chain = LLMChain(llm=llm, prompt=prompt)
chain.invoke({"number":2}, {"callbacks":[handler]})



[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3m1 + 2 = [0m

[1m> Finished chain.[0m


{'number': 2, 'text': '3'}

## 3. 自定义回调函数

自定义的处理类：实现基类BaseCallbackHandler接口中的事件方法即可

In [12]:
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

#自定义处理类
class MyCustomHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"My custom handler, token: {token}")


# 启用streaming参数，通过设置 `streaming=True` 实现流式输出
chat = ChatOpenAI(max_tokens=25, streaming=True, callbacks=[MyCustomHandler()])

chat.invoke([HumanMessage(content="Tell me a joke")])

My custom handler, token: 
My custom handler, token: Why
My custom handler, token:  did
My custom handler, token:  the
My custom handler, token:  scare
My custom handler, token: crow
My custom handler, token:  win
My custom handler, token:  an
My custom handler, token:  award
My custom handler, token: ?
My custom handler, token:  Because
My custom handler, token:  he
My custom handler, token:  was
My custom handler, token:  outstanding
My custom handler, token:  in
My custom handler, token:  his
My custom handler, token:  field
My custom handler, token: !
My custom handler, token: 


AIMessage(content='Why did the scarecrow win an award? Because he was outstanding in his field!', response_metadata={'finish_reason': 'stop'})

## 4.记录到文件

可以利用回调函数，将输出记录到日志文件中
通过FileCallbackHandler实现将日志输出打印到文件中，执行过程和StdOutCallbackHandler类似，只是输出写入文件

In [45]:
from langchain.callbacks import FileCallbackHandler
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from loguru import logger

logfile = "output.log"
logger.add(logfile, colorize=True, enqueue=True)
handler = FileCallbackHandler(logfile)

llm = ChatOpenAI()
prompt = PromptTemplate.from_template("1 + {number} = ")

#设置verbose=True输出到控制台，同时通过FileCallbackHandler写入文件（如果verbose=false仍然会写入到文件）
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler], verbose=True)
#调用llm链并获取到结果
answer = chain.invoke({"number":2})
#将结果也记录到日志
logger.info(answer)



[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3m1 + 2 = [0m


[32m2024-04-09 17:32:05.238[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m19[0m - [1m{'number': 2, 'text': '3'}[0m



[1m> Finished chain.[0m


## 5.异步回调   #!!!!!!!!!!!执行后，暂未看到异步的效果！！！！！！！！！！

当需要读取或写入大量数据到数据库或者文件时，可以使用异步回调来避免阻塞主线程

In [44]:
import asyncio
from typing import Any, Dict, List

from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_core.outputs import LLMResult
from langchain_openai import ChatOpenAI
import time

#1.自定义同步的回调处理器
class MyCustomSyncHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"{time.time()} Sync handler being called in a `thread_pool_executor`: token: {token}")
#2.自定义异步的回调处理器
class MyCustomAsyncHandler(AsyncCallbackHandler):
    """Async callback handler that can be used to handle callbacks from langchain."""

    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when chain starts running."""
        print(f"{time.time()} async on_llm_start....")
        await asyncio.sleep(0.5)
        print(f"{time.time()} Hi! I just woke up. Your llm is starting")

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when chain ends running."""
        print(f"{time.time()} async on_llm_end....")
        await asyncio.sleep(0.5)
        print(f"{time.time()} Hi! I just woke up. Your llm is ending")

# 启用流式输出，构建模型，放入回调列表
chat = ChatOpenAI(
    max_tokens=25,
    streaming=True,
    callbacks=[MyCustomSyncHandler(),MyCustomAsyncHandler()],
)

#!!!!!!!!!!!执行后，暂未看到异步的效果！！！！！！！！！！
await chat.agenerate([[HumanMessage(content="Tell me a joke")]])


1712653662.9382486 async on_llm_start....
1712653663.450857 Hi! I just woke up. Your llm is starting
1712653671.215189 Sync handler being called in a `thread_pool_executor`: token: 
1712653671.2435968 Sync handler being called in a `thread_pool_executor`: token: Why
1712653671.245625 Sync handler being called in a `thread_pool_executor`: token:  couldn
1712653671.258355 Sync handler being called in a `thread_pool_executor`: token: 't
1712653671.259472 Sync handler being called in a `thread_pool_executor`: token:  the
1712653671.3210065 Sync handler being called in a `thread_pool_executor`: token:  bicycle
1712653671.3225036 Sync handler being called in a `thread_pool_executor`: token:  find
1712653671.360427 Sync handler being called in a `thread_pool_executor`: token:  its
1712653671.3621085 Sync handler being called in a `thread_pool_executor`: token:  way
1712653671.372167 Sync handler being called in a `thread_pool_executor`: token:  home
1712653671.3738072 Sync handler being calle

LLMResult(generations=[[ChatGeneration(text="Why couldn't the bicycle find its way home?\n\nBecause it lost its bearings!", generation_info={'finish_reason': 'stop'}, message=AIMessage(content="Why couldn't the bicycle find its way home?\n\nBecause it lost its bearings!", response_metadata={'finish_reason': 'stop'}))]], llm_output={'token_usage': {}, 'model_name': 'gpt-3.5-turbo'}, run=[RunInfo(run_id=UUID('fcfefb80-e534-40bf-aae3-6d8ae9051ed1'))])