# 模型调用的方式

## 1.非流式和流式输出

- 提前引入大模型:

In [2]:
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
import os
import dotenv

dotenv.load_dotenv()

chat_model = ChatOpenAI(
    model = "deepseek-chat",
    base_url = os.environ["BASE_URL"],
    api_key = os.environ["DEEPSEEK_API_KEY"]
)

- invoke阻塞式调用(非流式输出):
1) 用户发出提问请求后, 后台会等待大模型 生成完整响应,一次性将结果全部返回
2) LLM和大模型交互时的默认方式, 简单且稳定
3) 适用于大多数问答、摘要、信息抽取类任务,可快速集成和部署

In [8]:
# Must be a PromptValue, str, or list of BaseMessages
# 即提示词必须是PromptValue,字符串 或 Message列表
messages = [
    HumanMessage("请用两三句话简单介绍下LangChain")
]
## invoke阻塞式调用
response = chat_model.invoke(messages)

print(f"response: {response}")
print(f"response type: {type(response)}")
print(f"content type: {type(response.content)}")

response: content='LangChain是一个用于开发大语言模型应用的框架。它通过提供模块化组件和链式调用，简化了与外部数据源和工具的集成过程。开发者可以利用它快速构建基于大语言模型的端到端应用，如问答系统和智能代理。' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 53, 'prompt_tokens': 13, 'total_tokens': 66, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}, 'prompt_cache_hit_tokens': 0, 'prompt_cache_miss_tokens': 13}, 'model_provider': 'openai', 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_ffc7281d48_prod0820_fp8_kvcache', 'id': 'f2903ad8-3886-49e4-9a1e-e1c6af125239', 'finish_reason': 'stop', 'logprobs': None} id='lc_run--53871be3-1dfb-474e-95df-2d6d8d3ff8ee-0' usage_metadata={'input_tokens': 13, 'output_tokens': 53, 'total_tokens': 66, 'input_token_details': {'cache_read': 0}, 'output_token_details': {}}
response type: <class 'langchain_core.messages.ai.AIMessage'>
content type: <class 'str'>


- stream流式调用:
1) 更具交互感的输出方式, 用户不用等待完整回答, 可看到大模型逐个token 实时地返回内容
2) 初始化ChatModel时,设置 streaming=True 并配合 回调机制 来启用流式输出
3) 适合构建强调“实时反馈”的应用，如聊天机器人、写作助手等

In [9]:
# invoke阻塞式调用
stream_chat_model = ChatOpenAI(
    model = "deepseek-chat",
    base_url = os.environ["BASE_URL"],
    api_key = os.environ["DEEPSEEK_API_KEY"],
    streaming=True # 开启流式输出
)

messages2 = [
    HumanMessage("请简单介绍下LangChain")
]
print("开始流式输出回答:\n")
for chunk in stream_chat_model.stream(messages2):
    # 刷新缓冲区 (无换行符，缓冲区未刷新，内容可能不会立即显示)
    print(chunk.content, end="", flush=True)

print("/n流式输出结束")

开始流式输出回答:

好的，这是一个对LangChain的简单介绍。

### 什么是LangChain？

**LangChain** 是一个用于开发由大型语言模型驱动的应用程序的**框架**。你可以把它想象成一个“工具箱”或“脚手架”，它的核心目的是让开发者能够更轻松、更高效地将LLM（如GPT系列、LLaMA等）与外部数据源和计算能力连接起来，从而构建出功能强大且实用的应用。

简单来说，LangChain解决了LLM原生存在的几个主要问题：
1.  **信息陈旧**：LLM的训练数据有截止日期，不知道最新信息。
2.  **缺乏上下文**：LLM无法直接访问你私有的、特定领域的数据（如公司文档、个人笔记）。
3.  **“幻觉”问题**：LLM有时会编造看似合理但实际错误的信息。
4.  **无法执行动作**：LLM本身不能替你做事情，比如发邮件、查询数据库。

### 核心思想：链

LangChain的名字就来源于其核心思想——**“链”**。它通过将不同的组件像链条一样连接起来，完成复杂的任务。一个典型的链条可能包括：
1.  接收用户输入。
2.  用一个提示模板格式化输入。
3.  将格式化后的提示传递给LLM。
4.  将LLM的输出解析为结构化数据。
5.  根据这个数据去执行一个动作（如查询数据库、调用API）。

### 核心组件

为了让“链”能够工作，LangChain提供了几个关键组件：

