# Chat Driver

An OpenAI Chat Completions API wrapper.

## Notebook setup

Run this cell to set the notebook up. Other sections can be run independently.

In [26]:
%reload_ext autoreload
%autoreload 2

import os
from dotenv import load_dotenv
from azure.identity import aio, DefaultAzureCredential, get_bearer_token_provider, AzureCliCredential

from openai import AsyncAzureOpenAI, AzureOpenAI

import logging 
import json
from pathlib import Path

# Set up structured logging to a file.
class JsonFormatter(logging.Formatter):
    def format(self, record) -> str:
        record_dict = record.__dict__
        log_record = {
            'timestamp': self.formatTime(record, self.datefmt),
            'level': record.levelname,
            'session_id': record_dict.get('session_id', None),
            'run_id': record_dict.get('run_id', None),
            'message': record.getMessage(),
            'data': record_dict.get('data', None),
            'module': record.module,
            'funcName': record.funcName,
            'lineNumber': record.lineno,
            'logger': record.name,
        }
        extra_fields = {
            key: value for key, value in record.__dict__.items() 
            if key not in ['levelname', 'msg', 'args', 'exc_info', 'funcName', 'module', 'lineno', 'name', 'message', 'asctime', 'session_id', 'run_id', 'data']
        }
        log_record.update(extra_fields)
        return json.dumps(log_record)

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
modules = ['httpcore.connection', 'httpcore.http11', 'httpcore.sync.connection', 'httpx', 'openai', 'urllib3.connectionpool', 'urllib3.util.retry']
for module in modules:
    logging.getLogger(module).setLevel(logging.ERROR)
if logger.hasHandlers():
    logger.handlers.clear()
data_dir = Path('.data')
if not data_dir.exists():
    data_dir.mkdir()
handler = logging.FileHandler(data_dir / 'logs.jsonl')
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)


load_dotenv()
credential = DefaultAzureCredential()

azure_openai_config = {
    "azure_endpoint": os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
    "azure_deployment": os.environ.get("AZURE_OPENAI_DEPLOYMENT", ""),
    "api_version": os.environ.get("AZURE_OPENAI_API_VERSION", ""),
    "max_retries": 2,
}

model = azure_openai_config.get("azure_deployment", "gpt-4o")

async_client = AsyncAzureOpenAI(
    **azure_openai_config,
    azure_ad_token_provider=aio.get_bearer_token_provider(
        aio.AzureCliCredential(),
        "https://cognitiveservices.azure.com/.default",
    ),
)

client = AzureOpenAI(
    **azure_openai_config,
    azure_ad_token_provider=get_bearer_token_provider(
        AzureCliCredential(),
        "https://cognitiveservices.azure.com/.default",
    ),
)

## ChatCompletionsAPI usage

Azure/OpenAI's Chat Completions API is the fundamental building block of an AI assistant that uses the GPT model. 

- https://platform.openai.com/docs/api-reference/chat
- https://github.com/openai/openai-python/blob/main/api.md
- https://platform.openai.com/docs/api-reference/chat drivers

### Sync

In [None]:
completion = client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ],
    model=model,
)
print(completion.model_dump())

### Async

In [None]:
response = await async_client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ],
    model=model,
)
print(response)

### Streaming

In [None]:
stream = await async_client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ],
    model=model,
    stream=True,
)
async for chunk in stream:
    print(chunk.model_dump())

## OpenAI Chat Completion Driver (a.k.a "chat driver")

### OpenAI Assistants

The Azure/OpenAI Assistants API is newer, stateful API that splits an `assistant` from the data about a conversation `thread` that can be `run` against an `assistant`. Additionally, you can add `tools` to an assistant that enable the assistant to have more interactive capabilities. The tools currently available are:

- *Functions*: Registering local functions with the assistant so it knows it can call them before generating a response. This is a "hold on let me look that up for you" kind of interaction.
- *File Search* (formerly the retrieval plugin): Attach one or more files and they will be RAG-vectorized and available as content to the assistant.
- *Code Interpreter*: Run python code in a secure sandbox.

The Assistant API productized as OpenAI's `GPTs` product. The `GPT Builder` lets developers create and deploy GPTs assistants using a web interface.

### Chat Driver

