**获取最新写法：** https://python.langchain.com/docs/versions/migrating_chains/map_reduce_chain/#legacy-1

In [8]:
import nest_asyncio
from langchain_openai import ChatOpenAI

# 需要同步jupyter运行异步事件循环
nest_asyncio.apply()
llm = ChatOpenAI(model_name="gpt-4", temperature=1.0)

## 使用 Playwright 抓取网页
uv pip install playwright
!playwright install

In [9]:
import asyncio

from langchain_core.tools import tool


async def async_load_playwright(url: str) -> str:
    """使用PlayWright加载指定的URL，并使用BeautifulSoup解析。"""
    from bs4 import BeautifulSoup
    from playwright.async_api import async_playwright

    results = ""
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        try:
            page = await browser.new_page()
            await page.goto(url)

            page_source = await page.content()
            soup = BeautifulSoup(page_source, "html.parser")

            for script in soup(["script", "style"]):
                script.extract()

            text = soup.get_text()
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            results = "\n".join(chunk for chunk in chunks if chunk)
        except Exception as e:
            results = f"错误：{e}"
        await browser.close()
    return results


def run_async(coro):
    event_loop = asyncio.get_event_loop()
    return event_loop.run_until_complete(coro)


@tool
def browse_web_page(url: str) -> str:
    """以详细的方式抓取整个网页。可能会导致解析问题。"""
    return run_async(async_load_playwright(url))

In [None]:
from typing import Literal

from langchain.base_language import BaseLanguageModel
from langchain.chains.qa_with_sources import load_qa_with_sources_chain
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.tools import BaseTool
from pydantic import Field


# 🧩 可选：分块函数
def _get_text_splitter() -> RecursiveCharacterTextSplitter:
    return RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=100,
    )


# 🧠 Tool 实现
class WebpageQATool(BaseTool):
    name: str = "query_webpage"
    description: str = "浏览网页并检索与问题相关的信息，并附带来源。"

    # 参数：模型、分块器、chain类型
    llm: BaseLanguageModel
    chain_type: Literal["stuff", "map_reduce", "refine", "map_rerank"] = "map_reduce"
    text_splitter: RecursiveCharacterTextSplitter = Field(default_factory=_get_text_splitter)

    def _run(self, url: str, question: str) -> str:
        """使用 QA chain 回答网页内容中的问题，并返回答案和来源。"""

        try:
            # 👇 1. 抓网页内容（你需要实现这个函数）
            html = browse_web_page.run(url)

            # 👇 2. 封装为 Document + 切块
            doc = Document(page_content=html, metadata={"source": url})
            docs = self.text_splitter.split_documents([doc])

            # 👇 3. 构建 QA Chain（含来源引用）
            chain = load_qa_with_sources_chain(
                llm=self.llm,
                chain_type=self.chain_type,
                verbose=False,
            )

            # 👇 4. 运行 QA chain
            result = chain({"input_documents": docs, "question": question}, return_only_outputs=True)

            # 👇 5. 返回格式化结果
            answer = result.get("answer", "")
            sources = result.get("sources", "")
            return f"回答：{answer}\n\n来源：{sources}"

        except Exception as e:
            return f"❌ 处理失败：{str(e)}"

    async def _arun(self, url: str, question: str) -> str:
        raise NotImplementedError("异步暂未实现。")

In [None]:
query_website_tool = WebpageQATool(llm=llm)

In [None]:
from langchain.chains.qa_with_sources.retrieval import RetrievalQAWithSourcesChain
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import ChatOpenAI

# 1. 加载网页（支持 JS 动态加载的页面除外）
url = "https://en.wikipedia.org/wiki/OpenAI"
loader = WebBaseLoader([url])
docs = loader.load()  # 返回的是 List[Document]

# 2. 分块（必须步骤，保证适配 LLM 输入限制）
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
split_docs = splitter.split_documents(docs)

# 3. 构建向量索引
vectorstore = FAISS.from_documents(split_docs, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()

# 4. 构建 RetrievalQAWithSourcesChain
llm = ChatOpenAI(temperature=0)
qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
    llm=llm,
    retriever=retriever,
    chain_type="stuff",  # 可改为 map_reduce/refine/map_rerank
    return_source_documents=True,  # 可选：返回完整文档
)

