## Setup env

In [None]:
import os
import getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ") 

In [None]:
_set_env("GOOGLE_API_KEY")

_set_env("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "pr-aching-coincidence-98"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"

## Imports

In [None]:
from langchain_ollama import OllamaLLM, ChatOllama
from langchain_community.document_loaders import PyPDFLoader
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import SKLearnVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.schema import Document
from langgraph.graph import END
import operator
from typing_extensions import TypedDict
from typing import List, Annotated
import json
import requests
from pprint import pprint
from langsmith import traceable
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langgraph.graph import StateGraph
from IPython.display import Image, display

In [None]:
def choose_model(local_llm):
    if 'llama' in local_llm:
        base_url = "http://localhost:11434"
        llm = OllamaLLM(model=local_llm, base_url=base_url, temperature=0, model_kwargs={'device': 'gpu'})
        llm_json_mode = OllamaLLM(model=local_llm, temperature=0, format="json", model_kwargs={'device': 'gpu'})
        embedding=OllamaEmbeddings(model="nomic-embed-text")

    if 'gemini' in local_llm:
        llm = ChatGoogleGenerativeAI(model=local_llm)
        llm_json_mode = ChatGoogleGenerativeAI(model=local_llm)
        embedding = GoogleGenerativeAIEmbeddings(model="models/embedding-001")

    return llm, llm_json_mode, embedding

In [None]:
def create_vector_store(file_list, 
                        embedding, 
                        vector_store_name,
                        persist_location="C:/Users/mario/rag_notebooks/sklearn_vector_store_backup", 
                        ):
    
    if len(file_list) == 0:
        return None
    elif len(file_list) == 1:
        pdf_file = file_list[0]
        loader = PyPDFLoader(pdf_file)
        docs = loader.load()
    else:
        docs = []
        for pdf_file in file_list:
            loader = PyPDFLoader(pdf_file)
            docs.extend(loader.load())

    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=500,
        chunk_overlap=20
        )

    docs_split = text_splitter.split_documents(docs)

    vectorstore = SKLearnVectorStore.from_documents(
        documents=docs_split,
        embedding=embedding,
        persist_path=f"{persist_location}/{vector_store_name}.parquet",
        serializer="parquet"
        )
    
    # persist vectorstore
    vectorstore.persist()

    # create retriever
    retriever = vectorstore.as_retriever()

    return vectorstore, retriever

In [None]:
def load_vector_store(embedding, 
                      vector_store_name,
                      persist_location="C:/Users/mario/rag_notebooks/sklearn_vector_store_backup"
                      ):
    
    # Load the vector store from disk
    vectorstore = SKLearnVectorStore(
        embedding=embedding,
        persist_path=f"{persist_location}/{vector_store_name}.parquet",
        serializer="parquet"
    )

    # create retriever
    retriever = vectorstore.as_retriever()

    return vectorstore, retriever

### Set variables

In [None]:
local_llm = "llama3.1"
# local_llm = "gemini-1.5-flash-8b"

In [None]:
file_list = [
    "C:/Users/mario/Downloads/Expert_Python_Programming.pdf",
    "C:/Users/mario/Downloads/Python.Algorithms.pdf",
    "C:/Users/mario/Downloads/deep-learning-with-tensorflow-20-and-keras.pdf"
]

In [None]:
llm, llm_json_mode, embedding = choose_model(local_llm)
print(llm, llm_json_mode, embedding)

In [None]:
vectorstore, retriever = create_vector_store(file_list, embedding, "python_books")

In [None]:
vectorstore, retriever = load_vector_store(file_list, embedding, "python_books")

## Create Graph

In [None]:
class GraphState(TypedDict):
    """
    Graph state is a dictionary that contains information we want to propagate to, and modify in, each graph node.
    """
    
    question: str  # User question
    generation: str  # LLM generation
    web_search: str  # Binary decision to run web search
    max_retries: int  # Max number of retries for answer generation
    answers: int  # Number of answers generated
    loop_step: Annotated[int, operator.add]
    documents: List[str]  # List of retrieved documents

### Nodes

In [None]:
def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Write retrieved documents to documents key in state
    documents = retriever.invoke(question)
    return {"documents": documents}

In [None]:
def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """

    # Prompt
    rag_prompt = """You are an assistant for question-answering tasks. 

    Here is the context to use to answer the question:

    {context} 

    Think carefully about the above context. 

    Now, review the user question:

    {question}

    Provide an answer to this questions using only the above context. 

    Format the answer to be informative, concise, and relevant using lists and code blocks when appropriate.

    The answer should be no longer than 500 words.
    
    Answer:"""

    # Post-processing
    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)

    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    loop_step = state.get("loop_step", 0)

    # RAG generation
    docs_txt = format_docs(documents)
    rag_prompt_formatted = rag_prompt.format(context=docs_txt, question=question)
    generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
    return {"generation": generation, "loop_step": loop_step + 1}

In [None]:
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    # Doc grader instructions
    doc_grader_instructions = """You are a grader assessing relevance of a retrieved document to a user question.

    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant."""

    # Grader prompt
    doc_grader_prompt = """Here is the retrieved document: \n\n {document} \n\n Here is the user question: \n\n {question}. 

    This carefully and objectively assess whether the document contains at least some information that is relevant to the question.

    Return JSON with single key, binary_score, that is 'yes' or 'no' score to indicate whether the document contains at least some information that is relevant to the question."""

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        doc_grader_prompt_formatted = doc_grader_prompt.format(
            document=d.page_content, question=question
        )
        result = llm_json_mode.invoke(
            [SystemMessage(content=doc_grader_instructions)]
            + [HumanMessage(content=doc_grader_prompt_formatted)]
        )
        grade = json.loads(result)["binary_score"]
        
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "web_search": web_search}

In [None]:
def web_search_tool(question):
    search_url = "http://localhost:8080/search"
    res = requests.get(search_url, params={"q": question, "format": "json"})
    web_results = res.json()['results'][:5]

    docs = [Document(page_content=d['content'], metadata={'source': d['url'], 'page': "Web page"}) for d in web_results]

    return docs

In [None]:
def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state.get("documents", [])

    # Web search
    docs = web_search_tool(question)
    documents.extend(docs)
    return {"documents": documents}

### Edges

In [None]:
def route_question(state):
    """
    Route question to web search or RAG

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    # Prompt
    router_instructions = """You are an expert at routing a user question to a vectorstore or web search.

    The vectorstore contains documents related to tensorflow, keras, deep learning and python.

    Use the vectorstore for questions on these topics. For all else, and especially for current events, use web-search.

    Return JSON with single key, datasource, that is 'websearch' or 'vectorstore' depending on the question."""

    print("---ROUTE QUESTION---")
    route_question = llm_json_mode.invoke(
        [SystemMessage(content=router_instructions)]
        + [HumanMessage(content=state["question"])]
    )
    if local_llm == "gemini-1.5-flash-8b":
        source = json.loads(route_question.content.split("```json")[1].split("```")[0])["datasource"]
    else:
        source = json.loads(route_question)["datasource"]
        
    if source == "websearch":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"

