In [1]:
from openai import OpenAI

openai_client = OpenAI()

In [2]:
import requests
from dataclasses import dataclass
from urllib.parse import quote_plus

@dataclass
class WikipediaPageDocument:
    title: str
    url: str
    raw_content: str

    def parse(self) -> dict:
        return {
            "content": self.raw_content,   # <-- key name gitsource expects
            "metadata": {
                "source": "wikipedia",
                "title": self.title,
                "url": self.url,
            },
        }

class WikipediaPageDataReader:
    def __init__(self, user_agent: str = "my-agent/1.0 (contact: you@example.com)"):
        self.user_agent = user_agent

    def read(self, page_titles: list[str]) -> list[WikipediaPageDocument]:
        docs = []
        for title in page_titles:
            encoded = quote_plus(title)
            url = f"https://en.wikipedia.org/w/index.php?title={encoded}&action=raw"
            r = requests.get(url, timeout=10, headers={"User-Agent": self.user_agent})
            r.raise_for_status()
            docs.append(WikipediaPageDocument(title=title, url=url, raw_content=r.text))
        return docs

In [3]:
import json
import time
from gitsource import chunk_documents
from minsearch import AppendableIndex


reader = WikipediaPageDataReader()
pages = reader.read(["Capybara", "Lesser capybara"])


parsed_docs = [doc.parse() for doc in pages]
chunked_docs = chunk_documents(parsed_docs, size=3000, step=1500)
chunked_documents = chunked_docs

index = AppendableIndex(
    text_fields=["title", "description", "content"],
    keyword_fields=["filename"]
)


index.fit(chunked_docs)

<minsearch.append.AppendableIndex at 0x74a2555870b0>

In [4]:
def count_characters(text: str, exclude_spaces: bool = False) -> dict:
    including_spaces = len(text)
    excluding_spaces = len(text.replace(" ", ""))
    result = {
        "characters_including_spaces": including_spaces,
        "characters_excluding_spaces": excluding_spaces,
    }
    result["characters"] = excluding_spaces if exclude_spaces else including_spaces
    return result


# Test
if __name__ == "__main__":
    print(count_characters("capybara"))
    print(count_characters("lesser capybara"))

count_tool = {
    "type": "function",
    "name": "count_characters",
    "description": "Count characters in a given text. Use this whenever the user asks for character counts.",
    "parameters": {
        "type": "object",
        "properties": {
            "text": {"type": "string", "description": "Text to count characters for."},
            "exclude_spaces": {"type": "boolean", "description": "Exclude spaces if true."}
        },
        "required": ["text"]
    }
}

{'characters_including_spaces': 8, 'characters_excluding_spaces': 8, 'characters': 8}
{'characters_including_spaces': 15, 'characters_excluding_spaces': 14, 'characters': 15}


In [5]:
def fetch_url_content(url: str, max_chars: int | None = None) -> dict:
    r = requests.get(url, timeout=15, headers={"User-Agent": "my-agent/1.0 (contact: you@example.com)"})
    r.raise_for_status()
    text = r.text

    if max_chars is None:
        returned_text = text
    else:
        returned_text = text[:max_chars]

    return {
        "url": url,
        "text": returned_text,
        "truncated": len(text) > len(returned_text),
        "original_length": len(text),
        "returned_length": len(returned_text),
    }


def search(query):
    results = index.search(
        query=query,
        num_results=3
    )
    compact_results = []
    for item in results:
        compact_results.append({
            "title": item.get("title"),
            "url": item.get("url"),
            "source": item.get("source"),
            "content_preview": (item.get("content") or "")[:800],
        })
    return compact_results

fetch_url_tool = {
    "type": "function",
    "name": "fetch_url_content",
    "description": "Fetch raw text content from a URL. Use this when the user asks about a specific URL.",
    "parameters": {
        "type": "object",
        "properties": {
            "url": {
                "type": "string",
                "description": "A full URL to fetch"
            }
        },
        "required": ["url"]
    }
}

search_tool = {
    "type": "function",
    "name": "search",
    "description": "Search the documentation database for relevant results based on a query string.",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "The search query to look up in the index"
            }
        },
        "required": [
            "query"
        ]
    }
}

In [6]:
def make_call(tool_call):
    arguments = json.loads(tool_call.arguments)
    name = tool_call.name

    if name == 'search':
        result = search(**arguments)
    elif name == 'fetch_url_content':
        result = fetch_url_content(**arguments)
    elif name == 'count_characters':
        result = count_characters(**arguments)
    # elif name == 'add_entry':
    #     result = add_entry(**arguments)
    else: 
        result = f'not found tool "{name}"'
    
    return {
        "type": "function_call_output",
        "call_id": tool_call.call_id,
        "output": json.dumps(result),
    }

In [7]:
instructions = """
You're a documentation assistant. 

Answer the user question using the documentation knowledge base

IMPORTANT: When you explore the knowledge base, make at least 3 different
searchers to make sure you explore the topic well.

Use only facts from the knowledge base when answering.
If you cannot find the answer, inform the user.

You may look at the response data to help you. You may also use data manipulation 
of it such as counting characters or pulling snippets from the text.

If the user asks to count characters (or length), you MUST call count_characters.
If the user provides a URL, first call fetch_url_content with that URL, then count_characters on the returned text.
Do not mention external access. Do not refuse.

"""

In [8]:
# question = "How do I create a dahsbord in Evidently?"
# question = "What is this page about? https://en.wikipedia.org/wiki/Capybara"
question = "How many characters are in this page? https://en.wikipedia.org/wiki/Capybara"

In [9]:
message_history = [
    {"role": "system", "content": instructions},
    {"role": "user", "content": question}
]

iteration_number = 1
max_iterations = 8
max_runtime_seconds = 60
start_time = time.monotonic()
url_in_question = "http://" in question or "https://" in question
while True:
    elapsed = time.monotonic() - start_time
    if iteration_number > max_iterations:
        print(f'stopped after {max_iterations} iterations to avoid hanging.')
        break
    if elapsed > max_runtime_seconds:
        print(f'stopped after {max_runtime_seconds}s to avoid hanging.')
        break

    response = openai_client.responses.create(
        model='gpt-4o-mini',
        input=message_history,
        tools=[search_tool, count_tool, fetch_url_tool],
        tool_choice=(
            {"type": "function", "name": "fetch_url_content"}
            if iteration_number == 1 and url_in_question
            else 'auto'
        ),
    )

    print(f'iteration number {iteration_number}...') 
    message_history.extend(response.output)

    has_function_calls = False

    for message in response.output:
        if message.type == 'function_call':
            print(f'executing {message.name}({message.arguments})...')
            tool_call_output = make_call(message)
            message_history.append(tool_call_output)
            has_function_calls = True

        if message.type == 'message':
            text = message.content[0].text
            print('ASSISTANT:', text)

    iteration_number = iteration_number + 1
    print()
    
    if not has_function_calls:
        break



iteration number 1...
executing fetch_url_content({"url":"https://en.wikipedia.org/wiki/Capybara"})...



KeyboardInterrupt: 