# 完整指南：使用阶跃星辰知识库（RAG）实现检索增强生成

本 `ipynb` 文件旨在详细演示如何利用阶跃星辰 API 实现 RAG（检索增强生成）功能，涵盖文件上传、知识库创建、文件关联以及最终使用的全过程。

**RAG 完整流程：**
1.  **准备知识**: 使用本地文件。
2.  **上传文件**: 将本地 PDF 文件上传到平台，设置 `purpose` 为 `retrieval-text`。
3.  **创建知识库**: 创建一个 Vector Store 容器。
4.  **关联文件**: 将上传的文件与知识库关联。
5.  **对话补全**: 在 Chat API 调用中指定知识库 ID，启用 RAG 检索。

In [1]:
import requests
import json
import os
from openai import OpenAI
from requests_toolbelt.multipart.encoder import MultipartEncoder

# --- 替换为你自己的 API Key --- 
STEP_API_KEY = os.getenv("STEPFUN_API_KEY")
BASE_URL = "https://api.stepfun.com/v1"
COMPLETION_MODEL = "step-1-32k"

# 指定您要上传的 PDF 文件名
DEMO_FILE_NAME = "./media/08_step-audio2.pdf"

# 存储流程中产生的关键 ID
UPLOADED_FILE_ID = "" 
VECTOR_STORE_ID = "" 

headers = {
    "Authorization": f"Bearer {STEP_API_KEY}"
}

client = OpenAI(
    api_key=STEP_API_KEY, 
    base_url=BASE_URL
)

if not os.path.exists(DEMO_FILE_NAME):
    print(f"⚠️ 错误：请确保文件 {DEMO_FILE_NAME} 存在于当前目录下，以便进行上传。")


## 1. 准备本地演示文件

本步骤使用您指定的本地文件 `08_step-audio2.pdf`。请确保该文件已放置在运行此 Notebook 的同一目录下。

In [2]:
if os.path.exists(DEMO_FILE_NAME):
    print(f"✅ 本地文件 {DEMO_FILE_NAME} 准备就绪，可以进行上传。")
else:
    print(f"❌ 警告：文件 {DEMO_FILE_NAME} 未找到，请检查路径。")

✅ 本地文件 ./media/08_step-audio2.pdf 准备就绪，可以进行上传。


## 2. 上传文件到平台

使用 `client.files.create` 接口上传 PDF 文件。对于 RAG 知识库，`purpose` 必须设置为 `retrieval-text`。

In [7]:
if not os.path.exists(DEMO_FILE_NAME):
    print("⚠️ 文件不存在，跳过上传步骤。")
else:
    try:
        print(f"正在上传文件 {DEMO_FILE_NAME}...")
        
        with open(DEMO_FILE_NAME, "rb") as f:
            uploaded_file = client.files.create(
                file=f,
                purpose="retrieval-text" # 知识库的文本检索用途
            )
        
        UPLOADED_FILE_ID = uploaded_file.id
        
        print("✅ 文件上传成功！")
        print(f"文件 ID: {UPLOADED_FILE_ID}")
        # 注意：文件状态为 processed 后才能被关联到知识库
        print(f"文件状态: {uploaded_file.status}") 
        
    except Exception as e:
        print(f"❌ 文件上传失败: {e}")

正在上传文件 ./media/08_step-audio2.pdf...
✅ 文件上传成功！
文件 ID: file-LkO5dJwUW8
文件状态: None


## 3. 创建知识库（Vector Store）

创建一个知识库容器，用于存储和管理与该 PDF 文件相关的知识。

In [4]:
url = f"{BASE_URL}/vector_stores"
create_vector_store_headers = headers.copy()
create_vector_store_headers["Content-Type"] = "application/json"

data = {
    "name": "audio2_pdf_knowledge"
}

print("正在创建知识库...")
try:
    response = requests.post(url, headers=create_vector_store_headers, json=data)
    response.raise_for_status() 
    result = response.json()
    
    VECTOR_STORE_ID = result.get("id")
    
    print("✅ 知识库创建成功！")
    print(f"知识库 ID: {VECTOR_STORE_ID}")
    
except requests.exceptions.HTTPError as e:
    print(f"❌ 知识库创建失败: {e} - {response.text}")

正在创建知识库...
✅ 知识库创建成功！
知识库 ID: 294402242740027392


## 4. 知识库关联文件

将已上传的文件（`UPLOADED_FILE_ID`）关联到知识库（`VECTOR_STORE_ID`）中。**重要提示**：在实际应用中，您可能需要等待几秒或轮询文件状态，确保文件处理完成才能成功关联。

In [5]:
if not VECTOR_STORE_ID or not UPLOADED_FILE_ID:
    print("⚠️ ID 缺失，请先成功执行前面的步骤！")
