# <center>大模型 AI Agent 开发实战

## <center>Ch.6 OpenAI Assistant API 高阶应用 - 流式输出功能

&emsp;&emsp;通过前两节课程中对`Assistant API`的详细介绍，**我们已经清楚的了解到:`Assistant API` 是由 `OpenAI` 提供的一个基于 `GPT` 系列模型构建 `AI Agent`开发框架。**该框架不仅支持常规 `Chat API` 的对话功能，还内置了高精度的外部工具以满足更复杂的用户需求。因此，当我们希望构建的应用程序不局限于简单的对话交互，而是能够执行更复杂的任务时，我们完全可以考虑将底层从 `Chat API` 升级到 `Assistant API`。

&emsp;&emsp;使用 `Assistant API` 的主要优点包括：

1. **减少编码工作量**：
   使用 `Assistant API` 可以显著减少编码量，特别是在实现如 RAG (Retrieval-Augmented Generation) 这样的复杂功能时。如果是一位新手开发者，无需深入理解分块、嵌入以及其他底层细节，便能实现高效的问答系统。

2. **自动管理上下文窗口**：
   对于初学者来说，控制输入数据的上下文长度并不容易。`Assistant API` 自动管理上下文窗口，有效处理数据长度问题。当输入数据超出模型的处理能力时，`Assistant API` 能自动裁剪或调整数据，保证输出的质量和一致性。

3. **安全的访问控制**：
   `Assistant API` 提供了授权限制功能，确保只有授权用户可以访问特定的助手功能。这增强了应用的安全性，防止未经授权的访问。

4. **工具和文档的轻松集成**：
   如果需要调用外部工具或API，`Assistant API` 提供了便捷的集成选项。无论是公开的 API 还是私有函数，都可以轻松集成进 `Assistant API`，增强助手的功能性和灵活性。

&emsp;&emsp;**以上提到的四个优点精准地解决了大模型开发工程师目前面临的核心痛点**，所以使得`Assistant API`的受欢迎程度非常高，其开发者社区也特别活跃。但需要提前说明的是：**虽然 `Assistant API` 极具实用性并能够支持复杂的应用场景，但其底层设计和线程等概念非常明显的表达了：它尤其适合于大规模应用程序的开发。** 因此，在基于 `Assistant API` 构建应用时，我们的重点并非仅仅在于实现具体功能，而是**需要全面考虑应用程序的整体性能和架构设计。**功能实现方面，通过《Ch.4 OpenAI Assistant API 基本理论与入门实战》和《Ch.5 OpenAI Assistant API 进这两节课程用》两节课程的详细介绍，已经全面覆盖了 `Assistant API` 的调用方法和其内置工具的使用技巧。那么在本节的课程中，我们将重点从 `Assistant API` 的应用设计和性能方面展开详细的探讨。

&emsp;&emsp;**从应用系统开发的角度来看，性能和响应速度是提升用户体验的关键因素。**构建能够动态响应并提供即时交互的应用程序是基本要求。为此，我们需要进一步了解 `Assistant API` 中的流式输出功能，同时，除了外部工具调用功能之外，这也是我们需要掌握的 `Assistant API` 高级应用中另一个关键的技术点。

