# 3. LangChain基础

## 3.4. LangChain的回调

### 3.4.2. 认识异步回调

In [1]:
import asyncio
from typing import Any, Dict, List
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_core.outputs import LLMResult
from langchain_openai import ChatOpenAI
from langchain_community.chat_models import ChatOllama

class MyCustomSyncHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"正在 thread_pool_executor 中调用同步处理程序: token: {token}")

class MyCustomAsyncHandler(AsyncCallbackHandler):
    """可用于处理来自LangChain的回调异步回调处理程序。"""
    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> None:
        """在链开始时运行。"""
        print("zzzz....")
        await asyncio.sleep(0.3)
        class_name = serialized["name"]
        print("LLM正在启动")

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """在链结束时运行。"""
        print("zzzz....")
        await asyncio.sleep(0.3)
        print("LLM结束")


# 为了启用流式传输，在ChatModel()构造函数中传入streaming=True
# 此外，还传入了一个包含自定义处理程序的列表

"""
openai_api_key = "EMPTY"
openai_api_base = "http://localhost:11434/v1"

chat = ChatOpenAI(
    openai_api_key = openai_api_key, 
    openai_api_base = openai_api_base,
    temperature=0, 
    streaming=True,
    callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],)
"""

chat = ChatOllama(
    model="qwen:1.8b",
    temperature=0, 
    streaming=True,
    callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],)