else:
    url = f"{BASE_URL}/vector_stores/{VECTOR_STORE_ID}/files"
    
    # 准备 multipart 数据
    multipart_data = MultipartEncoder(
        fields={
            "file_ids": UPLOADED_FILE_ID
        }
    )

    associate_headers = headers.copy()
    associate_headers["Content-Type"] = multipart_data.content_type

    print(f"正在将文件 {UPLOADED_FILE_ID} 关联到知识库 {VECTOR_STORE_ID}...")
    try:
        response = requests.post(url, headers=associate_headers, data=multipart_data)
        response.raise_for_status()
        result = response.json()
        
        print("✅ 文件关联成功！")
        print(json.dumps(result, indent=2, ensure_ascii=False))
        
    except requests.exceptions.HTTPError as e:
        print(f"❌ 文件关联失败: {e} - {response.text}")

正在将文件 file-LkMduLpJPk 关联到知识库 294402242740027392...
✅ 文件关联成功！
{
  "id": "file-LkMduLpJPk",
  "usage_bytes": 872404,
  "vector_store_id": "294402242740027392"
}


## 5. 使用知识库进行对话补全

在 `chat.completions.create` 接口中，通过 `tools` 参数启用 `retrieval`，并指向你的知识库 ID。模型将基于 `08_step-audio2.pdf` 的内容进行回答。

In [6]:
if not VECTOR_STORE_ID:
    print("⚠️ 知识库 ID 缺失，无法进行对话补全演示！")
else:
    sys_prompt = """
你是一名专业的技术文档分析师。你的任务是根据知识库检索到的信息，准确回答用户关于文档内容的问题。如果知识库中没有答案，请明确告知用户。
"""
    # 假设提问与 PDF 内容相关
    user_prompt = "文档中提到关于音频处理的关键技术点是什么？"
    messages = [
        { "role": "system", "content": sys_prompt },
        { "role": "user", "content": user_prompt }
    ]

    tools = [
        {
            "type": "retrieval",
            "function": {
                "name": "pdf_document_search",
                "description": "本文档存储了关于音频技术的详细信息。", 
                "options": {
                    "vector_store_id": VECTOR_STORE_ID, # 你的知识库 ID
                    "prompt_template": "从文档{{knowledge}} 中找到问题{{query}} 的答案。"
                }
            }
        }
    ]

    print("--- 正在使用知识库进行对话补全（流式输出）---")
    try:
        response = client.chat.completions.create(
            messages=messages,
            model=COMPLETION_MODEL,
            tool_choice="auto",
            tools=tools,
            stream=True,
        )
        
        print("模型回答:")
        for chunk in response:
            if chunk.choices and chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end='', flush=True)
        print("\n\n--- 对话补全结束 ---\n")
        
    except Exception as e:
        print(f"❌ 对话补全失败: {e}")

--- 正在使用知识库进行对话补全（流式输出）---
模型回答:
很抱歉，我无法回答你关于文档中提到的音频处理关键技术点的问题，因为我没有访问到具体的文档内容。如果你能提供更多关于文档的上下文或相关信息，我将尽力帮助你找到答案。请提供更多细节，以便我能够更好地为你服务。

--- 对话补全结束 ---



## 6. 清理（可选）

清理本地创建的临时文件，以及在平台上的知识库和文件（需要调用相应的删除 API 接口）。

In [None]:
# # --- 清理示例：删除知识库和文件 --- 
# if VECTOR_STORE_ID:
#     try:
#         client.vector_stores.delete(vector_store_id=VECTOR_STORE_ID)
#         print(f"✅ 已删除知识库: {VECTOR_STORE_ID}")
#     except Exception as e:
#         print(f"❌ 删除知识库失败: {e}")

# if UPLOADED_FILE_ID:
#     try:
#         client.files.delete(file_id=UPLOADED_FILE_ID)
#         print(f"✅ 已删除上传文件: {UPLOADED_FILE_ID}")
#     except Exception as e:
#         print(f"❌ 删除文件失败: {e}")

# 阶跃星辰 API 指南：知识库（Vector Store）完整管理

本 `ipynb` 文件旨在演示阶跃星辰开放平台提供的 **知识库（Vector Store）** 相关的全部 API 操作，涵盖知识库本身的 CRUD 操作和知识库内文件的管理。

## 知识库 API 概览：