But an "assistant" requires pretty strong "abstraction lock-in". This thing isn't really an assistant in the fullest sense... it's more like a "pseudo-assistant", but this confuses things. Let's just let the Chat Completion API be what it is and drive it as necessary as we create our assistants. Let's just wrap up the function calling bits (which, ultimately, can give you the other tools like Functions and File Search) in a simple-to-use GPT-like interface we'll call a *chat driver*.

The chat driver is meant to be used the exact way the Chat Completions API is... just easier.

Our chat driver provides:

- The ability to almost magically register functions to the function tool using a `FunctionRegistry`.
- Tracking of message history.
- Management of a `Context` object that can be used for session management and supply additional context to functions.
- Some prompt creation helpers.
- Other utilities... this is just meant to be an interface you can use to forget about all the api complexities.

In [None]:
from chat_driver import ChatDriver, ChatDriverConfig, Context
from typing import List
from openai.types.chat import ChatCompletionMessageParam


# When an chat driver is created, it will automatically create a context with a
# session_id. Or, if you want to use a specific session_id, you can pass it as
# an argument. This is useful for scoping this chat driver instance to an
# external identifier.
context = Context("conversation-id-1000")


# Define tool functions for the chat driver. All functions used by the chat driver
# require a session_id as the first argument.
def get_file_contents(context: Context, file_path: str) -> str:
    """Return the contents of a file."""
    return "The purpose of life is to be happy."


def erase(context: Context, name: str) -> str:
    """Erases a stored value."""
    return f"{context.session_id}: {name} erased"


# Define the chat driver.
instructions = "You are a helpful assistant."

# Define the conversation so far.
messages: List[ChatCompletionMessageParam] = []

chat_driver = ChatDriver(
    ChatDriverConfig(
        openai_client=async_client,
        model=model,
        instructions=instructions,
        messages=messages,
        context=context,
        commands=[erase],  # Commands can be registered when instantiating the chat driver.
        functions=[erase],  # Functions can be registered when instantiating the chat driver.
    ),
)

# Let's clear the data from previous runs.
chat_driver.clear_session_data()


# You can also use the `register_function` decorator to register a function.
# Remember, all functions used by the chat driver require a session_id as the
# first argument.
@chat_driver.register_function_and_command
def echo(context: Context, text: str) -> str:
    """Return the text."""
    return f"{context.session_id}: {text}"


# You can also register functions manually.
chat_driver.register_function_and_command(get_file_contents)

# Ok. Let's see if we got one.
print(chat_driver.context.session_id)

# Let's see if the agent can respond.
response = await chat_driver.respond("Hi, my name is Paul.")
print(response.message)

# Help command (shows command available).
response = await chat_driver.respond("/help")
print(response.message)

# We can run any function or command directly.
response = await chat_driver.functions.echo("Hi, my name is Paul.")
print(response)

# Let's see if the chat driver has the ability to run it's own registered function.
response = await chat_driver.respond("Please tell me what's in file 123.txt.")
print(response.message)

# Let's see the full response event.
print(response.to_json())

## Chat with a chat driver

In [None]:
from typing import Any
from chat_driver import ChatDriverConfig, ChatDriver
from context import Context

context = Context("conversation-id-1001")


def get_file_contents(context: Context, file_path: str) -> str:
    """Returns the contents of a file."""
    return "The purpose of life is to be happy."


def erase(context: Context, name: str) -> str:
    """Erases a stored value."""
    return f"{context.session_id}: {name} erased"


def echo(context: Context, value: Any) -> str:
    """Echos a value as a string."""
    match value:
        case str():
            return value
        case list():
            return ", ".join(map(str, value))
        case dict():
            return json.dumps(value)
        case int() | bool() | float():
            return str(value)
        case _:
            return str(value)


functions = [get_file_contents, erase, echo]

# Define the chat driver.
chat_driver_config = ChatDriverConfig(
    openai_client=async_client,
    model=model,
    instructions="You are an assistant that has access to a sand-boxed Posix shell.",
    context=context,
    commands=functions,
    functions=functions,
)

chat_driver = ChatDriver(chat_driver_config)

# Note: Look in the .data directory for the logs, message history, and other data.

# Chat with the skill.
while True:
    message = input("User: ")
    if message == "":
        break
    print(f"User: {message}", flush=True)
    response = await chat_driver.respond(message)
    # You can print the entire response event! 
    # print(response.to_json())
    print(f"Assistant: {response.message}", flush=True)