# 5. 提问
question = "OpenAI 是做什么的？"
result = qa_chain.invoke({"question": question})

# 6. 输出结果
print("🧠 答案：", result["answer"])
print("📎 来源：", result["sources"])

## 封装成工具



In [None]:
class QueryWebpageWithSourcesTool(BaseTool):
    name: str = "query_webpage"
    description: str = "读取网页并回答问题，附带来源"

    llm: BaseLanguageModel

    def _run(self, url: str, question: str) -> str:
        docs = WebBaseLoader(url).load()
        split_docs = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100).split_documents(docs)
        vectorstore = FAISS.from_documents(split_docs, OpenAIEmbeddings())
        retriever = vectorstore.as_retriever()

        qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
            llm=self.llm, retriever=retriever, return_source_documents=False
        )

        result = qa_chain.invoke({"question": question})
        return f"🧠 回答：{result['answer']}\n📎 来源：{result['sources']}"

    async def _arun(self, url: str, question: str):
        raise NotImplementedError

好的，下面是**增强版的网页问答工具 `QueryWebpagesWithSourcesTool`**，具备以下功能：

---

## ✅ 功能亮点：

| 功能                | 描述                                            |
| ----------------- | --------------------------------------------- |
| 🔗 支持多个网页链接       | 多网页合并提问，一次问多个网页内容                             |
| 📄 自动网页解析         | 用 `WebBaseLoader` 抓网页正文                       |
| 🧩 文本分块 + 嵌入 + 检索 | 自动构建向量数据库                                     |
| 🧠 支持多种 QA 模式     | `stuff`, `map_reduce`, `refine`, `map_rerank` |
| 📎 来源引用           | 输出中包含提取的网页链接（source）                          |
| 🧱 可接入 Agent      | 完整 Tool 结构，支持 Tool Calling                    |
| ⚙️ 可返回结构化 JSON    | 答案 + sources 可直接处理                            |

---

## ✅ 完整代码（多网页问答工具）

```python
from typing import List, Union, Literal
from pydantic import BaseModel, Field
from langchain.tools import BaseTool
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQAWithSourcesChain
from langchain.schema import Document
from langchain.base_language import BaseLanguageModel


# 🧠 参数结构：用于 Tool 接收 inputs
class WebQAInput(BaseModel):
    urls: Union[str, List[str]] = Field(..., description="要提问的网页链接，可以是一个或多个")
    question: str = Field(..., description="要提问的问题")


# 🛠️ Tool 实现
class QueryWebpagesWithSourcesTool(BaseTool):
    name: str = "query_webpages_with_sources"
    description: str = "读取一个或多个网页并回答问题，附带引用来源"

    llm: BaseLanguageModel
    chain_type: Literal["stuff", "map_reduce", "refine", "map_rerank"] = "map_reduce"
    chunk_size: int = 1000
    chunk_overlap: int = 100

    args_schema = WebQAInput

    def _run(self, urls: Union[str, List[str]], question: str) -> str:
        try:
            # 支持字符串或列表
            url_list = [urls] if isinstance(urls, str) else urls

            # Step 1: 加载网页
            docs: List[Document] = []
            for url in url_list:
                web_docs = WebBaseLoader(url).load()
                for d in web_docs:
                    d.metadata["source"] = url  # 明确来源
                docs.extend(web_docs)

            # Step 2: 分块
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap
            )
            split_docs = splitter.split_documents(docs)

            # Step 3: 向量索引
            vectorstore = FAISS.from_documents(split_docs, OpenAIEmbeddings())
            retriever = vectorstore.as_retriever()

            # Step 4: QA Chain
            qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
                llm=self.llm,
                retriever=retriever,
                chain_type=self.chain_type,
                return_source_documents=False
            )

            result = qa_chain.invoke({"question": question})
            answer = result.get("answer", "")
            sources = result.get("sources", "")

            return f"🧠 回答：{answer}\n📎 来源：{sources}"

        except Exception as e:
            return f"❌ 出错：{str(e)}"

    async def _arun(self, urls: Union[str, List[str]], question: str) -> str:
        raise NotImplementedError("暂不支持异步运行")
```

