# 检索和重排

![retrieval and reranking](https://github.com/tivon-x/bRAG-langchain/blob/main/notebooks/image/retrieval_reranking.png?raw=1)

In [1]:
! pip3 install --quiet langchain_community tiktoken langchain-openai langchainhub chromadb langchain cohere langgraph python-dotenv

In [2]:
! pip install --upgrade --quiet  dashscope

In [3]:
! pip install --quiet langchain-chroma

## Environment

`(1) Packages`

In [None]:
import os
from dotenv import load_dotenv

# LangSmith
langsmith_tracing = os.getenv('LANGSMITH_TRACING')
langsmith_endpoint = os.getenv('LANGSMITH_ENDPOINT')
langsmith_api_key = os.getenv('LANGSMITH_API_KEY')

## LLM
dashscope_api_key = os.getenv('DASHSCOPE_API_KEY')

## Cohere API
cohere_api_key = os.getenv('COHERE_API_KEY')

In [4]:
# Colab环境
import os
from google.colab import userdata

langsmith_tracing = userdata.get('LANGSMITH_TRACING')
langsmith_endpoint = userdata.get('LANGSMITH_ENDPOINT')
langsmith_api_key = userdata.get('LANGSMITH_API_KEY')

dashscope_api_key = userdata.get("DASHSCOPE_API_KEY")

## Cohere API
cohere_api_key = userdata.get('COHERE_API_KEY')

`(2) LangSmith`

https://docs.smith.langchain.com/

In [5]:
os.environ['LANGSMITH_TRACING'] = langsmith_tracing
os.environ['LANGSMITH_ENDPOINT'] = langsmith_endpoint
os.environ['LANGSMITH_API_KEY'] = langsmith_api_key

`(3) API Keys`

In [6]:
os.environ['DASHSCOPE_API_KEY'] = dashscope_api_key
dashscope_model = "qwen-plus-latest"

os.environ['COHERE_API_KEY'] = cohere_api_key

In [7]:
# langchain的webbaseloader需要
os.environ["USER_AGENT"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"

## 重排 Re-ranking

之前的RAG-Fusion架构已经执行了重排操作：

![reranking](https://github.com/tivon-x/bRAG-langchain/blob/main/notebooks/image/reranking.png?raw=1)

In [8]:
#### INDEXING ####

# Load blog
import bs4
from langchain_community.document_loaders import WebBaseLoader
loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(
            class_=("post-content", "post-title", "post-header")
        )
    ),
)
blog_docs = loader.load()

# Split
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=300,
    chunk_overlap=50)

# Make splits
splits = text_splitter.split_documents(blog_docs)

# Index
from langchain_community.embeddings import DashScopeEmbeddings

embeddings = DashScopeEmbeddings(model="text-embedding-v4")

def batch_read(lst, batch_size=10):
    for i in range(0, len(lst), batch_size):
        yield lst[i:i + batch_size]

# from langchain_cohere import CohereEmbeddings
from langchain_chroma import Chroma

vectorstore = Chroma(embedding_function=embeddings)

for batch in batch_read(splits):
  vectorstore.add_documents(batch)

retriever = vectorstore.as_retriever()

In [None]:
from langchain.prompts import ChatPromptTemplate

# RAG-Fusion
template = """You are a helpful assistant that generates multiple search queries based on a single input query. \n
Generate multiple search queries related to: {question} \n
Output (4 queries):"""
prompt_rag_fusion = ChatPromptTemplate.from_template(template)

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_models.tongyi import ChatTongyi

llm = ChatTongyi(model=dashscope_model, temperature=0.1)


generate_queries = (
    prompt_rag_fusion
    | llm
    | StrOutputParser()
    | (lambda x: x.split("\n"))
)

In [None]:
from langchain.load import dumps, loads

