# LangGraph Testing

Inspiration from [Langchain Videos](https://www.youtube.com/watch?v=E2shqsYwxck&t=394s)

<p>
<img src="ILLUSTRATIONS/langgraph_example.png" 
      width="65%" height="auto"
      style="display: block; margin: 0 auto" />

Illustration [reference](https://github.com/langchain-ai/langgraph/tree/main/examples/rag)

In [197]:
import pandas as pd
import numpy as np
import json, os, pprint
import matplotlib.pyplot as plt
import plotly.express as px
import random
from typing import Annotated, List, Sequence, Tuple, TypedDict, Union, Dict
import operator
from langchain_openai import OpenAIEmbeddings
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, BaseMessage, FunctionMessage
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.tools import tool
from langchain.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, MessagesPlaceholder, SystemMessagePromptTemplate
from langchain_community.document_loaders.csv_loader import CSVLoader
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma, FAISS
from langchain_core.runnables import RunnablePassthrough
from langgraph.graph import END, MessageGraph
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import YoutubeLoader
from langchain.tools.render import format_tool_to_openai_function
from langchain import hub
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.docstore.document import Document
from langchain_experimental.utilities import PythonREPL
from langchain_community.retrievers import TavilySearchAPIRetriever
import functools

In [5]:
os.environ["OPENAI_API_KEY"] = ""

In [98]:
os.environ["TAVILY_API_KEY"] = ""

Load sample website docs, split, embed and place into VectorStore

In [20]:
# Load docs from website (it can be any site)
url = "https://lilianweng.github.io/posts/2023-06-23-agent/"
loader = WebBaseLoader(url)
docs = loader.load() # Loads as single doc

# Split doc into chunks
text_splitter_class = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500, chunk_overlap=100
)

chunks = text_splitter_class.split_documents(docs)
print(f"Split into {len(chunks)} chunks")

# Embed into Vector Store
embedding_model = OpenAIEmbeddings(model="text-embedding-3-small")

vectorstore = Chroma.from_documents(
    documents = chunks,
    embedding = embedding_model,
    collection_name = "chroma_testing_langgraph"
)

# Build retriever
retriever = vectorstore.as_retriever()


Split into 33 chunks


In [22]:
retriever.get_relevant_documents("what is FAISS?")

[Document(page_content='FAISS (Facebook AI Similarity Search): It operates on the assumption that in high dimensional space, distances between nodes follow a Gaussian distribution and thus there should exist clustering of data points. FAISS applies vector quantization by partitioning the vector space into clusters and then refining the quantization within clusters. Search first looks for cluster candidates with coarse quantization and then further looks into each cluster with finer quantization.\nScaNN (Scalable Nearest Neighbors): The main innovation in ScaNN is anisotropic vector quantization. It quantizes a data point $x_i$ to $\\tilde{x}_i$ such that the inner product $\\langle q, x_i \\rangle$ is as similar to the original distance of $\\angle q, \\tilde{x}_i$ as possible, instead of picking the closet quantization centroid points.', metadata={'description': 'Building agents with LLM (large language model) as its core controller is a cool concept. Several proof-of-concepts demos, 

Load llm with json output to parse decision in graph (grader). Built chain with prompt

In [39]:
llm_json_output = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0.1, 
                             model_kwargs={"response_format":{"type":"json_object"}})

In [46]:
template = """
You are a grader assessing relevance of a retrieved document to a user question. \n 
Here is the retrieved document: \n\n {document} \n\n
Here is the user question: {question} \n
If the document contains keywords related to the user question, grade it as relevant. \n
It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
"""

prompt = ChatPromptTemplate.from_template(template)

chain = prompt | llm_json_output | JsonOutputParser()

Manually feed in retrieved documents which may or may not have relevant documents to test chain

In [47]:
query = "What are the different types of agent memory?"
query_docs = retriever.get_relevant_documents(query)

chain_scores = chain.invoke(
    {
        "question":query, 
        "document":query_docs[0].page_content
    }
    )

In [48]:
chain_scores

{'score': 'yes'}

Output for illustration without `JsonOutputParser()` in chain:

In [45]:
chain_scores

AIMessage(content='{\n"score": "yes"\n}', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 544, 'total_tokens': 552}, 'model_name': 'gpt-4-turbo-preview', 'system_fingerprint': 'fp_122114e45f', 'finish_reason': 'stop', 'logprobs': None})

## Building a Graph

We'll develop a graph based on multiple vector inputs. We'll start with building multiple vector stores

Referenced from [here](https://www.youtube.com/watch?v=pbAd8O1Lvm4).

In [68]:
llm_urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
    
]

plotly_yt_urls = [
    "https://www.youtube.com/watch?v=Qx5eFVUdDxk&list=PLYD54mj9I2JevdabetHsJ3RLCeMyBNKYV&index=1",
    "https://www.youtube.com/watch?v=Z9YUejzkFa0&list=PLYD54mj9I2JevdabetHsJ3RLCeMyBNKYV&index=2",
    "https://www.youtube.com/watch?v=4bP66rRxVBw&list=PLYD54mj9I2JevdabetHsJ3RLCeMyBNKYV&index=3",
    "https://www.youtube.com/watch?v=a1qzu5GKIf0&list=PLYD54mj9I2JevdabetHsJ3RLCeMyBNKYV&index=4",
    "https://www.youtube.com/watch?v=Fm7DC-Z5R7A&list=PLYD54mj9I2JevdabetHsJ3RLCeMyBNKYV&index=5",
    "https://www.youtube.com/watch?v=4jcWJ30HqSY&list=PLYD54mj9I2JevdabetHsJ3RLCeMyBNKYV&index=6"
]

Build a vector store per ulrs and video inputs

In [87]:
# Vector store for llm urls

llm_docs = [WebBaseLoader(url).load() for url in llm_urls] # Each loader produced a list of one element
llm_docs = [i for j in llm_docs for i in j] # Decoupling lists of one element

text_splitter_class = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)

