In [1]:
import os
import json

from dotenv import load_dotenv

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_ollama import ChatOllama
from typing import Annotated, TypedDict, Literal

from langgraph.graph.message import add_messages
from langgraph.graph import END
from langchain_core.messages.ai import AIMessage
from langchain_core.messages.human import HumanMessage
from langchain_core.documents.base import Document
from langchain_core.messages.system import SystemMessage
from langchain_community.tools import DuckDuckGoSearchResults
from pydantic import BaseModel, Field


from langchain_unstructured import UnstructuredLoader
from langchain_community.vectorstores import SQLiteVec
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.prompts import PromptTemplate

from langgraph.graph import StateGraph, START

from IPython.display import Image, Markdown

In [2]:
load_dotenv()


GOOGLEAPI = os.getenv("GOOGLEAPI")
os.environ["GOOGLE_API_KEY"] = GOOGLEAPI

GOOGLE= ChatGoogleGenerativeAI(
    # temperature=0,
    model="gemini-2.0-flash",
    # max_tokens=300,
    # top_p=0.95,
    # top_k=40,
    # streaming=True,
)

OLLAMA = ChatOllama(
    model="llama3.2:3b",
    temperature=0,
    streaming=True,
)

OLLAMAJSON = ChatOllama(
    model="llama3.2:3b",
    temperature=0,
    format="json",
    streaming=True,
)


In [57]:

class RouterStep(BaseModel):
    step: Literal["vectorstore", "websearch"]

class RouterStep2(TypedDict):
    step: Literal["vectorstore", "websearch"]
    score: int
    reasoning: str
    

In [58]:
llm = GOOGLE
#llm_router = llm.with_structured_output(schema=RouterStep)
llm_json = GOOGLE.with_structured_output(schema=RouterStep2)

#llm.invoke("What is python?")

In [5]:
search = DuckDuckGoSearchResults(output_format="json")

In [5]:
json.loads(search.invoke("Who is a PEP?"))[0]["snippet"]

"What is a PEP / Politically Exposed Persons? A politically exposed person (PEP) is someone who has a high-profile political role or has been entrusted with a prominent public function. Because of their position, they are more likely to be involved in money laundering and/or terrorist financing. Why should this be of money laundering concern? These individuals' positions can be abused to ..."

In [98]:
class Router(BaseModel):
    """
    A simple router to route between states.
    """
    step: Literal["audit_assistant", "websearch", "vectorstore"] = Field(None, description="The step to route to")

In [6]:
connection = SQLiteVec.create_connection(db_file="policies.db")

EMBEDDINGS = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
from langchain_huggingface import ChatHuggingFace

In [54]:
def retriever(query: str) -> list[str]:

    """
    Call the retriever tool to get the policy
    """
    vector_db = SQLiteVec(table="policy", embedding=EMBEDDINGS, connection=connection)

    QUERY_PROMPT = PromptTemplate(
    input_variables=["question"],
    template="""
    Your task is to generate five different versions
    of the given user question to retrieve relevant documents from a vector database.
    By generating multiple perspectives on the user question, your goal is to help the user
    overcome some of the limitations of the distance-based similarity search.
    Provide alternative questions seperated by new lines.
    Original question: {question}""",

    )

    # retriever = MultiQueryRetriever.from_llm(
    #                 retriever=vector_db.as_retriever(),
    #                 llm=OLLAMA,
    #                 prompt=QUERY_PROMPT,
    #                 parser_key="lines"
    #                 )

    retriever = vector_db.as_retriever()
    docs = retriever.invoke(query)

    return docs

In [11]:
retriever("What is AML CFT?")