def reciprocal_rank_fusion(results: list[list], k=60, n=7):
    """ Reciprocal_rank_fusion that takes multiple lists of ranked documents
        and an optional parameter k used in the RRF formula
        接受多个排名文档列表、RRF公式中使用的可选参数k、返回的文档数量
    """

    # 初始化字典以保存每个唯一文档的融合分数
    fused_scores = {}

    # 遍历每个排名文档列表
    for docs in results:
        # 遍历列表中的每个文档及其排名（列表中的位置）
        for rank, doc in enumerate(docs):
            # 将文档转换为字符串格式以用作key（假设文档可以序列化为JSON）
            doc_str = dumps(doc)
            # 如果文档尚未在fused_scores字典中，请将其初始分数添加为0
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            # 检索文档的当前分数（如果有的话）
            previous_score = fused_scores[doc_str]
            # 使用RRF公式更新文档的分数：1/（rank+k）
            fused_scores[doc_str] += 1 / (rank + k)

    # 根据fusion分数按降序对文档进行排序，以获得最终的重新排序结果
    reranked_results = [
        (loads(doc), score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

    # 将重新排序的结果作为元组列表返回，每个元组包含文档及其fusion分数
    return reranked_results[:n]

question = "What is task decomposition for LLM agents?"
retrieval_chain_rag_fusion = generate_queries | retriever.map() | reciprocal_rank_fusion
docs = retrieval_chain_rag_fusion.invoke({"question": question})
len(docs)

6

In [None]:
from operator import itemgetter
from langchain_core.runnables import RunnablePassthrough

# RAG
template = """Answer the following question based on this context:

{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

final_rag_chain = (
    {"context": retrieval_chain_rag_fusion,
     "question": itemgetter("question")}
    | prompt
    | llm
    | StrOutputParser()
)

final_rag_chain.invoke({"question":question})

'Task decomposition for LLM (Large Language Model) agents refers to the process of breaking down complex tasks into smaller, more manageable subgoals or steps. This enables efficient handling of intricate problems by transforming them into multiple simpler tasks. \n\nKey methods and insights about task decomposition from the context include:\n\n1. **Chain of Thought (CoT)**: A prompting technique where the model is instructed to "think step by step," decomposing a hard task into smaller steps to improve performance on complex tasks.\n\n2. **Tree of Thoughts (ToT)**: An extension of CoT that explores multiple reasoning possibilities at each step. It generates multiple thoughts per step, creating a tree structure, which can be explored using BFS (breadth-first search) or DFS (depth-first search).\n\n3. **Implementation Approaches**:\n   - Prompting the LLM directly with instructions like "Steps for XYZ" or "What are the subgoals for achieving XYZ?"\n   - Using task-specific instructions,

我们也可以使用 [Cohere Re-Rank](https://python.langchain.com/docs/integrations/retrievers/cohere-reranker#doing-reranking-with-coherererank). 进行重排操作

[cohere 相关博客](https://txt.cohere.com/rerank/):

![cohere-re-rank](https://github.com/tivon-x/bRAG-langchain/blob/main/notebooks/image/cohere-re-rank.png?raw=1)

In [None]:
from langchain_community.llms import Cohere
from langchain.retrievers import  ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank

In [None]:
# 返回前10个
retriever = vectorstore.as_retriever(search_kwargs={"k": 10})

# Re-rank
compressor = CohereRerank(model="rerank-english-v3.0")
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

compressed_docs = compression_retriever.invoke(question)
compressed_docs

[Document(metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'relevance_score': 0.998844}, page_content='Component One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.\nTree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) w

整合到RAG链中

In [None]:
# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

cohere_rag_chain = (
    {"context": compression_retriever | format_docs,
     "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

cohere_rag_chain.invoke(question)

'**Answer:**  \nTask decomposition for LLM (large language model) agents refers to the process of breaking down complex tasks into smaller, more manageable subgoals or steps. This enables the agent to handle complicated problems more effectively by focusing on solving each individual part sequentially. Techniques like Chain of Thought (CoT) and Tree of Thoughts (ToT) are used to facilitate task decomposition:\n\n- **Chain of Thought (CoT)** involves instructing the model to "think step-by-step," decomposing a task into a sequence of intermediate reasoning steps.\n- **Tree of Thoughts (ToT)** extends CoT by exploring multiple possible reasoning paths at each step, organizing them in a tree structure, and using search strategies like BFS or DFS to find the optimal solution.\n\nTask decomposition can be performed:\n1. By the LLM itself through simple prompting (e.g., “Steps for XYZ.”),\n2. Through task-specific instructions (e.g., “Write a story outline.”),\n3. With assistance from human 

## Corrective RAG (CRAG)

CorrectiveRAG（CRAG）是RAG的一种策略，包括对检索到的文档进行自我反思/自我评分。

在[论文](https://arxiv.org/pdf/2401.15884.pdf)中，采取了几个步骤：

- 如果至少有一个文档超过了相关性阈值，则继续生成
- 在生成之前，它执行知识精炼
- 这将文档划分为“knowledge strips”
- 它对每个 knowledge strip 进行分级，并过滤掉不相关的条带
- 如果所有文档都低于相关性阈值，或者评分者不确定，那么框架会寻找额外的数据源
- 它将使用网络搜索来补充检索

动机：
- self-reflection可以增强RAG
- Self-reflection的想法是：基于检索得到的文档与问题的相关性、生成内容相对于问题的质量或者生成内容相对于检索得到的文档的质量，进行一些推理、反馈和重试各种步骤。

我们将使用LangGraph从头开始实现其中的一些想法：

- 让我们跳过知识精炼阶段作为第一步。如果需要，可以将其作为节点添加回来。
- 如果有任何文档是不相关的，让我们选择用网络搜索来补充检索。
- 我们将使用Tavily Search进行网络搜索。
- 让我们使用查询重写来优化网络搜索的查询。

![CRAG](https://github.com/tivon-x/bRAG-langchain/blob/main/notebooks/image/crag.png?raw=1)

`视频教程`

https://www.youtube.com/watch?v=E2shqsYwxck

`Notebooks`

https://github.com/langchain-ai/langgraph/blob/main/examples/rag/langgraph_crag.ipynb

https://github.com/langchain-ai/langgraph/blob/main/examples/rag/langgraph_crag_local.ipynb




我们先创建文档评分的链，用于对检索到的文档进行评分。

In [None]:
from pydantic import BaseModel, Field

# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )

structured_llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n
    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"
docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

binary_score='yes'


这里定义一个rag的链

In [None]:
### Generate

from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = hub.pull("rlm/rag-prompt")


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

In the context of LLM-powered autonomous agents, memory is categorized into short-term and long-term memory. Short-term memory involves in-context learning, constrained by the model's context window, while long-term memory utilizes external vector stores for retaining and retrieving vast information over time. These components enable agents to effectively manage knowledge and improve decision-making through reflection and refinement.


接下来是问题改写的链，用于改写原始问题，以便更好地进行网络搜索。

In [None]:
# Prompt
system = """You a question re-writer that converts an input question to a better version that is optimized \n
     for web search. Look at the input and try to reason about the underlying semantic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

'Sure! Here\'s an improved version of the question optimized for web search:\n\n**"What is agent memory in artificial intelligence, and how does it function in AI systems?"**\n\nThis version clarifies the topic and specifies the intent, making it easier to find relevant and detailed information.'

In [None]:
%pip install -U langchain-tavily

In [None]:
os.environ["TAVILY_API_KEY"] = userdata.get("TAVILY_API_KEY")

In [None]:
from langchain_tavily import TavilySearch

tool = TavilySearch(max_results=3)
result = tool.invoke({"query": "What's a 'node' in LangGraph?"})

In [None]:
result["results"][0]

{'title': 'What is LangGraph? - GeeksforGeeks',
 'url': 'https://www.geeksforgeeks.org/machine-learning/what-is-langgraph/',
 'content': 'LangGraph is a Python library that helps you build applications like chatbots or AI agents by organizing their logic step-by-step using state machine model. This step configures your Gemini API key and then we create a simple function `ask_gemini` that takes user input, sends it to the Gemini model and returns the AI-generated response. Creates a state structure with three fields: `question`, `classification` and `response` which flows through the LangGraph. import matplotlib.pyplot as plt from langgraph.graph import StateGraph\u200bbuilder = StateGraph(GraphState)builder.add_node("classify", classify)builder.add_node("respond", respond)builder.set_entry_point("classify")builder.add_edge("classify", "respond")builder.set_finish_point("respond")app = builder.compile()\u200bdef visualize_workflow(builder): G = nx.DiGraph()\u200b for node in builder.nod

In [None]:
from typing_extensions import TypedDict
from langchain.schema import Document


class GraphState(TypedDict):
    """
    图的状态，用于存储问题、生成的答案、是否需要网络搜索以及相关文档列表。

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: str
    documents: list[Document]

In [None]:
def retrieve(state):
    """
    检索相关文档

    Args:
        state (dict): 当前图的状态

    Returns:
        state (dict): 更新状态，更新documents和question键，包含检索到的文档和问题
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.get_relevant_documents(question)
    return {"documents": documents, "question": question}


def generate(state):
    """
    生成答案

    Args:
        state (dict): 当前图的状态

    Returns:
        state (dict): 更新状态，包含生成的答案和相关文档
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    判断检索到的文档是否与问题相关。

    Args:
        state (dict): 当前图的状态

    Returns:
        state (dict): 更新状态，包含过滤后的文档列表、问题和是否需要网络搜索的标志
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


def transform_query(state):
    """
    改写问题以优化网络搜索。

    Args:
        state (dict): 当前图的状态

    Returns:
        state (dict): 更新状态，包含改写后的问题和相关文档列表
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    print(better_question)
    return {"documents": documents, "question": better_question}


def web_search(state):
    """
    基于改写后的问题进行网络搜索，并将结果添加到文档列表中。

    Args:
        state (dict): 当前图的状态

    Returns:
        state (dict): 更新状态，包含网络搜索结果和相关文档列表
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # Web search
    tool_result = tool.invoke({"query": question})
    print(tool_result)
    web_results = "\n".join([d["content"] for d in tool_result["results"]])
    web_results = Document(page_content=web_results)
    documents.append(web_results)

    return {"documents": documents, "question": question}


### Edges


def decide_to_generate(state):
    """
    判断是否生成答案或重新生成问题。

    Args:
        state (dict): 当前图的状态，包含问题、网络搜索标志和相关文档列表

    Returns:
        str: 决定是生成答案还是改写问题的操作名称
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    web_search = state["web_search"]
    state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

In [None]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# 定义工作流节点
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate
workflow.add_node("transform_query", transform_query)  # transform_query
workflow.add_node("web_search_node", web_search)  # web search

# 构建工作流
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "web_search_node")
workflow.add_edge("web_search_node", "generate")
workflow.add_edge("generate", END)

# Compile
app = workflow.compile()

In [None]:
from pprint import pprint

# 询问与文档相关的问题
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
"Node 'grade_documents':"
'\n---\n'
---GENERATE---
"Node 'generate':"
'\n---\n'
('The types of agent memory include Sensory Memory, Short-Term Memory (STM) or '
 'Working Memory, and Long-Term Memory (LTM). Sensory memory retains '
 'impressions of sensory information briefly, STM holds a limited amount of '
 'information actively for a short period, and LTM stores vast amounts of '
 'information for long durations. LTM is further divided into '
 'Explicit/Declarative (episodic and semantic) and Implicit/Procedural memory.')


In [None]:

# 询问与文档不相关的问题
inputs = {"question": "How does the AlphaCodium paper work?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---
"Node 'grade_documents':"
'\n---\n'
---TRANSFORM QUERY---
"Node 'transform_query':"
'\n---\n'
---WEB SEARCH---
{'query': 'Improved question:  \n**What is the methodology and key contribution of the AlphaCodium paper?**\n\nAlternate version for broader search results:  \n**How does the AlphaCodium approach function, and what are its main innovations?**', 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [{'title': 'Code Generation wit

# Generation


## Self-RAG
自我反思式检索增强生成（Self-RAG）是一种在检索到的文档和生成内容中加入自我反思/自我评估的 RAG 策略。

在[论文](https://arxiv.org/abs/2310.11511)中，做出了以下几点决策：

1. 是否应从检索器`R`中检索内容
  - 输入为问题`x`，或问题`x`和生成内容`y`。
  - 判断是否从`R`中检索分块`D`（可能有多个）
  - 输出为“是”、“否”或“继续”。

2. 检索到的分块`D`是否与问题`x`相关。

  - 输入为问题`x`和分块`d`，
  - 判断分块是否有助于解决问题`x`
  - 输出为“相关”或“不相关”。
3. 来自分块`D`的 LLM 生成内容是否与段落相关（如是否存在幻觉等）。
  - 输入为问题`x`、分块`D`和生成内容`y`，
  - 判断生成的内容是否得到分块`D`的支持
  - 输出为“完全支持”、“部分支持”或“无支持”。
4. 来自分块`D`的 LLM 生成内容是否对问题`x`有用。
  - 输入为问题`x`和生成内容`y`，
  - 判断`y`对于`x`是否有用
  - 输出为 1 到 5 分

![self-rag](./image/self-rag.png)


`Notebooks`

https://github.com/langchain-ai/langgraph/blob/main/examples/rag/langgraph_self_rag.ipynb

#### 检索器
索引三个文档

In [9]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import DashScopeEmbeddings

embeddings = DashScopeEmbeddings(model="text-embedding-v4")

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

def batch_read(lst, batch_size=10):
    for i in range(0, len(lst), batch_size):
        yield lst[i:i + batch_size]

vectorstore = Chroma(embedding_function=embeddings)
for batch in batch_read(doc_splits):
  vectorstore.add_documents(batch)

retriever = vectorstore.as_retriever()

In [10]:
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from langchain_community.chat_models.tongyi import ChatTongyi

llm = ChatTongyi(model=dashscope_model, temperature=0)

# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )

structured_llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

binary_score='yes'


In [11]:
### Generate

from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = hub.pull("rlm/rag-prompt")

# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

In LLM-powered autonomous agents, memory is divided into short-term and long-term components. Short-term memory involves in-context learning, constrained by the model's context window, while long-term memory uses external vector stores for retaining and retrieving large amounts of information over time. These memory systems enable agents to handle complex tasks by leveraging both immediate and historical data efficiently.


In [12]:
### Hallucination Grader


# Data model
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""

    binary_score: str = Field(
        description="Answer is grounded in the facts, 'yes' or 'no'"
    )


# LLM with function call
structured_llm_grader = llm.with_structured_output(GradeHallucinations)

# Prompt
system = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
    ]
)

hallucination_grader = hallucination_prompt | structured_llm_grader
hallucination_grader.invoke({"documents": docs, "generation": generation})

GradeHallucinations(binary_score='yes')

In [13]:
### Answer Grader


# Data model
class GradeAnswer(BaseModel):
    """Binary score to assess answer addresses question."""

    binary_score: str = Field(
        description="Answer addresses the question, 'yes' or 'no'"
    )


# LLM with function call
structured_llm_grader = llm.with_structured_output(GradeAnswer)

# Prompt
system = """You are a grader assessing whether an answer addresses / resolves a question \n
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {generation}"),
    ]
)

answer_grader = answer_prompt | structured_llm_grader
answer_grader.invoke({"question": question, "generation": generation})

GradeAnswer(binary_score='yes')

In [14]:
### Question Re-writer

# Prompt
system = """You a question re-writer that converts an input question to a better version that is optimized \n
     for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

'What are the key components and functionalities of agent memory in AI systems, and how do they contribute to intelligent behavior?'

#### Graph

In [15]:
from typing import List

from typing_extensions import TypedDict
from langchain.schema import Document



class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: List[Document]

In [16]:
### Nodes


def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.get_relevant_documents(question)
    return {"documents": documents, "question": question}


def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}


def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}

In [17]:
### Edges


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

In [18]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate
workflow.add_node("transform_query", transform_query)  # transform_query

# Build graph
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)

# Compile
app = workflow.compile()

In [19]:
from pprint import pprint

# Run
inputs = {"question": "Explain how the different types of agent memory work?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

---RETRIEVE---


  documents = retriever.get_relevant_documents(question)


"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
"Node 'grade_documents':"
'\n---\n'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
"Node 'generate':"
'\n---\n'
('In LLM-powered autonomous agents, memory is divided into short-term and '
 'long-term types. Short-term memory relies on in-context learning, where the '
 'model retains information temporarily within its context window to perform '
 'tasks. Long-term memory involves external storage, such as vector databases, '
 'allowing the agent to store and efficiently retrieve vast amounts of '
 'information over extended periods using methods like approximate nearest '
 'neighbors search.')


In [20]:
from pprint import pprint

# Run
inputs = {"question": "Explain how the different types of agent memory work?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
"Node 'grade_documents':"
'\n---\n'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
"Node 'generate':"
'\n---\n'
('In LLM-powered autonomous agents, memory is categorized into **short-term** '
 'and **long-term** types. Short-term memory corresponds to in-context '
 'learning, where the agent uses the limited context window of the model to '
 'retain and process recent information. Long-term memory allows the agent to '
 'store and retrieve vast amounts of information over extended periods, '
 'typically using external vector stores with fast retrieval methods like '
 'approximate nearest neighbors (

## 18 - Impact of long context  

`Deep dive`

https://www.youtube.com/watch?v=SsHUNfhF32s

`Slides`

https://docs.google.com/presentation/d/1mJUiPBdtf58NfuSEQ7pVSEQ2Oqmek7F1i4gBwR6JDss/edit#slide=id.g26c0cb8dc66_0_0