In [20]:
import os

os.chdir("..")

In [21]:
%pwd

'd:\\langgraph_fastapi_streamlit'

In [22]:
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv()) # important line if cannot load api key

import os
# os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY')
# os.environ['LANGCHAIN_API_KEY'] = os.getenv('LANGCHAIN_API_KEY')

# WebSearch tools
# os.environ['SERPAPI_API_KEY'] = os.getenv('SERPAPI_API_KEY')
# os.environ['GOOGLE_API_KEY'] = os.getenv('GOOGLE_API_KEY')
# os.environ['GOOGLE_CSE_ID'] = os.getenv('GOOGLE_CSE_ID')
# os.environ['TAVILY_API_KEY'] = os.getenv('TAVILY_API_KEY')

# Langsmith Tracing
# os.environ['LANGCHAIN_TRACING_V2'] = os.getenv('LANGCHAIN_TRACING_V2')
# os.environ['LANGCHAIN_ENDPOINT'] = os.getenv('LANGCHAIN_ENDPOINT')
# os.environ['LANGCHAIN_PROJECT'] = os.getenv('LANGCHAIN_PROJECT')

In [23]:
from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings
embedding_model = OllamaEmbeddings(base_url="http://localhost:11434", model="bge-m3:latest")
store = Chroma(collection_name="collection_01", persist_directory="../db/chroma_db_02", embedding_function=embedding_model)
store

<langchain_chroma.vectorstores.Chroma at 0x2172caf3950>

In [24]:
retriever = store.as_retriever()
retriever

VectorStoreRetriever(tags=['Chroma', 'OllamaEmbeddings'], vectorstore=<langchain_chroma.vectorstores.Chroma object at 0x000002172CAF3950>, search_kwargs={})

In [25]:
# test it!
query = "let me know brief explanation about SOLAS?"
docs = retriever.invoke(query)
docs

