### 1 基本用法

#### 1.1 初始化模型

In [None]:
! pip install -U "langchain[deepseek]"

In [None]:
# 使用init_chat_model 或者 Model class 都可以，这里使用Model class
from langchain_deepseek import ChatDeepSeek

model = ChatDeepSeek(
    model="deepseek-chat"
)

result = model.invoke("你好，请你详细介绍下你自己")
result

### 2 模型参数

常见参数如下：

model -> string , require 你想使用的模型名称

api_key -> string 模型的api_key

temperature -> number 控制模型输出的随机性，数值越高，创造性越高，数值越低，则越确定。

timeout -> number 等待模型响应的最大时间，以秒为单位，超时后取消请求。

max_tokens -> number 最大令牌数 限制响应的token总数，控制输出长度。

max_retries -> number 最大重试次数。系统在请求因网络超时或速率限制等问题失败时，最多重试的次数。

In [None]:
model = ChatDeepSeek(
    model="deepseek-chat",
    temperature=0,
    timeout=1000,
    max_tokens=1000,
    max_retries=2
)
model.invoke("请你详细介绍下你自己")

### 3 调用

#### 3.1 invoke

In [None]:
# 调用模型最直接的方式是使用 invoke() ，传入单个消息或消息列表。
response = model.invoke("你好")
print(response)

In [None]:
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

conversation = [
    SystemMessage("You are a helpful assistant that translates English to French."),
    HumanMessage("Translate: I love programming."),
    AIMessage("J'adore la programmation."),
    HumanMessage("Translate: I love building applications.")
]

response = model.invoke(conversation)
print(response)  

#### 3.2 stream 流式输出

In [None]:
# 基本文本流
for chunk in model.stream("你好，两句话介绍下自己"):
    print(chunk.text, end="|", flush=True)

#### 3.3 批处理

将一组独立的请求批量提交给模型，可以显著提高性能并降低成本，因为处理可以并行进行

In [None]:
responses = model.batch([
    "解释下机器学习",
    "解释下深度学习",
    "解释下强化学习"
])
for response in responses:
    print(response)

默认情况下， batch() 只会返回整个批次的最终输出。如果你希望在每个输入生成完成后接收对应的输出，可以使用 batch_as_completed() 进行流式传输：

In [None]:
for response in model.batch_as_completed([
    "解释下机器学习",
    "解释下深度学习",
    "解释下强化学习"
]):
    print(response)
# 使用 batch_as_completed() 时，结果可能会无序到达。每个结果都包含输入索引，以便匹配并根据需要重建原始顺序。
# 不知道为何其实还是一次性返回的。官方是这么说的

在使用 batch() 或 batch_as_completed() 处理大量输入时，您可能希望控制最大并行调用次数。这可以通过在 RunnableConfig 字典中设置 max_concurrency 属性来实现。

In [None]:
model.batch(
    [
    "解释下机器学习",
    "解释下深度学习",
    "解释下强化学习"
],
    config={
        'max_concurrency': 5,  # 限制为5个并发。
    }
)

### 4 工具调用

工具调用需要使用bind_tools绑定工具

In [None]:
from langchain.tools import tool

@tool
def get_weather(city):
    """用来查询city天气"""
    return f"{city}的天气多云！"

llm_with_tools = model.bind_tools([get_weather])

response = llm_with_tools.invoke("今天上海的天气如何")

In [None]:
for tool_call in response.tool_calls:
    print(f"Tool: {tool_call['name']}")
    print(f"Args: {tool_call['args']}")

#### 4.1 完整的工具执行

In [None]:
llm_with_tools = model.bind_tools([get_weather])
messages = [{"role":"user", "content":"帮我查一下上海的天气"}]
ai_msg = llm_with_tools.invoke(messages)

messages.append(ai_msg)

for tool_call in ai_msg.tool_calls:
    result = get_weather.invoke(tool_call)
    messages.append(result)

final_response = llm_with_tools.invoke(messages)
final_response.text

#### 4.2 强制工具调用