| 功能模块 | 接口说明 | HTTP 方法 | 对应的 API 链接 |
| :--- | :--- | :--- | :--- |
| **知识库管理** | 创建知识库 | `POST` | `/v1/vector_stores` |
| | 查询知识库列表 | `GET` | `/v1/vector_stores` |
| | 获取知识库详情 | `GET` | `/v1/vector_stores/{id}` |
| | 删除知识库 | `DELETE` | `/v1/vector_stores/{id}` |
| **文件管理** | 将文件添加到知识库 | `POST` | `/v1/vector_stores/{id}/files` |
| | 查看知识库中的文件 | `GET` | `/v1/vector_stores/{id}/files` |
| | 将文件从知识库中移除 | `DELETE` | `/v1/vector_stores/{id}/files/{file_id}` |

---

In [8]:
import requests
import json
import os
import time
from openai import OpenAI
from requests_toolbelt.multipart.encoder import MultipartEncoder

# --- 配置和全局变量 --- 
STEP_API_KEY = os.getenv("STEPFUN_API_KEY") # ⚠️ 替换为您的 API Key
BASE_URL = "https://api.stepfun.com/v1"
DEMO_FILE_NAME = "./media/08_step-audio2.pdf" # ⚠️ 请确保此文件存在于当前目录

# 存储流程中产生的关键 ID
UPLOADED_FILE_ID = "" 
VECTOR_STORE_ID = "" 
VECTOR_STORE_FILE_ID = "" # 关联后返回的对象ID

headers = {
    "Authorization": f"Bearer {STEP_API_KEY}"
}

client = OpenAI(
    api_key=STEP_API_KEY, 
    base_url=BASE_URL
)

if not os.path.exists(DEMO_FILE_NAME):
    print(f"⚠️ 错误：请确保文件 {DEMO_FILE_NAME} 存在于当前目录下。")

## A. 知识库准备：文件上传

首先，上传文件并获取 `file_id`，这是后续知识库文件管理的基础。`purpose` 必须为 `retrieval-text`。

In [None]:
if not os.path.exists(DEMO_FILE_NAME):
    print("❌ 文件不存在，跳过上传。")
else:
    try:
        print(f"[A.1] 正在上传文件 {DEMO_FILE_NAME}...")
        with open(DEMO_FILE_NAME, "rb") as f:
            uploaded_file = client.files.create(
                file=f,
                purpose="retrieval-text" 
            )
        
        global UPLOADED_FILE_ID
        UPLOADED_FILE_ID = uploaded_file.id
        
        print("✅ 文件上传成功！")
        print(f"文件 ID: {UPLOADED_FILE_ID} | 状态: {uploaded_file.status}")
        print("等待文件处理完成... (约 10 秒)")
        time.sleep(10) # 模拟等待文件处理完成
        
    except Exception as e:
        print(f"❌ 文件上传失败: {e}")

## B. 知识库管理（Vector Store CRUD）

演示创建、列出、获取详情和删除知识库的 API 操作。

In [9]:
# [B.1] 创建知识库 (Create Vector Store)
print("\n--- [B.1] 创建知识库 ---")
url = f"{BASE_URL}/vector_stores"
create_headers = headers.copy()
create_headers["Content-Type"] = "application/json"
data = {"name": "Audio2_Tech_Docs"}

try:
    response = requests.post(url, headers=create_headers, json=data)
    response.raise_for_status()
    result = response.json()
    
    global VECTOR_STORE_ID
    VECTOR_STORE_ID = result.get("id")
    
    print("✅ 创建成功！")
    print(f"知识库 ID: {VECTOR_STORE_ID}")
except requests.exceptions.HTTPError as e:
    print(f"❌ 创建失败: {e} - {response.text}")

# [B.2] 查询知识库列表 (List Vector Stores)
if VECTOR_STORE_ID:
    print("\n--- [B.2] 查询知识库列表 ---")
    url_list = f"{BASE_URL}/vector_stores?limit=1"
    try:
        response = requests.get(url_list, headers=headers)
        response.raise_for_status()
        result = response.json()
        
        print("✅ 查询列表成功！")
        print(f"总数: {len(result['data'])} / {result['data'][0]['name']}")
    except requests.exceptions.HTTPError as e:
        print(f"❌ 查询列表失败: {e}")

# [B.3] 获取知识库详情 (Retrieve Vector Store)
if VECTOR_STORE_ID:
    print("\n--- [B.3] 获取知识库详情 ---")
    url_retrieve = f"{BASE_URL}/vector_stores/{VECTOR_STORE_ID}"
    try:
        response = requests.get(url_retrieve, headers=headers)
        response.raise_for_status()
        result = response.json()
        
        print("✅ 获取详情成功！")
        print(f"ID: {result['id']}, 名称: {result['name']}, 文件数: {result['file_counts']['total']}")
    except requests.exceptions.HTTPError as e:
        print(f"❌ 获取详情失败: {e}")


--- [B.1] 创建知识库 ---
✅ 创建成功！
知识库 ID: 294407544549998592

