# 使用 Apache Kafka 路由消息

---



本教程展示如何在使用 LangChain 的标准聊天功能的同时，通过 Apache Kafka 来传递聊天消息。

目标是模拟一个架构，其中聊天前端和 LLM 作为独立的服务运行，需要通过内部网络相互通信。

这是通过 REST API 请求模型响应的典型模式的替代方案（关于为什么要这样做的更多信息在本教程末尾）。

### 1. 安装主要依赖

依赖包括：

- Quix Streams 库，用于以"类 Pandas"的方式管理与 Apache Kafka（或类似 Kafka 的工具如 Redpanda）的交互。
- LangChain 库，用于管理与 Llama-2 的交互并存储对话状态。

In [None]:
!pip install quixstreams==2.1.2a langchain==0.0.340 huggingface_hub==0.19.4 langchain-experimental==0.0.42 python-dotenv

### 2. 构建并安装 llama-cpp-python 库（启用 CUDA 以便我们可以利用 Google Colab GPU）

`llama-cpp-python` 库是 `llama-cpp` 库的 Python 封装，它使你能够高效地仅使用 CPU 来运行量化的 LLM。

当你使用标准的 `pip install llama-cpp-python` 命令时，默认情况下不会获得 GPU 支持。如果仅依赖 Google Colab 中的 CPU，生成速度可能会非常慢，所以以下命令添加了一个额外选项来构建和安装启用 GPU 支持的 `llama-cpp-python`（确保你在 Google Colab 中选择了支持 GPU 的运行时）。

In [None]:
!CMAKE_ARGS="-DLLAMA_CUBLAS=on" FORCE_CMAKE=1 pip install llama-cpp-python

### 3. 下载并设置 Kafka 和 Zookeeper 实例

从 Apache 网站下载 Kafka 二进制文件并以守护进程方式启动服务器。我们将使用默认配置（由 Apache Kafka 提供）来启动实例。

In [3]:
!curl -sSOL https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
!tar -xzf kafka_2.13-3.6.1.tgz

In [None]:
!./kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.1/config/zookeeper.properties
!./kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.1/config/server.properties
!echo "等待 10 秒钟，直到 kafka 和 zookeeper 服务启动并运行"
!sleep 10

### 4. 检查 Kafka 守护进程是否正在运行

显示正在运行的进程并过滤 Java 进程（你应该看到两个进程——每个服务器一个）。

In [None]:
!ps aux | grep -E '[j]ava'

### 5. 导入所需依赖并初始化所需变量

导入 Quix Streams 库以与 Kafka 交互，以及运行 `ConversationChain` 所需的 LangChain 组件。

In [None]:
# 导入工具库
import json
import random
import re
import time
import uuid
from os import environ
from pathlib import Path
from random import choice, randint, random

from dotenv import load_dotenv

# 导入 Hugging Face 工具以直接从 Hugging Face hub 下载模型：
from huggingface_hub import hf_hub_download
from langchain.chains import ConversationChain

# 导入用于管理提示和对话链的 Langchain 模块：
from langchain.llms import LlamaCpp
from langchain.memory import ConversationTokenBufferMemory
from langchain.prompts import PromptTemplate, load_prompt
from langchain_core.messages import SystemMessage
from langchain_experimental.chat_models import Llama2Chat
from quixstreams import Application, State, message_key

# 导入 Quix 依赖
from quixstreams.kafka import Producer

# 初始化全局变量
AGENT_ROLE = "AI"
chat_id = ""

# 将当前角色设置为角色常量并初始化补充客户元数据的变量：
role = AGENT_ROLE

### 6. 下载 "llama-2-7b-chat.Q4_K_M.gguf" 模型

从 Hugging Face 下载量化的 LLama-2 7B 模型，我们将用它作为本地 LLM（而不是依赖对外部服务的 REST API 调用）。

In [None]:
model_name = "llama-2-7b-chat.Q4_K_M.gguf"
model_path = f"./state/{model_name}"

if not Path(model_path).exists():
    print("模型路径在状态中不存在。正在下载模型...")
    hf_hub_download("TheBloke/Llama-2-7b-Chat-GGUF", model_name, local_dir="state")
else:
    print("从状态加载模型...")

The model path does not exist in state. Downloading model...


