In [1]:
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
from uuid import uuid4

import pixeltable as pxt
from PIL import Image


class BaseAgent(ABC):
    """
    An Base agent powered by LLM model with persistent memory and tool execution capabilities.

    This base agent gets inherited by other agent classes (see pixelagent/anthropic/agent.py and pixelagent/anthropic/agent.py).

    The agent maintains three key tables in Pixeltable:
    1. memory: Stores all conversation history with timestamps
    2. agent: Manages chat interactions and responses
    3. tools: (Optional) Handles tool execution and responses

    Key Features:
    - Persistent conversation memory with optional message limit
    - Tool execution support
    - Structured data storage and orchestration using Pixeltable
    """

    def __init__(
        self,
        name: str,
        system_prompt: str,
        model: str,
        n_latest_messages: Optional[int] = 10,
        tools: Optional[pxt.tools] = None,
        reset: bool = False,
        chat_kwargs: Optional[dict] = None,
        tool_kwargs: Optional[dict] = None,
    ):
        """
        Initialize the agent with the specified configuration.

        Args:
            name: Unique name for the agent (used for table names)
            system_prompt: System prompt that guides LLM's behavior
            model: LLM model to use
            n_latest_messages: Number of recent messages to include in context (None for unlimited)
            tools: Optional tools configuration for function calling
            reset: If True, deletes existing agent data
            chat_kwargs: Additional kwargs for chat completion
            tool_kwargs: Additional kwargs for tool execution
        """
        self.directory = name
        self.system_prompt = system_prompt
        self.model = model
        self.n_latest_messages = n_latest_messages
        self.tools = tools
        self.chat_kwargs = chat_kwargs or {}
        self.tool_kwargs = tool_kwargs or {}

        # Set up or reset the agent's database
        if reset:
            pxt.drop_dir(self.directory, if_not_exists="ignore", force=True)

        # Create agent directory if it doesn't exist
        pxt.create_dir(self.directory, if_exists="ignore")

        # Set up tables
        self._setup_tables()

        # Get references to the created tables
        self.memory = pxt.get_table(f"{self.directory}.memory")
        self.agent = pxt.get_table(f"{self.directory}.agent")
        self.tools_table = (
            pxt.get_table(f"{self.directory}.tools") if self.tools else None
        )

    def _setup_tables(self):
        """
        Initialize the required Pixeltable tables for the agent.
        Creates three tables:
        1. memory: Stores conversation history
        2. agent: Manages chat completions
        3. tools: (Optional) Handles tool execution
        """
        # Create memory table for conversation history
        self.memory = pxt.create_table(
            f"{self.directory}.memory",
            {
                "message_id": pxt.String,  # Unique ID for each message
                "role": pxt.String,  # 'user' or 'assistant'
                "content": pxt.String,  # Message content
                "timestamp": pxt.Timestamp,  # When the message was received
            },
            if_exists="ignore",
        )

        # Create agent table for managing chat interactions
        self.agent = pxt.create_table(
            f"{self.directory}.agent",
            {
                "message_id": pxt.String,  # Unique ID for each message
                "user_message": pxt.String,  # User's message content
                "timestamp": pxt.Timestamp,  # When the message was received
                "system_prompt": pxt.String,  # System prompt for Claude
                "image": pxt.Image,  # Optional image attachment
            },
            if_exists="ignore",
        )

        # Create tools table if tools are configured
        if self.tools:
            self.tools_table = pxt.create_table(
                f"{self.directory}.tools",
                {
                    "tool_invoke_id": pxt.String,  # Unique ID for each tool invocation
                    "tool_prompt": pxt.String,  # Tool prompt for Claude
                    "timestamp": pxt.Timestamp,  # When the tool was invoked
                },
                if_exists="ignore",
            )
            # Set up tools pipeline
            self._setup_tools_pipeline()

        # Set up chat pipeline
        self._setup_chat_pipeline()

    @abstractmethod
    def _setup_chat_pipeline(self):
        """To be implemented by subclasses"""
        raise NotImplementedError

    @abstractmethod
    def _setup_tools_pipeline(self):
        """To be implemented by subclasses"""
        raise NotImplementedError

    def chat(self, message: str, image: Optional[Image.Image] = None) -> str:
        """
        Send a message to the agent and get its response.

        This method:
        1. Stores the user message in memory
        2. Triggers the chat completion pipeline
        3. Stores the assistant's response in memory
        4. Returns the response

        Args:
            message: The user's message

        Returns:
            The agent's response
        """
        now = datetime.now()

        # Generate unique IDs for the message pair
        user_message_id = str(uuid4())
        assistant_message_id = str(uuid4())

        # Store user message in memory
        self.memory.insert(
            [
                {
                    "message_id": user_message_id,
                    "role": "user",
                    "content": message,
                    "timestamp": now,
                }
            ]
        )

        # Store user message in agent table (which triggers the chat pipeline)
        self.agent.insert(
            [
                {
                    "message_id": user_message_id,
                    "user_message": message,
                    "timestamp": now,
                    "system_prompt": self.system_prompt,
                    "image": image,
                }
            ]
        )

        # Get LLM's response from agent table
        result = (
            self.agent.select(self.agent.agent_response)
            .where(self.agent.message_id == user_message_id)
            .collect()
        )
        response = result["agent_response"][0]

        # Store LLM's response in memory
        self.memory.insert(
            [
                {
                    "message_id": assistant_message_id,
                    "role": "assistant",
                    "content": response,
                    "timestamp": now,
                }
            ]
        )
        return response

    def tool_call(self, prompt: str) -> str:
        """
        Execute a tool call with the given prompt.

        This method:
        1. Stores the user prompt in memory
        2. Triggers the tool call handshake pipeline
        3. Stores the tool's response in memory
        4. Returns the response

        Args:
            prompt: The user's prompt

        Returns:
            The tool's response
        """
        if not self.tools:
            return "No tools configured for this agent."

        now = datetime.now()
        user_message_id = str(uuid4())
        tool_invoke_id = str(uuid4())
        assistant_message_id = str(uuid4())

        # Store user message in memory
        self.memory.insert(
            [
                {
                    "message_id": user_message_id,
                    "role": "user",
                    "content": prompt,
                    "timestamp": now,
                }
            ]
        )

        # Store user prompt in tools table (which triggers the tool call handshake pipeline)
        self.tools_table.insert(
            [
                {
                    "tool_invoke_id": tool_invoke_id,
                    "tool_prompt": prompt,
                    "timestamp": now,
                }
            ]
        )

        # Get tool answer from tools table
        result = (
            self.tools_table.select(self.tools_table.tool_answer)
            .where(self.tools_table.tool_invoke_id == tool_invoke_id)
            .collect()
        )
        tool_answer = result["tool_answer"][0]

        # Store LLM's response in memory
        self.memory.insert(
            [
                {
                    "message_id": assistant_message_id,
                    "role": "assistant",
                    "content": tool_answer,
                    "timestamp": now,
                }
            ]
        )
        return tool_answer