chunks = text_splitter_class.split_documents(llm_docs)
print(f"Split into {len(chunks)} chunks")

llm_llw_vectorstore = Chroma.from_documents(
    documents = chunks,
    embedding = embedding_model,
    collection_name = "chroma_llm_llw"
)

llm_llw_retriever = llm_llw_vectorstore.as_retriever()

Split into 194 chunks


In [88]:
# Vector store for plotly youtube urls

yt_docs = [YoutubeLoader.from_youtube_url(url, add_video_info=False).load() for url in plotly_yt_urls] # Each loader produced a list of one element
yt_docs = [i for j in yt_docs for i in j] # Decoupling lists of one element

text_splitter_class = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)

chunks = text_splitter_class.split_documents(yt_docs)
print(f"Split into {len(chunks)} chunks")

plotly_yt_vectorstore = Chroma.from_documents(
    documents = chunks,
    embedding = embedding_model,
    collection_name = "chroma_plotly_yt"
)

plotly_yt_retriever = plotly_yt_vectorstore.as_retriever()

Split into 109 chunks


### Define a Graph State - ie. a dict where we keep state of flow elements

In [199]:
from typing_extensions import TypedDict
from typing import Dict
from langchain_core.pydantic_v1 import BaseModel, Field

class GraphState(TypedDict):
    
    question: str
    generation: str
    docs: List[str]
    run_web_search: str

We are going to define **functions for every node (changes state) in graph** as well as **Edges**

In [201]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

##### NODES #####

# RETRIEVER NODE
def retrieve(state):

    print("---RETRIEVE---")

    # Get a question from the state dictionary
    question = state["question"]

    # Execute retriever to get documents
    docs = retriever.get_relevant_documents(question)

    # Return to write back (update) state
    return {"docs":docs, "question":question}

# GENERATE NODE
def generate(state):

    print("---GENERATE---")

    # Get question from dictionary
    question = state["question"]

    # Get documents stored in update state (theoretically retrieved using retriever)
    docs = state["docs"]

    # Build LLM chain to generate answer based on question and retrieved docs
    prompt = hub.pull("rlm/rag-prompt")
    llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0.1, streaming=True)

    chain = prompt | llm | StrOutputParser()

    # Generate
    generation = chain.invoke({"context":format_docs(docs), "question":question})

    # Return generation to update state
    return {"docs":docs, "question":question, "generation":generation}

