In [50]:
# This implementation is using LangGraph to implement the chatting system
import sqlite3
import os
from dotenv import load_dotenv
from typing import TypedDict, List, Annotated, Dict, Any
from langgraph.graph import StateGraph, END
import uuid
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_groq import ChatGroq
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import json
import time

load_dotenv()

# Database and Vector Store paths
DB_PATH = os.getenv("DB_PATH", "data/chat_history.db")
VECTOR_STORE_PATH = os.getenv("VECTOR_STORE_PATH", "data/chroma_db")
PROCESSED_FOLDER = "data/processed"
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_MODEL = os.getenv("LLM_MODEL")
HISTORY_CONTEXT = 5
RETRIEVE_DOCS = 3

In [98]:
# Database Layer
class ChatDatabase:
    def __init__(self):
        self.conn = sqlite3.connect(DB_PATH, check_same_thread=False)
        self._create_table()

    def _create_table(self):
        # print("Creating tables >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS chats (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id TEXT,
                user_message TEXT,
                response TEXT,
                reference_docs TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS sessions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id TEXT,
                name TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()

    def save_chat(self, session_id: str, user_message: str, response: str, reference_docs=None):
        """Save a chat message with references stored as JSON"""
        cursor = self.conn.cursor()
        
        # Convert list of Documents to list of dictionaries
        references = json.dumps([
            {"page_content": doc.page_content, "metadata": doc.metadata}
            for doc in (reference_docs or [])
        ])
        
        cursor.execute("""
            INSERT INTO chats (session_id, user_message, response, reference_docs)
            VALUES (?, ?, ?, ?)
        """, (session_id, user_message, response, references))
        
        self.conn.commit()
    def create_session(self, session_id: str, metadata: dict):
        """Create a new session with metadata"""
        # Add this method to save session metadata (name, created_at) to your database
        # Implementation depends on your database structure
        cursor = self.conn.cursor()
        cursor.execute("""
            INSERT INTO sessions (session_id, name, created_at)
            VALUES (?, ?, ?)
        """, (session_id, metadata['name'], metadata['created_at']))
        self.conn.commit()

    def get_all_sessions(self):
        """Retrieve all sessions with their metadata"""
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT session_id, name, created_at
            FROM sessions
        """)
        sessions = {}
        for row in cursor.fetchall():
            sessions[row[0]] = {
                'name': row[1],
                'created_at': row[2]
            }
        return sessions

    def get_chat_history(self, session_id: str, limit=HISTORY_CONTEXT):
        """Retrieve chat history for a session with reference docs as list of dicts"""
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT user_message, response, reference_docs
            FROM chats
            WHERE session_id = ?
            ORDER BY timestamp DESC LIMIT ?
        """, (session_id, limit))
        
        history = []
        for row in cursor.fetchall():
            user_message, response, reference_docs_json = row
            reference_docs = json.loads(reference_docs_json) if reference_docs_json else []
            
            history.append({
                'user_message': user_message,
                'response': response,
                'reference_docs': reference_docs
            })
        
        return history