In [3]:
from typing import Optional

import pixeltable as pxt
import pixeltable.functions as pxtf

try:
    from pixeltable.functions.openai import chat_completions, invoke_tools
except ImportError:
    raise ImportError("openai not found; run `pip install openai`")

import base64
import io
from typing import Optional

import PIL
import pixeltable as pxt


@pxt.udf
def create_messages(
    system_prompt: str,
    memory_context: list[dict],
    current_message: str,
    image: Optional[PIL.Image.Image] = None,
) -> list[dict]:

    messages = [{"role": "system", "content": system_prompt}]
    messages.extend(memory_context.copy())

    if not image:
        messages.append({"role": "user", "content": current_message})
        return messages

    # Encode Image
    bytes_arr = io.BytesIO()
    image.save(bytes_arr, format="jpeg")
    b64_bytes = base64.b64encode(bytes_arr.getvalue())
    b64_encoded_image = b64_bytes.decode("utf-8")

    # Create content blocks with text and image
    content_blocks = [
        {"type": "text", "text": current_message},
        {
            "type": "image_url",
            "image_url": {"url": f"data:image/jpeg;base64,{b64_encoded_image}"},
        },
    ]

    messages.append({"role": "user", "content": content_blocks})
    return messages

class Agent(BaseAgent):
    """
    OpenAI-specific implementation of the BaseAgent.

    This agent uses OpenAI's chat completion API for generating responses and handling tools.
    It inherits common functionality from BaseAgent including:
    - Table setup and management
    - Memory persistence
    - Base chat and tool call implementations
    """

    def __init__(
        self,
        name: str,
        system_prompt: str,
        model: str = "gpt-4o-mini",
        n_latest_messages: Optional[int] = 10,
        tools: Optional[pxt.tools] = None,
        reset: bool = False,
        chat_kwargs: Optional[dict] = None,
        tool_kwargs: Optional[dict] = None,
    ):
        # Initialize the base agent with all common parameters
        super().__init__(
            name=name,
            system_prompt=system_prompt,
            model=model,
            n_latest_messages=n_latest_messages,  # None for unlimited history
            tools=tools,
            reset=reset,
            chat_kwargs=chat_kwargs,
            tool_kwargs=tool_kwargs,
        )

    def _setup_chat_pipeline(self):
        """
        Configure the chat completion pipeline using Pixeltable's computed columns.
        This method implements the abstract method from BaseAgent.

        The pipeline consists of 4 steps:
        1. Retrieve recent messages from memory
        2. Format messages with system prompt
        3. Get completion from OpenAI
        4. Extract the response text
        """

        # Step 1: Define a query to get recent messages
        @pxt.query
        def get_recent_memory(current_timestamp: pxt.Timestamp) -> list[dict]:
            """
            Get recent messages from memory, respecting n_latest_messages limit if set.
            Messages are ordered by timestamp (newest first).
            """
            query = (
                self.memory.where(self.memory.timestamp < current_timestamp)
                .order_by(self.memory.timestamp, asc=False)
                .select(role=self.memory.role, content=self.memory.content)
            )
            if self.n_latest_messages is not None:
                query = query.limit(self.n_latest_messages)
            return query

        # Step 2: Add computed columns to process the conversation
        # First, get the conversation history
        self.agent.add_computed_column(
            memory_context=get_recent_memory(self.agent.timestamp),
            if_exists="ignore",
        )

        # Format messages for OpenAI with system prompt
        self.agent.add_computed_column(
            prompt=create_messages(
                self.agent.system_prompt,
                self.agent.memory_context,
                self.agent.user_message,
                self.agent.image,
            ),
            if_exists="ignore",
        )

        # Get OpenAI's API response
        self.agent.add_computed_column(
            response=chat_completions(
                messages=self.agent.prompt, model=self.model, **self.chat_kwargs
            ),
            if_exists="ignore",
        )

        # Extract the final response text
        self.agent.add_computed_column(
            agent_response=self.agent.response.choices[0].message.content,
            if_exists="ignore",
        )

    def _setup_tools_pipeline(self):
        """
        Configure the tool execution pipeline using Pixeltable's computed columns.
        This method implements the abstract method from BaseAgent.

        The pipeline has 4 stages:
        1. Get initial response from OpenAI with potential tool calls
        2. Execute any requested tools
        3. Format tool results for follow-up
        4. Get final response incorporating tool outputs
        """
        # Stage 1: Get initial response with potential tool calls
        self.tools_table.add_computed_column(
            initial_response=chat_completions(
                model=self.model,
                messages=[{"role": "user", "content": self.tools_table.tool_prompt}],
                tools=self.tools,  # Pass available tools to OpenAI
                **self.tool_kwargs,
            ),
            if_exists="ignore",
        )

        # Stage 2: Execute any tools that OpenAI requested
        self.tools_table.add_computed_column(
            tool_output=invoke_tools(self.tools, self.tools_table.initial_response),
            if_exists="ignore",
        )

        # Stage 3: Format tool results for follow-up
        self.tools_table.add_computed_column(
            tool_response_prompt=pxtf.string.format(
                "{0}: {1}", self.tools_table.tool_prompt, self.tools_table.tool_output
            ),
            if_exists="ignore",
        )

        # Stage 4: Get final response incorporating tool results
        self.tools_table.add_computed_column(
            final_response=chat_completions(
                model=self.model,
                messages=[
                    {"role": "user", "content": self.tools_table.tool_response_prompt},
                ],
                **self.tool_kwargs,
            ),
            if_exists="ignore",
        )

        # Extract the final response text
        self.tools_table.add_computed_column(
            tool_answer=self.tools_table.final_response.choices[0].message.content,
            if_exists="ignore",
        )

