<h1>Custom GPT based on LangGraph with Search and Summarization Tools</h1>

In [None]:
pip install langchain_experimental PyMuPDF langchain langgraph chromadb openai python-dotenv langchain-chroma

In [None]:
import os
import fitz  # PyMuPDF for PDF extraction
# import pdfplumber
import pandas as pd
from langchain_community.document_loaders import UnstructuredWordDocumentLoader, DataFrameLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.chains.summarize import load_summarize_chain
from langgraph.prebuilt import create_react_agent
from langchain_core.documents import Document
from langchain_chroma import Chroma
from langchain.tools import Tool
from dotenv import load_dotenv
import chromadb
from chromadb.config import Settings
import time
from uuid import uuid4  # Ensure unique IDs
from pydantic import BaseModel, Field
from typing import List

# Load environment variables
load_dotenv()

In [2]:
# Set Chroma persistence directory
persist_directory = "./chroma_company_name_db"

# Initialize ChromaDB client
try:
    persistent_client = chromadb.PersistentClient(path=persist_directory, settings=Settings(allow_reset=True))
    collections = persistent_client.list_collections()
    collection_names = [coll.name for coll in collections]

    if "company_name_docs" in collection_names:
        persistent_client.delete_collection("company_name_docs")
        print("🗑️ Deleted existing ChromaDB collection: company_name_docs")

    time.sleep(1)
    vector_db = persistent_client.get_or_create_collection("company_name_docs")
    print("✅ Successfully reinitialized ChromaDB with a fresh 'company_name_docs' collection.")

except Exception as e:
    print(f"❌ Error during ChromaDB initialization: {e}")

✅ Successfully reinitialized ChromaDB with a fresh 'company_name_docs' collection.


In [None]:
# Initialize vector store
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_db = Chroma(
    client=persistent_client,
    collection_name="company_name_docs",
    embedding_function=embeddings,
)

# Define recursive text splitter
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1500,
    chunk_overlap=150,
    separators=["\n\n\n", "\n\n", "\n", ". ", " ", ""]
)

# Define semantic text splitter
# splitter = SemanticChunker(
#     embeddings=OpenAIEmbeddings(),
#     breakpoint_threshold_type="percentile",  # Method to determine breakpoints
#     breakpoint_threshold_amount=85.0  # Threshold value
# )


### === Document Splitting Functions === ###

def extract_text_from_pdf(pdf_path):
    """Extracts text from a PDF while preserving page numbers."""
    doc = fitz.open(pdf_path)
    return [(page.get_text("text"), pdf_path, page.number + 1) for page in doc]


def load_text_file(txt_path):
    """Loads plain text files as single chunks for splitting."""
    with open(txt_path, "r", encoding="utf-8") as file:
        content = file.read()
    return [(content, txt_path, 1)]


def load_word_file(doc_path):
    """Loads Word documents using UnstructuredWordDocumentLoader in elements mode."""
    loader = UnstructuredWordDocumentLoader(doc_path, mode="elements")
    docs = loader.load()

    # Extract text and metadata from each element
    return [(doc.page_content, doc_path, doc.metadata.get("page_number", 1)) for doc in docs]


def load_excel_file(excel_path):
    """Loads Excel files using DataFrameLoader, treating each row as a document."""
    df = pd.read_excel(excel_path)

    # Ensure there is data
    if df.empty:
        print(f"⚠️ No data found in {excel_path}. Skipping.")
        return []

    documents = []

    print(f"\n📂 Loading Excel File: {excel_path}")
    print(f"🔢 Total Rows: {len(df)}\n")
    
    for index, row in df.iterrows():
        # Convert row to string (concatenating all columns)
        row_content = " | ".join(map(str, row.values))  # Join all values in the row
        documents.append((row_content, excel_path, index + 1))  # Using index+1 as row ID

        # Debug Print
        print(f"📝 Row {index + 1}: {row_content}")

    print(f"\n✅ Successfully processed {len(documents)} rows from {excel_path}.\n")
    
    return documents


def process_document(file_path):
    """Determines the correct processing method based on file type."""
    ext = os.path.splitext(file_path)[-1].lower()
    
    if ext == ".pdf":
        return extract_text_from_pdf(file_path)
    elif ext == ".txt":
        return load_text_file(file_path)
    elif ext in [".docx", ".doc"]:
        return load_word_file(file_path)
    elif ext in [".xlsx", ".xls"]:
        return load_excel_file(file_path)
    else:
        print(f"❌ Unsupported file type: {ext}. Skipping {file_path}.")
        return []


