In [1]:
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = 'college-information-llm'


-----------------retriever-------------------------------

In [2]:
from knowledgebase import TXTKnowledgeBase
SEARCH_DOCS_NUM=4

kb=TXTKnowledgeBase(txt_source_folder_path='lxbd')
vector=kb.return_retriever_from_persistant_vector_db()

retriever = vector.as_retriever(search_kwargs={'k':SEARCH_DOCS_NUM})

------------------------LLMs-----------------------------

In [3]:
### Router

from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI


# Data model
class RouteQuery(BaseModel):
    """基于用户的查询词条选择最相关的资料来源"""

    datasource: Literal["vectorstore", "web_search","database"] = Field(
        ...,
        description="基于用户的问题选择web_search或者vectrostore或者database.",
    )


# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm_router = llm.with_structured_output(RouteQuery)

# Prompt
system = """你是一位选择路径的专家，你需要基于用户的提问选择是使用web_search, vectorstore还是database.
vectorstore包含了关于总体的在美国留学相关的资料，比如美国大学排名，美国留学申请，美国转学等等.
database包含了特定某一所大学的相关资料，比如这所大学的排名、录取率、学费、生活费、专业设置、犯罪率等等.
如果用户的问题是美国留学相关但是不针对某一所大学的问题，请选择vectorstore，如果是针对某一所美国大学的问题，请选择database，尽量少选择web_search."""
route_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

question_router = route_prompt | structured_llm_router

print(
    question_router.invoke({"question": "怎么钓鱼？"})
)
print(
    question_router.invoke({"question": "美国大学排名？"})
)
print(
    question_router.invoke({"question": "哈佛的学费是多少？"})
)

datasource='web_search'
datasource='vectorstore'
datasource='database'


-----------------------Retriever Grader-------------------------------

In [4]:
### Retrieval Grader


# Data model
class GradeDocuments(BaseModel):
    """检查vectorstore取回的资料相关性，是与否回答"""

    binary_score: str = Field(
        description="检查资料与问题是否相关，回答只能是：'是'、'否'"
    )


# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """你是一位评判取回资料与用户问题是否相关的评判员. \n 
    如果取回的资料包含用户问题的关键词，或者取回资料与用户问题有相关性，评判为'是'，否则评判为'否'. \n
    评判标准不需要太严格，主要是为了排除错误的取回资料. \n
    最后的回答只能是：'是'与'否'，'是'对应相关，'否'对应不相关."""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "取回资料如下: \n\n {document} \n\n 用户问题如下: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
"""
question = "美国大学排名"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))
"""

'\nquestion = "美国大学排名"\ndocs = retriever.invoke(question)\ndoc_txt = docs[1].page_content\nprint(retrieval_grader.invoke({"question": question, "document": doc_txt}))\n'

In [5]:
### Generate

from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = ChatPromptTemplate.from_messages([
    ('system',"你是一位回答问题的助手，你只能使用下面取回的资料回答问题，如果你不知道问题的答案，请回答'不知道'，取回的资料如下\n\n{context}\n\n"),
    ('human',"{question}")
])

# LLM
llm = ChatOpenAI(model_name="gpt-3.5-turbo-0125", temperature=0)


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

# Test Run
"""
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)
"""

'\ngeneration = rag_chain.invoke({"context": docs, "question": question})\nprint(generation)\n'

In [6]:
### Hallucination Grader


# Data model
class GradeHallucinations(BaseModel):
    """评判生成的答案是否为捏造的."""

    binary_score: str = Field(
        description="评判回答是否真实，回答只能为'是'与'否'"
    )


# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeHallucinations)

# Prompt
system = """你作为一名评判员需要评判LLM生成的答案是否真实基于取回的资料. \n 
     回答只能为'是'与'否'. '是'表示回答是基于取回资料的真实回答，这里不需要太严格，与取回资料是相关的真实回答就可以."""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "取回资料如下: \n\n {documents} \n\n LLM生成答案如下: {generation}"),
    ]
)

hallucination_grader = hallucination_prompt | structured_llm_grader

"""
hallucination_grader.invoke({"documents": docs, "generation": generation})
"""

'\nhallucination_grader.invoke({"documents": docs, "generation": generation})\n'

In [7]:
### Answer Grader


# Data model
class GradeAnswer(BaseModel):
    """评判回答是否解决用户的问题."""

    binary_score: str = Field(
        description="评判回答是否能解决用户的问题，回答只能为'是'与'否'"
    )


# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeAnswer)

# Prompt
system = """你是一名评判员，你需要评判LLM回答是否能解决或者回答用户的问题\n 
     回答只能为'是'与'否'，'是'表示LLM回答能解决或者回答用户的问题."""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "用户问题如下: \n\n {question} \n\n LLM回答如下: {generation}"),
    ]
)

answer_grader = answer_prompt | structured_llm_grader

"""
answer_grader.invoke({"question": question, "generation": generation})
"""

'\nanswer_grader.invoke({"question": question, "generation": generation})\n'

In [8]:
### Question Re-writer

# LLM
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)

# Prompt
system = """你是一名重新提问的提问员，你需要基于用户的提问，并且更好的从vectorstore取回资料来优化问题. 
你需要审视用户的提问以及抓住问题隐含的内容和意图."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "用户问题如下: \n\n {question} \n 提出一个优化的问题.",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()