--- [B.2] 查询知识库列表 ---
✅ 查询列表成功！
总数: 1 / Audio2_Tech_Docs

--- [B.3] 获取知识库详情 ---
✅ 获取详情成功！
ID: 294407544549998592, 名称: Audio2_Tech_Docs, 文件数: 0


## C. 知识库文件管理（Vector Store File CRUD）

演示文件与知识库的关联、查询和移除操作。注意：文件本身（`file-id`）需要单独通过 `/v1/files` 接口删除。

In [10]:
# [C.1] 将文件添加到知识库 (Create Vector Store File)
if VECTOR_STORE_ID and UPLOADED_FILE_ID:
    print("\n--- [C.1] 将文件添加到知识库 ---")
    url_add_file = f"{BASE_URL}/vector_stores/{VECTOR_STORE_ID}/files"
    
    # 准备 multipart 数据，使用文件 ID
    multipart_data = MultipartEncoder(fields={"file_ids": UPLOADED_FILE_ID})
    add_file_headers = headers.copy()
    add_file_headers["Content-Type"] = multipart_data.content_type

    try:
        response = requests.post(url_add_file, headers=add_file_headers, data=multipart_data)
        response.raise_for_status()
        result = response.json()
        
        global VECTOR_STORE_FILE_ID
        # 关联后返回的是 Vector Store File Object 列表，包含 vector_store_id 和 file_id
        VECTOR_STORE_FILE_ID = UPLOADED_FILE_ID # 在 StepFun API 中，移除文件仍使用 File ID
        
        print("✅ 文件添加成功！")
        print(f"关联文件 ID: {UPLOADED_FILE_ID}, 知识库 ID: {result['vector_store_id']}")
        print(f"状态: {result['status']}")
    except requests.exceptions.HTTPError as e:
        print(f"❌ 添加文件失败: {e} - {response.text}")

# [C.2] 查看知识库中的文件列表 (List Vector Store Files)
if VECTOR_STORE_ID:
    print("\n--- [C.2] 查询知识库中的文件列表 ---")
    url_list_files = f"{BASE_URL}/vector_stores/{VECTOR_STORE_ID}/files?limit=10"
    try:
        response = requests.get(url_list_files, headers=headers)
        response.raise_for_status()
        result = response.json()
        
        print("✅ 查询文件列表成功！")
        print(f"知识库 {VECTOR_STORE_ID} 中共有 {len(result['data'])} 个文件")
        if result['data']:
            print(f"第一个文件 ID: {result['data'][0]['id']}, 状态: {result['data'][0]['status']}")
    except requests.exceptions.HTTPError as e:
        print(f"❌ 查询文件列表失败: {e}")


--- [C.2] 查询知识库中的文件列表 ---
✅ 查询文件列表成功！


TypeError: object of type 'NoneType' has no len()

## D. 清理操作

演示如何从知识库中移除文件以及删除整个知识库。为了避免资源浪费，强烈建议执行清理操作。

In [None]:
# [D.1] 将文件从知识库中移除 (Delete Vector Store File)
if VECTOR_STORE_ID and UPLOADED_FILE_ID:
    print("\n--- [D.1] 移除文件 ---")
    url_delete_file = f"{BASE_URL}/vector_stores/{VECTOR_STORE_ID}/files/{UPLOADED_FILE_ID}"
    try:
        response = requests.delete(url_delete_file, headers=headers)
        response.raise_for_status()
        result = response.json()
        
        print(f"✅ 文件 {UPLOADED_FILE_ID} 已从知识库中移除！")
        print(f"删除状态: {result['deleted']}")
    except requests.exceptions.HTTPError as e:
        print(f"❌ 移除文件失败: {e}")

# [D.2] 删除知识库 (Delete Vector Store)
if VECTOR_STORE_ID:
    print("\n--- [D.2] 删除知识库 ---")
    url_delete_vs = f"{BASE_URL}/vector_stores/{VECTOR_STORE_ID}"
    try:
        response = requests.delete(url_delete_vs, headers=headers)
        response.raise_for_status()
        result = response.json()
        
        print(f"✅ 知识库 {VECTOR_STORE_ID} 已删除！")
        print(f"删除状态: {result['deleted']}")
    except requests.exceptions.HTTPError as e:
        print(f"❌ 删除知识库失败: {e}")
        
# [D.3] 清理上传的文件 (可选)
if UPLOADED_FILE_ID:
    print("\n--- [D.3] 清理上传的文件 (Files API) ---")
    try:
        client.files.delete(file_id=UPLOADED_FILE_ID)
        print(f"✅ 已删除上传文件: {UPLOADED_FILE_ID}")
    except Exception as e:
        print(f"❌ 删除上传文件失败: {e}")