[Document(id='28d09004-1c9a-4ead-b09a-4f95c6be933f', metadata={'File Name': 'Focus on IMO - SOLAS, the International Convention for the Safety, of Life at Sea, 1974 (October 1998)', 'File Path': '/content/drive/MyDrive/Rules/SOLAS/Focus on IMO - SOLAS, the International Convention for the Safety, of Life at Sea, 1974 (October 1998).pdf', 'First Division': 'Rules', 'Page': 14, 'Second Division': 'SOLAS'}, page_content='This page explains Focus on IMO - SOLAS, the International Convention for the Safety, of Life at Sea, 1974 (October 1998), that belongs to catogories of Rules and SOLAS./nfor safe practices in ship operation and a safe working environment;  • to establish safeguards against all identified risks;  • to continuously improve safety management skills of personnel, including preparing for emergencies.  The Code requires a safety management system (SMS) to be established by "the Company", which is defined as the shipowner or any person, such as the manager or bareboat charterer

In [26]:
def similarity_search(query:str, db_path:str):
    embed_model = OllamaEmbeddings(base_url="http://localhost:11434", model="bge-m3:latest")
    vector_store = Chroma(collection_name="collection_01", persist_directory=db_path, embedding_function=embed_model)
    results = vector_store.similarity_search_with_relevance_scores(query, k=3)
    return results

In [27]:
result = similarity_search(query=query, db_path="../db/chroma_db_02")
result

[(Document(id='28d09004-1c9a-4ead-b09a-4f95c6be933f', metadata={'File Name': 'Focus on IMO - SOLAS, the International Convention for the Safety, of Life at Sea, 1974 (October 1998)', 'File Path': '/content/drive/MyDrive/Rules/SOLAS/Focus on IMO - SOLAS, the International Convention for the Safety, of Life at Sea, 1974 (October 1998).pdf', 'First Division': 'Rules', 'Page': 14, 'Second Division': 'SOLAS'}, page_content='This page explains Focus on IMO - SOLAS, the International Convention for the Safety, of Life at Sea, 1974 (October 1998), that belongs to catogories of Rules and SOLAS./nfor safe practices in ship operation and a safe working environment;  • to establish safeguards against all identified risks;  • to continuously improve safety management skills of personnel, including preparing for emergencies.  The Code requires a safety management system (SMS) to be established by "the Company", which is defined as the shipowner or any person, such as the manager or bareboat chartere

In [28]:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_groq import ChatGroq
from langchain_ollama import OllamaEmbeddings
from langgraph.graph import END, StateGraph
from fastapi import FastAPI
from langserve import add_routes

In [29]:
## LLM model
# llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm = ChatGroq(temperature=0, model_name= "llama-3.2-3b-preview")


In [30]:
# generate_chain.py
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

def create_generate_chain(llm):
    """
    Creates a generate chain for answering code-related questions.

    Args:
        llm (LLM): The language model to use for generating responses.

    Returns:
        A callable function that takes a context and a question as input and returns a string response.
    """

    generate_template = """
    You are a smart AI assistant. 
    Use the following pieces of retrieved context to answer the question. 
    Generate detailed answer including specified numbers, fomulas in the point of technical specifications. 
    If you don't know the answer, just say that you don't know. Do NOT try to make up an answer.
    If the question is not related to the context, politely respond that you only answer questions related to the context.

    <context>
    {context}
    </context>

    <question>
    {question}
    </question>
    """

    generate_prompt = PromptTemplate(template=generate_template, input_variables=["context", "question"])

    # Create the generate chain
    generate_chain = generate_prompt | llm | StrOutputParser()

    return generate_chain

In [31]:
# Create the generate chain
generate_chain = create_generate_chain(llm)
generate_chain

PromptTemplate(input_variables=['context', 'question'], input_types={}, partial_variables={}, template="\n    You are a smart AI assistant. \n    Use the following pieces of retrieved context to answer the question. \n    Generate detailed answer including specified numbers, fomulas in the point of technical specifications. \n    If you don't know the answer, just say that you don't know. Do NOT try to make up an answer.\n    If the question is not related to the context, politely respond that you only answer questions related to the context.\n\n    <context>\n    {context}\n    </context>\n\n    <question>\n    {question}\n    </question>\n    ")
| ChatGroq(client=<groq.resources.chat.completions.Completions object at 0x000002172E48E180>, async_client=<groq.resources.chat.completions.AsyncCompletions object at 0x000002172E48EA20>, model_name='llama-3.2-3b-preview', temperature=1e-08, model_kwargs={}, groq_api_key=SecretStr('**********'))
| StrOutputParser()

In [32]:
generation = generate_chain.invoke({"context": docs, "question": query})
print(generation)

Based on the provided context, SOLAS (International Convention for the Safety of Life at Sea, 1974) is a maritime convention that aims to ensure the safety of life at sea. The convention requires ships to be designed, constructed, and equipped to prevent accidents and minimize the risk of loss of life.

According to the context, SOLAS has undergone several amendments, including the addition of new regulations 23-2 and 42-1 to Chapter II-1 of the SOLAS Convention. Regulation 23-2 deals with the integrity of the hull and superstructure, damage prevention, and control, and requires indicators to be provided on the navigating bridge for all doors that, if left open, could lead to major flooding.

SOLAS also requires the establishment of a safety management system (SMS) by shipowners or operators, which should be designed to ensure compliance with all mandatory regulations and take into account codes, guidelines, and standards recommended by IMO and others.

In summary, SOLAS is a maritime 

In [33]:
from langchain.prompts import PromptTemplate
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain import hub
from pydantic import BaseModel, Field

class GraderUtils:
    def __init__(self, model):
        self.model = model

    def create_retrieval_grader(self):
        """
        Creates a retrieval grader that assesses the relevance of a retrieved document to a user question.

        Returns:
            A callable function that takes a document and a question as input and returns a JSON object with a binary score indicating whether the document is relevant to the question.
        """
        grade_prompt = PromptTemplate(
            template="""
            <|begin_of_text|><|start_header_id|>system<|end_header_id|>
            You are a grader assessing relevance of a retrieved document to a user question. \n 
            If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
            It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
            Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.
            Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
            <|eot_id|>
            <|start_header_id|>user<|end_header_id|>

            Here is the retrieved document: \n\n {document} \n\n
            Here is the user question: {question} \n
            <|eot_id|>
            <|start_header_id|>assistant<|end_header_id|>
            """,
            input_variables=["document", "question"],
        )

        # Create the retriever chain
        retriever_grader = grade_prompt | self.model | JsonOutputParser()

        return retriever_grader
    
    
    class GradeHallucinations(BaseModel):
        """Binary score for hallucination present in generation answer."""

        binary_score: str = Field(
            description="Answer is grounded in the facts, 'yes' or 'no'"
        )

    def create_hallucination_grader(self):
        """
        Creates a hallucination grader that assesses whether an answer is grounded in/supported by a set of facts.

        Returns:
            A callable function that takes a generation (answer) and a list of documents (facts) as input and returns a JSON object with a binary score indicating whether the answer is grounded in/supported by the facts.
        """
        # hallucination_prompt = PromptTemplate(
        #     template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
        #     You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
        #     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.
        #     Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
        #     <|eot_id|>
        #     <|start_header_id|>user<|end_header_id|>
        #     Here are the facts:
        #     \n ------- \n
        #     {documents}
        #     \n ------- \n
        #     Here is the answer: {generation}
        #     <|eot_id|>
        #     <|start_header_id|>assistant<|end_header_id|>""",
        #     input_variables=["generation", "documents"],
        # )

        system = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
                 Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.
                 Provide the binary score as a JSON with a single key 'score' and no preamble or explanation."""
        hallucination_prompt = ChatPromptTemplate.from_messages(
            [
                ("system", system),
                ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
            ]
        )

        hallucination_grader = hallucination_prompt | self.model | JsonOutputParser()

        return hallucination_grader

    def answer_grader(self):
        """
        Creates a code evaluator that assesses whether the generated code is correct and relevant to the given question.

        Returns:
            A callable function that takes a generation (code), a question, and a list of documents as input and returns a JSON object with a binary score and feedback.
        """
        eval_template = PromptTemplate(
            template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer addresses / resolves a question \n 
            Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question.
            Provide a JSON response with the following keys:

            'score': A binary score 'yes' or 'no' indicating whether the code is correct and relevant.
            'feedback': A brief explanation of your evaluation, including any issues or improvements needed.

            <|eot_id|><|start_header_id|>user<|end_header_id|>
            Here is the generated code:
            \n ------- \n
            {generation}
            \n ------- \n
            Here is the question: {question}
            \n ------- \n
            Here are the relevant documents: {documents}
            <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
            input_variables=["generation", "question", "documents"],
        )

        code_evaluator = eval_template | self.model | JsonOutputParser()

        return code_evaluator

    def create_question_rewriter(self):
        """
        Creates a question rewriter chain that rewrites a given question to improve its clarity and relevance.

        Returns:
            A callable function that takes a question as input and returns the rewritten question as a string.
        """
        system = """You a question re-writer that converts an input question to a better version that is optimized \n 
        for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning."""
        re_write_prompt = ChatPromptTemplate.from_messages(
            [
                ("system", system),
                (
                    "human",
                    "Here is the initial question: \n\n {question} \n Formulate an improved question.",
                ),
            ]
        )
        question_rewriter = re_write_prompt | self.model | StrOutputParser()

        return question_rewriter

In [34]:
# Create an instance of the GraderUtils class
grader = GraderUtils(llm)

# Get the retrieval grader
retrieval_grader = grader.create_retrieval_grader()

# test it!
doc_txt = docs[1].page_content

# check with the grader
print(retrieval_grader.invoke({"question": query,"document": doc_txt}))


{'score': 'yes'}


In [35]:
# Get the hallucination grader
hallucination_grader = grader.create_hallucination_grader()
# test
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

In [36]:
# Get the answer_grader
answer_grader = grader.answer_grader()
# test
answer_grader.invoke({"generation": generation, "question": query, "documents": docs})

{'score': 'yes',
 'feedback': 'The answer provides a brief explanation of SOLAS, including its purpose, key regulations, and the establishment of a safety management system. The explanation is accurate and relevant, citing specific documents and pages from the provided PDFs. However, the answer could be improved by providing more context and examples to illustrate the importance and application of SOLAS in real-world scenarios.'}

In [37]:
# Get the question rewriter
question_rewriter = grader.create_question_rewriter()
question_rewriter.invoke({"question": query})

'Improved question: \nWhat is the purpose and key components of the International Convention for the Safety of Life at Sea (SOLAS), and how does it impact maritime safety regulations?\n\nReasoning:\nThe initial question is quite broad and lacks specificity. The improved question aims to:\n\n1. Clarify the intent: The improved question asks for a brief explanation of SOLAS, which is a more specific and focused query.\n2. Provide context: The added phrase "International Convention for the Safety of Life at Sea" helps to establish the context and scope of the query.\n3. Encourage more detailed information: The phrase "key components" and "impact on maritime safety regulations" suggests that the user is looking for a more in-depth explanation, which can lead to a more informative and relevant response.\n\nThis improved question is better suited for vectorstore retrieval, as it:\n\n1. Reduces ambiguity: The improved question is more specific and clear, reducing the likelihood of misinterpre

In [38]:
## Start the Graph
from typing_extensions import TypedDict
from typing import List

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: str #List[str]

In [39]:
from langchain_core.documents import Document

class GraphNodes:
    def __init__(self, llm, retriever, retrieval_grader, hallucination_grader, code_evaluator, question_rewriter):
        self.llm = llm
        self.retriever = retriever
        self.retrieval_grader = retrieval_grader
        self.hallucination_grader = hallucination_grader
        self.code_evaluator = code_evaluator
        self.question_rewriter = question_rewriter
        self.generate_chain = create_generate_chain(llm)

    def retrieve(self, state):
        """
        Retrieve documents

        Args:
            state (dict): The current graph state

        Returns:
            state (dict): New key added to state, documents, that contains retrieved documents
        """
        print("---RETRIEVE---")
        question = state["question"]

        # Retrieval
        documents = self.retriever.invoke(question)
        return {"documents": documents, "question": question}

    def generate(self, state):
        """
        Generate answer

        Args:
            state (dict): The current graph state

        Returns:
            state (dict): New key added to state, generation, that contains LLM generation
        """
        print("---GENERATE---")
        question = state["question"]
        documents = state["documents"]

        # RAG generation
        generation = self.generate_chain.invoke({"context": documents, "question": question})
        return {"documents": documents, "question": question, "generation": generation}

    def grade_documents(self, state):
        """
        Determines whether the retrieved documents are relevant to the question.

        Args:
            state (dict): The current graph state

        Returns:
            state (dict): Updates documents key with only filtered relevant documents
        """
        print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
        question = state["question"]
        documents = state["documents"]

        # score each doc
        filtered_docs = []

        for d in documents:
            score = self.retrieval_grader.invoke({"question": question, "document": d.page_content})
            grade = score["score"]
            if grade == "yes":
                print("---GRADE: DOCUMENT RELEVANT---")
                filtered_docs.append(d)
            else:
                print("---GRADE: DOCUMENT IR-RELEVANT---")
                continue

        return {"documents": filtered_docs, "question": question}

    def transform_query(self, state):
        """
        Transform the query to produce a better question.

        Args:
            state (dict): The current graph state

        Returns:
            state (dict): Updates question key with a re-phrased question
        """
        print("---TRANSFORM QUERY---")
        question = state["question"]
        documents = state["documents"]

        # Re-write question
        better_question = self.question_rewriter.invoke({"question": question})
        return {"documents": documents, "question": better_question}

In [40]:
class EdgeGraph:
    def __init__(self, hallucination_grader, code_evaluator):
        self.hallucination_grader = hallucination_grader
        self.code_evaluator = code_evaluator

    def decide_to_generate(self, state):
        """
        Determines whether to generate an answer, or re-generate a question.

        Args:
            state (dict): The current graph state

        Returns:
            str: Binary decision for next node to call
        """
        print("---ASSESS GRADED DOCUMENTS---")
        question = state["question"]
        filtered_documents = state["documents"]

        if not filtered_documents:
            # All documents have been filtered check_relevance
            # We will re-generate a new query
            print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---")
            return "transform_query"  # "retrieve_from_community_page", "transform_query"
        else:
            # We have relevant documents, so generate answer
            print("---DECISION: GENERATE---")
            return "generate"

    def grade_generation_v_documents_and_question(self, state):
        """
        Determines whether the generation is grounded in the document and answers question.

        Args:
            state (dict): The current graph state

        Returns:
            str: Decision for next node to call
        """
        print("---CHECK HALLUCINATIONS---")
        question = state["question"]
        documents = state["documents"]
        generation = state["generation"]

        score = self.hallucination_grader.invoke({"documents": documents, "generation": generation})
        grade = score["score"]

        # Check hallucination
        if grade == "yes":
            print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
            # Check question-answering
            print("---GRADE GENERATION vs QUESTION---")
            score = self.code_evaluator.invoke({"question": question, "generation": generation, "documents": documents})
            grade = score["score"]
            if grade == "yes":
                print("---DECISION: GENERATION ADDRESSES QUESTION---")
                return "useful"
            else:
                print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
                return "not useful"
        else:
            print("---DECISION: GENERATIONS ARE HALLUCINATED, RE-TRY---")
            return "not supported"

In [41]:
# Initiating the Graph
workflow = StateGraph(GraphState)

# Create an instance of the GraphNodes class
graph_nodes = GraphNodes(llm, retriever, retrieval_grader, hallucination_grader, answer_grader, question_rewriter)

# Create an instance of the EdgeGraph class
edge_graph = EdgeGraph(hallucination_grader, answer_grader)

# Define the nodes
workflow.add_node("retrieve", graph_nodes.retrieve) # retrieve documents
workflow.add_node("grade_documents", graph_nodes.grade_documents)  # grade documents
workflow.add_node("generate", graph_nodes.generate) # generate answers
workflow.add_node("transform_query", graph_nodes.transform_query)  # transform_query

# Build graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    edge_graph.decide_to_generate,
    {
        "transform_query": "transform_query", # "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    edge_graph.grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query", # "transform_query"
    },
)

# Compile
network = workflow.compile()


In [42]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [43]:
inputs = {"question": "let me know brief explanation about SOLAS?"}

response = network.invoke(inputs)

---RETRIEVE---
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---


In [52]:
print(response['generation'])

Based on the provided context, SOLAS (International Convention for the Safety of Life at Sea, 1974) is a maritime convention that aims to ensure the safety of life at sea. The convention requires ships to be designed, constructed, and equipped to prevent accidents and minimize the risk of loss of life.

According to the context, SOLAS has undergone several amendments, including the addition of new regulations 23-2 and 42-1 to Chapter II-1 of the SOLAS Convention. Regulation 23-2 deals with the integrity of the hull and superstructure, damage prevention, and control, and requires indicators to be provided on the navigating bridge for all doors that, if left open, could lead to major flooding.

SOLAS also requires the establishment of a safety management system (SMS) by shipowners or operators, which should be designed to ensure compliance with all mandatory regulations and take into account codes, guidelines, and standards recommended by IMO and others.

In summary, SOLAS is a maritime 