In [None]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: NOT ALL DOCUMENTS ARE RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
        )
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

In [None]:
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    # Hallucination grader instructions
    hallucination_grader_instructions = """

    You are a teacher grading a quiz. 

    You will be given FACTS and a STUDENT ANSWER. 

    Here is the grade criteria to follow:

    (1) Ensure the STUDENT ANSWER is grounded in the FACTS. 

    (2) Ensure the STUDENT ANSWER does not contain "hallucinated" information outside the scope of the FACTS.

    Score:

    A score of yes means that the student's answer meets all of the criteria. This is the highest (best) score. 

    A score of no means that the student's answer does not meet all of the criteria. This is the lowest possible score you can give.

    Explain your reasoning in a step-by-step manner to ensure your reasoning and conclusion are correct. 

    Avoid simply stating the correct answer at the outset."""

    # Grader prompt
    hallucination_grader_prompt = """FACTS: \n\n {documents} \n\n STUDENT ANSWER: {generation}. 

    Return JSON with two two keys, binary_score is 'yes' or 'no' score to indicate whether the STUDENT ANSWER is grounded in the FACTS. And a key, explanation, that contains an explanation of the score."""

    # Answer grader instructions
    answer_grader_instructions = """You are a teacher grading a quiz. 

    You will be given a QUESTION and a STUDENT ANSWER. 

    Here is the grade criteria to follow:

    (1) The STUDENT ANSWER helps to answer the QUESTION

    Score:

    A score of yes means that the student's answer meets all of the criteria. This is the highest (best) score. 

    The student can receive a score of yes if the answer contains extra information that is not explicitly asked for in the question.

    A score of no means that the student's answer does not meet all of the criteria. This is the lowest possible score you can give.

    Explain your reasoning in a step-by-step manner to ensure your reasoning and conclusion are correct. 

    Avoid simply stating the correct answer at the outset."""

    # Grader prompt
    answer_grader_prompt = """QUESTION: \n\n {question} \n\n STUDENT ANSWER: {generation}. 

    Return JSON with two two keys, binary_score is 'yes' or 'no' score to indicate whether the STUDENT ANSWER meets the criteria. And a key, explanation, that contains an explanation of the score."""

    # Post-processing
    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)
    
    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    max_retries = state.get("max_retries", 3)  # Default to 3 if not provided

    hallucination_grader_prompt_formatted = hallucination_grader_prompt.format(
        documents=format_docs(documents), generation=generation
    )
    result = llm_json_mode.invoke(
        [SystemMessage(content=hallucination_grader_instructions)]
        + [HumanMessage(content=hallucination_grader_prompt_formatted)]
    )
    if local_llm == "gemini-1.5-flash-8b":
        grade = json.loads(result.content.split("```json")[1].split("```")[0])["binary_score"]
    else:
        grade = json.loads(result)["binary_score"]

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        # Test using question and generation from above
        answer_grader_prompt_formatted = answer_grader_prompt.format(
            question=question, generation=generation
        )
        result = llm_json_mode.invoke(
            [SystemMessage(content=answer_grader_instructions)]
            + [HumanMessage(content=answer_grader_prompt_formatted)]
        )

        if local_llm == "gemini-1.5-flash-8b":
            grade = json.loads(result.content.split("```json")[1].split("```")[0])["binary_score"]
        else:
            grade = json.loads(result)["binary_score"]

        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        elif state["loop_step"] <= max_retries:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
        else:
            print("---DECISION: MAX RETRIES REACHED---")
            return "max retries"
    elif state["loop_step"] <= max_retries:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"
    else:
        print("---DECISION: MAX RETRIES REACHED---")
        return "max retries"

### Compile Graph

In [None]:
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate

# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
        "max retries": END,
    },
)

# Compile
graph = workflow.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
def ask_question(inputs):
    """
    Ask a question and run the workflow

    Args:
        question (str): The question to ask

    Returns:
        str: The answer to the question
    """

    for event in graph.stream(inputs, stream_mode="values"):
        pprint(event)
        # print(event.keys())
        if event.get("generation"):
            result_string = event.get("generation", {})
            # Display the Markdown in Jupyter Notebook
            display(Markdown(result_string))

In [None]:
question ="How can i create a streamlit app in order to run my langgraph workflow?"
max_retries = 3
inputs = {
    "question": question,
    "max_retries": max_retries,
}

In [None]:
ask_question(inputs)