---

## ✅ 示例使用方式：

```python
tool = QueryWebpagesWithSourcesTool(
    llm=ChatOpenAI(temperature=0),
    chain_type="map_rerank"
)

output = tool.run({
    "urls": [
        "https://en.wikipedia.org/wiki/OpenAI",
        "https://en.wikipedia.org/wiki/ChatGPT"
    ],
    "question": "OpenAI 和 ChatGPT 有什么关系？"
})

print(output)
```

---

## ✅ 示例输出：

```
🧠 回答：OpenAI 是 ChatGPT 的开发者。ChatGPT 是由 OpenAI 开发的大型语言模型应用。
📎 来源：https://en.wikipedia.org/wiki/OpenAI, https://en.wikipedia.org/wiki/ChatGPT
```

---

## ✅ 扩展建议（可选）：

| 扩展          | 建议做法                                          |
| ----------- | --------------------------------------------- |
| 支持 JSON 输出  | 改成 `return {"answer": ..., "sources": ...}`   |
| 支持 Agent 调用 | 设置 `tool.description` 和 `args_schema`         |
| 异步抓网页       | 用 `aiohttp + asyncio.gather` 替代 WebBaseLoader |
| 增加日志        | `print(f"Loaded from {url}")` 等调试辅助           |

---

## 🧩 小贴士：如果网页 JS 渲染怎么办？

`WebBaseLoader` 不能抓动态 JS 内容，你可以替换为：

```python
from langchain_community.document_loaders import PlaywrightURLLoader

loader = PlaywrightURLLoader(urls=[...], headless=True)
docs = loader.load()
```

---

是否需要我为你补充这段代码的 Tool Calling JSON schema（OpenAI functions/assistant API 用）？


好的！你想要的“网页问答工具”进一步扩展成**支持多轮对话**的能力，即：

> **记住之前的问题与答案，上下文连贯地对话问网页内容**

---

## ✅ 核心思路

在原有的网页问答基础上，**加入聊天历史的记忆机制**：

1. 维护一份多轮 `chat_history`（对话上下文）
2. 在每次提问时，把过去的问答也一并输入模型
3. 保证答案连贯、上下文相关

---

## ✅ 使用组件说明

| 组件                             | 用法                        |
| ------------------------------ | ------------------------- |
| `ConversationalRetrievalChain` | 官方推荐链，支持历史对话记忆            |
| `chat_history`                 | 传入 `(user, ai)` 对列表，构成上下文 |
| `Retriever`                    | 使用向量检索构建（从网页文本构建）         |

---

## ✅ 完整多轮对话版 Web QA Tool

```python
from typing import List, Tuple, Union, Literal
from pydantic import BaseModel, Field
from langchain.tools import BaseTool
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.schema import Document
from langchain.base_language import BaseLanguageModel


class ConversationalWebQAInput(BaseModel):
    urls: Union[str, List[str]] = Field(..., description="网页链接（单个或多个）")
    question: str = Field(..., description="当前用户提问")
    history: List[Tuple[str, str]] = Field(default_factory=list, description="对话历史，格式为[(user, ai), ...]")


class ConversationalWebQATool(BaseTool):
    name: str = "conversational_web_qa"
    description: str = "读取网页并进行多轮对话问答，支持上下文记忆和来源引用"

    llm: BaseLanguageModel
    chunk_size: int = 1000
    chunk_overlap: int = 100

    args_schema = ConversationalWebQAInput

    def _run(self, urls: Union[str, List[str]], question: str, history: List[Tuple[str, str]]) -> str:
        try:
            url_list = [urls] if isinstance(urls, str) else urls

            # Step 1: 加载网页
            docs: List[Document] = []
            for url in url_list:
                web_docs = WebBaseLoader(url).load()
                for d in web_docs:
                    d.metadata["source"] = url
                docs.extend(web_docs)

            # Step 2: 文本切块
            splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
            split_docs = splitter.split_documents(docs)

            # Step 3: 向量索引
            vectorstore = FAISS.from_documents(split_docs, OpenAIEmbeddings())
            retriever = vectorstore.as_retriever()

            # Step 4: 构建 Conversational QA Chain
            qa_chain = ConversationalRetrievalChain.from_llm(
                llm=self.llm,
                retriever=retriever,
                return_source_documents=True
            )

            # Step 5: 多轮对话
            result = qa_chain.invoke({"question": question, "chat_history": history})

            # Step 6: 输出答案与来源
            answer = result["answer"]
            sources = ", ".join({doc.metadata.get("source", "未知") for doc in result["source_documents"]})

            return f"🧠 回答：{answer}\n📎 来源：{sources}"

        except Exception as e:
            return f"❌ 出错：{str(e)}"

    async def _arun(self, *args, **kwargs):
        raise NotImplementedError("暂不支持异步")
```