"""
question_rewriter.invoke({"question": question})
"""

'\nquestion_rewriter.invoke({"question": question})\n'

In [9]:
### Search

from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

In [10]:
#####Graph state
from typing_extensions import TypedDict
from typing import List


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: List[str]

----------------Graph Flow----------------------------

In [36]:
from langchain.schema import Document


def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}


def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "是":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}


def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}


def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("---WEB SEARCH---")
    question = state["question"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)

    return {"documents": web_results, "question": question}


### Edges ###


def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    source = question_router.invoke({"question": question})
    if source.datasource == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "web_search"
    elif source.datasource == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"
    elif source.datasource=="database":
        print("---ROUTE QUESTION TO DATABASE")
        return "database"


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score

    # Check hallucination
    if grade == "是":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "是":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

def database(state):
    """
    Retrieve database infos

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print('database func called')
    return state

------------------build graph---------------------------------

In [37]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("web_search", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("database",database_fuc)
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("transform_query", transform_query)  # transform_query

# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "web_search": "web_search",
        "vectorstore": "retrieve",
        "database":"database"
    },
)
workflow.add_edge("web_search", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)
workflow.add_edge("database",END)
# Compile
app = workflow.compile()

In [41]:
from pprint import pprint

# Run
inputs = {
    "question": "美国大学排名"
}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("-------------------------------")

# Final generation
pprint(value["generation"])

---ROUTE QUESTION---
---ROUTE QUESTION TO RAG---
---RETRIEVE---
"Node 'retrieve':"
'-------------------------------'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
"Node 'grade_documents':"
'-------------------------------'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
"Node 'generate':"
'-------------------------------'
('根据提供的资料，美国大学排名可以从不同的角度和机构进行评定。一些影响力较大的排名机构包括《美国新闻和世界报道》（U.S. News and World '
 'Report）、《福布斯》（Forbes）、《商业周刊》（Bloomberg Businessweek）、《华尔街日报》（Wall Street '
 'Journal）等。这些机构根据不同的指标和权重对美国大学进行排名，如学术声誉、教师资源、学生生源质量等。\n'
 '\n'
 '具体的排名结果可能会有所不同，因为每个机构所侧重的指标和权重不同。如果您对特定排名的具体结果感兴趣，可以查看相应的排名榜单。')


In [45]:
inputs = {
    "question": "哈佛大学排名"
}
respose=app.invoke(inputs)

---ROUTE QUESTION---
---ROUTE QUESTION TO DATABASE
database func called


In [44]:
respose['generation']

'美国大学排名是根据不同的排名机构和标准而定的。一些影响力较大的排名机构包括《美国新闻和世界报道》（U.S. News and World Report）、《福布斯》（Forbes）、《商业周刊》（Bloomberg Businessweek）、《华尔街日报》（Wall Street Journal）等。它们每年都会发布不同的排名榜单，涵盖综合性排名、研究生院排名、商学院排名等。\n\n具体到2023年的排名数据，根据《美国新闻和世界报道》（U.S. News and World Report）的排名，TOP10的美国大学包括：\n1. 普林斯顿大学\n2. 麻省理工学院\n3. 哈佛大学\n4. 斯坦福大学\n5. 耶鲁大学\n6. 芝加哥大学\n7. 约翰霍普金斯大学\n8. 宾夕法尼亚大学\n9. 加州理工学院\n10. 杜克大学 和 西北大学（并列）\n\n另外，《福布斯》（Forbes）也有自己的美国最佳大学排名，2022年的排名前几名包括：\n1. 麻省理工学院\n2. 斯坦福大学\n3. 加州大学伯克利分校\n4. 普林斯顿大学\n5. 哥伦比亚大学\n6. 加州大学洛杉矶分校\n7. 威廉姆斯学院\n8. 耶鲁大学\n9. 杜克大学\n10. 宾夕法尼亚大学\n\n这些排名都是根据不同的指标和权重来评定的，学生在选择美国大学时可以参考这些排名数据，但也要根据自己的需求和兴趣做出选择。'