1.  **模型**：支持多种LLM和聊天模型（如OpenAI, Anthropic, 开源模型等）以及嵌入模型。
2.  **提示**：管理LLM的输入。包括**提示模板**（可复用的提示结构）和**示例选择器**（为提示动态选择最佳示例）。
3.  **链**：将多个组件组合在一起，形成一个完整的工作流。这是LangChain的核心。
4.  **检索器**：用于从外部数据源（如文档、数据库）中获取相关数据，是解决“信息陈旧”和“缺乏上下文”的关键。
5.  **代理**：这是更高级的“链”。代理可以理解用户指令，然后**自主地决定使用哪些工具**来完成任务。工具可以是搜索引擎、计算器、API等。代理让LLM从“聊天机器人”变成了可以“行动”的智能体。
6.  **记忆**：为对话或交互提供记忆能力，使其能够记住之前说过的话，实现多轮对话的上下

## 2.批量调用

- batch批量调用: 一次性将多个对话传个大模型, 让大模型批量生成回答

In [10]:
from langchain_core.messages import SystemMessage

messages1 = [
    SystemMessage(content="你是一位人工智能应用领域的专家"),
    HumanMessage(content="请用两三句话简单介绍下什么是机器学习")
]

messages2 = [
    SystemMessage(content="你是一位资深的Java+AI应用开发方向的工程师"),
    HumanMessage(content="请用两三句话简单介绍下现在企业中的主流技术栈都有哪些")]

messages3 = [
    SystemMessage(content="你是一位乐于助人的智能小助手"),
    HumanMessage(content="请用两三句话简单介绍下什么是大模型技术")]

msg_list = [messages1, messages2, messages3]
response = chat_model.batch(msg_list)

print(f"response: {response}")
print(f"response type: {type(response)}")

response: [AIMessage(content='机器学习是人工智能的一个分支，它让计算机通过分析数据来自动学习和改进，而无需显式编程。其核心在于从历史数据中识别模式，并利用这些模式来预测未来或做出决策。常见的应用包括推荐系统、图像识别和自然语言处理等。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 58, 'prompt_tokens': 19, 'total_tokens': 77, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}, 'prompt_cache_hit_tokens': 0, 'prompt_cache_miss_tokens': 19}, 'model_provider': 'openai', 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_ffc7281d48_prod0820_fp8_kvcache', 'id': 'e45fbb4b-4749-4fb7-9f6c-ed456db26927', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--a03a286d-e38b-4214-af18-2e9491082924-0', usage_metadata={'input_tokens': 19, 'output_tokens': 58, 'total_tokens': 77, 'input_token_details': {'cache_read': 0}, 'output_token_details': {}}), AIMessage(content='目前企业中主流的技术栈主要包括Spring Boot、Spring Cloud等微服务框架，结合MySQL、Redis等数据库与缓存系统。前端通常采用Vue、React等框架，并通过Docker、Kubernetes实现容器化部署。同时，大数据与AI场景下常集

## 3.同步调用和异步调用

- 同步调用: 每个操作依次执行,直到当前操作完成后才开始下一个操作,其总执行耗时是各个操作的时间总和

In [3]:
import time

def call_model():
    print("开始调用大模型...")
    time.sleep(2)  # 模拟调用过程,等待两秒
    print("大模型调用结束")

def invoke_other_task():
    for i in range(5):
        print(f"执行其他任务{i+1}")
        time.sleep(1)

def sync_main():
    start_time = time.time()
    call_model()
    invoke_other_task()
    end_time = time.time()
    cost_time = end_time - start_time
    return f"all invoke cost time: {cost_time}"

time_info = sync_main()
print(time_info)

开始调用大模型...
大模型调用结束
执行其他任务1
执行其他任务2
执行其他任务3
执行其他任务4
执行其他任务5
all invoke cost time: 7.005067825317383


- 异步调用: 允许程序在等待某些操作完成时,继续执行其他任务,而不是阻塞等待;
    适用于处理I/O操作(如网络请求、文件读写等),可显著提高程序的执行效率和响应速度

In [4]:
import asyncio
import time

async def async_call(llm):
    await asyncio.sleep(5) # 模拟异步操作
    print("异步调用完成")

async def perform_other_tasks():
    await asyncio.sleep(5) # 模拟异步操作
    print("其他任务完成")

async def run_async_tasks():
    start_time = time.time()
    await asyncio.gather(
        async_call(None), # 示例调用，使用None模拟LLM对象
        perform_other_tasks()
    )
    end_time = time.time()
    return f"总共耗时：{end_time - start_time}秒"

# # 正确运行异步任务的方式
# if __name__ == "__main__":
# # 使用 asyncio.run() 来启动异步函数
# result = asyncio.run(run_async_tasks())
# print(result)

# 在 Jupyter 单元格中直接调用
result = await run_async_tasks()
print(result)

异步调用完成
其他任务完成
总共耗时：5.0139172077178955秒


- ainvoke异步调用验证:

In [5]:
import inspect

print("ainvoke 是协程函数【是否异步】:", inspect.iscoroutinefunction(chat_model.ainvoke))
print("invoke 是协程函数【是否异步】:", inspect.iscoroutinefunction(chat_model.invoke))

ainvoke 是协程函数【是否异步】: True
invoke 是协程函数【是否异步】: False