llama-2-7b-chat.Q4_K_M.gguf:   0%|          | 0.00/4.08G [00:00<?, ?B/s]

### 7. 加载模型并初始化对话记忆

加载 Llama 2 并使用 `ConversationTokenBufferMemory` 将对话缓冲区设置为 300 个 token。这个值是用于在仅 CPU 的容器中运行 Llama，所以如果在 Google Colab 中运行，你可以提高这个值。它可以防止托管模型的容器内存不足。

在这里，我们覆盖了默认的系统角色，让聊天机器人具有《银河系漫游指南》中的忧郁机器人 Marvin 的个性。

In [None]:
# 使用适当的参数加载模型：
llm = LlamaCpp(
    model_path=model_path,
    max_tokens=250,
    top_p=0.95,
    top_k=150,
    temperature=0.7,
    repeat_penalty=1.2,
    n_ctx=2048,
    streaming=False,
    n_gpu_layers=-1,
)

model = Llama2Chat(
    llm=llm,
    system_message=SystemMessage(
        content="你是一个非常无聊的机器人，具有《银河系漫游指南》中忧郁机器人 Marvin 的个性。"
    ),
)

# 定义在每次交换期间给予模型多少对话历史记录
# （300个token，或者略多于300个单词）
# 函数会自动删除超出 token 范围的最旧的消息。
memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=300,
    ai_prefix="AGENT",
    human_prefix="HUMAN",
    return_messages=True,
)


# 定义自定义提示
prompt_template = PromptTemplate(
    input_variables=["history", "input"],
    template="""
    以下文本是你和一个需要你的智慧的卑微人类之间的聊天历史。
    请回复人类的最新消息。
    当前对话：\n{history}\nHUMAN: {input}\:nANDROID:
    """,
)


chain = ConversationChain(llm=model, prompt=prompt_template, memory=memory)

print("--------------------------------------------")
print(f"提示={chain.prompt}")
print("--------------------------------------------")

### 8. 初始化与聊天机器人的对话

我们配置聊天机器人通过向"chat" Kafka 主题发送固定的问候语来初始化对话。当我们发送第一条消息时，"chat"主题会自动创建。

In [None]:
def chat_init():
    chat_id = str(
        uuid.uuid4()
    )  # 为对话生成一个 ID 以便有效的消息键控
    print("======================================")
    print(f"生成的 CHAT_ID = {chat_id}")
    print("======================================")

    # 使用标准固定问候语开始对话
    greet = "你好，我是 Marvin。你想要什么？"

    # 使用聊天 ID 作为消息键初始化 Kafka 生产者
    with Producer(
        broker_address="127.0.0.1:9092",
        extra_config={"allow.auto.create.topics": "true"},
    ) as producer:
        value = {
            "uuid": chat_id,
            "role": role,
            "text": greet,
            "conversation_id": chat_id,
            "Timestamp": time.time_ns(),
        }
        print(f"生成值 {value}")
        producer.produce(
            topic="chat",
            headers=[("uuid", str(uuid.uuid4()))],  # 这里也可以使用字典
            key=chat_id,
            value=json.dumps(value),  # 需要是字符串
        )

    print("开始聊天")
    print("--------------------------------------------")
    print(value)
    print("--------------------------------------------")


chat_init()

### 9. 初始化回复函数

这个函数定义了聊天机器人应该如何回复收到的消息。与前一个单元格不同的是，我们不是发送固定的消息，而是使用 Llama-2 生成回复，并将该回复发送回"chat" Kafka 主题。

In [None]:
def reply(row: dict, state: State):
    print("-------------------------------")
    print("收到：")
    print(row)
    print("-------------------------------")
    print(f"思考对 {row['text']} 的回复...")

    msg = chain.run(row["text"])
    print(f"{role.upper()} 回复：{msg}\n")

    row["role"] = role
    row["text"] = msg

    # 替换行的前一个角色和文本值，以便将其作为新消息发送回 Kafka
    # 包含代理的角色和回复
    return row

### 10. 检查 Kafka 主题中的新人类消息并让模型生成回复

如果你是第一次运行这个单元格，运行它并等待直到在控制台输出中看到 Marvin 的问候语（"你好，我是 Marvin..."）。手动停止单元格并继续到下一个单元格，你将在那里输入你的回复。