In [99]:
# Document Retrieval Layer
class DocumentRetriever:
    def __init__(self):
        self.embeddings = HuggingFaceEmbeddings()
        self.processed_files_path = os.path.join(VECTOR_STORE_PATH, "processed_files.json")
        self.processed_files = self._load_processed_files()
        self.db = self._initialize_vectorstore()

    def _load_processed_files(self) -> Dict[str, Dict]:
        """Load the list of processed files and their metadata."""
        if os.path.exists(self.processed_files_path):
            with open(self.processed_files_path, 'r') as f:
                return json.load(f)
        return {}

    def _save_processed_files(self):
        """Save the list of processed files and their metadata."""
        os.makedirs(VECTOR_STORE_PATH, exist_ok=True)
        with open(self.processed_files_path, 'w') as f:
            json.dump(self.processed_files, f, indent=2)

    def _get_file_metadata(self, filepath: str) -> Dict:
        """Get file metadata including modification time and size."""
        stat = os.stat(filepath)
        return {
            "mtime": stat.st_mtime,
            "size": stat.st_size
        }

    def _has_file_changed(self, filepath: str) -> bool:
        """Check if a file has been modified since last processing."""
        if not os.path.exists(filepath):
            return False

        current_metadata = self._get_file_metadata(filepath)
        filename = os.path.basename(filepath)

        if filename not in self.processed_files:
            return True

        stored_metadata = self.processed_files[filename]
        return (current_metadata["mtime"] != stored_metadata["mtime"] or
                current_metadata["size"] != stored_metadata["size"])

    def _initialize_vectorstore(self):
        """Initialize or update the vector store with new or modified documents."""
        if not os.path.exists(PROCESSED_FOLDER):
            os.makedirs(PROCESSED_FOLDER)

        # Get all PDF files in the processed folder
        pdf_files = [f for f in os.listdir(PROCESSED_FOLDER) if f.endswith(".pdf")]
        
        # Check for new or modified files
        new_docs = []
        for filename in pdf_files:
            filepath = os.path.join(PROCESSED_FOLDER, filename)
            
            if self._has_file_changed(filepath):
                print(f"Processing new/modified file: {filename}")
                loader = PyPDFLoader(filepath)
                new_docs.extend(loader.load())
                
                # Update processed files record
                self.processed_files[filename] = self._get_file_metadata(filepath)
        
        # Save the updated processed files record
        self._save_processed_files()

        # If we have new documents, process them and update the vector store
        if new_docs:
            print(f"Number of new documents to process: {len(new_docs)}")
            text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=300)
            texts = text_splitter.split_documents(new_docs)
            print(f"Created {len(texts)} chunks from new documents")
            
            # If vector store exists, add to it; otherwise create new
            if os.path.exists(VECTOR_STORE_PATH):
                db = Chroma(persist_directory=VECTOR_STORE_PATH, 
                          embedding_function=self.embeddings)
                db.add_documents(texts)
                return db
            else:
                return Chroma.from_documents(texts, self.embeddings, 
                                          persist_directory=VECTOR_STORE_PATH)
        
        # If no new documents, just load existing vector store
        return Chroma(persist_directory=VECTOR_STORE_PATH, 
                     embedding_function=self.embeddings)

    def retrieve_documents(self, query: str, k=RETRIEVE_DOCS) -> List[Any]:
        """Retrieve similar documents for a given query."""
        return self.db.similarity_search(query, k)

In [100]:
# LLM Response Generation
class ResponseGenerator:
    def __init__(self):
        self.llm = ChatGroq(api_key=LLM_API_KEY, 
                          model=LLM_MODEL)

    def generate_response(self, context: str, history: str, query: str) -> str:
        messages = [
            SystemMessage(content="You are an AI assistant. Use the provided context to generate accurate answers. If the context is insufficient, state that you are unsure rather than guessing."),
            HumanMessage(content=f"Context:\n{context}\n\nConversation History:\n{history}\n\nUser Query:\n{query}\n\nResponse:")
        ]

        result = self.llm.stream(messages)
        print(f"Generated response: {result}")
        return result
    
    def custom_call(self, user_prompt, system_prompt=None):
        messages = [HumanMessage(content=user_prompt)]
        if system_prompt:
            messages.insert(0, SystemMessage(system_prompt))
        return self.llm(messages).content

In [101]:
# LangGraph State Definition
class ChatState(TypedDict):
    session_id: str
    user_message: str
    reference_docs: List[Any]
    response: str
    requires_retrieval: bool
    history: Annotated[List[Dict[str, str]], lambda x, y: x + y]