---

## ✅ 使用示例（模拟多轮对话）

```python
tool = ConversationalWebQATool(llm=ChatOpenAI(temperature=0))

history = []

q1 = "OpenAI 是做什么的？"
a1 = tool.run({"urls": "https://en.wikipedia.org/wiki/OpenAI", "question": q1, "history": history})
print(a1)
history.append((q1, a1))

q2 = "他们的著名产品有哪些？"
a2 = tool.run({"urls": "https://en.wikipedia.org/wiki/OpenAI", "question": q2, "history": history})
print(a2)
```

---

## ✅ 示例输出（可上下文连贯）

```
🧠 回答：OpenAI 是一家从事人工智能研究的实验室。
📎 来源：https://en.wikipedia.org/wiki/OpenAI

🧠 回答：他们的著名产品包括 ChatGPT、Codex 和 DALL·E。
📎 来源：https://en.wikipedia.org/wiki/OpenAI
```

---

## ✅ 整体结构图

```
User Question + Chat History
        ↓
 ConversationalRetrievalChain
        ↓
Retriever (网页向量库)
        ↓
回答 + 来源
```

---

## ✅ 总结

| 功能                     | 是否支持                    |
| ---------------------- | ----------------------- |
| 多网页合并问答                | ✅                       |
| 上下文记忆                  | ✅（通过 `chat_history`）    |
| 多轮提问连贯                 | ✅                       |
| 引用来源网页                 | ✅                       |
| 支持 Agent / ToolCalling | ✅（符合 LangChain Tool 接口） |

---

是否需要我再扩展为支持**LangGraph 状态机式多轮问答**，或者接入 LangChain Expression Language？如果你想让它接入真实 Web Agent，我也可以帮你封装。


In [None]:
from itertools import batched
from typing import Literal

import faiss
import rich
from langchain.base_language import BaseLanguageModel
from langchain.chains.qa_with_sources.retrieval import RetrievalQAWithSourcesChain
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.tools import BaseTool
from langchain.vectorstores.faiss import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from pydantic import BaseModel, Field
from rich.console import Console
from rich.markdown import Markdown

from app.core import settings


# 🧠 参数结构：用于 Tool 接收 inputs
class WebQAInput(BaseModel):
    urls: str | list[str] = Field(..., description="要提问的网页链接，可以是一个或多个")
    question: str = Field(..., description="要提问的问题")