# GRADING DOCUMENTS NODE
def grading_docs(state):

    print("---GRADE---")

    question = state["question"]
    docs = state["docs"]

    # Define pydantic object to defined desired structure in llm
    class grader_output(BaseModel):
        binary_score: str = Field(description="Relevance score by grader of yes or no")

    # Import llm and incorporate structured function call
    llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0.1, streaming=True)    
    structured_llm = llm.with_structured_output(grader_output)

    # Define prompt for grader
    system = """
    You are a grader assessing relevance of a retrieved document to a user question. \n 
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.
    """

    grade_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
        ]
    )

    # Define grader chain - THIS WILL GRADE ONE DOCUMENT AT A TIME
    grader_chain = grade_prompt | structured_llm

    # Grade retrieved docs with grader_chain AND add `web_search` if at least one document irrelevant
    filtered_docs = []
    run_web_search = "no" # Start variable as "no" and change only if we get a single irrelevant document
    for i in docs:
        score = grader_chain.invoke({"question":question, "document":i.page_content})
        grade = score.binary_score

        if grade=="yes":
            print("---Doc is relevant---")
            filtered_docs.append(i)
        else:
            print("---Doc is not relevant")
            run_web_search = "yes" # WILL PERFORM WEB SEARCH if we have and irrelevant document
            continue
    
    # Update state with filtered docs
    return {"docs": filtered_docs, "question":question, "run_web_search":run_web_search}

def improve_query(state):
    """
    Improve query being asked by re-generating it
    """

    print("---IMPROVE QUERY---")

    question  = state["question"]
    docs = state["docs"]

    # Build chain that takes in a query and outputs an improves query
    llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)

    system = """
    You a question re-writer that converts an input question to a better version that is optimized \n 
    for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning and re-write it.
    """

    re_write_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            ("human", "Here is the initial question: \n\n {question} \n Formulate an improved question."),
        ]
    )

    re_write_chain = re_write_prompt | llm | StrOutputParser()

    # Improve query
    improved_question = re_write_chain.invoke({"question":question})

    # Update state with improved question
    return {"docs": docs, "question":improved_question}

def web_search(state):

    """
    Web search using query to retrieve relevant docs
    """

    print("---WEB SEARCH FOR DOCS---")

    question = state["question"]
    docs = state["docs"]

    # Retrieve relevant web documents
    web_tool = TavilySearchAPIRetriever(k=5)
    web_results = web_tool.invoke(question)
    web_docs = [Document(page_content = i.page_content) for i in web_results]

    # Add to running document list
    docs.extend(web_docs)

    # Return updated document list into state
    return {"docs":docs, "question":question}

###### EDGES ######

def retrieved_decission_maker(state):

    """
    Will decide to re-formulate query if no relevant documents are found, 
    otherwise will generate answer based on retrieved documents
    """

    print ("---GT:1 - ASSESS IF NEEDS TO RE-GENERATE QUERY---")

    question = state["question"]
    filtered_docs = state["docs"]

    if not filtered_docs:
        print("---ALL DOCS ARE NOT RELEVANT: WILL TRANSFORM AND IMPROVE QUERY AND TRY RETRIEVING AGAIN---")
        return "improve_query"
    else:
        print("---FOUND RELEVANT DOCUMENTS: WILL PROCEED TO GENERATE ANSWER---")
        return "generate"
    
def retrieved_decission_maker_with_web_search(state):
    """
    Will decide to re-formulate query + pass that to web search if at least
    one retrieved document was found to be non-relevant
    """

    print("---GT:2 - ASSESS IF NEEDS TO RE-GENERATE QUERY AND ADD WEB SEARCH RESULTS---")

    question = state["question"]
    filtered_docs = state["docs"]
    run_web_search = state["run_web_search"]

    if run_web_search=="yes":
        print("---FOUND AT LEAST ONE DOC IRRELEVANT: WILL IMPROVE QUERY AND RUN WEB SEARCH TO RETRIEVE ADDITIONAL RELEVANT DOCS")
        return "improve_query"
    else:
        print("---ALL DOCS FOUNDS TO BE RELEVANT: NO NEED TO RUN WEB SEARCH, WILL GO AHEAD AND GENERATE ANSWER---")
        return "generate"