In [102]:
# LangGraph Nodes
class ChatNodes:
    def __init__(self):
        self.db = ChatDatabase()
        self.retriever = DocumentRetriever()
        self.generator = ResponseGenerator()

    def retrieve_documents(self, state: ChatState) -> Dict:
        docs = self.retriever.retrieve_documents(state["user_message"])
        return {"reference_docs": docs}

    def generate_response(self, state: ChatState) -> Dict:
        context = "\n\n".join([doc.page_content for doc in state["reference_docs"]])
        history = "\n".join([f"User: {msg['user_message']}\nBot: {msg['response']}" 
                           for msg in state.get("history", [])[-HISTORY_CONTEXT:]])
        # print("Context >>>>>>>>>>>>>>>>>>>>>>>>", context)
        # print("History >>>>>>>>>>>>>>>>>>>>>>>>", history)
        # response = self.generator.generate_response(
        #     context=context,
        #     history=history,
        #     query=state["user_message"]
        # )
        generator = self.generator.generate_response(
            context=context,
            history=history,
            query=state["user_message"]
        )
        # print("Response from generator >>>>>>>>>>>>>>>>>>>>>>>>", generator)
        return generator
        

    def save_conversation(self, state: ChatState) -> Dict:
        self.db.save_chat(
            state["session_id"],
            state["user_message"],
            state["response"],
            reference_docs=state["reference_docs"]
        )
        return {
            "history": [{
                "user_message": state["user_message"],
                "response": state["response"],
                "reference_docs": state["reference_docs"]
            }]
        }
    
    def decide_retrieval(self, state: ChatState):
        """Uses the LLM to decide whether retrieval is needed."""
        prompt = f"""
        You are an AI assistant that determines whether a user's query requires retrieving external knowledge.

        If the query is a greeting or a general conversational question (like "How are you?"), respond `False`.
        If the query asks about specific knowledge (like documentation, facts, or other stored data), respond `True`.

        Query: "{state['user_message']}"
        Answer with 'True' if retrieval is needed, otherwise 'False'. It should be either of them in all cases
        """

        response = self.generator.custom_call(prompt)  
        # print("Response from decider >>>>>>>>>>>>>>>>>>>>>>>>", response)
        state['requires_retrieval'] = "true" in response.lower().strip() 
        print("State", state)
        return state

In [103]:
# LangGraph Workflow Setup with LLM Classification
def create_workflow():
    nodes = ChatNodes()
    workflow = StateGraph(ChatState)

    workflow.add_node("decide_retrieval", nodes.decide_retrieval)  # LLM decides if retrieval is needed
    workflow.add_node("retrieve", nodes.retrieve_documents)
    workflow.add_node("generate", nodes.generate_response)
    workflow.add_node("save", nodes.save_conversation)

    workflow.set_entry_point("decide_retrieval")

    # Conditional transition based on LLM decision
    workflow.add_conditional_edges(
        "decide_retrieval",
        lambda state: "retrieve" if state['requires_retrieval'] else "generate",
        {"retrieve": "retrieve", "generate": "generate"},
    )

    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "save")
    workflow.add_edge("save", END)

    return workflow.compile()

In [104]:
class LangGraphChat:
    def __init__(self):
        self.workflow = create_workflow()
        self.db = ChatDatabase()

    def chat(self, user_message: str, session_id: str = None):
        session_id = session_id or str(uuid.uuid4())
        history = self.db.get_chat_history(session_id, limit=HISTORY_CONTEXT)
        initial_state = {
            "session_id": session_id,
            "user_message": user_message,
            "reference_docs": [],
            "response": "",
            "history": [
                {"user_message": h['user_message'], "response": h["response"], "reference_docs": h['reference_docs']}
                for h in history
            ]
        }
        return self.workflow.stream(initial_state)

        # full_response = ""
        # print("Generator ", self.workflow.stream(initial_state))
        # print("Type", type(self.workflow.stream(initial_state)))
        # This is your generator streaming updates
        # for update in self.workflow.stream(initial_state):
        #     # Check if a response chunk is available
        #     if "generate" in update:
        #         print("Update >>>>>>>>>>>>>>>>>>>>>>>>", update)
        #         full_response += update["generate"]
        #         print("Full Response >>>>>>>>>>>>>>>>>>>>>>>>", full_response)
        #         yield update["generate"]
        #     # You can also yield final payloads or metadata as needed.
        # # Optionally yield a final payload with metadata at the end.
        # yield {"final": True, "session_id": session_id, "response": full_response, "reference_docs": initial_state["reference_docs"]}