### === Document Processing and Embedding === ###

def process_and_embed_documents(file_paths):
    """Processes and embeds documents one by one to avoid performance issues."""
    documents = []
    
    for file_path in file_paths:
        print(f"📄 Processing: {file_path}")
        
        # Extract text based on file type
        doc_chunks = process_document(file_path)
        
        # Split text into chunks
        for text, source, page in doc_chunks:
            for chunk in splitter.split_text(text):
                documents.append(
                    Document(
                        page_content=chunk,
                        metadata={"source": source, "page": page}
                    )
                )
    
    # Assign unique IDs and add to vector DB
    if documents:
        uuids = [str(uuid4()) for _ in range(len(documents))]
        vector_db.add_documents(documents=documents, ids=uuids)

        # Debug Document Distribution
        print("\n=== START :: Document Distribution Analysis ================================================\n")
        docs = vector_db._collection.get()
        source_stats = {}
        
        # Count documents per source and page
        for meta in docs["metadatas"]:
            source = meta["source"]
            page = meta["page"]
            
            if source not in source_stats:
                source_stats[source] = {"total": 0, "pages": set()}
            
            source_stats[source]["total"] += 1
            source_stats[source]["pages"].add(page)
        
        # Print statistics
        print("\nDocument counts per source:")
        for source, stats in source_stats.items():
            print(f"\n📁 {source}")
            print(f"  Total chunks: {stats['total']}")
            print(f"  Pages/Sections: {sorted(list(stats['pages']))}")
        
        print("\n=== END :: Document Distribution Analysis ==============================================\n")
        
        print(f"✅ Successfully embedded {len(documents)} chunks.")


### === Search and Retrieval TOOLD with Citations === ###

class Citation(BaseModel):
    """Represents a citation from a retrieved document."""
    source: str = Field(..., description="The document name or source.")
    page: int = Field(..., description="The page number of the cited document.")

class CitedAnswer(BaseModel):
    """Structured response with citations."""
    answer: str = Field(..., description="The answer to the user's question.")
    citations: List[Citation] = Field(..., description="List of citations that justify the answer.")


def search_docs(query):
    """Performs vector search with citation retrieval and returns structured CitedAnswer."""
    print(f"\n🔍 ############################################## search_docs tool called!")

    docs = vector_db.similarity_search_with_score(query, k=10)
    if not docs:
        print("❌ No documents found in search")
        return CitedAnswer(answer="No relevant data found.", citations=[])

    results = []
    citations = []
    snippets = []  # NEW: Stores only the most relevant snippets

    for index, (doc, score) in enumerate(docs):
        metadata = doc.metadata
        source = metadata.get("source", "Unknown Source")
        page = metadata.get("page", "Unknown Page")

        # Extract only the part of text that directly matches the query
        best_snippet = extract_relevant_snippet(doc.page_content, query)
        snippets.append(best_snippet)

        citations.append(Citation(source=source, page=page))

    return CitedAnswer(answer="\n\n".join(snippets), citations=citations)

# NEW: Extract only the most relevant part of the retrieved text
def extract_relevant_snippet(text, query):
    """Extracts the most relevant snippet from the retrieved text that matches the query."""
    sentences = text.split(". ")
    for sentence in sentences:
        if query.lower() in sentence.lower():
            return sentence + "."
    return text[:300] + "..."  # Return first 300 chars if no clear match


search_tool = Tool(
    name="SearchDocsTool",
    func=search_docs,
    description="Must always be used to retrieve relevant documentation with citations. Searches product, API, and scripting documentation for relevant details with proper citations."
)


def summarization_tool(query):
    #     """
    #     Retrieves relevant document content and summarizes it using the specified summarization technique.
    #     Ensures all generated summaries include explicit citations.
    #     Summarization Options: "stuff", "map_reduce", "refine". You can switch between these depending on your need!
    #
    
    try:
        print(f"\n🔍 ############################################## summarization_tool tool called!")

        # Retrieve document content with citations and formatted doc input
        cited_answer = search_docs(query)

        if not cited_answer.answer.strip():
            return CitedAnswer(answer="No relevant content found for summarization.", citations=[])

        summarization_type = "map_reduce"

        if 'load_summarize_chain' not in globals():
            return CitedAnswer(answer="Summarization functionality is not available.", citations=[])

        summarization_chain = load_summarize_chain(llm, chain_type=summarization_type)

        # Pass ONLY the retrieved snippets to avoid LLM making up information
        response = summarization_chain.invoke([Document(page_content=cited_answer.answer)])

        return CitedAnswer(answer=response, citations=cited_answer.citations)

    except Exception as e:
        return CitedAnswer(answer=f"Error during summarization: {str(e)}", citations=[])


