In [8]:



import getpass
# import os
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict
import requests
import json
from bs4 import BeautifulSoup, SoupStrainer
from typing import TypedDict, List, Dict, Any
import hashlib



ModuleNotFoundError: No module named 'aiohttp._websocket.reader'; 'aiohttp._websocket' is not a package

In [6]:
# 配置API信息（替换为你的第三方平台信息）
API_BASE_URL = "https://api2.aigcbest.top/v1"  # 第三方API基础地址
API_KEY = "sk-5hmjTgNvEwpzxliLy8Ub6SBkjdt5GkotJUcr9Y8HoW8CQ7bX"  # 你的第三方平台API Key


In [7]:
# 1. 加载并分割文档（这部分无需依赖模型，保持不变）
class WebBaseLoader:
    def __init__(self, web_paths, bs_kwargs):
        self.web_paths = web_paths
        self.bs_kwargs = bs_kwargs

    def load(self):
        docs = []
        for url in self.web_paths:
            response = requests.get(url)
            strainer = self.bs_kwargs.get('parse_only')
            soup = BeautifulSoup(response.text, 'html.parser', parse_only=strainer)
            content = soup.get_text()
            docs.append({"page_content": content, "metadata": {"source": url}})
        return docs

# 文档分割器
class RecursiveCharacterTextSplitter:
    def __init__(self, chunk_size, chunk_overlap):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def split_documents(self, docs):
        chunks = []
        for doc in docs:
            content = doc["page_content"]
            start = 0
            while start < len(content):
                end = start + self.chunk_size
                chunk = content[start:end]
                chunks.append({
                    "page_content": chunk,
                    "metadata": doc["metadata"]
                })
                start = end - self.chunk_overlap
        return chunks

# 加载并分割文档
loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        parse_only=SoupStrainer(
            class_=("post-content", "post-title", "post-header")
        )
    ),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)

# 2. 用HTTP请求实现向量存储（替代vector_store）
class HTTPVectorStore:
    def __init__(self, api_base, api_key):
        self.api_base = api_base
        self.api_key = api_key
        self.embeddings = {}  # 本地存储向量（实际应用中可能需要数据库）

    def get_embedding(self, text: str) -> List[float]:
        """通过HTTP请求获取文本嵌入向量"""
        url = f"{self.api_base}/embeddings"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        data = {
            "model": "text-embedding-3-large",  # 嵌入模型
            "input": text,
            "encoding_format": "float"
        }
        response = requests.post(url, headers=headers, data=json.dumps(data))
        return response.json()["data"][0]["embedding"]

    def add_documents(self, documents):
        """将文档分割添加到向量存储"""
        for doc in documents:
            # 生成唯一ID（用于后续检索）
            doc_id = hashlib.md5(doc["page_content"].encode()).hexdigest()
            # 获取嵌入向量
            embedding = self.get_embedding(doc["page_content"])
            self.embeddings[doc_id] = {
                "embedding": embedding,
                "document": doc
            }

    def similarity_search(self, query: str, top_k: int = 3) -> List[Dict]:
        """通过HTTP请求实现相似性检索"""
        # 获取查询的嵌入向量
        query_embedding = self.get_embedding(query)
        
        # 简单的余弦相似度计算（实际应用可能在服务端完成）
        results = []
        for doc_id, item in self.embeddings.items():
            doc_embedding = item["embedding"]
            # 计算余弦相似度
            similarity = self._cosine_similarity(query_embedding, doc_embedding)
            results.append({
                "similarity": similarity,
                "document": item["document"]
            })
        
        # 按相似度排序并返回前top_k个结果
        results.sort(key=lambda x: x["similarity"], reverse=True)
        return [item["document"] for item in results[:top_k]]

    def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """计算两个向量的余弦相似度"""
        dot_product = sum(a * b for a, b in zip(vec1, vec2))
        norm1 = sum(a **2 for a in vec1)** 0.5
        norm2 = sum(b **2 for b in vec2)** 0.5
        return dot_product / (norm1 * norm2) if norm1 and norm2 else 0

# 初始化向量存储并添加文档
vector_store = HTTPVectorStore(API_BASE_URL, API_KEY)
vector_store.add_documents(all_splits)

# 3. 用HTTP请求实现大模型调用（替代llm.invoke）
def call_chat_model(messages: List[Dict[str, str]], model: str = "gpt-4o") -> str:
    """通过HTTP请求调用聊天模型"""
    url = f"{API_BASE_URL}/chat/completions"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    data = {
        "model": model,
        "messages": messages
    }
    response = requests.post(url, headers=headers, data=json.dumps(data))
    return response.json()["choices"][0]["message"]["content"]

# 4. 定义RAG流程（状态和步骤）
class State(TypedDict):
    question: str
    context: List[Dict]
    answer: str

def retrieve(state: State) -> Dict[str, List[Dict]]:
    """检索相关文档"""
    retrieved_docs = vector_store.similarity_search(state["question"])
    return {"context": retrieved_docs}

def generate(state: State) -> Dict[str, str]:
    """生成回答"""
    # 构建提示词（替代hub.pull的prompt）
    context = "\n\n".join(doc["page_content"] for doc in state["context"])
    prompt = f"""
    基于以下上下文回答问题，只使用提供的信息，不要编造内容。
    上下文: {context}
    问题: {state["question"]}
    回答:
    """
    
    # 调用聊天模型
    messages = [{"role": "user", "content": prompt}]
    answer = call_chat_model(messages)
    return {"answer": answer}

# 5. 执行RAG流程
def run_rag(question: str) -> str:
    state = {"question": question, "context": [], "answer": ""}
    state.update(retrieve(state))
    state.update(generate(state))
    return state["answer"]
# 测试
if __name__ == "__main__":
    question = "What is Task Decomposition?"
    print(run_rag(question))

Task Decomposition is a process used in planning within an LLM-powered autonomous agent system. It involves breaking down a complicated task, which usually consists of many steps, into smaller and simpler tasks. This approach helps the agent plan effectively by making large, complex tasks more manageable. One standard technique for task decomposition is Chain of Thought (CoT), which instructs the model to "think step by step" to enhance performance on complex tasks by using more computation during testing to decompose these tasks into smaller ones. Additionally, Tree of Thoughts extends CoT by exploring multiple reasoning paths at each step, forming a tree structure, and utilizing search strategies like BFS or DFS to evaluate each state's potential solutions. Task decomposition can be achieved through simple prompting, task-specific instructions, or human input.


1 用数据让rag根据数据进行回答
2 尝试构建本地模型库，让大模型根据数据选择模型并给出解释：
    库中预先存放工业故障诊断常用的机器学习 / 深度学习模型文件（如 LSTM、TCN、KNN、SVM、Transformer 等，涵盖 FD 1.0/1.5 阶段的经典模型及 FD 2.0 的 Time-GPT 等），每个模型对应独立的可执行代码文件（如.py脚本）和调用接口（如函数入口、参数说明）。
    配套数据库：存储模型的 “元信息”，包括：
    模型基本属性：适用场景（如 “长时预测”“非线性故障检测”“小数据集任务”）、输入数据要求（如时间序列维度、数据格式）、性能指标（如在历史数据上的 Precision/Recall/F1）；
    模型关联信息：调用路径、依赖库（如 Pandas、Scikit-learn、PyTorch）、更新时间（用于判断是否需维护）。