await chat.agenerate([[HumanMessage(content="告诉我北京的特产")]])

  chat = ChatOllama(


zzzz....
LLM正在启动
正在 thread_pool_executor 中调用同步处理程序: token: 北京
正在 thread_pool_executor 中调用同步处理程序: token: 是中国
正在 thread_pool_executor 中调用同步处理程序: token: 的
正在 thread_pool_executor 中调用同步处理程序: token: 首都
正在 thread_pool_executor 中调用同步处理程序: token: ，
正在 thread_pool_executor 中调用同步处理程序: token: 拥有
正在 thread_pool_executor 中调用同步处理程序: token: 丰富的
正在 thread_pool_executor 中调用同步处理程序: token: 特产
正在 thread_pool_executor 中调用同步处理程序: token: 。
正在 thread_pool_executor 中调用同步处理程序: token: 以下
正在 thread_pool_executor 中调用同步处理程序: token: 是一
正在 thread_pool_executor 中调用同步处理程序: token: 些
正在 thread_pool_executor 中调用同步处理程序: token: 在北京
正在 thread_pool_executor 中调用同步处理程序: token: 常见的
正在 thread_pool_executor 中调用同步处理程序: token: 特产
正在 thread_pool_executor 中调用同步处理程序: token: ：


正在 thread_pool_executor 中调用同步处理程序: token: 1
正在 thread_pool_executor 中调用同步处理程序: token: .
正在 thread_pool_executor 中调用同步处理程序: token:  糖
正在 thread_pool_executor 中调用同步处理程序: token: 果
正在 thread_pool_executor 中调用同步处理程序: token: ：
正在 thread_pool_executor 中调用同步处理程序: token:

LLMResult(generations=[[ChatGeneration(text='北京是中国的首都，拥有丰富的特产。以下是一些在北京常见的特产：\n\n1. 糖果：北京的糖果种类繁多，如枣泥、豆沙、芝麻糖等。其中，枣泥和豆沙是北京传统糕点，而芝麻糖则是一种新兴的甜品。\n\n2. 面食：北京的面食种类丰富，包括馒头、包子、烙饼、炸酱面等。其中，馒头和包子是中国北方的传统面食，而烙饼和炸酱面则是北京地区新兴的面食。\n\n3. 茶叶：北京是全国最大的茶叶产区之一，拥有丰富的茶树资源和独特的茶文化。其中，龙井茶、碧螺春茶、白毫银针茶等都是北京地区的著名茶叶品种。\n\n4. 瓷器：北京是中国的首都，也是中国瓷器的重要产地之一。其中，青花瓷、粉彩瓷、五彩瓷等都是北京地区著名的瓷器品种。', generation_info={'model': 'qwen:1.8b', 'created_at': '2025-02-22T09:20:27.223424119Z', 'message': {'role': 'assistant', 'content': ''}, 'done_reason': 'stop', 'done': True, 'total_duration': 53251345137, 'load_duration': 2082765641, 'prompt_eval_count': 12, 'prompt_eval_duration': 401000000, 'eval_count': 207, 'eval_duration': 50765000000}, message=AIMessage(content='北京是中国的首都，拥有丰富的特产。以下是一些在北京常见的特产：\n\n1. 糖果：北京的糖果种类繁多，如枣泥、豆沙、芝麻糖等。其中，枣泥和豆沙是北京传统糕点，而芝麻糖则是一种新兴的甜品。\n\n2. 面食：北京的面食种类丰富，包括馒头、包子、烙饼、炸酱面等。其中，馒头和包子是中国北方的传统面食，而烙饼和炸酱面则是北京地区新兴的面食。\n\n3. 茶叶：北京是全国最大的茶叶产区之一，拥有丰富的茶树资源和独特的茶文化。其中，龙井茶、碧螺春茶、白毫银针茶等都是北京地区的著名茶叶品种。\n\n4. 瓷器：北京是中国的首都，也是中国瓷器的重要产地之一。其中，青花

### 3.4.4. 自定义回调

In [2]:
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_community.chat_models import ChatOllama

class MyCustomHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"自定义回调处理器, token: {token}")

"""
openai_api_key = "EMPTY"
openai_api_base = "http://localhost:11434/v1"
chat = ChatOpenAI(max_tokens=25, streaming=True,openai_api_key = openai_api_key,openai_api_base = openai_api_base, callbacks=[MyCustomHandler()])
"""

chat = ChatOllama(model="qwen:1.8b", max_tokens=25, streaming=True, callbacks=[MyCustomHandler()])
chat([HumanMessage(content="中国首都是？")])

  chat([HumanMessage(content="中国首都是？")])


自定义回调处理器, token: 北京
自定义回调处理器, token: 。
自定义回调处理器, token: 


AIMessage(content='北京。', additional_kwargs={}, response_metadata={'model': 'qwen:1.8b', 'created_at': '2025-02-22T09:21:19.66625431Z', 'message': {'role': 'assistant', 'content': ''}, 'done_reason': 'stop', 'done': True, 'total_duration': 732771314, 'load_duration': 24750644, 'prompt_eval_count': 12, 'prompt_eval_duration': 328000000, 'eval_count': 3, 'eval_duration': 378000000}, id='run-21a31dea-52f4-43de-a543-118cbdb6cd09-0')

### 3.4.5. 使用回调记录日志

In [3]:
from langchain.callbacks import FileCallbackHandler
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
from langchain_community.chat_models import ChatOllama
from loguru import logger

logfile = "output.log"
logger.add(logfile, colorize=True, enqueue=True)
handler = FileCallbackHandler(logfile)

"""
openai_api_key = "EMPTY"
openai_api_base = "http://localhost:11434/v1"
llm = OpenAI(openai_api_key = openai_api_key, openai_api_base = openai_api_base,)
"""
llm = ChatOllama(model="qwen:1.8b")
prompt = PromptTemplate.from_template("1 + {number} = ")

# 这个链将同时向标准输出打印（因为 verbose=True）并写入 'output.log'
# 如果 verbose=False，FileCallbackHandler 仍会写入 'output.log'
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler], verbose=True)
answer = chain.run(number=1)
logger.info(answer)

  chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler], verbose=True)
  answer = chain.run(number=1)
Error in FileCallbackHandler.on_chain_start callback: AttributeError("'NoneType' object has no attribute 'get'")




[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3m1 + 1 = [0m


[32m2025-02-22 04:21:24.431[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m24[0m - [1m2[0m



[1m> Finished chain.[0m


### 3.4.6. 处理多个回调

In [4]:
from typing import Any, Dict, List, Union
from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.agents import AgentAction
from langchain_community.llms import Ollama

# 定义自定义回调处理器
class MyCustomHandlerOne(BaseCallbackHandler):
    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
        print(f"on_llm_start {serialized['name']}")

    def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
        print(f"on_new_token {token}")

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any ) -> Any:
        """Run when LLM errors."""

    def on_chain_start(
        self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> Any:
        print(f"on_chain_start {serialized['name']}")

    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
        print(f"on_tool_start {serialized['name']}")

    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
        print(f"on_agent_action {action}")

class MyCustomHandlerTwo(BaseCallbackHandler):
    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> Any:
        print(f"on_llm_start (I'm the second handler!!) {serialized['name']}")

# 实例化处理器
handler1 = MyCustomHandlerOne()
handler2 = MyCustomHandlerTwo()

# 设置代理。只有“llm”会为handler2发出回调
llm = Ollama(model="qwen:1.8b", callbacks=[handler2])
tools = load_tools(["llm-math"], llm=llm)
agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION)

# handler1的回调将由参与Agent执行的每个对象（llm、llmchain、tool、agent executor）发出
agent.run("为什么影子之间会有吸力?", callbacks=[handler1])

  llm = Ollama(model="qwen:1.8b", callbacks=[handler2])
  agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION)
Error in MyCustomHandlerOne.on_chain_start callback: TypeError("'NoneType' object is not subscriptable")
Error in MyCustomHandlerOne.on_chain_start callback: TypeError("'NoneType' object is not subscriptable")


on_llm_start Ollama
on_llm_start (I'm the second handler!!) Ollama
on_new_token  我
on_new_token 会
on_new_token 回答
on_new_token 。
Final
on_new_token  Answer
on_new_token :
on_new_token  影
on_new_token 子
on_new_token 之间的
on_new_token 吸
on_new_token 力
on_new_token 主要是
on_new_token 由于
on_new_token 光
on_new_token 的
on_new_token 折射
on_new_token 和
on_new_token 干涉
on_new_token 现象
on_new_token 引起的
on_new_token 。

首先
on_new_token ，
on_new_token 当
on_new_token 光线
on_new_token 从
on_new_token 一种
on_new_token 介质
on_new_token （
on_new_token 如
on_new_token 空气
on_new_token ）
on_new_token 射
on_new_token 向
on_new_token 另一种
on_new_token 介质
on_new_token （
on_new_token 如
on_new_token 水
on_new_token ）
on_new_token 时
on_new_token ，
on_new_token 会发生
on_new_token 光
on_new_token 的
on_new_token 反射
on_new_token 。
on_new_token 在
on_new_token 反射
on_new_token 过程中
on_new_token ，
on_new_token 光
on_new_token 将
on_new_token 一部分
on_new_token 能量
on_new_token 转化为
on_new_token 热量
on_new_token ，
on_new_token 而
on_new_token 另一

'影子之间的吸力主要是由于光的折射和干涉现象引起的。\n\n首先，当光线从一种介质（如空气）射向另一种介质（如水）时，会发生光的反射。在反射过程中，光将一部分能量转化为热量，而另一部分能量则以光的形式返回到空气中。这种过程称为光的反射。\n\n接着，当光线继续从一种介质射向另一种介质时，还会发生光的折射现象。在折射过程中，由于不同介质对入射光的折射率不同，因此一部分入射光会改变方向，进入另一种介质中。这种改变方向的入射光就叫做为光的折射。\n\n最后，当光线继续从一种介质射向另一种介质时，还会发生光的干涉现象。在干涉过程中，由于不同光源发出的光波具有不同的相位和频率特性，因此一部分入射光会在接收器上产生相位差或频率差，从而导致接收器上的光强发生变化。这种光强变化就是为光的干涉。\n\n综上所述，影子之间的吸力主要是由光的反射、折射和干涉现象引起的。'