In [None]:
from langchain_openai import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.callbacks.stdout import StdOutCallbackHandler
from langchain.agents import AgentType, initialize_agent, load_tools, AgentExecutor
from langchain.tools import tool
from langchain.tools import StructuredTool
import os
import yaml
from typing import List
def createDir(path:str) -> bool:
    """在指定的path下创建一个目录。
    每次只能创建一个层级的新目录。
    返回值表示是否成功"""
    os.mkdir(path)
    return True

# ChatOpenAI、agent、链等，本身是线程安全的，但是如果附带记忆机制，或含跨线程callbacks，则不是线程安全的。
class AgentPool:
    def __init__(self, pool_size):
        tools = load_tools(["ddg-search"], llm=llm)
        tools.append(StructuredTool.from_function(
            createDir,
            name="创建目录",
            description="""在指定的path下创建一个目录。
            每次只能创建一个层级的新目录。返回值表示是否成功。执行该工具时需要等待一秒才能返回结果"""
        ))
        self.agents:List[AgentExecutor] = []
        self.loadConfig()
        for i in range(pool_size):
            llm = ChatOpenAI(
                model= self.llm_config['model'],
                openai_api_key= self.llm_config['api-key'],  # 替换为你的SiliconFlow API Key
                base_url= self.llm_config['url'],  # SiliconFlow的API地址
                streaming=True,  # 启用流式输出
                # callbacks=[FormattedStreamingCallback()],
                model_kwargs={"chunk_size":50}
            )
            agent = initialize_agent(
                tools,
                llm,
                agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION,  # 专为 Chat 模型优化的 Agent
                handle_parsing_errors=True,
                verbose=True
            )
            self.agents.append(agent)

    def loadConfig(self):
        config_path = os.path.join(os.path.curdir, "..", "configs", "aiservice.yaml")
        with open(config_path, encoding="UTF-8") as config_file:
            config = yaml.load(config_file, yaml.FullLoader)
        self.llm_config = config['llm']
    def run(self):
        agent = self.agents[0]
        response = agent.run("""
        在当前目录下逐步创建3个层级的嵌套目录，且每个目录的名字随机生成。
        """)
        print(response)

In [None]:
from langchain.prompts import PromptTemplate
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough, RunnableBranch
from langchain.chains import SequentialChain, ConversationChain, LLMChain
from langchain.memory import ConversationBufferMemory

template_1 = PromptTemplate(
    input_variables=["question"],
    template="解析问题：{question}"
)

template_2 = PromptTemplate(
    input_variables=["answer"],
    template="你是一个有多年教学经验的大学教师，根据该答案：{answer}，扩展更多知识点以形成系统化知识，不要重复原答案内容，使用颜文字修饰语言。"
)

class EventCallback(StdOutCallbackHandler):
    def on_chain_start(self, serialized, inputs, **kwargs):
        pass
    def on_chain_end(self, outputs, **kwargs):
        pass

memory = ConversationBufferMemory(
    memory_key="chat_history",  # 存储对话的变量名
    return_messages=True       # 设置为True时返回消息列表（适合ChatModels）
)

chain = {
    "question": RunnablePassthrough(),
    "answer": RunnablePassthrough()
} | RunnableBranch(
    (lambda x: "t" in x["question"], template_1),
    (lambda x: "t" not in x["question"], template_2),
    template_2
)
chain = chain.with_config(callbacks=[EventCallback()], memory=memory)


output = chain.invoke("es")
output = chain.invoke("efads")
output = chain.invoke("efsdads")

print(output)
# print(chain..load_memory_variables({}))

In [None]:
from langchain.prompts import PromptTemplate
from langchain.schema.runnable import RunnableSequence, RunnablePassthrough
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory



template_1 = PromptTemplate(
    input_variables=["question"],
    template="你是一个LCEL的熟练使用者，解析问题：{question}"
)

template_2 = PromptTemplate(
    input_variables=["ans"],
    template="你是一个有多年教学经验的大学教师，熟悉LCEL，根据该答案：{answer}，扩展更多知识点以形成系统化知识，不要重复原答案内容，使用颜文字修饰语言。"
)

#rm = RunnableWithMessageHistory(get_session_history=)

chain = RunnableSequence(
    first=template_1 | llm,
    last=template_2 | llm
)

# chain = (
#     {"question": RunnablePassthrough()}
#     | template_1
#     | llm  # 调用SiliconFlow模型
#     | template_2
#     | llm  # 可多次调用
# )

resp = chain.invoke({"question": 
"""
什么是OutputParser？什么是PydanticOutputParser？如何使用？
"""})

In [None]:
from langchain.vectorstores import Chroma
from langchain.vectorstores import FAISS

from langchain.document_loaders import TextLoader
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()

from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
model_name = "all-MiniLM-L6-v2"  # 轻量级模型，适用于CPU
embeddings = HuggingFaceEmbeddings(model_name=model_name)

# 创建嵌入模型
db = FAISS.from_documents(documents, embeddings)
# 进行相似性搜索
query = "What did the president say about Ketanji Brown Jackson"
docs = db.similarity_search(query)
print(docs[0].page_content)