## Miscellaneous

Chaining together chat drivers in more interesting ways.

### A router "mini-gpt" assistant

(this is just an idea I was trying out... it doesn't work yet)

In [None]:
from chat_driver import ChatDriver, ChatDriverConfig, Context
from enum import Enum
from events import BaseEvent


class InteractionMode(Enum):
    """The possible interaction modes of the assistant."""

    INTRO = "intro"
    CONVERSATIONAL = "conversational"
    BRAINSTORMING = "brainstorming"


context = Context("conversation-id-123")
session_state = {}
session_state[context.session_id] = {"interaction_mode": InteractionMode.INTRO}


# Define the intro assistant.
intro_instructions = (
    "You are a introduction assistant that gathers a user's name and a topic they would "
    "like to discuss. \n\n"
    'Once you have this information, respond with DONE: { "name": string, "topic": string }.'
)
intro_assistant = ChatDriver(
    ChatDriverConfig(
        openai_client=async_client,
        model=model,
        instructions=intro_instructions,
        context=context,
    )
)

# Define the brainstorming assistant.
brainstorming_instructions = (
    "You are a brainstorming assistant that generates creative responses. You can generate "
    "ideas, concepts, or suggestions on the user's topic. Once the user indicates they are done "
    "brainstorming, respond with DONE: {}."
)
brainstorming_assistant = ChatDriver(
    ChatDriverConfig(
        openai_client=async_client,
        model=model,
        instructions=brainstorming_instructions,
        context=context,
    )
)

# Define the conversational assistant.
conversational_instructions = "You are a conversational assistant that generates engaging responses."
conversational_assistant = ChatDriver(
    ChatDriverConfig(
        openai_client=async_client,
        model=model,
        instructions=conversational_instructions,
        context=context,
    )
)

# Define the modal assistant.
instructions = (
    "You are a message routing assistant. While interacting with the user, over a long conversation, "
    'you may decide to enter into new "interaction mode". An interaction mode is a state in which '
    "the assistant behaves differently. For example, in a brainstorming mode, the assistant may generate "
    "more creative responses. In a conversational mode, the assistant may generate more engaging responses. "
    "In an intro mode, the assistant may generate more informative responses. You can switch between "
    "these modes by calling the set_interaction_mode function. Always forward the user's message to the "
    "appropriate assistant based on the current interaction mode.\n\n"
    "Always start a conversation by switching to the intro mode."
    "<CURRENT_INTERACTION_MODE>{interaction_mode}</CURRENT_INTERACTION_MODE>"
)


def set_interaction_mode(session_id: str, mode: InteractionMode) -> None:
    """Set the assistant to a specific interaction mode."""
    print(f"Setting interaction mode to: {mode}")
    session_state[session_id]["interaction_mode"] = mode


def get_interaction_mode(session_id: str) -> InteractionMode:
    """Get the current interaction mode of the assistant."""
    return session_state[session_id]["interaction_mode"]


async def forward_message(session_id: str, interaction_mode: InteractionMode, message: str) -> BaseEvent:
    """Forward a message to the appropriate assistant based on the current interaction mode."""
    print(f"Forwarding message to assistant in mode: {interaction_mode}")
    if interaction_mode == InteractionMode.INTRO:
        response = await intro_assistant.respond(message)
    elif interaction_mode == InteractionMode.BRAINSTORMING:
        response = await brainstorming_assistant.respond(message)
    else:
        response = await conversational_assistant.respond(message)
    print(f"Assistant response from forward_message{interaction_mode}: {response}")
    return response


modal_assistant = ChatDriver(
    ChatDriverConfig(
        openai_client=async_client,
        model="gpt-4o",
        instructions=instructions,
        context=context,
        functions=[set_interaction_mode, forward_message],
    )
)
modal_assistant.functions.set_interaction_mode(InteractionMode.INTRO)

# Begin the conversation.

while True:
    message = input("User: ")
    if message == "":
        break
    print(f"User: {message}", flush=True)
    response = modal_assistant.respond(
        message, instruction_parameters={"interaction_mode": get_interaction_mode(context.session_id)}
    )
    print(f"Modal Assistant: {response}", flush=True)