summarization_tool = Tool(
    name="SummarizationTool",
    func=summarization_tool,
    description="Provides a high-level summary of document content with structured citations."
)


### === Define LangGraph Agent === ###

llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_tool, summarization_tool]


#WORKING
# system_prompt = """
# You are a company-wide GPT assistant, available to answer:
# - General product questions
# - Detailed feature explanations
# - Technical guidance on REST APIs, GraphQL APIs, and Company_name Script Language.

# You have access to two tools:
#  - 
# SearchDocsTool
#  Retrieves relevant documentation with citations.
#  - 
# SummarizationTool
#  Provide a high-level summary with citations of the document(s) relevant to the query.

# ### **Behavior Guidelines:**
# 1. If the user asks a general question, answer directly.
# 2. For queries about API details or scripting:
#    - Retrieve relevant details using SearchDocsTool.
#    - Use the retrieved information to answer the question.
#    - Generate a structured response with citations, including document name and page number.
#    - Provide code snippets when applicable.
# 3. If SearchDocsTool does not return relevant data, then **DO NOT** invent examples or generate completions which are not a result from 
# SearchDocsTool
#  tool. Only politely inform the user that you cannot provide an answer for the question.
# 4. Ensure responses are precise and well-structured.
# """

system_prompt = """
You are a company-wide GPT assistant, available to answer:
- General Company_name product questions
- Detailed feature explanations
- Technical guidance on REST APIs, GraphQL APIs, and Company_name Script Language.

You have access to two tools:
- SearchDocsTool: Retrieves relevant documentation with citations.
- SummarizationTool: Provides a high-level summary with citations of the document(s) relevant to the query.

### **Behavior Guidelines:**
1. IMPORTANT: Your answers must strictly come from retrieved documents! Do NOT generated examples which are not retrieved from the provided documents!
2. For specific product features or technical details:
   - First use SearchDocsTool to retrieve relevant information
   - Use the retrieved information to answer the question
   - Include proper citations (document name and page number)
   - Provide code snippets when applicable
3. Use SummarizationTool only when a broad overview of multiple documents would better serve the query.
4. IMPORTANT: If tools do not return relevant data, DO NOT fabricate answers. Instead, clearly state: "I don't have sufficient documentation to answer this question accurately."
5. Ensure all responses are precise and well-structured.
"""

# Another Working SUGGESTION
# system_prompt = """
# You are a company-wide GPT assistant, available to answer:
# - General Company_name product questions
# - Technical guidance on REST APIs, GraphQL APIs, and Company_name Script Language.

# You have access to two tools:
# - SearchDocsTool: Retrieves relevant documentation with citations.
# - SummarizationTool: Provides a high-level summary with citations.

# ### **Behavior Guidelines:**
# 1. **IMPORTANT: Your answers must strictly come from retrieved documents. DO NOT invent or modify code examples.**
# 2. For API details or technical questions:
#    - Use SearchDocsTool first to retrieve relevant data.
#    - If needed, use SummarizationTool.
#    - **Include exact document citations for all statements.**
# 3. If no relevant data is found, explicitly state:  
#    - _"I don't have sufficient documentation to answer this question accurately."_
# 4. **If providing an example, ensure it is verbatim from the retrieved document. Do not paraphrase.**
# """
     
agent = create_react_agent(
    model=llm,
    tools=tools,
    prompt=system_prompt,
    response_format=CitedAnswer,
    debug=True
)

### === Query Execution with Citations === ###

def ask_question(question):
    """Executes a query against the agent."""
    inputs = {"messages": [("user", question)]}
    for step in agent.stream(inputs, stream_mode="values"):
        message = step["messages"][-1]
        if isinstance(message, tuple):
            print(message)
        else:
            message.pretty_print()


### === Example Usage === ###

# List of files to process

# ❗❗❗ List your document(s) paths here!
file_list = [
    "example.pdf",
    "Examples.txt",
    # "sample-doc.docx",
    # "test.xlsx"
]

# Process and embed documents
process_and_embed_documents(file_list)



In [None]:
# Example queries
ask_question("How to create ...")
