<h1>Custom GPT using OpenAI Assistant API</h1>

In [None]:
import os
import time
import json
import fitz  # PyMuPDF for PDF extraction
from dotenv import load_dotenv
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_chroma import Chroma
from openai import OpenAI
import chromadb
from chromadb.config import Settings
from uuid import uuid4

# Load environment variables
load_dotenv()

In [2]:
# Environment Setup and Document Embedding
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
    raise ValueError("Missing OPENAI_API_KEY environment variable.")

# Initialize OpenAI client
client = OpenAI(api_key=openai_api_key)

# Path to store the Chroma database
persist_directory = "./chroma_assistant_db"

# Try to create or reset the persistent Chroma client
try:
    persistent_client = chromadb.PersistentClient(
        path=persist_directory, 
        settings=Settings(allow_reset=True)
    )
    # Delete existing "assistant_docs" collection if present
    collections = persistent_client.list_collections()
    collection_names = [coll.name for coll in collections]
    if "assistant_docs" in collection_names:
        persistent_client.delete_collection("assistant_docs")
        print("üóëÔ∏è Deleted existing ChromaDB collection: assistant_docs")
    time.sleep(1)

    # Create a fresh collection
    vector_db = persistent_client.get_or_create_collection("assistant_docs")
    print("‚úÖ Successfully reinitialized ChromaDB with a fresh 'assistant_docs' collection.")
except Exception as e:
    print(f"‚ùå Error during ChromaDB initialization: {e}")
    raise

üóëÔ∏è Deleted existing ChromaDB collection: assistant_docs
‚úÖ Successfully reinitialized ChromaDB with a fresh 'assistant_docs' collection.


In [3]:
# Load and Embed Documentation
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

# Initialize Chroma PersistentClient
persistent_client = chromadb.PersistentClient(path=persist_directory, settings=Settings(allow_reset=True))

# Create or Get Collection
collection = persistent_client.get_or_create_collection("assistant_docs")

# Initialize Chroma Vector Store
vector_db = Chroma(
    client=persistent_client,
    collection_name="assistant_docs",
    embedding_function=embeddings,
)


# File paths
pdf_path = "mm_files/MARMIND_GraphQL_Doc.pdf"
txt_path = "mm_files/MARMIND_GraphQL_Examples.txt"

# Extract text from PDFs
def extract_text_from_pdf(pdf_path):
    doc = fitz.open(pdf_path)
    return [(page.get_text("text"), pdf_path, page.number + 1) for page in doc]

pdf_text = extract_text_from_pdf(pdf_path)

# Generic txt chunking
def load_text_file(txt_path):
    """
    Generic text file loader that attempts to preserve logical content structure.
    """
    with open(txt_path, "r", encoding="utf-8") as file:
        content = file.read()
    
    # Return content as a single chunk with metadata
    # Let the RecursiveCharacterTextSplitter handle the chunking
    # based on its configured chunk_size and separators
    return [(content, txt_path, 1)]
    

txt_text = load_text_file(txt_path)

# Combine extracted text
doc_chunks = pdf_text + txt_text

# Split into Chunks
documents = []

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1500,
    chunk_overlap=150,
    separators=[
        "\n\n\n",  # Multiple blank lines (major sections)
        "\n\n",    # Paragraph breaks
        "\n",      # Line breaks
        ". ",      # Sentences
        " ",       # Words
        ""         # Characters
    ]
)

for text, source, page in doc_chunks:
    for chunk in splitter.split_text(text):
        documents.append(
            Document(
                page_content=chunk,
                metadata={"source": source, "page": page if isinstance(page, int) else -1},
            )
        )

# Assign Unique IDs for Chroma
uuids = [str(uuid4()) for _ in range(len(documents))]

# Add documents properly
vector_db.add_documents(documents=documents, ids=uuids)


#  Define the Actual Retrieval Function (search_docs)
def search_docs(query: str):
    """
    Search the "assistant_docs" Chroma collection using the query.
    Return a single string that includes the retrieved text plus citations.
    """
    # We re-instantiate the same Chroma store
    vector_store = Chroma(
        client=persistent_client,
        collection_name="assistant_docs",
        embedding_function=embeddings
    )
    # Perform similarity search
    docs = vector_store.similarity_search_with_score(query, k=15)

    # Combine text into a single string
    combined_text = []
    citations = []
    for doc, score in docs:
        source = doc.metadata.get("source", "Unknown Source")
        page = doc.metadata.get("page", "Unknown Page")
        combined_text.append(doc.page_content)
        citations.append(f"Source: {source}, Page: {page}")

    # Return formatted output
    all_text = "\n\n".join(combined_text)
    all_citations = "\n".join(citations)

    return f"**Retrieved Text:**\n{all_text}\n\n**Citations:**\n{all_citations}"


# Create an Assistant with a "search_docs" Function
# !!! Replace Jenys with your company name. Name your documents e.g. Company_name_REST_API.pdf
assistant = client.beta.assistants.create(
    name="Jenys Assistant (Function Calling)",
    instructions="""
        You are an AI assistant for Jenys, helping answerd gneral product functionality questions, more specific technical questions related to APIs and scripts and produce code with the help of your knowledge base documents.
        When you need additional context or references, call the 'search_docs' function
        with a 'query' argument relevant to the user's request.

        **Always include citations in the final response.** 
        When responding, present the retrieved text first, followed by the citations explicitly labeled as "Sources."
    """,
    model="gpt-4o",
    tools=[
        {
            "type": "function",
            "function": {
                "name": "search_docs",
                "description": "Search Jenys docs for relevant info and return relevant text (with citations).",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The user's question or search query about Jenys docs."
                        }
                    },
                    "required": ["query"]
                }
            }
        }
    ]
)