In [105]:
chatbot = LangGraphChat()
result = chatbot.chat("Hello")
# print(f"Response: {result['response']}")
# print(f"Session ID: {result['session_id']}")
# print(f"References: {result['reference_docs']}")

In [106]:
result

<generator object Pregel.stream at 0x000001BBB67FFC50>

In [107]:
# a = next(result)
# print(a)

In [108]:
# b = next(result)
# print(b)

In [109]:
for i in result:
    time.sleep(2)
    print(i)

State {'session_id': '8cb412c0-0a88-4bf8-a46d-ec51c805283a', 'user_message': 'Hello', 'reference_docs': [], 'response': '', 'history': [], 'requires_retrieval': False}
{'decide_retrieval': {'session_id': '8cb412c0-0a88-4bf8-a46d-ec51c805283a', 'user_message': 'Hello', 'reference_docs': [], 'response': '', 'history': [], 'requires_retrieval': False}}
Generated response: <generator object BaseChatModel.stream at 0x000001BB80BC6710>


InvalidUpdateError: Expected dict, got <generator object BaseChatModel.stream at 0x000001BB80BC6710>
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_GRAPH_NODE_RETURN_VALUE

In [110]:
# LLM Response Generation
class ResponseGenerator:
    def __init__(self):
        self.llm = ChatGroq(api_key=LLM_API_KEY, 
                          model=LLM_MODEL)

    def generate_response(self, context: str, history: str, query: str) -> Dict:
        """
        Generate a response and return a dictionary with a placeholder.
        The actual streaming happens outside the LangGraph workflow.
        """
        # Return a placeholder to satisfy LangGraph's requirement for a dict
        return {"response": "Response will be streamed directly in the UI"}
    
    def get_stream(self, context: str, history: str, query: str):
        """Method that returns the stream generator directly"""
        messages = [
            SystemMessage(content="You are an AI assistant. Use the provided context to generate accurate answers. If the context is insufficient, state that you are unsure rather than guessing."),
            HumanMessage(content=f"Context:\n{context}\n\nConversation History:\n{history}\n\nUser Query:\n{query}\n\nResponse:")
        ]

        return self.llm.stream(messages)
    
    def custom_call(self, user_prompt, system_prompt=None):
        messages = [HumanMessage(content=user_prompt)]
        if system_prompt:
            messages.insert(0, SystemMessage(system_prompt))
        return self.llm(messages).content

# LangGraph State Definition
class ChatState(TypedDict):
    session_id: str
    user_message: str
    reference_docs: List[Any]
    response: str
    requires_retrieval: bool
    context: str  # Added to store context for streaming outside the graph
    chat_history: str  # Added to store history for streaming outside the graph
    history: Annotated[List[Dict[str, str]], lambda x, y: x + y]