In [202]:
from langchain_community.retrievers import TavilySearchAPIRetriever
web_api_tool = TavilySearchAPIRetriever(k=5)

web_api_results = web_api_tool.invoke("what is the One Piece?")

In [198]:
[Document(page_content=i.page_content) for i in web_api_results]

[Document(page_content='According to one estimate, the series boasts over 516.6 million in sales and broke the world record for “the most copies published for the same comic book series by a single author” in 2015 and again in 2022.\n How long are the One Piece manga and anime?\nAt the time of publication, the streaming service Crunchyroll has 1,073 episodes of the One Piece anime. The entertainment analytics firm Parrot Analytics finds that audience demand for One Piece is “40.7 times the demand of the average TV series in the United States in the last 30 days.” At the beginning of the manga and anime, Pirate King Gol D. Roger confirms the existence of the mythological treasure known as the One Piece. The anime series takes certain liberties to adapt the story of the manga — like extending fight scenes or adding characters — but it generally follows the same story beats of the manga.'),
 Document(page_content='The reading of pirate biographies influenced Oda to incorporate the charact

### Build Graph Structure

In [207]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

Incoporate nodes

In [208]:
workflow.add_node("retrieve", retrieve)
workflow.add_node("grading_docs", grading_docs)
workflow.add_node("generate", generate)
workflow.add_node("improve_query", improve_query)
workflow.add_node("web_search", web_search)

Build Graph

In [209]:
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grading_docs")
workflow.add_conditional_edges(
    "grading_docs", # the entry node where the edge is coming from
    retrieved_decission_maker_with_web_search, # the function that will provide a decision based on the entry node's output
    {
        "improve_query":"improve_query", #pair: first is the decision of the deciding function, second is the node to follow if that decision happens
        "generate":"generate"
    }
)
workflow.add_edge("improve_query", "web_search") # Will improve query and apply to web search to retrieve more relevant documents and incorporate to doc corpus
workflow.add_edge("web_search", "generate") # With both, internal Vector DB relevant docs and web docs, it will generate an answer
workflow.add_edge("generate", END)


Compile Graph

In [210]:
multi_agent_app = workflow.compile()

### Test LangGraph-based Multi-Agent App

In [211]:
from pprint import pprint

# Run
inputs = {"question": "Explain how the different types of agent memory work?"}
for output in multi_agent_app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---GRADE---
---Doc is relevant---
---Doc is relevant---
---Doc is relevant---
---Doc is relevant---
"Node 'grading_docs':"
'\n---\n'
---GT:2 - ASSESS IF NEEDS TO RE-GENERATE QUERY AND ADD WEB SEARCH RESULTS---
---ALL DOCS FOUNDS TO BE RELEVANT: NO NEED TO RUN WEB SEARCH, WILL GO AHEAD AND GENERATE ANSWER---
---GENERATE---
"Node 'generate':"
'\n---\n'
"Node '__end__':"
'\n---\n'
('Different types of agent memory work as follows: Short-term memory in agents '
 "is akin to in-context learning, utilizing the model's immediate context to "
 "learn and adapt, but it is limited by the model's finite context window. "
 'Long-term memory allows agents to retain and recall information over '
 'extended periods, often through an external vector store that supports fast '
 'retrieval, enabling the agent to access a vast amount of information beyond '
 'its immediate context. Sensory memory in agents could be compared to '
 'learning embedding representat

In [100]:
tavily_tool = TavilySearchResults(max_results=5)
repl = PythonREPL()

@tool
def python_repl(
    code: Annotated[str, "The python code to execute to generate your chart."]
):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    return f"Succesfully executed:\n```python\n{code}\n```\nStdout: {result}"