In [4]:
agent = Agent(
    name="my_assistant",
    system_prompt="You are a helpful assistant."
)

Creating a Pixeltable instance at: /Users/moteroperdido/.pixeltable
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/moteroperdido/.pixeltable/pgdata
Created directory 'my_assistant'.
Created table `memory`.
Created table `agent`.
Added 0 column values with 0 errors.
Added 0 column values with 0 errors.
Added 0 column values with 0 errors.
Added 0 column values with 0 errors.


In [5]:
# Chat with your agent
response = agent.chat("Hello, who are you?")
print(response)

Inserting rows into `memory`: 1 rows [00:00, 22.91 rows/s]
Inserted 1 row with 0 errors.


Error: Exception in task: 
Traceback (most recent call last):
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/exec/expr_eval/schedulers.py", line 190, in _exec
    result = await pxt_fn.aexec(*request.args, **request.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/func/callable_function.py", line 84, in aexec
    return await self.py_fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/functions/openai.py", line 446, in chat_completions
    rate_limits_info = env.Env.get().get_resource_pool_info(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/env.py", line 705, in get_resource_pool_info
    info = make_pool_info()
           ^^^^^^^^^^^^^^^^
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/functions/openai.py", line 447, in <lambda>
    resource_pool, lambda: OpenAIRateLimitsInfo(_chat_completions_get_request_resources)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/functions/openai.py", line 96, in __init__
    import openai
ModuleNotFoundError: No module named 'openai'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/exec/expr_eval/expr_eval_node.py", line 393, in _done_cb
    t.result()
  File "/Users/moteroperdido/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/asyncio/futures.py", line 202, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/Users/moteroperdido/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/exec/expr_eval/schedulers.py", line 90, in _main_loop
    await self._exec(item.request, item.exec_ctx, item.num_retries, is_task=False)
  File "/Users/moteroperdido/Desktop/projects/the_neural_maze/projects/multimodal-agents-course/.venv/lib/python3.12/site-packages/pixeltable/exec/expr_eval/schedulers.py", line 207, in _exec
    assert self.pool_info is not None
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError


In [5]:
from pixelagent.openai import Agent  # Or from pixelagent.openai import Agent

# Create a simple agent
agent = Agent(
    name="my_assistant",
    system_prompt="You are a helpful assistant.",
    verbose=True
)

# Chat with your agent
response = agent.chat("Do you know my name?")
print(response)

Inserting rows into `memory`: 1 rows [00:00, 786.04 rows/s]
Inserted 1 row with 0 errors.
Inserting rows into `agent`: 1 rows [00:00, 260.14 rows/s]
Inserted 1 row with 0 errors.
Inserting rows into `memory`: 1 rows [00:00, 595.61 rows/s]
Inserted 1 row with 0 errors.
Yes, you mentioned that your name is Miguel. How can I help you today, Miguel?