# 🛠️ Tool 实现
class QueryWebpagesWithSourcesTool(BaseTool):
    name: str = "query_webpages_with_sources"
    description: str = "读取一个或多个网页并回答问题，附带引用来源"

    llm: BaseLanguageModel
    chain_type: Literal["stuff", "map_reduce", "refine", "map_rerank"] = "map_reduce"
    chunk_size: int = 1000
    chunk_overlap: int = 100

    args_schema: type[BaseModel] = WebQAInput

    def _run(self, urls: str | list[str], question: str) -> str:
        try:
            # 支持字符串或列表
            url_list = [urls] if isinstance(urls, str) else urls

            # Step 1: 加载网页
            docs: list[Document] = []
            for url in url_list:
                web_docs = WebBaseLoader(url).load()
                for d in web_docs:
                    d.metadata["source"] = url  # 明确来源
                docs.extend(web_docs)

            # Step 2: 分块
            splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
            split_docs = splitter.split_documents(docs)

            # Step 3: 向量索引
            embeddings = OpenAIEmbeddings(
                model="Qwen/Qwen3-Embedding-8B",
                openai_api_key=settings.SILICONFLOW_API_KEY,
                openai_api_base=settings.SILICONFLOW_API_BASE,
            )
            index = faiss.IndexFlatL2(len(embeddings.embed_query("hello world")))

            vector_store = FAISS(
                embedding_function=embeddings,
                index=index,
                docstore=InMemoryDocstore(),
                index_to_docstore_id={},
            )
            for batch in batched(split_docs, 64):
                vector_store.add_documents(batch)
            retriever = vector_store.as_retriever()

            # Step 4: QA Chain
            qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
                llm=self.llm,
                retriever=retriever,
                chain_type=self.chain_type,
                return_source_documents=True,
            )

            result = qa_chain.invoke({"question": question})
            answer = result.get("answer", "")
            sources = result.get("sources", "")

            # 调试信息：打印 result 的键
            print(f"调试：result 包含的键: {list(result.keys())}")
            print(f"调试：原始 sources: '{sources}'")

            # 多种方式提取 sources
            source_urls = set()

            # 方法1：从 source_documents 中提取
            if "source_documents" in result and result["source_documents"]:
                for doc in result["source_documents"]:
                    if hasattr(doc, "metadata") and "source" in doc.metadata:
                        source_urls.add(doc.metadata["source"])
                        print(f"调试：从 source_documents 提取到: {doc.metadata['source']}")

            # 方法2：如果 sources 字段不为空，解析它
            if sources and sources.strip():
                # sources 可能是逗号分隔的字符串
                parsed_sources = [s.strip() for s in sources.split(",") if s.strip()]
                source_urls.update(parsed_sources)
                print(f"调试：从 sources 字段解析到: {parsed_sources}")

            # 方法3：如果还是没有找到，使用原始 URL
            if not source_urls:
                source_urls.update(url_list)
                print(f"调试：使用原始 URL: {url_list}")

            final_sources = ", ".join(source_urls) if source_urls else "未找到来源"
            print(f"调试：最终 sources: '{final_sources}'")

            return f"🧠 回答：{answer}\n📎 来源：{final_sources}"

        except Exception as e:
            return f"❌ 出错：{str(e)}"

    async def _arun(self, urls: str | list[str], question: str) -> str:
        raise NotImplementedError("暂不支持异步运行")


tool = QueryWebpagesWithSourcesTool(
    llm=ChatOpenAI(
        model="Qwen/Qwen3-32B",
        api_key=settings.SILICONFLOW_API_KEY,
        base_url=settings.SILICONFLOW_API_BASE,
        temperature=0.8,
    ),
    chain_type="map_reduce",  # 改为 stuff 模式，对 sources 返回更稳定
)

output = tool.run(
    {
        "urls": ["https://en.wikipedia.org/wiki/OpenAI", "https://en.wikipedia.org/wiki/ChatGPT"],
        "question": "OpenAI 和 ChatGPT 有什么关系？",
    }
)

# rich.print(output)
# rich渲染成 markdown 打印
console = Console()
console.print(Markdown(output))
# """🔧 如果你想定制它
# 你可以替换其中任意组件：
# 替换 retriever（如换成 MultiVectorRetriever, SelfQueryRetriever）
# 自定义 prompt
# 使用 ChatPromptTemplate + StructuredOutputParser 控制结构化输出
# 替换 chain_type 逻辑为更强的 LangGraph 状态机
# """

In [None]:
# 测试修复后的工具
print("=== 测试修复后的工具 ===")
output = tool.run(
    {
        "urls": ["https://en.wikipedia.org/wiki/OpenAI", "https://en.wikipedia.org/wiki/ChatGPT"],
        "question": "OpenAI 和 ChatGPT 有什么关系？",
    }
)

rich.print(output)

# 如果还是没有 sources，尝试单个 URL 测试
print("\n=== 单个 URL 测试 ===")
output_single = tool.run({"urls": "https://en.wikipedia.org/wiki/OpenAI", "question": "OpenAI 是什么时候成立的？"})

rich.print(output_single)