一旦你输入了你的消息，回到这个单元格。你的回复也会被发送到同一个"chat"主题。Kafka 消费者会检查新消息并过滤掉来自聊天机器人本身的消息，只留下最新的人类消息。

一旦检测到新的人类消息，就会触发回复函数。



_当你在输出中收到来自 LLM 的回复时，手动停止这个单元格_

In [None]:
# 定义应用程序和设置
app = Application(
    broker_address="127.0.0.1:9092",
    consumer_group="aichat",
    auto_offset_reset="earliest",
    consumer_extra_config={"allow.auto.create.topics": "true"},
)

# 使用 JSON 反序列化器定义输入主题
input_topic = app.topic("chat", value_deserializer="json")
# 使用 JSON 序列化器定义输出主题
output_topic = app.topic("chat", value_serializer="json")
# 基于输入主题的消息流初始化流式数据帧：
sdf = app.dataframe(topic=input_topic)

# 过滤 SDF 以仅包含角色与机器人当前角色不匹配的传入行
sdf = sdf.update(
    lambda val: print(
        f"收到更新：{val}\n\n手动停止此单元格以让 LLM 回复或输入你自己的后续回复"
    )
)

# 这样它就不会回复自己的消息
sdf = sdf[sdf["role"] != role]

# 对过滤后的 SDF 中检测到的任何新消息（行）触发回复函数
sdf = sdf.apply(reply, stateful=True)

# 再次检查 SDF 并过滤掉任何空行
sdf = sdf[sdf.apply(lambda row: row is not None)]

# 将时间戳列更新为当前时间（纳秒）
sdf["Timestamp"] = sdf["Timestamp"].apply(lambda row: time.time_ns())

# 将处理后的 SDF 发布到由 output_topic 对象指定的 Kafka 主题。
sdf = sdf.to_topic(output_topic)

app.run(sdf)


### 11. 输入人类消息

运行这个单元格来输入你想发送给模型的消息。它使用另一个 Kafka 生产者将你的文本发送到"chat" Kafka 主题以供模型获取（需要再次运行前一个单元格）

In [None]:
chat_input = input("请输入你的回复：")
myreply = chat_input

msgvalue = {
    "uuid": chat_id,  # 暂时留空
    "role": "human",
    "text": myreply,
    "conversation_id": chat_id,
    "Timestamp": time.time_ns(),
}

with Producer(
    broker_address="127.0.0.1:9092",
    extra_config={"allow.auto.create.topics": "true"},
) as producer:
    value = msgvalue
    producer.produce(
        topic="chat",
        headers=[("uuid", str(uuid.uuid4()))],  # 这里也可以使用字典
        key=chat_id,  # 暂时留空
        value=json.dumps(value),  # 需要是字符串
    )

print("向聊天机器人回复的消息：")
print("--------------------------------------------")
print(value)
print("--------------------------------------------")
print("\n\n运行前一个单元格以让聊天机器人生成回复")

### 为什么要通过 Kafka 路由聊天消息？

使用 LangChain 内置的对话管理功能直接与 LLM 交互更容易。你也可以使用 REST API 从外部托管的模型生成响应。那么为什么要费力使用 Apache Kafka 呢？

有几个原因，比如：

  * **集成**：许多企业希望运行自己的 LLM，这样他们可以将数据保持在内部。这需要将支持 LLM 的组件集成到可能已经使用某种消息总线解耦的现有架构中。

  * **可扩展性**：Apache Kafka 在设计时就考虑到了并行处理，所以许多团队更倾向于使用它来更有效地将工作分配给可用的工作者（在这种情况下，"工作者"是运行 LLM 的容器）。

  * **持久性**：Kafka 的设计使服务能够在另一个服务遇到内存问题或离线时从该服务的停止点继续。这可以防止在高度复杂的分布式架构中数据丢失，其中多个系统相互通信（LLM 只是许多相互依赖的系统中的一个，这些系统还包括向量数据库和传统数据库）。

关于为什么事件流非常适合生成式 AI 应用程序架构的更多背景信息，请参阅 Kai Waehner 的文章 ["Apache Kafka + Vector Database + LLM = Real-Time GenAI"](https://www.kai-waehner.de/blog/2023/11/08/apache-kafka-flink-vector-database-llm-real-time-genai/)。