In [1]:
%%capture --no-stderr
%pip install langchain_community langchainhub chromadb langchain langgraph tavily-python langchain-text-splitters langchain_openai

In [2]:
# tavily example
from tavily import TavilyClient
tavily = TavilyClient(api_key='')

response = tavily.search(query="Where does Messi play right now?", max_results=3)
context = [{"url": obj["url"], "content": obj["content"]} for obj in response['results']]

# You can easily get search result context based on any max tokens straight into your RAG.
# The response is a string of the context within the max_token limit.

response_context = tavily.get_search_context(query="Where does Messi play right now?", search_depth="advanced", max_tokens=500)

# You can also get a simple answer to a question including relevant sources all with a simple function call:
# You can use it for baseline
response_qna = tavily.qna_search(query="Where does Messi play right now?")

In [3]:
from langchain_openai import ChatOpenAI
import os
os.environ['OPENAI_API_KEY'] = ''

llm = ChatOpenAI(model="gpt-4o-mini", temperature = 0)

In [4]:
### Index
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# Add to vectorDB
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding = OpenAIEmbeddings(model="text-embedding-3-small")
)
retriever = vectorstore.as_retriever()



In [5]:
from pprint import pprint
from typing import List

from langchain_core.documents import Document
from typing_extensions import TypedDict

from langgraph.graph import END, StateGraph
from langchain_core.output_parsers import StrOutputParser
# State
class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]
    is_generated : bool
    is_relevance_checked : bool

# Nodes

## Docs Retrieval
def DocsRetrieval(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    print(question)
    print(documents)
    return {"documents": documents, "question": question} # 4개 docs가 반환됨

## Relevance Checker
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate

def RelevanceChecker(state) :
  """
  Determines whether the retrieved documents are relevant to the question
  If any document is not relevant, we will set a flag to run web search

  Args:
      state (dict): The current graph state

  Returns:
      state (dict): Filtered out irrelevant documents and updated web_search state
  """

  print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
  question = state["question"]
  documents = state["documents"]
  system = """You are a grader assessing relevance
      of a retrieved document to a user question. If the document contains keywords related to the user question,
      grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
      Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
      Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
      """

  prompt = ChatPromptTemplate.from_messages(
      [
          ("system", system),
          ("human", "question: {question}\n\n document: {document} "),
      ]
  )

  retrieval_grader = prompt | llm | JsonOutputParser()
  if state["is_relevance_checked"] :
    print("RelevanceChecker : 2번째 relevance_check 진행")
    # web Search 결과도 이상할 때 종료하는 부분
    filtered_docs = []
    for d in documents:
      score = retrieval_grader.invoke(
          {"question": question, "document": d.page_content}
      )
      grade = score["score"]
      # Document relevant
      if grade.lower() == "yes":
        print("RelevanceChecker : ===관련있===")
        filtered_docs.append(d)
      # Document not relevant
      else:
        print("RelevanceChecker : ===관련없===")
        # We do not include the document in filtered_docs
        # We set a flag to indicate that we want to run web search
        continue
    if len(filtered_docs) > 0 :
      return {"documents": filtered_docs, "question": question, "web_search": "no", "is_relevance_checked" : True}
    else :
      print("RelevanceChecker : shit")
      return {"documents": filtered_docs, "question": question, "web_search": "done", "is_relevance_checked" : True}
  else :
    print("RelevanceChecker : 1번째 relevance_check 진행")
    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
      score = retrieval_grader.invoke(
          {"question": question, "document": d.page_content}
      )
      grade = score["score"]
      # Document relevant
      if grade.lower() == "yes":
        print("RelevanceChecker : ===관련있===")
        filtered_docs.append(d)
      # Document not relevant
      else:
        print("RelevanceChecker : ===관련없===")
        # We do not include the document in filtered_docs
        # We set a flag to indicate that we want to run web search
        web_search = "Yes"
        continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search, "is_relevance_checked" : True}

## Generate Answer

def generate(state):
  """
  Generate answer using RAG on retrieved documents

  Args:
      state (dict): The current graph state

  Returns:
      state (dict): New key added to state, generation, that contains LLM generation
  """
  system = """You are an assistant for question-answering tasks.
      Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
      Use three sentences maximum and keep the answer concise"""

  prompt = ChatPromptTemplate.from_messages(
      [
          ("system", system),
          ("human", "question: {question}\n\n context: {context} "),
      ]
  )

  # Chain
  rag_chain = prompt | llm | StrOutputParser()

  print("generate : generate 시작")
  question = state["question"]
  documents = state["documents"]

  # RAG generation
  generation = rag_chain.invoke({"context": documents, "question": question})

  is_relevance_checked = state["is_relevance_checked"]
  return {"documents": documents, "question": question, "generation": generation,
          "is_relevance_checked" : is_relevance_checked,
          "is_generated" : True
          }


