## **Corrective RAG**

**修正型検索強化型生成**（ Corrective RAG または CRAG ）: \
検索された文書を使用する前に評価および修正を行うことで、生成された応答の精度を向上させる方法です。その仕組みは以下の通りです。

**正しい例：** 関連性のある文書の場合、不必要な部分を削除して精製し、生成に使用します。

**誤った例：** 関連性のない文書の場合、破棄し、ウェブ検索を使用するなどして追加情報を取得します。

**あいまいな例：** あいまいな場合、システムは取得した情報とウェブ検索した情報を組み合わせて、バランスのとれた応答を作成します。

Research Paper: [Corrective RAG](https://arxiv.org/pdf/2401.15884)

In [26]:
!pip install -qU langgraph


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [27]:
import os
from dotenv import load_dotenv
load_dotenv()

os.environ['TAVILY_API_KEY'] = os.getenv('TAVILY_API_KEY')

# VERTEXAI用の設定
import vertexai
import google.generativeai as genai

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.expanduser("~/.config/gcloud/application_default_credentials.json")
vertexai.init(project=os.getenv("gcp_project_id"), location="us-central1")

# load llm
from langchain_google_vertexai import ChatVertexAI
llm = ChatVertexAI(
    model_name="gemini-2.0-flash-exp",
    project=os.getenv("gcp_project_id"),
    location="us-central1",
    temperature=0
)

# # load data
# from langchain_community.document_loaders import PyPDFLoader
# loader = PyPDFLoader("../data/pdf/57_public_スタートアップ育成に向けた政府の取組_file_name=kaisetsushiryou_2024.pdf")
# documents = loader.load()

# # split documents
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
# documents = text_splitter.split_documents(documents)

# load embedding model
from langchain_huggingface import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(
    model_name="intfloat/multilingual-e5-base",
    encode_kwargs={"normalize_embeddings": True}
)

# huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using tokenizers before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
os.environ["TOKENIZERS_PARALLELISM"] = "true" # 警告対策　tokenizersライブラリの並列処理を明示的にON 

from langchain.vectorstores import Chroma
# vectorstore = Chroma.from_documents(documents, embeddings, persist_directory="../data/chroma_db_57")
vectorstore = Chroma(persist_directory="../data/chroma_db_57", embedding_function=embeddings) # すでに作ったものを利用

# create retirever
retriever = vectorstore.as_retriever()

## **ドキュメントグレーダー**
ドキュメントグレーダーは、ドキュメントが与えられたクエリに関連しているかどうかを評価します。

### Prompt for the grader
system = """あなたは、検索されたドキュメントがユーザーの質問に関連しているかどうかを評価する採点者です。
 厳密なテストである必要はありません。誤った検索結果を除外することが目的です。
 ドキュメントがユーザーの質問に関連するキーワードまたは意味的な意味を含んでいる場合、関連性があると評価します。
 ドキュメントが質問に関連しているかどうかを示すために、「はい」または「いいえ」の2値スコアを付けます。"""

In [28]:
# create grader for doc retriever 
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
# langchain_core.pydantic_v1.BaseModel は、LangChain の設定や構成要素を定義・検証するために利用される Pydantic の基盤機能を活かしたモデルクラスです。
# Field は、Pydantic モデル内でフィールド（属性）のメタ情報を定義するための関数です。

# defining a data class for the grader
class GradeDocuments(BaseModel):
    """Schema for grading retrieved documents for relevance.
    
    The field 'binary_score' is expected to be either "yes" or "no" indicating
    whether the document is relevant to the user's question.
    """
    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )

# LLM with function call
structured_llm_grader = llm.with_structured_output(GradeDocuments) # LLM （ChatOpenAI）の出力を GradeDocuments の形式に整形するように設定, 構造化出力