[Document(metadata={'source': 'C:\\Users\\aeniatorudabo\\Documents\\7.3 - AML CFT CPF Policy.pdf', 'coordinates': {'points': [[89.088, 191.44999999999993], [89.088, 203.44999999999993], [224.11599999999999, 203.44999999999993], [224.11599999999999, 191.44999999999993]], 'system': 'PixelSpace', 'layout_width': 595.2, 'layout_height': 841.92}, 'file_directory': 'C:\\Users\\aeniatorudabo\\Documents', 'filename': '7.3 - AML CFT CPF Policy.pdf', 'languages': ['eng'], 'last_modified': '2024-11-20T10:18:40', 'page_number': 121, 'filetype': 'application/pdf', 'category': 'Title', 'element_id': 'cdb79991f33ea7a2e4924045042e2c67'}, page_content='with respect to AML/CFT/CFT.'),
 Document(metadata={'source': 'C:\\Users\\aeniatorudabo\\Documents\\7.3 - AML CFT CPF Policy.pdf', 'coordinates': {'points': [[57.624, 197.68999999999994], [57.624, 209.68999999999994], [173.68599999999998, 209.68999999999994], [173.68599999999998, 197.68999999999994]], 'system': 'PixelSpace', 'layout_width': 595.2, 'layou

In [12]:
route_prompt = """You are a router to route to the audit assistant, vectorstore or websearch based on the question asked.
                   The vectorsore contains documents on AML CFT, PEP, KYC and other policies.
                   Use the vectorstore for questions on these topics and questions concerning FSDH.
                   For current events and any other question, use the websearch
                   Return the step to route to as a json object with the key 'step'"""

In [59]:
response = llm_json.invoke(
    [SystemMessage(route_prompt),
    HumanMessage("What is audit?")
    ]
)

response

{'score': 9.0,
 'reasoning': "The question is a general question about the definition of 'audit' and does not involve FSDH, AML, CFT, PEP, or KYC. Therefore, it should be routed to websearch.",
 'step': 'websearch'}

In [9]:
def parse_json(json_str: str) -> dict:
    """
    Parse the json string to a dictionary
    """
    json_str = json_str.split("json")[-1].strip("`").replace("\n", "")
    try:
        return json.loads(json_str)
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
        return {}

In [52]:
#parse_json(response)

In [129]:
response.content

'{\n  "step": "vectorstore"\n}\n\n \n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\n'

In [55]:
document_grader_instructions = """
    You are a document grader. You will be given a question and a document.
    Your task is to grade the document based on the question asked.
    if the documemnt contains keywords or semantic meaning related to the question, return a score of 1.
    else return a score of 0."""

document_grader_prompt = """
Here is the retrieved document \n\n{document}\n\n
Here is the question asked \n\n{question}\n\n
Think carefully about it and return a json object with the key 'score' and the value 1 or 0 based on the document and question
to indicate if the document is relevant to the question asked."""

question = "Who is a PEP?"
documents = retriever("Who is a PEP?")

for document in documents:
    
    response = llm_json.invoke(
        [SystemMessage(document_grader_instructions),
        HumanMessage(document_grader_prompt.format(document=document.page_content, question=question))
        ]
    )

    # print(parse_json(response.content))
    print(response)
    break

{'step': 'websearch'}


In [36]:
document_grader_prompt.format(document=document.page_content, question=question)

"\nHere is the retrieved document \n\ndealing with PEPs:\n\n\nHere is the question asked \n\nWho is a PEP?\n\n\nThink carefully about it and return a json object with the key 'score' and the value 1 or 0 based on the document and question\nto indicate if the document is relevant to the question asked."

In [134]:
document.page_content

'dealing with PEPs:'

In [62]:
rag_prompt = """
You are an assistant that answers questions based on the context provided
here is the context \n\n{context}\n\n

Think carefully about the above context and answer the question asked below.

Here is the question asked \n\n{question}\n\n

Answer the question using only the context provided above.
Be honest and do not make up any information."""

def format_docs(docs: list[str | Document]) -> str:
    """
    Format the documents to be used in the prompt.
    """
    if isinstance(docs[0], str):
        return "\n\n".join(docs)
    
    return "\n\n".join([doc.page_content for doc in docs])

response = llm.invoke(
    [HumanMessage(rag_prompt.format(context=format_docs(documents), question=question))]
)

print(response.content)



PEPs are individuals who are or have been entrusted with prominent public functions both in Nigeria.


In [252]:
print(format_docs(["Hello World", "This is a test document"]))

Hello World

This is a test document


In [35]:
print(format_docs(documents))

dealing with PEPs:

accepting a PEP.

PEPs are individuals who are or have been entrusted with prominent public functions both in Nigeria or

Finally any account(s) opened by any person who qualifies as a “PEP” will be designated a “PEP” account.


In [63]:
hallucination_instructions = """
    You are a response grader.
    You will be given a response and documents for the response.
    Your task is to grade the response based on the documents provided.
    If the response is correct and based on the documents, return a score of 1.
    else return a score of 0.
    Explain the reasoning for your answer step by step.
    Do not state the answer from the onset.
    """


hallucination_prompt = """
Here are the documents \n\n{context}\n\n 
Here is the response \n\n{response}\n\n
return a json object with the key 'score' and the value 1 or 0 based on the response and context
to indicate if the response is relevant to the documents provided.
return a key 'reasoning' with the value of your reasoning for the score given."""

hallucination_score = llm_json.invoke(
    [SystemMessage(hallucination_instructions),
    HumanMessage(hallucination_prompt.format(context=format_docs(documents), response=response.content))
    ]
)

hallucination_score

{'score': 1.0,
 'reasoning': 'The response is correct and based on the documents provided, thus the score is 1.',
 'step': 'vectorstore'}

In [50]:
response.content

'PEPs are individuals who are or have been entrusted with prominent public functions both in Nigeria. Any account(s) opened by any person who qualifies as a “PEP” will be designated a “PEP” account.'

In [64]:
response_instructions = """
    You are a response grader.
    You will be given a response and the questuion the response asnwers.
    Your task is to grade the response based on the question asked.
    If the response is correct and based on the question, return a score of 1.
    else return a score of 0.
    Explain the reasoning for your answer step by step.
    Do not state the answer from the onset.
    """

response_prompt = """
Here is the question asked \n\n{question}\n\n
Here is the response \n\n{response}\n\n
return a json object with the key 'score' and the value 1 or 0 based on the response and question
to indicate if the response is relevant to the question asked.
return a key 'reasoning' with the value of your reasoning for the score given."""

response_score = llm_json.invoke(
    [SystemMessage(response_instructions),
    HumanMessage(response_prompt.format(question=question, response=response.content))
    ]
)
response_score

{'score': 1.0,
 'step': 'vectorstore',
 'reasoning': 'The response accurately answers the question of who a PEP is by stating that they are individuals entrusted with prominent public functions. Therefore, the response is relevant and correct.'}

In [106]:
my_list = [[]]

In [174]:
vector_db = SQLiteVec(table="policy", embedding=EMBEDDINGS, connection=connection)
retriever2 = vector_db.as_retriever()

SYS_PROMPT = (
    "system",
    "You are a helpful assistant That answers questions carefully"
    "You will be provided with a vectorstore of documents on AML CFT, PEP, KYC and other policies"
    "You will be asked questions about the above topics and you will answer them based on the documents provided"
    "You will answer them to the best of your ability and give reasons for your answers"
    "If you don't know the answer, say 'I don't know'"
    "You are allowed to search the web for answers. Review answers getting from web search and combine multiple results into a final answer"
    "Include sources for your final output of the websearch"
    "Always confirm if they have additional information to add to the question"
    "If they do, ask them to provide it"
)

WELCOME_PROMPT = (
    "Welcome to the Audit Assistant "
    "I am here to help you with your audit questions.\n"
    "Please ask me anything about the audit process or the bank's policies.\n"
    "Type exit or quit to end the conversation."
)


class AuditAssistantState(TypedDict):
    """
    A simple agent with state management.
    """

    question: list[str]
    finished: bool
    web_search: bool
    documents: list[list[str]]
    generated_response: Annotated[list, add_messages]
    loops: int
    max_loops: int


def router(state: AuditAssistantState) -> AuditAssistantState:
    """
    Rouete between vectorstore and websearch"""

    if state.get("finished", False):
        return "exit"

    
    print("---ROUTING---")
    question = state["question"][-1]
    decision = llm_json.invoke([route_prompt] + [question])

    decision = decision["step"]

    
    if decision.lower().replace(" ", "") == "websearch":
        print("--ROUTING TO WEBSEARCH--")
        return "websearch"
    
    elif decision.lower().replace(" ", "") == "vectorstore":
        print("--ROUTING TO VECTORSTORE--")
        return "vectorstore"
    

def human_node(state: AuditAssistantState) -> AuditAssistantState:

    if response := state.get("generated_response"):
        user_input = input(
            f"Assistant: {response[-1].content}\n"
            "User: "
        )

    else:
        user_input = input(
            f"Assistant: {WELCOME_PROMPT}\n"
            "User: "
        )
    
    if user_input.lower() in ["exit", "quit", "stop"] :
        state["finished"] = True

    question = state.get("question", [])
    question.append(user_input)
   
    return state | {"question": question}


def exit(state: AuditAssistantState) -> AuditAssistantState:
    """
    Exit the program
    """
    if state.get("finished", False):
        return "exit"
    else:
        return "router"


def retrieve(state: AuditAssistantState) -> AuditAssistantState:

    """
    Retrieve documents from a vectorstore
    """
    
    print("---RETRIEVING DOCUMENTS---")
    query = state["question"][-1]	
    documents = state.get("documents", [])

    docs = retriever2.invoke(query)
    documents.append(docs)

    return {"documents": documents}


def generate(state: AuditAssistantState) -> AuditAssistantState:
    """
    Generate a response based on the documents retrieved"""


    print("--GENERATING RESPONSE--")
    question = state["question"]
    documents = state["documents"][-1]
    documents = format_docs(documents)
    loops = state.get("loops", 0)

    response = llm.invoke(rag_prompt.format(context=documents, question=question))
    


    
    return {"generated_response": response, "loops": loops+1}

    
def grade_documents(state: AuditAssistantState) -> AuditAssistantState:
    """
    Grade documents as relevant to the question asked
    """

    question = state["question"]
    documents = state["documents"]
    relevant_documents = state["documents"][-1]

    filtered_docs = []

    web_search = False
    for document in relevant_documents:
    
        response = llm_json.invoke(
            [SystemMessage(document_grader_instructions),
            document_grader_prompt.format(document=document.page_content, question=question)
            ]
        )

        score = response["score"]

        if score == 1:
            print("--GRADED DOCUMENT AS RELEVANT--")
            filtered_docs.append(document)

        else:
            print("--GRADED DOCUMENT AS NOT RELEVANT--")
            web_search = True
            continue

    
    documents.append(filtered_docs)
    
    return {"documents": documents, "web_search": web_search}


def grade_response(state: AuditAssistantState) -> str:
    """
    Grade the response as relevant to the document and  question asked
    """

    question = state["question"][-1]
    response = state["generated_response"][-1]
    documents = state["documents"][-1]
    max_loops = state.get("max_loops", 3)

    hallucination_score = llm_json.invoke(
    [SystemMessage(hallucination_instructions),
    HumanMessage(hallucination_prompt.format(context=format_docs(documents), response=response))
    ]
    )

    hallucination_score = hallucination_score["score"]

    if hallucination_score == 1:
        print("--GRADED RESPONSE IS GROUNDED IN THE DOCUMENT--")
        print("--GRADING RESPONSE AS RELEVANT TO QUESTION--")

        response_score = llm_json.invoke(
        [SystemMessage(response_instructions),
        HumanMessage(response_prompt.format(question=question, response=response.content))
        ]
        )

        response_score = response_score["score"]

        if response_score == 1:
            print("--GRADED RESPONSE AS RELEVANT--")
            return "useful"
        
        elif state["loops"] <= max_loops:
            print("--RESPONSE DOES NOT ADDRESS THE QUESTION--")
            return "not useful"
        
        else:
            print("--MAX RETRIES REACHED")
            return "max retries"
        
    elif state["loops"] <= max_loops:
        print("-- RESPONSE IS NOT GROUNDED IN DOCUMENTS, RETRY--")
        return "not supported"
    
    else:
        print("--MAX RETRIES REACHED--")
        return "max retries"


def web_search(state: AuditAssistantState) -> AuditAssistantState:

    """
    Search the web for the question asked"""

    print("--WEB SEARCHING--")
    query = state["question"][-1]

    print(query)
    documents = state.get("documents", [])

    result = json.loads(search.invoke(query))
    result = "\n".join([doc["snippet"] for doc in result])
    result = [Document(page_content=result, metadata={"source": "websearch"})]
    documents.append(result)
   

    return {"documents": documents}


def decide_to_generate(state: AuditAssistantState) -> str:

    """
    Decided whether to generate a response or add websearch"""

    question = state["question"][-1]
    web_search = state["web_search"]
    filtered_documents = state["documents"][-1]

    if web_search:
        print("--Not all Documents are relevant, Include webseacrh--".upper())
        return "websearch"
    
    else:
        print("--All documents are relevant, Generate response--".upper)
        return "generate"



   
    


In [155]:
router({"finished": "vectorstore"})

'exit'

In [171]:
graph_builder = StateGraph(AuditAssistantState)

graph_builder.add_node("grade_documents", grade_documents)
graph_builder.add_node("websearch", web_search)
graph_builder.add_node("generate", generate)
graph_builder.add_node("retriever", retrieve)
graph_builder.add_node("human", human_node)

graph_builder.add_edge(START, "human")
graph_builder.add_edge("websearch", "generate")
graph_builder.add_edge("retriever", "grade_documents")

graph_builder.add_conditional_edges("human", router,
                                          {"exit": END,
                                           "vectorstore": "retriever",
                                           "websearch": "websearch"}
                                           )

graph_builder.add_conditional_edges("generate", grade_response,
                                    {"not supported": "generate",
                                     "not useful": "websearch",
                                     "useful": "human",
                                     "max retries": "human"}
                                     )
graph_builder.add_conditional_edges("grade_documents", decide_to_generate,
                                    {"websearch": "websearch",
                                     "generate": "generate"}
                                     )

chat_graph = graph_builder.compile()

In [165]:
# display(Image(chat_graph.get_graph().draw_mermaid_png()))

In [92]:
router({"question": "what is audit?"})

---ROUTING---
--ROUTING TO WEBSEARCH--


'websearch'

In [93]:
list({'step': 'WebSearch'}.values())[0]

'WebSearch'

In [172]:
inputs = {"question": [], "max_loops": 3}
for event in chat_graph.stream(inputs, stream_mode="values"):
    print(event)

    

{'question': [], 'generated_response': [], 'max_loops': 3}
{'question': ['exit'], 'finished': True, 'generated_response': [], 'max_loops': 3}


In [173]:
config = {"recursion_limit": 1000}
chat_graph.invoke({"question": []}, config)

False
---ROUTING---
--ROUTING TO WEBSEARCH--
--WEB SEARCHING--
Who is an auditor?
--GENERATING RESPONSE--
--GRADED RESPONSE IS GROUNDED IN THE DOCUMENT--
--GRADING RESPONSE AS RELEVANT TO QUESTION--
--GRADED RESPONSE AS RELEVANT--
False
---ROUTING---
--ROUTING TO WEBSEARCH--
--WEB SEARCHING--
How do I become one?
--GENERATING RESPONSE--
--GRADED RESPONSE IS GROUNDED IN THE DOCUMENT--
--GRADING RESPONSE AS RELEVANT TO QUESTION--
--RESPONSE DOES NOT ADDRESS THE QUESTION--
--WEB SEARCHING--
How do I become one?
--GENERATING RESPONSE--
--GRADED RESPONSE IS GROUNDED IN THE DOCUMENT--
--GRADING RESPONSE AS RELEVANT TO QUESTION--
--RESPONSE DOES NOT ADDRESS THE QUESTION--
--WEB SEARCHING--
How do I become one?
--GENERATING RESPONSE--
--GRADED RESPONSE IS GROUNDED IN THE DOCUMENT--
--GRADING RESPONSE AS RELEVANT TO QUESTION--
--GRADED RESPONSE AS RELEVANT--
False
---ROUTING---
--ROUTING TO WEBSEARCH--
--WEB SEARCHING--
I want infornation on becomiing an auditor
--GENERATING RESPONSE--
--GRADED

{'question': ['Who is an auditor?',
  'How do I become one?',
  'I want infornation on becomiing an auditor',
  'exit'],
 'finished': True,
 'documents': [[Document(metadata={'source': 'websearch'}, page_content="An auditor is a professional authorized to examine and confirm the accuracy of financial records to ensure that companies comply with laws and regulations.\nAuditors can be classified into several types, each serving distinct roles based on the nature of their work, the entities they audit, and their objectives. Internal. Internal auditors are employed by organizations to independently evaluate operations, focusing on improving internal controls, risk management, and governance processes.\nWhat is an Auditor? An auditor is an individual who examines the accuracy of recorded business transactions.Auditors are needed in order to verify that processes are functioning as planned, and that the financial statements produced by an organization fairly reflect its operational and finan

In [101]:
chat_graph.invoke({"question": "What is AML CFT?"}, config)

---ROUTING---
--ROUTING TO VECTORSTORE--
---RETRIEVING DOCUMENTS---
--GRADED DOCUMENT AS RELEVANT--
--GRADED DOCUMENT AS RELEVANT--
--GRADED DOCUMENT AS RELEVANT--
--GRADED DOCUMENT AS RELEVANT--
<built-in method upper of str object at 0x000002819EBE73F0>
--GENERATING RESPONSE--
--GRADED RESPONSE IS GROUNDED IN THE DOCUMENT--
--GRADING RESPONSE AS RELEVANT TO QUESTION--
--GRADED RESPONSE AS RELEVANT--


{'question': 'What is AML CFT?',
 'web_search': False,
 'documents': [Document(metadata={'source': 'C:\\Users\\aeniatorudabo\\Documents\\7.3 - AML CFT CPF Policy.pdf', 'coordinates': {'points': [[89.088, 191.44999999999993], [89.088, 203.44999999999993], [224.11599999999999, 203.44999999999993], [224.11599999999999, 191.44999999999993]], 'system': 'PixelSpace', 'layout_width': 595.2, 'layout_height': 841.92}, 'file_directory': 'C:\\Users\\aeniatorudabo\\Documents', 'filename': '7.3 - AML CFT CPF Policy.pdf', 'languages': ['eng'], 'last_modified': '2024-11-20T10:18:40', 'page_number': 121, 'filetype': 'application/pdf', 'category': 'Title', 'element_id': 'cdb79991f33ea7a2e4924045042e2c67'}, page_content='with respect to AML/CFT/CFT.'),
  Document(metadata={'source': 'C:\\Users\\aeniatorudabo\\Documents\\7.3 - AML CFT CPF Policy.pdf', 'coordinates': {'points': [[57.624, 197.68999999999994], [57.624, 209.68999999999994], [173.68599999999998, 209.68999999999994], [173.68599999999998, 197.6