# LangGraph Nodes
class ChatNodes:
    def __init__(self):
        self.db = ChatDatabase()
        self.retriever = DocumentRetriever()
        self.generator = ResponseGenerator()

    def retrieve_documents(self, state: ChatState) -> Dict:
        docs = self.retriever.retrieve_documents(state["user_message"])
        # Also prepare the context string for later streaming
        context = "\n\n".join([doc.page_content for doc in state["reference_docs"] + docs])
        return {"reference_docs": docs, "context": context}

    def generate_response(self, state: ChatState) -> Dict:
        # If no documents were retrieved, create the context here
        if not state.get("context"):
            context = "\n\n".join([doc.page_content for doc in state["reference_docs"]])
        else:
            context = state["context"]
            
        history = "\n".join([f"User: {msg['user_message']}\nBot: {msg['response']}" 
                           for msg in state.get("history", [])[-HISTORY_CONTEXT:]])
        
        # Store context and history for later streaming
        return {
            "context": context,
            "chat_history": history,
            "response": "Response will be streamed directly in the UI"
        }
        
    def save_conversation(self, state: ChatState) -> Dict:
        # For now, the response is just a placeholder
        # The actual response will be streamed directly in the UI
        # We'll update the database after streaming is complete
        return {
            "history": [{
                "user_message": state["user_message"],
                "response": state["response"],
                "reference_docs": state["reference_docs"]
            }]
        }
    
    def decide_retrieval(self, state: ChatState):
        """Uses the LLM to decide whether retrieval is needed."""
        prompt = f"""
        You are an AI assistant that determines whether a user's query requires retrieving external knowledge.

        If the query is a greeting or a general conversational question (like "How are you?"), respond `False`.
        If the query asks about specific knowledge (like documentation, facts, or other stored data), respond `True`.

        Query: "{state['user_message']}"
        Answer with 'True' if retrieval is needed, otherwise 'False'. It should be either of them in all cases
        """

        response = self.generator.custom_call(prompt)  
        state['requires_retrieval'] = "true" in response.lower().strip() 
        print("State", state)
        return state

# LangGraph Workflow Setup with LLM Classification
def create_workflow():
    nodes = ChatNodes()
    workflow = StateGraph(ChatState)

    workflow.add_node("decide_retrieval", nodes.decide_retrieval)
    workflow.add_node("retrieve", nodes.retrieve_documents)
    workflow.add_node("generate", nodes.generate_response)
    workflow.add_node("save", nodes.save_conversation)

    workflow.set_entry_point("decide_retrieval")

    workflow.add_conditional_edges(
        "decide_retrieval",
        lambda state: "retrieve" if state['requires_retrieval'] else "generate",
        {"retrieve": "retrieve", "generate": "generate"},
    )

    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "save")
    workflow.add_edge("save", END)

    return workflow.compile()

class LangGraphChat:
    def __init__(self):
        self.workflow = create_workflow()
        self.db = ChatDatabase()
        self.generator = ResponseGenerator()

    def chat(self, user_message: str, session_id: str = None):
        """
        Process the chat through LangGraph and then return a stream generator 
        that can be used in Streamlit.
        """
        session_id = session_id or str(uuid.uuid4())
        history = self.db.get_chat_history(session_id, limit=HISTORY_CONTEXT)
        initial_state = {
            "session_id": session_id,
            "user_message": user_message,
            "reference_docs": [],
            "response": "",
            "context": "",
            "chat_history": "",
            "history": [
                {"user_message": h['user_message'], "response": h["response"], "reference_docs": h['reference_docs']}
                for h in history
            ]
        }
        
        # Run the workflow to prepare everything
        final_state = self.workflow.invoke(initial_state)
        
        # Now get the stream directly from the LLM
        stream = self.generator.get_stream(
            context=final_state["context"],
            history=final_state["chat_history"],
            query=user_message
        )
        
        # Return both the session ID and the stream for use in Streamlit
        return session_id, stream
    
    def save_streamed_response(self, session_id, user_message, response, reference_docs):
        """
        Save the final streamed response to the database
        """
        self.db.save_chat(
            session_id,
            user_message,
            response,
            reference_docs=reference_docs
        )

In [113]:
# For testing
if __name__ == "__main__":
    chatbot = LangGraphChat()
    
    session_id, stream = chatbot.chat("Hello, can you help me understand quantum computing?")
    
    print(f"Session ID: {session_id}")
    print("Streaming response:")
    
    full_response = ""
    for chunk in stream:
        if hasattr(chunk, 'content'):
            content = chunk.content
            time.sleep(2)
        else:
            content = str(chunk)
            
        if content:
            full_response += content
            print(content, end='', flush=True)
    
    print("\n\nFull response:", full_response)
    
    # Now save the full response
    chatbot.save_streamed_response(
        session_id,
        "Hello, can you help me understand quantum computing?",
        full_response,
        []  # Example reference docs
    )