&emsp;&emsp;那什么是流式输出呢？一个最简单且通俗的理解是：**借助流式输出，可以让应用程序实时处理和响应用户输入。具体来说，这种技术允许数据在生成的同时即刻传输和处理，而不必等待整个数据处理过程完成。这样，用户无需等待全部数据加载完成即可开始接收到部分结果，从而显著提高了应用的反应速度和交互流畅性。**典型的应用场景就是我们在使用[ChatGPT](https://chatgpt.com/?ref=dotcom)或者[智谱清言](https://chatglm.cn/?lang=zh)进行实时对话时，可以观察到回答是逐渐展现的，而不是一次性显示完整。这种技术有效地优化了用户体验，使得交互更加自然和连贯。

&emsp;&emsp;在介绍`Assistant API`的流式输出功能之前，为了让大家更好且直观的理解，我们先从大家更为熟悉的 `Chat API` 来理解流式输出与非流式输出的区别。

# 1. Chat API 的流式与非流式输出

&emsp;&emsp;大模型的 `API` 接口，例如 `OpenAI` 的 `GPT`系列，智谱清言的`GLM`系列，都会提供多种形式的接口来满足不同的应用场景需求。

> OpenAI Chat：https://platform.openai.com/docs/api-reference/chat/create?lang=python

<div align=center><img src="https://muyu001.oss-cn-beijing.aliyuncs.com/img/image-20241009101636001.png" width=100%></div>

&emsp;&emsp;非流式的 `API` 调用方式，意味着**客户端发送一个完整的请求，并等待服务器处理后返回一个完整的响应**。这里我们看下使用`gpt-4o`模型的非流式调用代码及输出情况，代码如下所示：

In [6]:
from openai import OpenAI
client = OpenAI()

completion = client.chat.completions.create(
  model="gpt-4o",
  messages=[
    {"role": "system", "content": "你是一位乐于助人的人工智能小助理。"},
    {"role": "user", "content": "你好，请你介绍一下你自己。"}
  ]
)

print(completion.choices[0].message)

ChatCompletionMessage(content='你好！我是一个人工智能助手，旨在为用户提供信息和支持。我可以帮助回答问题、提供建议、进行简单的对话等。不过，我的能力和知识是基于我在2023年10月之前接受的训练数据，我没有实时更新的功能，所以可能无法提供最新的信息。如果你有任何问题或需要帮助，请随时告诉我！', refusal=None, role='assistant', function_call=None, tool_calls=None)


&emsp;&emsp;正如上述的返回结果显示，**非流式输出适用于可以一次性处理的对话或请求，通常在请求不频繁更新且数据量管理得当的情况下使用。**客户端发起请求后，需要等待服务器处理完成，再接收一次性的响应。比如常见的网页加载，当我们访问一个网页时，浏览器向服务器发送一个请求，服务器处理后返回整个网页的内容。这是一个典型的非流式交互，即用户发起请求后等待服务器响应，最终接收到完整的网页数据。除此之外，像数据库查询、在线搜索引擎查询等，用户与系统的交互通常是明确的请求与响应流程，适合使用非流式接口，因为它们可以在用户发出请求后立即处理并返回完整的响应数据。这样的处理方式适用于数据量可控且处理时间相对短的场景。

&emsp;&emsp;而对于**流式输出，这种方式允许客户端逐渐接收到大模型生成的每一部分内容，而不是等待整个响应完成后一次性接收**。

&emsp;&emsp;这里我们看下使用`gpt-4o`模型的流式调用代码及输出情况。在流式输出的实现方式中，我们需要在调用 `client.chat.completions.create()` 时添加 `stream=True` 参数，用以指定启用流式输出，从而允许 `API` 逐步发送生成的文本片段，然后使用 `for` 循环来迭代 `completion` 对象，每次循环接收到的 `chunk.choices[0].delta` 中会包含最新生成的文本片段。完整代码如下所示：

In [8]:
from openai import OpenAI
client = OpenAI()

completion = client.chat.completions.create(
  model="gpt-4o",
  messages=[
    {"role": "system", "content": "你是一位乐于助人的人工智能小助理。"},
    {"role": "user", "content": "你好，请你介绍一下你自己。"}
  ],
  stream=True
)

for chunk in completion:
  print(chunk.choices[0].delta)

ChoiceDelta(content='', function_call=None, refusal=None, role='assistant', tool_calls=None)
ChoiceDelta(content='你好', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='！', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='我是', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='一个', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='人工', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='智能', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='助手', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='，', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='旨', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='在', function_call=None, refusal=None, role=None, tool_calls=None)
ChoiceDelta(content='帮助', funct

&emsp;&emsp;从输出上看，每个 `ChoiceDelta` 对象代表了大模型生成的一小段文本。所以很明显，这种流式调用的方式可以让我们看到 `GPT` 模型是如何一步一步构建出最终的回答的。例如，从 "你好" 开始，接着是 "！"，然后是 "我是"，一直到整个回答完整地构建出来，每个 `ChoiceDelta` 可能包含几个字符到一个或几个词。

&emsp;&emsp;与非流式输出相反，**流式输出则适用于需要连续或大量数据交换的场景，它在长时间或连续的交互中非常有效**。典型的场景如实时聊天机器人，实时音视频处理或大规模数据实时分析。在流式模式下，数据会被分成小块连续发送和接收，这样可以减少等待时间，提高处理效率。服务器可以边接收边处理数据，客户端也可以边接收边显示结果，**在这些需要快速反馈的应用场景中，我们可以非常明确的说，全部是采用流式输出。**

&emsp;&emsp;一个直观的应用例子是像 `ChatGPT` 和 `智谱清言` 这样的 Web 端应用。在这方面，我们也有一个相关案例，即构建了一个基于 `Chat` 类 `API` 的流式对话网页应用。如果感兴趣，大家可以查看这个项目：[gradio_gpt](https://github.com/MuYuCheney/gradio_gpt)。在使用 `OpenAI` 的 `API` 时，我们通过设置特定的 `HTTP Headers` 或使用 `WebSocket` 协议来实现流式交互。项目中已经详细介绍了相关内容，这里就不再赘述。

&emsp;&emsp;接下来，我们还是重点关注`Assistant API`中的流式输出应用方法。

# 2. Assistant API 流式输出

&emsp;&emsp;正如我们刚刚提到的项目：[gradio_gpt](https://github.com/MuYuCheney/gradio_gpt)。在该项目中，我们通过大量自编代码才能够实现基于 `PDF` 的 `RAG` 问答。而这样的功能如果我们是使用 `Assistant API` 开发框架，仅需在创建 `Assistant` 对象时加入 `tools=[{"type": "file_search"}]` 参数，便可轻松实现一个更高精度的 RAG 版本，支持的解析文本类型也更为广泛。这直接体现的就是 `Assistant API` 在实现高级功能时的明显优势。而正是基于这种想基于`Assistant API`去构建应用程序的迫切需求，**在 2024 年 3 月 14 日，`OpenAI` 推出了对 `Assistant API` 的流支持。**`OpenAI` 流式输出功能的出现直接降低了人工智能和聊天机器人交互的落地门槛。通过实现实时通信，`Assistant API`可以显著增强用户体验。

&emsp;&emsp;`OpenAI` 提供了两种主要的流实现方法：第一种基于 `WebSocket` 的直接实时通信方法，第二种是适用于本身不支持此类交互的平台的模拟流。两种流实现方法各有其特点和应用场景：

- **基于 `WebSocket` 的直接实时通信方法**

&emsp;&emsp;`WebSocket` 是一种网络通信协议，它允许在用户的浏览器和服务器之间建立一个不断开的连接，通过这个连接可以实现双向的数据传输。这意味着服务器可以在任何时候发送数据到客户端（用户的浏览器），而不需要客户端进行新的请求。这种方法非常适合需要实时互动的应用，如在线游戏、实时聊天应用等。使用 `WebSocket` 的优点主要包括：
1. **实时性**：数据可以几乎无延迟地从服务器推送到客户端，适合对实时性要求高的应用。
2. **减少资源消耗**：建立了持久的连接后，不需要频繁地进行连接建立和断开，减少了网络资源的消耗。
3. **双向通信**：服务器和客户端都可以主动发送数据，通信更为灵活。

- **模拟流**

&emsp;&emsp;模拟流是为了在不支持 `WebSocket` 或其他实时协议的环境中提供类似的功能。这种方法通过定期发送`HTTP`请求来“模拟”一个持续的数据流。常见的实现方式包括长轮询和服务器发送事件（Server-Sent Events, SSE）。

1. **长轮询**：客户端发送`HTTP`请求到服务器，服务器不会立即响应，而是等到有新数据可发送时才响应。响应发送后，客户端立即发起新的请求，这样循环进行。
2. **服务器发送事件**：这是一种让服务器能主动向客户端发送信息的技术。与`WebSocket`不同，`SSE`在客户端只支持单向通信，即服务器向客户端发送数据。

> OpenAI Events：https://platform.openai.com/docs/guides/realtime/events

&emsp;&emsp;在这两种方法中，模拟流更加常用，因为它依赖于标准的HTTP请求和响应，易于理解和实现。但是解析 `SSE`（Server-Sent Events）非常复杂，如果处理不当，很容易出现错误，导致数据解析不准确和失败等问题。所以我们通常使用现有的 `SSE` 客户端库。这些库帮助简化集成过程，并确保流传输功能的可靠性和无误性。在后续的`AI Agent`案例分析中，我们将重点介绍如何有效地利用这些工具。

&emsp;&emsp;在了解了`Assistant API`实现流传输的方法后，我们接下来将探讨如何在 `Assistant API` 的 API 调用中启用流式传输。

## 2.1 Assistant API 如何开启流式传输

&emsp;&emsp;在 `Assistant API` 中，要在一个完整的生命周期内启用流式传输，可以分别在`Create Run`、 `Create Thread and Run`和 `Submit Tool Outputs` 这三个 `API` 端点中添加 `"stream": True` 参数。这样设置后，`API` 返回的响应将是一个事件流。

> Assistants Streaming：https://platform.openai.com/docs/api-reference/assistants-streaming

&emsp;&emsp;这里我们首先测试一下 `Create Run` 时的运行状态。

> Create Thread and Run：https://platform.openai.com/docs/api-reference/runs/createThreadAndRun

&emsp;&emsp;首先，我们按照 `Assistant API`创建对话或代理的标准流程来构建应用实例。如下代码所示：

In [12]:
from openai import OpenAI

# 构建 OpenAI 客户端对象的实例
client = OpenAI()

# Step 1. 创建 Assistant 对象
assistant = client.beta.assistants.create(
    model="gpt-4o-mini-2024-07-18",
    name="Good writer",  # 优秀的作家
    instructions="You are an expert at writing excellent literature"  # 你是一位善于写优秀文学作品的专家
)

# Step 2. 创建 Thread 对象
thread = client.beta.threads.create()

# Step 3. 向Thread中添加Message
message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="写一篇关于一个小女孩在森林里遇到一只怪兽的故事。详细介绍她的所见所闻，并描述她的心里活动"
)

&emsp;&emsp;在`Create Run`的过程中，添加`stream=True`参数，开启流媒体传输。代码如下：

In [14]:
print(f"assistant_id:{assistant.id}")
print(f"thread_id:{thread.id}")

assistant_id:asst_W2lgERnVVtH51SY7lHn7LQXW
thread_id:thread_lL5WX56FN1QKduFry0SpzCqW


In [16]:
run = client.beta.threads.runs.create(
    assistant_id=assistant.id,
    thread_id=thread.id,
    stream=True      # 开启流式传输
)

print(run)

for event in run:
    print(event)

<openai.Stream object at 0x0000028F76F87B90>
ThreadRunCreated(data=Run(id='run_dp8A4GLj1BKQfSOY3TZclFmr', assistant_id='asst_W2lgERnVVtH51SY7lHn7LQXW', cancelled_at=None, completed_at=None, created_at=1728547409, expires_at=1728548009, failed_at=None, incomplete_details=None, instructions='You are an expert at writing excellent literature', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4o-mini-2024-07-18', object='thread.run', parallel_tool_calls=True, required_action=None, response_format='auto', started_at=None, status='queued', thread_id='thread_lL5WX56FN1QKduFry0SpzCqW', tool_choice='auto', tools=[], truncation_strategy=TruncationStrategy(type='auto', last_messages=None), usage=None, temperature=1.0, top_p=1.0, tool_resources={'code_interpreter': {'file_ids': []}}), event='thread.run.created')
ThreadRunQueued(data=Run(id='run_dp8A4GLj1BKQfSOY3TZclFmr', assistant_id='asst_W2lgERnVVtH51SY7lHn7LQXW', cancelled_at=None, completed_at=None, 

&emsp;&emsp;如上输出所示：当使用 `stream=True` 这个参数开启流式输出后，`API`不是在全部处理完成后一次性返回结果，而是逐渐地向客户端发送数据。其返回的结果是一个 `openai.Stream` 对象。这个对象表示的是一个流式数据通道，可以用来接收连续传输的数据。由于是流式传输，数据会持续流入，直到会话结束或数据传输完毕。所以我们便可以从这个流中读取数据，以实现实时数据处理和显示。

## 2.2 Assistant API 流式传输中的事件流

&emsp;&emsp;我们从输出上来分析，对于一个`openai.Stream`事件，每当创建新对象、转换到新状态或按部分（增量）进行流式传输时，都会生成一个新的事件，每个事件都有一个`event`和`data`属性，如下图所示：

<div align=center><img src="https://muyu001.oss-cn-beijing.aliyuncs.com/img/image-20241009160117043.png" width=100%></div>

&emsp;&emsp;整个事件流的核心流程包括：当新的运行被创建时，发出 `thread.run.created` 事件；当运行完成时，发出 `thread.run.completed` 事件。在运行期间选择创建消息时，将发出一个 `thread.message.created` 事件、一个 `thread.message.in_progress` 事件、多个 `thread.message.delta` 事件，以及最终的 `thread.message.completed` 事件。整体如下图所示：

<div align=center><img src="https://muyu001.oss-cn-beijing.aliyuncs.com/img/101001.png" width=80%></div>

&emsp;&emsp;在构建 `Assistant API` 的流式输出功能时，我们需要密切关注和处理上述核心事件。除此之外，在流式传输过程中，除了上图中的核心事件流，还可能发生更多的事件流。以下是所有事件的详细说明：

> Assistant API Stream Events：https://platform.openai.com/docs/api-reference/assistants-streaming/events

| 事件 | 描述 | 数据描述 |
|------|------|----------|
| `thread.created` | 创建新线程时发生。 | 数据是一个线程 |
| `thread.run.created` | 创建新运行时发生。 | 数据是一个运行 |
| `thread.run.queued` | 当运行转入queued状态时发生。 | 数据是一个运行 |
| `thread.run.in_progress` | 当运行转至in_progress状态时发生。 | 数据是一个运行 |
| `thread.run.requires_action` | 当运行转至requires_action状态时发生。 | 数据是一个运行 |
| `thread.run.completed` | 运行完成时发生。 | 数据是一个运行 |
| `thread.run.incomplete` | 当运行以状态incomplete结束时发生。 | 数据是一个运行 |
| `thread.run.failed` | 运行失败时发生。 | 数据是一个运行 |
| `thread.run.cancelling` | 当运行转至cancelling状态时发生。 | 数据是一个运行 |
| `thread.run.cancelled` | 取消运行时发生。 | 数据是一个运行 |
| `thread.run.expired` | 当运行到期时发生。 | 数据是一个运行 |
| `thread.run.step.created` | 创建运行步骤时发生。 | 数据是一个运行步骤 |
| `thread.run.step.in_progress` | 当运行步骤进入in_progress状态时发生。 | 数据是一个运行步骤 |
| `thread.run.step.delta` | 在流式传输运行步骤的一部分时发生。 | 数据是运行步骤增量 |
| `thread.run.step.completed` | 当运行步骤完成时发生。 | 数据是一个运行步骤 |
| `thread.run.step.failed` | 当运行步骤失败时发生。 | 数据是一个运行步骤 |
| `thread.run.step.cancelled` | 取消运行步骤时发生。 | 数据是一个运行步骤 |
| `thread.run.step.expired` | 当运行步骤到期时发生。 | 数据是一个运行步骤 |
| `thread.message.created` | 创建消息时发生。 | 数据是一条消息 |
| `thread.message.in_progress` | 当消息进入in_progress状态时发生。 | 数据是一条消息 |
| `thread.message.delta` | 当消息的一部分被传输时发生。 | 数据是消息增量 |
| `thread.message.completed` | 当消息完成时发生。 | 数据是一条消息 |
| `thread.message.incomplete` | 当消息在完成之前结束时发生。 | 数据是一条消息 |
| `error` | 发生错误时发生。这可能是由于内部服务器错误或超时而发生的。 | 数据有错误 |
| `done` | 当流结束时发生。 | 数据显示Done |


&emsp;&emsp;事件流中的每一个点，都与`Assistant API`的底层运行原理紧密相关。即我们在上一节课中重点向大家介绍的`Run`运行状态的设计。

<div align=center><img src="https://muyu001.oss-cn-beijing.aliyuncs.com/img/2503.png" width=100%></div>

&emsp;&emsp;理解了上述原理后，我们再来分析这段输出。在处理过程中，任何需要的信息都可以通过访问数据结构的方式来获取。例如，若想实时获取大模型回复的文本，可以直接从事件数据中提取相应的值。代码如下所示：

In [18]:
run = client.beta.threads.runs.create(
    assistant_id=assistant.id,
    thread_id=thread.id,
    stream=True      # 开启流媒体传输
)

for event in run:
    #print(event.event)
    if event.event == 'thread.message.delta':
        # 提取 text delta 的 value
        value = event.data.delta.content[0].text.value
        print(value)

在
一个
阳
光
明
媚
的
午
后
，小
女孩
莉
莉
踏
上
了
她
的
冒
险
之
旅
。
她
住
在
小
村
庄
的
边
缘
，
周
围
环
绕
着
一
片
神
秘
的
森林
。
今天
，
莉
莉
决定
独
自
探
险
，
去
发现
藏
在
树
丛
间
的
秘密
。


走
进
森林
时
，
阳
光
透
过
高
大的
树
木
洒
下
斑
驳
的
光
影
，
空气
中
弥
漫
着
泥
土
的
香
气
和
树
叶
的
清
新
。
周
围
传
来
鸟
儿
的
欢
鸣
声
和
树
叶
的
沙
沙
声
，
令人
心
旷
神
怡
。
莉
莉
的
心
中
充
满
了
兴
奋
和
期待
，她
灵
巧
地
穿
过
一
条
小
溪
，
水
中
闪
烁
着
阳
光
的
光
辉
，
仿
佛
是
无
数
小
宝
石
镶
嵌
在
水
底
。


忽
然
，一
声
低
沉
的
咕
噜
声
打
破
了
森林
的
宁
静
。
莉
莉
停
下
脚
步
，
心
里
一
阵
紧
张
，又
夹
杂
着
好
奇
：“
那
是什么
声音
？”
她
缓
缓
朝
声音
的
源
头
走
去
，
心
跳
如
擂
鼓
，
既
想
要
逃
跑
又
不
甘
心
错
过
。


就在
这
时
，一
只
巨
大的
怪
兽
出
现在
她
面
前
。
怪
兽
全
身
长
着
深
绿色
的
毛
发
，
宽
大的
眼
睛
闪
着
神
秘
的
光
泽
。
看
起来
既
笨
重
又
有
些
可
爱
，
心
中
却
让
莉
莉
感
到
一
丝
不
安
。“
这
只
怪
兽
会
很
凶
猛
吗
？”
她
想
起
妈妈
曾
讲
过
的
故事
，
里面
的
怪
兽
都
显
得
那么
可
怕
。


但
就在
惊
恐
之
时
，
莉
莉
却
无法
自
拔
地
向
怪
兽
靠
近
。
她
小
心
翼
翼
地
问
：“
你
……
你好
啊
？”
她
的
声音
几
乎
被
从
树
梢
飘
来的
风
带
走
。
怪
兽
的
目
光
变
得
柔
和
，
似
乎
在
回应
她
的
问
候
。


这
时
，
莉
莉
的
心
跳
逐
渐
平
静
下来
，
观察
着
这
只
奇
怪
的
生
物
。
她
察
觉
到
它
并
没有
生
气
，
反
而
在
用
那
双
大
眼
睛
好
奇
地
打
量
着
自己
。
莉
莉
鼓
起
勇
气
，
慢
慢
向
前
迈
了

&emsp;&emsp;如果我们仅想获取最终的响应结果，就像非流式输出一样，则稍微修改代码即可：

In [19]:
run = client.beta.threads.runs.create(
    assistant_id=assistant.id,
    thread_id=thread.id,
    stream=True      # 开启流媒体传输
)

for event in run:
    #print(event.event)
    if event.event == 'thread.message.completed':
        # 提取 text delta 的 value
        value = event.data.content[0].text.value
        print(value)

在一个阳光明媚的午后，小女孩莉莉决定踏上她的冒险旅程。她住在小村庄的边缘，村子外是一片神秘而广袤的森林。听村里的老人讲，森林里有很多未知的生物和秘密，今天，莉莉终于鼓起勇气，准备去探险。

走进森林，阳光透过高大的树木洒下斑驳的光影，空气中弥漫着清脆的鸟鸣和树叶的沙沙声。莉莉的心中充满了兴奋和期待，她感觉自己就像是一个小小的探险家。她沿着小溪流走去，水面在阳光下闪烁着，如同撒满了无数宝石，莉莉不由得用手轻轻捧起一捧冰凉的水，水滴顺着她的指尖滑落，凉意沁透心扉。

然而，就在她尽情享受大自然时，一种低沉而奇怪的咕噜声从远处传来，吸引了她的注意。莉莉好奇地探耳倾听，心中闪过一丝不安，难道是森林里的野兽？她的心跳开始加速，既紧张又期待，“这什么声音啊？”小小的心灵充满了无数的疑惑。

她鼓起勇气，慢慢向声音的来源靠近，穿过一片灌木丛，眼前的景象让她愣住了——一只巨大的怪兽正趴在树下。怪兽的全身覆盖着浓密的绿色毛发，浑身是巨大的圆润的身体，长长的尾巴懒洋洋地搭在地上，巨大的眼睛半睁半闭，正以一种懒散却温柔的眼神打量着她。

“这……这是个怪物吗？”莉莉心中掠过一丝恐惧，她想起妈妈讲的故事，里面的怪兽通常都很可怕。然而，观察着它那副懒散却好奇的模样，莉莉渐渐意识到，这只怪兽似乎并没有恶意。

“你好！”莉莉小心翼翼地打招呼，声音低得几乎被风吹散。怪兽微微侧头，似乎对她的话语感到好奇。“我叫莉莉，我不想吓到你，我只是来森林里探险。”

令她惊讶的是，怪兽发出了低沉的咕噜声，像是在回应她的问候。莉莉的心情一下子轻松了不少，她不知道该怎么形容这种感觉，仿佛不再是单方面的交流，而是两颗心灵在悄悄碰撞。

莉莉大胆地走近了一步，鼓起所有的勇气，问道：“你是……怪兽吗？”怪兽眨了眨眼，似乎听懂了，缓缓抬起了一只大爪子，似乎在说：“是的，但我不是坏怪兽。”

莉莉的心仿佛开了一扇窗，她忍不住想要了解这个奇妙的生物。她坐到怪兽旁边，开始告诉它自己的故事，谈起了自己的梦想和生活中的小烦恼。随着她的低声细语，怪兽的眼神也渐渐柔和，低沉的咕噜声不时响起，像是回应着她的每一句话。

时间在他们的谈话中悄然流逝，当夕阳开始西下，天边洒下金色的余晖，莉莉意识到自己该回家了，心里有一点不舍。“我得走了，希望下次还能见到你。”她轻声说道，眼中闪烁着依依不舍的光芒。

怪兽缓缓点头，似乎也有些不舍。莉莉转身的

&emsp;&emsp;从上述体验来看，流式输出在响应时效和用户体验方面明显优于非流式输出。这种优势是构建聊天类 AI 应用时广泛采用流式输出的主要原因。通过实时处理和传输数据，流式输出不仅可以加快响应速度，还能提供更连贯、更互动的用户体验。

&emsp;&emsp;按照相同的思路，我们还可以尝试测试`Create Thread and Run`和 `Submit Tool Outputs`两个端点在启用流媒体传输时的实现方法。

&emsp;&emsp;首先来看`Create Thread and Run`方法。这个方法很容易理解，它所做的事情就是把创建`Thread`对象实例、向`Thread`对象实例中追加`Message`信息以及构建`Run`状态这三个单独的步骤合并在了一个`.beta.threads.create_and_run`方法中，所以调用方法也发生了一定的变化，代码如下：

In [22]:
print(f"assistant_id:{assistant.id}")

assistant_id:asst_W2lgERnVVtH51SY7lHn7LQXW


In [24]:
run = client.beta.threads.create_and_run(
    assistant_id=assistant.id,
    thread={
        "messages": [
            {"role": "user", "content": "写一篇歌颂中国的文章"}
        ]
    },
    stream=True,
)

for event in run:
    print(event)

ThreadCreated(data=Thread(id='thread_F7D1B9me8rV9sxMJaHi8K6Zf', created_at=1728547562, metadata={}, object='thread', tool_resources=ToolResources(code_interpreter=ToolResourcesCodeInterpreter(file_ids=[]), file_search=None)), event='thread.created')
ThreadRunCreated(data=Run(id='run_izJmR0gYbPya8fgRr1FeexLd', assistant_id='asst_W2lgERnVVtH51SY7lHn7LQXW', cancelled_at=None, completed_at=None, created_at=1728547562, expires_at=1728548162, failed_at=None, incomplete_details=None, instructions='You are an expert at writing excellent literature', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4o-mini-2024-07-18', object='thread.run', parallel_tool_calls=True, required_action=None, response_format='auto', started_at=None, status='queued', thread_id='thread_F7D1B9me8rV9sxMJaHi8K6Zf', tool_choice='auto', tools=[], truncation_strategy=TruncationStrategy(type='auto', last_messages=None), usage=None, temperature=1.0, top_p=1.0, tool_resources={'code_i

&emsp;&emsp;如上代码所示：`client.beta.threads.create_and_run` 是一个集成方法，它在一个操作中创建一个新的线程 (Thread)后追加消息(Messages) 并立即运行。而通过观察输出结果可以发现，尽管构建对话的具体方法不同，但产生的事件流实质上是相同的。这表明在 `Create Run` 中介绍的所有方法都同样适用于 `Create Thread and Run` 方法。例如，如果我们想要仅打印大模型的响应结果文本，之前的实现方式依然有效。代码如下：

In [26]:
run = client.beta.threads.create_and_run(
    assistant_id=assistant.id,
    thread={
        "messages": [
            {"role": "user", "content": "写一篇歌颂中国的文章"}
        ]
    },
    stream=True,
)

for event in run:
    #print(event.event)
    if event.event == 'thread.message.delta':
        # 提取 text delta 的 value
        value = event.data.delta.content[0].text.value
        print(value)

###
 歌
颂
中国
：
辉
煌
与
梦想
的
交
响
曲



中国
，这
片
厚
重
而
神
秘
的
土地
，如
同
一
卷
展开
的
画
卷
，
承
载
着
五
千
年的
历史
与
文化
。
她
以
其
独
特
的
风
貌
、
丰富
的
资源
和
无限
的
潜
力
，
吸
引
着
全
世界
的
目
光
。
今天
，我
想
以
一
颗
敬
畏
的
心
，
歌
颂
这
片
土地
，
歌
颂
这
片
盛
满
奋斗
与
希望
的
中国
。


####
 历
史
的
积
淀



中国
是
世界
四
大
文明
古
国
之一
，
拥有
悠
久
而
璀
璨
的
历史
。从
炎
黄
祖
上传
承
下
来的
文化
瑰
宝
，到
诗
词
歌
赋
的
婉
转
流
丽
，从
孔
子
儒
学
的
智慧
光
芒
，到
道
家的
自然
哲
学
，
中华
文化
就
像
一
颗
璀
璨
的
明
珠
，
照
耀
着
世界
历史
的
长
河
。在
这
片
土地
上
，无
数
文
人
墨
客
挥
毫
泼
墨
，
留下
了
《
诗
经
》、《
红
楼
梦
》等
永
恒
的
文学
作品
，
犹
如
星
辰
般
闪
耀
在
文化
的
天空
。


####
 自
然
的
壮
丽



中国
的
山
河
壮
丽
，
气
势
磅
礴
，
北
有
苍
茫
的
阴
山
、
辽
阔
的
黄
土
高
原
；
南
有
秀
丽
的
桂
林
山
水
、
稻
田
如
织
的
江
南
水
乡
；
东
有
万
里
海
岸
线
、
浩
瀚
的大
海
；
西
有
巍
峨
的
高
原
、
蜿
蜒
的
河
流
。
每
一
寸
土地
都
饱
含
着
大
自然
的
恩
赐
，也
见
证
着
无
数
英雄
儿
女
的
足
迹
。
唐
诗
宋
词
中的
那
一
幅
幅
自然
画
卷
，
仿
佛
在
诉
说
着
这
片
土地
的
深
情
与
苦
乐
。


####
 
现代
的
崛
起



在
新时代
的
浪
潮
中
，中国
以
令人
瞩
目的
速度
崛
起
，
成为
世界
经济
的重要
引
擎
。
改革
开放
以来
，无
数
优秀
的
中华
儿
女
以
实际
行动
证明
了
勤
劳
与
智慧
的
力量
。从
科技
创新
到
制造
业
升级
，从
乡
村
振
兴
到
城市
建设
，
中华
大
地
焕
发
出
勃
勃
生
机
。
高
铁
如

&emsp;&emsp;仅获取最终全部的大模型响应结果，代码如下：

In [28]:
run = client.beta.threads.create_and_run(
    assistant_id=assistant.id,
    thread={
        "messages": [
            {"role": "user", "content": "写一篇歌颂中国的文章"}
        ]
    },
    stream=True,
)

for event in run:
    #print(event.event)
    if event.event == 'thread.message.completed':
        # 提取 text delta 的 value
        value = event.data.content[0].text.value
        print(value)

**歌颂中国：华夏的辉煌与希望**

在广袤的中华大地上，延绵的山河孕育了五千年的灿烂文化，从长江的波澜壮阔到黄河的滔滔洪流，铸就了中华民族的精神和信念。中国，这片充满生机与希望的土地，是历史与现代交融的瑰宝，是无数华夏儿女心灵的归宿。

中国的历史如同一部厚重的史书，凝聚着无数先贤的智慧与奉献。自古以来，四大发明、丝绸之路等伟大的成就不仅影响了中国的进程，更推动了世界文明的进步。无论是孔子的仁义礼智，还是老子的道法自然，都在指引着我们追求和谐与平衡的道路。在这片土地上，诗词歌赋如星辰般璀璨，文人墨客的笔触勾勒出一个个动人的画面，展现了中华民族深厚的文化底蕴。

进入新时代，中国经历了翻天覆地的变化。在改革开放的浪潮中，经济的腾飞让世界为之瞩目。城市的高楼大厦拔地而起，乡村的田园风光焕然一新，科技的迅速发展更是为梦想插上了翅膀。从神舟飞天到高铁驰骋，创新的步伐将我们带向了更加灿烂的未来。无论是在抗击疫情的紧急时刻，还是在应对自然灾害的困境中，中国人民始终团结一致、共克时艰，展现出无与伦比的担当与勇气。

与此同时，中国的传统文化也在新时代焕发出新的生机。从春晚的舞台到各地的文化活动，传统与现代的融合让五千年的文化积淀重新焕发光彩。无论是面对面的书法艺术，还是浓情的春节习俗，中华文化无时无刻不在滋养着我们的心灵，连接着每一个家庭、每一个梦想。

中国，不仅仅是地理上的概念，更是我们共同的精神家园。各民族的团结与融合，构成了中华民族特有的多样性与包容性。在这片土地上，五湖四海的人们彼此尊重、共同发展，共筑和谐社会的美好愿景。

展望未来，中国将继续在全球舞台上发挥更大的作用，推动构建人类命运共同体，分享发展机遇，传播和平与友爱的种子。相信在不久的将来，中国会以更加开放、自信的姿态，迎接更加美好的明天。

在这片历史悠久的土地上，每一个中国人的心中都燃烧着对未来的信念与希望。让我们共同歌颂这份热爱，歌颂中国，歌颂我们的家园。无论身处何地，心中那份对中华大地的眷恋永远不会改变。


&emsp;&emsp;理解了 `Create Run` 之后，进一步掌握 `Create Thread and Run` 也相对容易，因为这两种方法在本质上没有太大区别。而我们接下来要介绍的 `Submit Tool Outputs` 则有着较大的差异。

# 3. 如何在函数调用中启用流式传输

&emsp;&emsp;`Assistant API`的`Submit Tool Outputs`方法是**用于在工具调用全部完成后提交输出**，所以意味着当需要使用到`Submit Tool Outputs`方法时，其应用场景一定是接入了外部工具并且在当前轮次的对话中，需要使用到外部函数。所以，为了更方便的进行功能演示，这里我们定义一个外部函数`get_current_weather`，用来获取指定地点的当前天气状况。它接收两个参数：`location` 和 `unit`。`location` 参数用于指定要查询的地点，而 `unit` 参数则用于指定温度的单位，默认设置为“fahrenheit”（华氏度），完整函数代码如下：

> Submit Tool Outputs：https://platform.openai.com/docs/api-reference/runs/submitToolOutputs

In [31]:
import json

def get_current_weather(location, unit="celsius"):
    """Get the current weather in a given location in China"""
    if "beijing" in location.lower():
        return json.dumps({"location": "Beijing", "temperature": "15", "unit": unit})
    elif "shanghai" in location.lower():
        return json.dumps({"location": "Shanghai", "temperature": "20", "unit": unit})
    elif "guangzhou" in location.lower():
        return json.dumps({"location": "Guangzhou", "temperature": "25", "unit": unit})
    else:
        return json.dumps({"location": location, "temperature": "unknown"})

&emsp;&emsp;接下来，需要为 `get_current_weather` 函数编写一个 `JSON Schema`的表示。这个 `Json Schema`的描述将用于指导大模型何时以及如何调用这个外部函数。通过定义必要的参数、预期的数据格式和响应类型，可以确保大模型能够正确并有效地利用这个功用这个功能。定义如下：

In [33]:
get_weather_desc = {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "Get the current weather in a given location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "The city and state, e.g.beijing",
                    },
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
                },
                "required": ["location"],
            },
        },
    }

&emsp;&emsp;然后，定义一个`available_functions` 字典，用来映射函数名到具体的函数对象，因为在大模型识别到需要调用外部函数的时候，我们需要动态地调用函数并获取到函数的执行结果。

In [35]:
available_functions = {
    "get_current_weather": get_current_weather,
}

In [37]:
available_functions

{'get_current_weather': <function __main__.get_current_weather(location, unit='celsius')>}

&emsp;&emsp;在准备好外部函数后，我们在定义 `Assistant` 对象时需要使用 `tools` 参数。此参数是一个列表的数据类型，我们要将可用的外部工具的 `JSON Schema` 描述添加进去，从而让 `Assistant` 可以正确识别这些工具，代码如下：

In [39]:
from openai import OpenAI
client = OpenAI()

In [41]:
# Step 1. 创建一个新的 assistant 对象实例
assistant = client.beta.assistants.create(
    name="你是一个实时天气小助理",   
    instructions="你可以调用工具，获取到当前的实时天气，再给出最终的回复", 
    model="gpt-4o-mini-2024-07-18",
    tools=[get_weather_desc],
) 

In [43]:
assistant.to_dict()

{'id': 'asst_ibso5mWYMQugvWJdeVs5jBbn',
 'created_at': 1728547641,
 'description': None,
 'instructions': '你可以调用工具，获取到当前的实时天气，再给出最终的回复',
 'metadata': {},
 'model': 'gpt-4o-mini-2024-07-18',
 'name': '你是一个实时天气小助理',
 'object': 'assistant',
 'tools': [{'function': {'name': 'get_current_weather',
    'description': 'Get the current weather in a given location',
    'parameters': {'type': 'object',
     'properties': {'location': {'type': 'string',
       'description': 'The city and state, e.g.beijing'},
      'unit': {'type': 'string', 'enum': ['celsius', 'fahrenheit']}},
     'required': ['location']},
    'strict': False},
   'type': 'function'}],
 'response_format': 'auto',
 'temperature': 1.0,
 'tool_resources': {},
 'top_p': 1.0}

In [45]:
assistant.id

'asst_ibso5mWYMQugvWJdeVs5jBbn'

&emsp;&emsp;接下来再创建一个`Thread`对象实例，代码如下：

In [47]:
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()

In [48]:
thread.id

'thread_0eERhv2617vO1Iv2b6V9Pfn4'

&emsp;&emsp;然后，还是按照`Assistant API`的标准流程，将`Messages`追加到`Thread`中，并执行`Run`运行状态。代码如下：

In [51]:
# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="北京现在的天气怎么样？"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
    thread_id=thread.id, 
    assistant_id=assistant.id,
    stream=True
)

for event in run:
    print(event)

ThreadRunCreated(data=Run(id='run_cnrabJganwe8vezt450sil9t', assistant_id='asst_ibso5mWYMQugvWJdeVs5jBbn', cancelled_at=None, completed_at=None, created_at=1728547653, expires_at=1728548253, failed_at=None, incomplete_details=None, instructions='你可以调用工具，获取到当前的实时天气，再给出最终的回复', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4o-mini-2024-07-18', object='thread.run', parallel_tool_calls=True, required_action=None, response_format='auto', started_at=None, status='queued', thread_id='thread_0eERhv2617vO1Iv2b6V9Pfn4', tool_choice='auto', tools=[FunctionTool(function=FunctionDefinition(name='get_current_weather', description='Get the current weather in a given location', parameters={'type': 'object', 'properties': {'location': {'type': 'string', 'description': 'The city and state, e.g.beijing'}, 'unit': {'type': 'string', 'enum': ['celsius', 'fahrenheit']}}, 'required': ['location']}, strict=False), type='function')], truncation_strategy=TruncationS

&emsp;&emsp;从上述的输出中可以分析出：**与直接生成回复的流式过程相比，明显的区别体现在 `thread.run.step.delta` 事件中。**在这一事件中，增量更新的是大模型输出的参数`arguments`，这个参数中的内容是用于执行外部函数的，而非对“北京现在的天气怎么样?”这个提问的回答。此外，流程会暂停在 `thread.run.requires_action` 事件处，整个过程不会自动生成最终的回复，而是等待外部操作的完成。

&emsp;&emsp;这里我们仍然可以通过直接提取数据结构的方式来获取到想要的信息。代码如下：

In [53]:
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()

# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="北京现在的天气怎么样？"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
    thread_id=thread.id, 
    assistant_id=assistant.id,
    stream=True
)


for event in run:
    if event.event == 'thread.run.step.delta':
        # 打印大模型输出的参数
        print("Delta Event Arguments:")
        print(event.data.delta.step_details.tool_calls[0].function.arguments)
        print("--------------------------------------------------")  # 添加分隔符

    if event.event == 'thread.run.requires_action':
        # 打印需要采取行动的详细信息
        print("Requires Action Data:")
        print(event.data)
        print("--------------------------------------------------")  # 添加分隔符

Delta Event Arguments:

--------------------------------------------------
Delta Event Arguments:
{"
--------------------------------------------------
Delta Event Arguments:
location
--------------------------------------------------
Delta Event Arguments:
":"
--------------------------------------------------
Delta Event Arguments:
be
--------------------------------------------------
Delta Event Arguments:
ijing
--------------------------------------------------
Delta Event Arguments:
","
--------------------------------------------------
Delta Event Arguments:
unit
--------------------------------------------------
Delta Event Arguments:
":"
--------------------------------------------------
Delta Event Arguments:
c
--------------------------------------------------
Delta Event Arguments:
elsius
--------------------------------------------------
Delta Event Arguments:
"}
--------------------------------------------------
Requires Action Data:
Run(id='run_P5DSnie9e0DoGZxNIPQCJJut', 

&emsp;&emsp;在 `event.event == 'thread.run.requires_action'` 事件中，当识别到需要执行的外部函数及其参数后，我们可以根据在《Ch.5 OpenAI Assistant API 进阶应用》的`3. Assistant API 的 Function Calling`中介绍的方法来执行这些函数。在流媒体的事件流中我们只要适当的进行修改，就可以完成这个过程。代码如下：

In [55]:
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()

# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="北京现在的天气怎么样？"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
    thread_id=thread.id, 
    assistant_id=assistant.id,
    stream=True
)

tool_outputs = []
for event in run:
    # print(f"event:{event}")
    if event.event == 'thread.run.requires_action':
        # 先拿到thread.run.requires_action事件的全部信息
        function_info = event.data
        print("--------------------------------------------------")
        print(f"Function Info: {function_info}")
        
        required_action = function_info.required_action
        print("--------------------------------------------------")
        print(f"Required Action: {required_action}")
        
        tool_calls = required_action.submit_tool_outputs.tool_calls
        print("--------------------------------------------------")
        print(f"Tool Calls: {tool_calls}")

        # 执行外部函数，使用循环是因为`Assistant API` 可以并行执行多个函数
        for tool_call in tool_calls:
            tool_id = tool_call.id
            function = tool_call.function
            function_name = function.name
            function_args = json.loads(function.arguments)
            function_result = available_functions[function_name](**function_args)
            print("--------------------------------------------------")
            print(f"Function Result for {function_name}: {function_result}")

            tool_outputs.append({"tool_call_id": tool_id, "output":function_result})

print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")

--------------------------------------------------
Function Info: Run(id='run_xW3IJjZsqJubwKAEDk3hzung', assistant_id='asst_ibso5mWYMQugvWJdeVs5jBbn', cancelled_at=None, completed_at=None, created_at=1728547696, expires_at=1728548296, failed_at=None, incomplete_details=None, instructions='你可以调用工具，获取到当前的实时天气，再给出最终的回复', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4o-mini-2024-07-18', object='thread.run', parallel_tool_calls=True, required_action=RequiredAction(submit_tool_outputs=RequiredActionSubmitToolOutputs(tool_calls=[RequiredActionFunctionToolCall(id='call_3oKpItztdUMRI5bIKDiomBBp', function=Function(arguments='{"location":"beijing"}', name='get_current_weather'), type='function')]), type='submit_tool_outputs'), response_format='auto', started_at=1728547696, status='requires_action', thread_id='thread_GPJ00HnI1FgbqSpSu81K8sOW', tool_choice='auto', tools=[FunctionTool(function=FunctionDefinition(name='get_current_weather', description

&emsp;&emsp;当获取到外部函数的执行结果后，需要将这些结果再次追加到 `Thread 中`，使其再次进入到队列中，以继续回答`北京现在的天气怎么样？`这个原始的问题。正如我们在上节课中介绍的，使用`.beta.threads.runs.submit_tool_outputs`方法用于提交外部工具的输出，此方法也支持流媒体输出，如果在这个阶段需要开启流媒体传输，我们就需要使用 `stream=True` 参数来明确指定。代码如下：

In [57]:
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()

# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="北京现在的天气怎么样？"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
    thread_id=thread.id, 
    assistant_id=assistant.id,
    stream=True
)

tool_outputs = []
for event in run:
    # print(f"event:{event}")
    if event.event == 'thread.run.requires_action':
        # 先拿到thread.run.requires_action事件的全部信息
        function_info = event.data
        print("--------------------------------------------------")
        print(f"Function Info: {function_info}")
        
        required_action = function_info.required_action
        print("--------------------------------------------------")
        print(f"Required Action: {required_action}")
        
        tool_calls = required_action.submit_tool_outputs.tool_calls
        print("--------------------------------------------------")
        print(f"Tool Calls: {tool_calls}")

        # 执行外部函数，使用循环是因为`Assistant API` 可以并行执行多个函数
        for tool_call in tool_calls:
            tool_id = tool_call.id
            function = tool_call.function
            function_name = function.name
            function_args = json.loads(function.arguments)
            function_result = available_functions[function_name](**function_args)
            print("--------------------------------------------------")
            print(f"Function Result for {function_name}: {function_result}")

            tool_outputs.append({"tool_call_id": tool_id, "output":function_result})

print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")


run_tools = client.beta.threads.runs.submit_tool_outputs(
    thread_id=thread.id, 
    run_id=function_info.id,
    tool_outputs=tool_outputs,
    stream=True)

for event_tool in run_tools:
    # print(f"event_tool:{event_tool}")
    if event_tool.event == 'thread.message.delta':
        # 提取 text delta 的 value
        text = event_tool.data.delta.content[0].text.value
        print("Delta Text Output:", text)
        print("--------------------------------------------------")  # 添加分隔符
    
    if event_tool.event == 'thread.message.completed':
        # 提取 text delta 的 value
        full_text = event_tool.data.content[0].text.value
        print("Completed Message Text:", full_text)
        print("--------------------------------------------------")  # 添加分隔符

--------------------------------------------------
Function Info: Run(id='run_C3rx5zyAXOZVSI40Hs5RW2yX', assistant_id='asst_ibso5mWYMQugvWJdeVs5jBbn', cancelled_at=None, completed_at=None, created_at=1728547749, expires_at=1728548349, failed_at=None, incomplete_details=None, instructions='你可以调用工具，获取到当前的实时天气，再给出最终的回复', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4o-mini-2024-07-18', object='thread.run', parallel_tool_calls=True, required_action=RequiredAction(submit_tool_outputs=RequiredActionSubmitToolOutputs(tool_calls=[RequiredActionFunctionToolCall(id='call_ELOCVusmKQtnzpH29CwoHIMu', function=Function(arguments='{"location":"beijing","unit":"celsius"}', name='get_current_weather'), type='function')]), type='submit_tool_outputs'), response_format='auto', started_at=1728547751, status='requires_action', thread_id='thread_CVQi2GaonRGIMT3QTQGkZv8w', tool_choice='auto', tools=[FunctionTool(function=FunctionDefinition(name='get_current_weat

&emsp;&emsp;由此可见，当流式传输中涉及到函数调用的中间过程时，实际上也是发生了两次`Run`操作。为了使第二个`Run`能够接续第一个`Run`的输出，关键在于两者之间需要共享第一个`Run`的`Run id`。此外，在第二个`Run`的运行状态中需要设置`stream=True`以启用流式传输，确保两个`Run`状态的输出能够有效链接，形成一次连贯的响应。整个过程的事件流非常多，需要大家仔细体会。

&emsp;&emsp;至此，我们已经全面介绍了`Assistant API`的所有调用方法和应用技巧。通过三节课程的讲解，涵盖了从`Assistant API`的整体架构到其生命周期等核心层面的详尽分析。而根据`OpenAI`官方的最新消息，预计将在近期推出`Assistant API`的最新版本。鉴于`OpenAI`一贯的风格，这次更新无疑又将带来颠覆性的变革。所以对这一框架感兴趣的朋友们，可以继续关注以获取最新信息。

&emsp;&emsp;最后，为了全面整合`Assistant API`的技术要点，并帮助大家更顺利地掌握应用，我们设计并开发了一个完全基于`Assistant API`构建的异步流式传输AI Agent开发框架。接下来，我们将切换到PyCharm编辑环境，详细介绍这个复杂且适用于生产系统的`Assistant API`项目。源码下载地址👉 [Github](https://github.com/fufankeji/AssistantStreaming)