默认情况下，模型有自由根据用户输入选择使用哪个绑定工具。然而，你可能希望强制选择某个工具，确保模型使用特定的工具或来自给定列表的任何工具

In [None]:
model_with_tools = model.bind_tools([get_weather], tool_choice="any") # 任何工具
model_with_tools = model.bind_tools([get_weather], tool_choice="get_weather") # 特定工具

#### 4.3 并行工具调用

许多模型在适当时支持并行调用多个工具。这允许模型同时从不同来源收集信息。

In [None]:
model_with_tools = model.bind_tools([get_weather])

response = model_with_tools.invoke("请问上海和北京的天气怎么样")

response

In [None]:
results = []
for tool_call in response.tool_calls:
    result = get_weather.invoke(tool_call)
    results.append(result)

results

In [None]:
# 默认开启并行工具调用，如果禁用，则如下设置
model.bind_tools([get_weather], parallel_tool_calls=False)

#### 4.4 流式工具调用

In [None]:
for chunk in model_with_tools.stream(
    "上海和北京的天气怎么样?"
):
    # 工具调用片段逐步到达
    for tool_chunk in chunk.tool_call_chunks:
        # 海象运算符
        if name := tool_chunk.get("name"):
            print(f"Tool: {name}")
        if id_ := tool_chunk.get("id"):
            print(f"ID: {id_}")
        if args := tool_chunk.get("args"):
            print(f"Args: {args}")

In [None]:
gathered = None
for chunk in model_with_tools.stream("北京天气如何?"):
    gathered = chunk if gathered is None else gathered + chunk
    print(gathered.tool_calls)

### 5 结构化输出

#### 5.1 Pydantic

In [26]:
from pydantic import BaseModel, Field

class Movie(BaseModel):
    """A movie with details."""
    title: str = Field(..., description="The title of the movie")
    year: int = Field(..., description="The year the movie was released")
    director: str = Field(..., description="The director of the movie")
    rating: float = Field(..., description="The movie's rating out of 10")

# 绑定格式。
model_with_structure = model.with_structured_output(Movie)
response = model_with_structure.invoke("提供西虹市首富的电视剧介绍")
print(response)  

title='西虹市首富' year=2018 director='闫非' rating=6.6


#### 5.2 TypedDict

In [None]:
from typing_extensions import TypedDict, Annotated

class MovieDict(TypedDict):
    """A movie with details."""
    title: Annotated[str, ..., "The title of the movie"]
    year: Annotated[int, ..., "The year the movie was released"]
    director: Annotated[str, ..., "The director of the movie"]
    rating: Annotated[float, ..., "The movie's rating out of 10"]

model_with_structure = model.with_structured_output(MovieDict)
response = model_with_structure.invoke("提供西虹市首富的电视剧介绍")
print(response)  

{'title': '西虹市首富', 'year': 2018, 'director': '闫非', 'rating': 6.6}


### 6 速率限制

许多聊天模型提供商对在一定时间内的调用次数设有限制。如果达到速率限制，通常会收到提供商返回的速率限制错误响应，您需要等待一段时间后再发起请求。

为了帮助管理速率限制，聊天模型集成接受一个 rate_limiter 参数，该参数在初始化时提供，用于控制请求的发送速率。

LangChain 内置（可选） InMemoryRateLimiter 。该限制器是线程安全的，可以被同一进程中的多个线程共享。

In [None]:
from langchain_core.rate_limiters import InMemoryRateLimiter

rate_limiter = InMemoryRateLimiter(
    requests_per_second=0.1,  # 每10秒请求一次  1 / requests_per_second = 请求间隔时间
    check_every_n_seconds=0.1,  # 每100毫秒检查一次  检查是否允许发送请求的频率
    max_bucket_size=10,  # 最大突发10个请求. 控制最大突发请求量
)
# 主要有些模型，有着请求数量速率限制和并发限制。
model = ChatDeepSeek(
    model="deepseek-chat",
    rate_limiter=rate_limiter  
)