State {'session_id': 'fab50fd9-947e-45a6-9bf5-688fcd9c789f', 'user_message': 'Hello, can you help me understand quantum computing?', 'reference_docs': [], 'response': '', 'context': '', 'chat_history': '', 'history': [], 'requires_retrieval': True}
Session ID: fab50fd9-947e-45a6-9bf5-688fcd9c789f
Streaming response:
I'm unsure about the provided context relating to quantum computing. The context appears to be an Explanatory Guide on Federal Decree-Law No. 47 of 2022 on the Taxation of Corporations and Businesses, specifically discussing articles related to corporate tax laws, tax losses, and assessment of corporate tax. 

If you'd like to ask a question about quantum computing or discuss a different topic, I'll be happy to assist you.

Full response: I'm unsure about the provided context relating to quantum computing. The context appears to be an Explanatory Guide on Federal Decree-Law No. 47 of 2022 on the Taxation of Corporations and Businesses, specifically discussing articles relat

In [None]:
# a3949cea-bfaf-4150-9acb-ffc2b1c522d0
response = chatbot.chat("Can you elaborate on point 3?", session_id="a3949cea-bfaf-4150-9acb-ffc2b1c522d0")

In [24]:
response

<generator object Pregel.stream at 0x00000212021B0610>

In [25]:
for i in response:
    print(i)

Response from decider >>>>>>>>>>>>>>>>>>>>>>>> False
State {'session_id': 'a3949cea-bfaf-4150-9acb-ffc2b1c522d0', 'user_message': 'Can you elaborate on point 3?', 'reference_docs': [], 'response': '', 'history': [], 'requires_retrieval': False}
{'decide_retrieval': {'session_id': 'a3949cea-bfaf-4150-9acb-ffc2b1c522d0', 'user_message': 'Can you elaborate on point 3?', 'reference_docs': [], 'response': '', 'history': [], 'requires_retrieval': False}}
Context >>>>>>>>>>>>>>>>>>>>>>>> 
History >>>>>>>>>>>>>>>>>>>>>>>> 
{'generate': {'response': "I'm happy to help, but there is no conversation history or user query provided for me to base my response on. Please provide the necessary context or information so I can assist you accurately."}}
{'save': {'history': [{'user_message': 'Can you elaborate on point 3?', 'response': "I'm happy to help, but there is no conversation history or user query provided for me to base my response on. Please provide the necessary context or information so I can

AttributeError: 'generator' object has no attribute 'keys'

In [51]:
history = chatbot.db.get_chat_history("a3949cea-bfaf-4150-9acb-ffc2b1c522d0", limit=5)

In [52]:
history

[('What is the refund policy?',
  'Based on the provided context, the refund policy involves the following steps:\n\n1. The Bureau Expo 2020 Dubai makes a request to the Authority to refund the amount, provided the refund claim is correct.\n2. The Authority and Bureau Expo 2020 Dubai agree on procedural, evidential, and verification requirements to be met by the Office of the Official Participant or any other Person to be eligible for the refund claim (Article 4: Requirements for Refund).\n3. Where the refund claim is correct, the Bureau Expo 2020 Dubai makes a request to the Authority to refund the amount, and the Authority prepares a Certificate on Entitlement (Article 5: Certificate on Entitlement).\n\nTherefore, the refund policy is that the Bureau Expo 2020 Dubai requests the Authority for a refund if the claim is correct, after meeting the agreed-upon requirements and obtaining the Certificate on Entitlement from the Authority.'),
 ('Can you elaborate on point 3?',
  "Based on th