def execute_query(user_question):

    # Option 1 :: Create a Thread, Add a User Message, and Initiate a Run
    # Use the manual step-by-step approach (second) if you need more control over the thread before running it.
    # thread = client.beta.threads.create()
    
    # client.beta.threads.messages.create(
    #     thread_id=thread.id,
    #     role="user",
    #     content=user_question
    # )
    
    # run = client.beta.threads.runs.create(
    #     thread_id=thread.id,
    #     assistant_id=assistant.id,
    #     tool_choice="auto"  # Ensures Assistant can execute tools automatically
    # )
    # Create Thread and Run

    # Use create_and_run() if you want simplicity and fewer API calls!
    run = client.beta.threads.create_and_run(
        assistant_id=assistant.id,
        thread={
            "messages": [
                {"role": "user", "content": user_question}
            ]
        }
    )
    
    # Extract thread_id from run
    thread_id = run.thread_id  # Fixes the NameError issue
    
    # Polling Loop to Process the Completion Correctly
    while run.status in ["queued", "in_progress", "requires_action"]:
        if run.status == "requires_action":
            # Handle required actions
            tool_calls = run.required_action.submit_tool_outputs.tool_calls
            tool_outputs = []
            
            for tool_call in tool_calls:
                # Get the function arguments
                function_args = json.loads(tool_call.function.arguments)
                
                # Execute the search_docs function
                if tool_call.function.name == "search_docs":
                    output = search_docs(function_args["query"])
                    tool_outputs.append({
                        "tool_call_id": tool_call.id,
                        "output": output
                    })
            
            # Submit all tool outputs back to the assistant
            run = client.beta.threads.runs.submit_tool_outputs(
                thread_id=thread_id, # For Opt 1, use thread_id=thread.id
                run_id=run.id,
                tool_outputs=tool_outputs
            )
        else:
            time.sleep(2)  # Sleep before checking again
            run = client.beta.threads.runs.retrieve(
                thread_id=thread_id, # For Opt 1, use thread_id=thread.id
                run_id=run.id
            )
            
    
    # Process the Assistant's Completion
    # 1. Checks if the assistant used a tool call 
    # 2. Handles the tool call execution. If search_docs is invoked, it calls search_docs(args["query"])
    # 3. Retriggers a new run (client.beta.threads.runs.create(...)) to complete the response.
    # 4. Handles multiple iterations of tool calls!
    if run.status == "completed":
        all_messages = client.beta.threads.messages.list(thread_id=thread_id) # For Opt 1, use thread_id=thread.id
        assistant_messages = [m for m in all_messages if m.role == "assistant"]
    
        if assistant_messages:
            last_message = assistant_messages[-1]  # Get the last assistant response
            
            # Check if the Assistant called a function
            if hasattr(last_message, "tool_calls") and last_message.tool_calls:
                tool_call = last_message.tool_calls[0]
                args = json.loads(tool_call.function.arguments)
                tool_name = tool_call.function.name
    
                # Process the function call automatically
                if tool_name == "search_docs":
                    result = search_docs(args["query"])
    
                    # Send the result back to the assistant
                    client.beta.threads.messages.create(
                        thread_id=thread_id, # For Opt 1, use thread_id=thread.id
                        role="tool",
                        tool_call_id=tool_call.id,
                        name=tool_name,
                        content=result
                    )
    
                    # Retrieve final completion after tool execution
                    run = client.beta.threads.runs.create(
                        thread_id=thread_id, # For Opt 1, use thread_id=thread.id
                        assistant_id=assistant.id,
                        tool_choice="auto"  # ‚úÖ Maintain consistency with initial run
                    )
    
                    # Wait for final response
                    while run.status in ["queued", "in_progress", "requires_action"]:
                        if run.status == "requires_action":
                            # Handle required actions (same as above)
                            tool_calls = run.required_action.submit_tool_outputs.tool_calls
                            tool_outputs = []
                            
                            for tool_call in tool_calls:
                                function_args = json.loads(tool_call.function.arguments)
                                if tool_call.function.name == "search_docs":
                                    output = search_docs(function_args["query"])
                                    tool_outputs.append({
                                        "tool_call_id": tool_call.id,
                                        "output": output
                                    })
                            
                            run = client.beta.threads.runs.submit_tool_outputs(
                                thread_id=thread_id, # For Opt 1, use thread_id=thread.id
                                run_id=run.id,
                                tool_outputs=tool_outputs
                            )
                        else:
                            time.sleep(2)
                            run = client.beta.threads.runs.retrieve(
                                thread_id=thread_id, # For Opt 1, use thread_id=thread.id
                                run_id=run.id
                            )
    
                    # Get final assistant message
                    all_messages = client.beta.threads.messages.list(thread_id=thread.id)
                    assistant_messages = [m for m in all_messages if m.role == "assistant"]
    
                    if assistant_messages:
                        final_answer = assistant_messages[-1].content
                        print("\n===== Jenys Assistant's Final Reply =====\n")
                        print(final_answer)
                    else:
                        print("No assistant message found in the thread.")
            else:
                # If no tool calls, just print the normal response
                content_blocks = last_message.content
                if isinstance(content_blocks, list) and content_blocks and hasattr(content_blocks[0], "type"):
                    if content_blocks[0].type == "text" and hasattr(content_blocks[0].text, "value"):
                        print("\n===== Jenys Assistant's Final Reply =====\n")
                        print(content_blocks[0].text.value)
                    else:
                        print("No text content found in the assistant's response.")
                else:
                    print("Unexpected content structure from Assistant API.")
    
    else:
        print(f"Run ended with status: {run.status}")


In [None]:
execute_query("Give me exactly 20 GraphQL query examples for MARMIND with the document page they are coming from.")