In [1]:
from dotenv import load_dotenv
import enum
import instructor
from openai import OpenAI
import os
from pathlib import Path
from pprint import pprint as pp
from pydantic import BaseModel, Field, StringConstraints, conlist, field_validator
from typing import List, Union
from typing_extensions import Annotated

In [2]:
# load API key

dotenv_path = Path(r"C:\Storage\python_projects\ashvin\.env")
load_dotenv(dotenv_path=dotenv_path)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# main constants

GPT_MODEL = "gpt-4o" # points to latest GPT model

#instantiate client
client = instructor.from_openai(OpenAI(), mode=instructor.Mode.TOOLS)

In [3]:
# query plan primitives

class QueryType(str, enum.Enum):
    """Enumeration representing the types of queries that can be asked to a question answer system."""

    SINGLE_QUESTION = "SINGLE"
    MERGE_MULTIPLE_RESPONSES = "MERGE_MULTIPLE_RESPONSES"

class Query(BaseModel):
    """Class representing a single question in a query plan."""

    id: int = Field(..., description="Unique id of the query")
    question: str = Field(
        ...,
        description="Question asked using a question answering system",
    )
    dependencies: List[int] = Field(
        default_factory=list,
        description="List of sub questions that need to be answered before asking this question",
    )
    node_type: QueryType = Field(
        default=QueryType.SINGLE_QUESTION,
        description="Type of question, either a single question or a multi-question merge",
    )

class QueryPlan(BaseModel):
    """Container class representing a tree of questions to ask a question answering system."""

    query_graph: List[Query] = Field(
        ..., description="The query graph representing the plan"
    )

    def _dependencies(self, ids: List[int]) -> List[Query]:
        """Returns the dependencies of a query given their ids."""

        return [q for q in self.query_graph if q.id in ids]

In [4]:
# wrapper

def wrapper(system_prompt: str | None = None, user_context: str | list = None, response_model: BaseModel | None = None, max_retries: int = 3):
    """Wrapper function to generate LLM completion"""
    messages = []
    if system_prompt is not None:
        messages.append({"role": "system", "content": system_prompt})
    if user_context is not None:
        messages.append({"role": "user", "content": user_context})

    completion = client.chat.completions.create(
        model=GPT_MODEL,
        response_model=response_model,
        max_retries=max_retries,
        messages=messages
    )
    return completion

In [4]:
# revised wrapper

def wrapper(
    system_prompt: str | None = None, 
    user_context: Union[str, list] | None = None, 
    response_model: BaseModel | None = None, 
    max_retries: int = 3, 
    additional_messages: List[dict] | None = None
):
    """Wrapper function to generate LLM completion"""
    messages = []

    # Add system prompt if provided
    if system_prompt is not None:
        messages.append({"role": "system", "content": system_prompt})

    # Add user context if provided
    if user_context is not None:
        if isinstance(user_context, list):
            for context in user_context:
                messages.append({"role": "user", "content": context})
        else:
            messages.append({"role": "user", "content": user_context})

    # Add additional messages if provided
    if additional_messages is not None:
        messages.extend(additional_messages)

    # Generate the completion
    completion = client.chat.completions.create(
        model=GPT_MODEL,
        response_model=response_model,
        max_retries=max_retries,
        messages=messages
    )
    return completion


In [None]:
plan_system_prompt = """
You are a world class query planning algorithm capable of breaking apart questions into its dependency queries such that the answers can be used to inform the parent question. 
Do not answer the questions, simply provide a correct compute graph with good specific questions to ask and relevant dependencies.
Your compute graph should both decompose and recompose queries in order to answer the parent question.
Before you call the function, think step-by-step to get a better understanding of the problem.
"""

optimise_system_prompt = """
You are a highly sophisticated query optimization algorithm designed to enhance the quality of query plans.
Your primary function is not to answer the queries but to meticulously improve the query graph. 
This involves ensuring each step is clearly defined and leverages detailed dependencies to construct a comprehensive and precise query sequence.
Before making any modifications, thoroughly evaluate the existing query plan to identify opportunities where additional or more detailed questions or steps could enhance clarity or add value.  
Your output should detail an optimised query graph where each question is well-defined and logically sequenced based on its dependencies.
Ensure no loss of necessary detail and increase the questions or steps where it adds to the robustness and precision of the information-gathering process.
"""

question = "I want to use AI to generate a exceptionally high quality science fiction and fantasy short story"

plan_user_context = f"Consider: {question}\nGenerate the correct, complete query plan to answer the parent question."

In [None]:
plan_response = wrapper(plan_system_prompt, plan_user_context, QueryPlan)
plan_response_json= plan_response.model_dump_json()
plan_response.model_dump()

In [None]:
class SuggestionList(BaseModel):
    """A detailed list of suggestions to optimise the query plan"""

    suggestions: List[str]

In [None]:
optimised_plan_list = wrapper(optimise_system_prompt, plan_response_json, SuggestionList)
suggestions = optimised_plan_list.model_dump_json()
for suggestion in optimised_plan_list.suggestions:
    print(suggestion)

In [None]:
optimised_plan_user_context = f"""
Consider the original parent question : {question}
Consider the list of suggestions for optimising the query plan: {suggestions}.
Apply them to improve the original query plan : {plan_response_json}.
Generate a revised, optimised query plan that answers the parent question.
Query Ids and dependencies should be listed sequentially.
"""
optimised_plan = wrapper(optimise_system_prompt, optimised_plan_user_context, QueryPlan)
optimised_plan_dump= optimised_plan.model_dump()
optimised_plan_dump