## Search Tavily
def web_search(state):
  """
  Web search based based on the question

  Args:
      state (dict): The current graph state

  Returns:
      state (dict): Appended web results to documents
  """

  print("web_search : web_search 시작")
  print(state)
  question = state["question"]
  documents = None
  if "documents" in state:
    documents = state["documents"]

  # Web search
  docs = tavily.search(query=question)['results']

  web_results = "\n".join([d["content"] for d in docs])
  web_results = Document(page_content=web_results)
  if documents is not None:
      documents.append(web_results)
  else:
      documents = [web_results]
  return {"documents": documents, "question": question,
          "is_relevance_checked" : state["is_relevance_checked"]
          }

In [6]:
# edge node
def generate_or_web_search(state):
  """
  Route question to web search or generate.

  Args:
      state (dict): The current graph state

  Returns:
      str: Next node to call
  """
  if state["web_search"] == 'done' : return 'done'
  print("generate_or_web_search : web? or generate?")
  print(state)
  question = state["question"]
  print(question)

  # source = question_router.invoke({"question": question})
  # print(source)

  if state["web_search"].lower() == "no":
      print("generate 하기로 결정")
      return "generate"
  elif state["web_search"].lower() == "yes":
      print("web_search 하기로 결정")
      return "websearch"
  else :
    print("shit")
    return "done"

def re_generate_or_answer(state) :
  ### Hallucination Grader

  system = """You are a grader assessing whether
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a
    single key 'score' and no preamble or explanation."""

  docs = state["documents"]
  generation = state["generation"]

  prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "documents: {documents}\n\n answer: {generation} "),
    ]
  )

  hallucination_grader = prompt | llm | JsonOutputParser()
  rst = hallucination_grader.invoke({"documents": docs, "generation": generation})

  if rst["score"].lower() =='yes' :
    return 'answer'
  elif rst['score'].lower() == 'no' :
    if state['is_generated'] :
      # 할루시네이션 2번 실패했을 때 종료 처리 채우기
      return "Done"
    return 'regenerate'

In [7]:
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", DocsRetrieval)  # retrieve
workflow.add_node("relevance_check", RelevanceChecker)  # RelevanceChecker
workflow.add_node("generate", generate)  # generatae
workflow.add_node("websearch", web_search)  # web search

# make graph
workflow.set_entry_point("retrieve") # 1
workflow.add_edge("retrieve", "relevance_check") # 2
# 3
## 3-1 : relevance 통과 후 generate 보내는 부분
workflow.add_conditional_edges(
    "relevance_check",
    generate_or_web_search,
    {
        "websearch": "websearch",
        "generate": "generate",
        "done" : END
    },
)
## 3-2 : web_search 후 relevance check 다시 하는 부분
workflow.add_edge("websearch", "relevance_check") # 2

# 4
## 분기 노드
workflow.add_conditional_edges(
    "generate",
    re_generate_or_answer,
    {
        "regenerate": "generate",
        "answer": END, # 답변 출력
        "Done" : END # failed
    },
)

<langgraph.graph.state.StateGraph at 0x79976301c7d0>

In [8]:
state = {"question": "What is prompt?",
         "is_regenerate" : False,
         "is_relevance_checked" : False,
         }


# Compile
app = workflow.compile()

# Test

inputs = {"question": "축구선수 메시는 현재 무슨 팀에 있어?",
         "is_regenerate" : False,
         "is_relevance_checked" : False,
         }
for output in app.stream(inputs):
  for key, value in output.items():
      pprint(f"Finished running: {key}:")
try :
  pprint(value["generation"])
except :
  pprint("failed")

---RETRIEVE---
축구선수 메시는 현재 무슨 팀에 있어?
[Document(metadata={'title': "LLM Powered Autonomous Agents | Lil'Log", 'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'language': 'en', 'description': 'Building agents with LLM (large language model) as its core controller is a cool concept. Several proof-of-concepts demos, such as AutoGPT, GPT-Engineer and BabyAGI, serve as inspiring examples. The potentiality of LLM extends beyond generating well-written copies, stories, essays and programs; it can be framed as a powerful general problem solver.\nAgent System Overview\nIn a LLM-powered autonomous agent system, LLM functions as the agent’s brain, complemented by several key components:\n\nPlanning\n\nSubgoal and decomposition: The agent breaks down large tasks into smaller, manageable subgoals, enabling efficient handling of complex tasks.\nReflection and refinement: The agent can do self-criticism and self-reflection over past actions, learn from mistakes and refine them for fu