# Prompt for the grader
system = """You are a grader assessing relevance of a retrieved document to a user question. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader # 取得された文書とユーザーの質問を用い、LLM が「関連しているか否か」を評価して"yes"または"no"を返すようになっています。

In [29]:
# run grader
question = "図書館間貸借制度はどのように機能していますか？"
no_docs = retriever.get_relevant_documents(question)
print(retrieval_grader.invoke({"question": question, "document": no_docs}))

binary_score='no'


In [30]:
# run grader
question = "2024年度における日本のスタートアップによる雇用創出数は？"
yes_docs = retriever.get_relevant_documents(question)
print(retrieval_grader.invoke({"question": question, "document": yes_docs}))

binary_score='yes'


# RAG Chain

In [31]:
# create document chain
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import ChatPromptTemplate

template = """"
You are a helpful assistant that answers questions based on the following context.'
Use the provided context to answer the question.
Context: {context}
Question: {question}
Answer:

"""

prompt = ChatPromptTemplate.from_template(template)

def format_docs(yes_docs):
    return "\n\n".join(doc.page_content for doc in yes_docs)

rag_chain = prompt | llm | StrOutputParser()


In [32]:
# response
generation = rag_chain.invoke({"context": yes_docs, "question": question})
generation

'2024年度における日本のスタートアップによる雇用創出数に関する情報が記載されていません。'

In [33]:
# define web search
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)

# Create Graph

## Define Graph State

In [34]:
# define a data class for state
from typing import List
from typing_extensions import TypedDict
class GraphState(TypedDict):
    question: str
    generation: str
    web_search: str
    documents: List[str]

## Build Graph

In [35]:
# define graph steps
from langchain.schema import Document

# node function for retrieval
def retrieve(state):

    print("---RETRIEVE---")
    question = state["question"]

    # retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

# node function for generation
def generate(state):

    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

# node function for check_relevance
def grade_documents(state):

    print("---CHECK DOCUMENT RELEVSNCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = "yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}

#  node function for web search
def web_search(state):

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # web_search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join(d["content"] for d in docs)
    web_results = Document(page_content=web_results)
    documents.append(web_results)

    return {"documents": documents, "question": question}

# node function for decision
def decide_to_generate(state):

    print("---ASSESS GRADED DOCUMENTS---")
    web_search = state.get("web_search", "no").lower()

    if web_search == "yes":
        print("---DECISION: WEB SEARCH---")
        return "web_search"
    else:
        print("---DECISION: GENERATE---")
        return "generate"



In [36]:
# build graph
from langgraph.graph import START, StateGraph, END

# Graph
workflow = StateGraph(GraphState)

# define the nodes
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade_documents
workflow.add_node("generate", generate) # generate
workflow.add_node("web_search_node", web_search) # web_search_node

# build graph
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
         "web_search":  "web_search_node",
         "generate": "generate", 
    },
)
workflow.add_edge("web_search_node", "generate")
workflow.add_edge("generate", END)

# Compile
app = workflow.compile()

In [37]:
# example 1 where documents are relevant
from pprint import pprint

inputs = {"question": "図書館間貸借制度はどのように機能していますか？"}

for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}':")
    pprint("\n---\n")

pprint(value['generation'])

---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVSNCE TO QUESTION---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: WEB SEARCH---
"Node 'grade_documents':"
'\n---\n'
---WEB SEARCH---
"Node 'web_search_node':"
'\n---\n'
---GENERATE---
"Node 'generate':"
'\n---\n'
'図書館利用者の求めに応じて、図書館はその資料を所蔵する他館にその利用を申し込み、所蔵館は無料ないし少ない手数料でそれを貸し出すことで機能しています。\n'


In [38]:
# example 2 where documents are not relevant
from pprint import pprint

inputs = {"question": "日本のスタートアップによる雇用創出作用は？"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}':")
    pprint("\n---\n")

pprint(value["generation"])

---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVSNCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
"Node 'grade_documents':"
'\n---\n'
---GENERATE---


Retrying langchain_google_vertexai.chat_models._completion_with_retry.<locals>._completion_with_retry_inner in 4.0 seconds as it raised ResourceExhausted: 429 Quota exceeded for aiplatform.googleapis.com/generate_content_requests_per_minute_per_project_per_base_model with base model: gemini-experimental. Please submit a quota increase request. https://cloud.google.com/vertex-ai/docs/generative-ai/quotas-genai..


"Node 'generate':"
'\n---\n'
'スタートアップは、雇用創出にも大きな役割を果たすと述べられています。\n'


In [42]:
# 複数の質問を準備
input_questions = [
    {"question": "令和五年ウクライナ復興支援事業予算は"},
    {"question": "日本のスタートアップによる雇用創出作用は？一行で答えて"},
    {"question": "図書館間貸借制度はどのように機能していますか？一行で答えて"},
]

outputs = []
expected_responses = {
    "令和五年ウクライナ復興支援事業予算は": "2024年9月時点の資料では260億円となっています。",
    "日本のスタートアップによる雇用創出作用は？一行で答えて": "日本のスタートアップは、新たな産業と雇用を生み出し、経済の活性化に貢献しています。",
    "図書館間貸借制度はどのように機能していますか？一行で答えて": "図書館間貸借制度とは、図書館同士が協力し、利用者の求めに応じて互いの所蔵資料を貸し借りする仕組みです。",
} # あらかじめ https://gemini.google.com/app で gemini-2.0-flash にきいておいた答え

# 各質問に対して処理を実行
for inputs in input_questions:
    for output in app.stream(inputs):
        for key, value in output.items():
            if key == "generate":
                question = value["question"]
                documents = value["documents"]
                generation = value["generation"]

                context = "\n".join(doc.page_content for doc in documents)

                # Append the result
                outputs.append({
                    "query": question,
                    "context": context,
                    "response": generation,
                    "expected_response": expected_responses.get(question, "")  # 期待される回答がない場合は空文字を返す
                })

---RETRIEVE---
---CHECK DOCUMENT RELEVSNCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
---GENERATE---
---RETRIEVE---
---CHECK DOCUMENT RELEVSNCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
---GENERATE---
---RETRIEVE---
---CHECK DOCUMENT RELEVSNCE TO QUESTION---
---GRADE: DOCUMENT NOT RELEVANT---


Retrying langchain_google_vertexai.chat_models._completion_with_retry.<locals>._completion_with_retry_inner in 4.0 seconds as it raised ResourceExhausted: 429 Quota exceeded for aiplatform.googleapis.com/generate_content_requests_per_minute_per_project_per_base_model with base model: gemini-experimental. Please submit a quota increase request. https://cloud.google.com/vertex-ai/docs/generative-ai/quotas-genai..
Retrying langchain_google_vertexai.chat_models._completion_with_retry.<locals>._completion_with_retry_inner in 4.0 seconds as it raised ResourceExhausted: 429 Quota exceeded for aiplatform.googleapis.com/generate_content_requests_per_minute_per_project_per_base_model with base model: gemini-experimental. Please submit a quota increase request. https://cloud.google.com/vertex-ai/docs/generative-ai/quotas-genai..
Retrying langchain_google_vertexai.chat_models._completion_with_retry.<locals>._completion_with_retry_inner in 4.0 seconds as it raised ResourceExhausted: 429 Quota excee

---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: WEB SEARCH---
---WEB SEARCH---
---GENERATE---


In [45]:
# Convert to DataFrame
import pandas as pd
df = pd.DataFrame(outputs)
df

Unnamed: 0,query,context,response,expected_response
0,令和五年ウクライナ復興支援事業予算は,価制度の導入予定【R６年度より導入予定】、デジタルマーケットプレイ\nスの本格稼働【R６年度...,令和5年補正予算におけるウクライナ復興支援事業は260億円(経産)です。\n,2024年9月時点の資料では260億円となっています。
1,日本のスタートアップによる雇用創出作用は？一行で答えて,スタートアップとは\n① スタートアップとは、一般に、以下のような企業をいう。\n1. 新し...,スタートアップは、雇用創出にも大きな役割を果たすとされています。\n,日本のスタートアップは、新たな産業と雇用を生み出し、経済の活性化に貢献しています。
2,図書館間貸借制度はどのように機能していますか？一行で答えて,... かの図書館同士の相互貸借制度が整っています。 利用者の借りたい本を、他の図書館から最...,図書館間貸借制度は、利用者の求めに応じて、図書館が他の図書館に資料の利用を申し込み、所蔵館が...,図書館間貸借制度とは、図書館同士が協力し、利用者の求めに応じて互いの所蔵資料を貸し借りする仕...
