In [43]:
#| default_exp core

In [44]:
#| hide
from nbdev.showdoc import *

# Dependencies

In [45]:
#| export
import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic_ai import Agent, ModelRetry, RunContext
from pydantic_ai.models import KnownModelName

load_dotenv()

# Enable async/await in Jupyter
import nest_asyncio
nest_asyncio.apply()

# Agent setup

System prompt


In [46]:
#| export
from datetime import date
system_prompt = f"""
You are a helpful assistant that operates in a Jupyter notebook.
Your regular text responses are rendered as cell output.
You can create new cells, edit existing cells, and run code.
You can also use tools to help you with your tasks.
Today's date is {date.today().strftime('%Y-%m-%d')}.
"""

In [47]:
#| export
from typing import cast
model = cast(KnownModelName, os.getenv('PYDANTIC_AI_MODEL', 'openai:gpt-4o'))
print(f'PydanticAI is using model: {model}')
notebook_agent = Agent(model, system_prompt=system_prompt)

PydanticAI is using model: openai:gpt-4o


In [48]:
#| export
def refresh_agent():
    global notebook_agent
    notebook_agent = Agent(model, system_prompt=system_prompt)
    return notebook_agent


### Adding cell creation tool

In [49]:
#| export
from IPython.display import display, Markdown
from typing import Literal

@notebook_agent.tool
def create_cell(ctx: RunContext[str], content: str, cell_type: Literal['code', 'markdown'] = 'code') -> str:
    """Create a new cell in the notebook with the specified content.
    
    Args:
        content: The content to put in the new cell
        cell_type: Type of cell to create ('code' or 'markdown')
    
    Returns:
        A confirmation message
    """
    try:    
        ipython = get_ipython()
    except NameError:
        return "Error: Not running in IPython/Jupyter environment"
    
    # Display the content immediately
    if cell_type == 'code':
        # display(Markdown(f"```python\n{content}\n```"))
        # Set up the next cell with the content
        ipython.set_next_input(content)
    else:
        display(Markdown(content))
    
    return f"Created new {cell_type} cell"

<module>:8: No type or annotation for returned value 1


Tool testing

In [50]:
result = notebook_agent.run_sync('can you create me a simple hello world functio in a new celland make sure I can run it right away to see the output?')
Markdown(result.data)

I've created a new code cell with a `hello_world` function. You can run it to see the output "Hello, world!".

In [51]:
def hello_world():
    print('Hello, World!')

# Call the function to display the output
hello_world()

Hello, World!


In [52]:
result = notebook_agent.run_sync("Create a function that calculates the factorial of a number with input validation")
Markdown(result.data)

I have created a function that calculates the factorial of a number with input validation, ensuring the input is a non-negative integer. You can test this function by running the cell above.

In [53]:
def factorial(number):
    # Input validation to check if the number is a non-negative integer
    if not isinstance(number, int):
        raise ValueError("Input must be an integer.")
    if number < 0:
        raise ValueError("Input must be a non-negative integer.")

    # Base case: factorial of 0 or 1 is 1
    if number == 0 or number == 1:
        return 1

    # Recursive case: n! = n * (n-1)!
    else:
        return number * factorial(number - 1)

# Example usage
print(factorial(5))  # Output: 120
print(factorial(0))  # Output: 1

# Uncomment the following lines to test input validation
# print(factorial(-1))  # Should raise ValueError
# print(factorial(3.5))  # Should raise ValueError

120
1


Testing agent with history

In [54]:
result = notebook_agent.run_sync('So what you just made for me here?', message_history=result.new_messages())
Markdown(result.data)

I created a Python function named `factorial` that calculates the factorial of a non-negative integer. Here is a brief explanation of what it does:

- **Function Definition**: The function `factorial(n)` takes a single argument `n`, which should be a non-negative integer.
- **Input Validation**: Before calculating the factorial, the function checks if `n` is a non-negative integer. If not, it raises a `ValueError` with a descriptive message.
- **Calculating Factorial**:
  - If `n` is 0, the function returns 1. (The factorial of 0 is defined as 1.)
  - If `n` is greater than 0, it iteratively calculates the factorial by multiplying numbers from 1 to `n`.
- **Test Cases**: There are sample test cases at the end of the function to demonstrate its usage:
  - `factorial(5)` calculates 5! and should return 120.
  - `factorial(0)` returns 1.
  - For `factorial(-1)` and `factorial(3.5)`, it will raise a `ValueError` because the inputs are not valid non-negative integers.

You can run the code cell to see the function in action and test the examples.

In [55]:
result.all_messages()

[SystemPrompt(content="\nYou are a helpful assistant that operates in a Jupyter notebook.\nYour regular text responses are rendered as cell output.\nYou can create new cells, edit existing cells, and run code.\nYou can also use tools to help you with your tasks.\nToday's date is 2024-12-30.\n", role='system'),
 UserPrompt(content='Create a function that calculates the factorial of a number with input validation', timestamp=datetime.datetime(2024, 12, 30, 16, 22, 30, 115313, tzinfo=datetime.timezone.utc), role='user'),
 ModelStructuredResponse(calls=[ToolCall(tool_name='create_cell', args=ArgsJson(args_json='{"content":"def factorial(n):\\n    \\"\\"\\"\\n    Calculate the factorial of a non-negative integer.\\n    \\n    Parameters:\\n    n (int): The number to calculate the factorial for. Must be a non-negative integer.\\n    \\n    Returns:\\n    int: The factorial of the number.\\n    \\n    Raises:\\n    ValueError: If n is not a non-negative integer.\\n    \\"\\"\\"\\n    # Input 

### Adding notebook history

In [56]:
#| export
import os
import json
from pathlib import Path
from typing import Optional, Dict, Any

# Cache for notebook data
_notebook_cache: Dict[str, Any] = {}

def find_current_notebook() -> Optional[dict]:
    """Find and cache the current notebook data.
    
    Returns:
        Dict containing notebook data or None if not found
    """
    global _notebook_cache
    
    try:
        ipython = get_ipython()
        if not ipython:
            return None
            
        # Get current cell content to identify the notebook
        current_cell = ipython.get_parent()['content']['code']
        
        # Check if we already found the notebook
        if 'notebook' in _notebook_cache:
            # Verify it's still the correct notebook by checking the current cell
            notebook = _notebook_cache['notebook']
            for cell in notebook['cells']:
                if (cell['cell_type'] == 'code' and 
                    ''.join(cell['source']) == current_cell):
                    return notebook
        
        # If not in cache or cache is invalid, search for the notebook
        current_dir = Path.cwd()
        notebook_files = list(current_dir.glob("*.ipynb"))
        
        for nb_file in notebook_files:
            try:
                with open(nb_file) as f:
                    notebook = json.load(f)
                    for cell in notebook['cells']:
                        if (cell['cell_type'] == 'code' and 
                            ''.join(cell['source']) == current_cell):
                            # Found the notebook, cache it
                            _notebook_cache['notebook'] = notebook
                            _notebook_cache['file'] = nb_file
                            return notebook
            except Exception:
                continue
                
        return None
        
    except Exception as e:
        print(f"Error finding notebook: {e}")
        return None


In [57]:
#| export
def get_notebook_history(max_cells: int = 5) -> list:
    """Get the content of notebook cells between current and last prompt cell.
    
    Args:
        max_cells: Maximum number of previous cells to include
        
    Returns:
        List of previous cell contents
    """
    try:
        # Get the cached notebook or find it
        notebook = find_current_notebook()
        if not notebook:
            return []
            
        # Find current cell index
        current_cell = get_ipython().get_parent()['content']['code']
        cells = notebook['cells']
        current_idx = -1
        last_prompt_idx = -1
        
        # Find current cell and last prompt cell
        for idx, cell in enumerate(cells):
            source = ''.join(cell['source']) if isinstance(cell['source'], list) else cell['source']
            
            # Find current cell
            if current_idx == -1 and cell['cell_type'] == 'code' and source == current_cell:
                current_idx = idx
                
            # Find last prompt cell before current cell
            if idx < current_idx or current_idx == -1:  # Only look at cells before current
                if cell['cell_type'] == 'code' and source.strip().startswith('%%prompt'):
                    last_prompt_idx = idx
                
        if current_idx == -1:
            return []
            
        # Get cells between last prompt and current cell
        start_idx = last_prompt_idx + 1 if last_prompt_idx != -1 else max(0, current_idx - max_cells)
        history = []
        
        for idx in range(start_idx, current_idx):
            cell = cells[idx]
            if cell['cell_type'] == 'code':  # Only include code cells
                # Skip cells that are prompt cells and nbdev directives
                source = cell['source'] if isinstance(cell['source'], str) else ''.join(cell['source'])
                if not source.strip().startswith('%%prompt'):
                    history.append(f"Cell[{idx}]:\n{source}")
        
        return history
        
    except Exception as e:
        print(f"Error getting notebook history: {e}")
        return []

Testing notebook history

In [58]:
nb_hist = get_notebook_history(max_cells=20)
nb_hist

['Cell[0]:\n#| default_exp core',
 'Cell[1]:\n#| hide\nfrom nbdev.showdoc import *',
 'Cell[2]:\n#| export\nimport os\nfrom dotenv import load_dotenv\nfrom pydantic import BaseModel, Field\nfrom pydantic_ai import Agent, ModelRetry, RunContext\nfrom pydantic_ai.models import KnownModelName\n\nload_dotenv()\n\n# Enable async/await in Jupyter\nimport nest_asyncio\nnest_asyncio.apply()',
 'Cell[3]:\n#| export\nfrom datetime import date\nsystem_prompt = f"""\nYou are a helpful assistant that operates in a Jupyter notebook.\nYour regular text responses are rendered as cell output.\nYou can create new cells, edit existing cells, and run code.\nYou can also use tools to help you with your tasks.\nToday\'s date is {date.today().strftime(\'%Y-%m-%d\')}.\n"""',
 "Cell[4]:\n#| export\nfrom typing import cast\nmodel = cast(KnownModelName, os.getenv('PYDANTIC_AI_MODEL', 'openai:gpt-4o'))\nprint(f'PydanticAI is using model: {model}')\nnotebook_agent = Agent(model, system_prompt=system_prompt)",
 'Ce

### Creating history-aware prompt

In [59]:
#| export
def create_history_aware_prompt(prompt: str, message_history: list = None, max_history: int = 5) -> tuple:
    """Create a prompt with notebook history context and message history.
    
    Args:
        prompt: The user's prompt
        message_history: Previous conversation messages from results.all_messages()
        max_history: Maximum number of previous cells to include
        
    Returns:
        Tuple of (enhanced prompt, combined message history)
    """
    try:
        ipython = get_ipython()
        if not ipython:
            return prompt, message_history
        
        # Get new cells using our optimized get_notebook_history
        new_cells = get_notebook_history(max_cells=max_history)
        
        if not new_cells and not message_history:
            return prompt, None
            
        # Create message history if none exists
        from pydantic_ai.messages import (
            ModelRequest, ModelResponse, 
            UserPromptPart, TextPart
        )
        
        messages = []
        
        # Add existing message history if provided
        if message_history:
            messages.extend(message_history)
        
        # Only add context message if we have new cells
        if new_cells:
            # Create context message with new cells
            history_content = "\n\n".join(new_cells)

            context_msg = ModelRequest(parts=[
                UserPromptPart(
                    content="Here is the context of new notebook cells that were added:\n" + history_content
                )
            ])
            
            # Create response acknowledging new context
            context_response = ModelResponse(parts=[
                TextPart(
                    content="I understand the new notebook context. How can I help?"
                )
            ])
            
            messages.extend([context_msg, context_response])
                
        return prompt, messages
        
    except Exception as e:
        print(f"Error creating history-aware prompt: {e}")
        return prompt, message_history

Testing history-aware prompt

In [60]:
create_history_aware_prompt('So what you just made for me here?', result.all_messages(), max_history=20)

Error creating history-aware prompt: cannot import name 'ModelRequest' from 'pydantic_ai.messages' (/home/ndendic/WebDev/FH_SQLModel/.venv/lib/python3.12/site-packages/pydantic_ai/messages.py)


('So what you just made for me here?',
 [SystemPrompt(content="\nYou are a helpful assistant that operates in a Jupyter notebook.\nYour regular text responses are rendered as cell output.\nYou can create new cells, edit existing cells, and run code.\nYou can also use tools to help you with your tasks.\nToday's date is 2024-12-30.\n", role='system'),
  UserPrompt(content='Create a function that calculates the factorial of a number with input validation', timestamp=datetime.datetime(2024, 12, 30, 16, 22, 30, 115313, tzinfo=datetime.timezone.utc), role='user'),
  ModelStructuredResponse(calls=[ToolCall(tool_name='create_cell', args=ArgsJson(args_json='{"content":"def factorial(n):\\n    \\"\\"\\"\\n    Calculate the factorial of a non-negative integer.\\n    \\n    Parameters:\\n    n (int): The number to calculate the factorial for. Must be a non-negative integer.\\n    \\n    Returns:\\n    int: The factorial of the number.\\n    \\n    Raises:\\n    ValueError: If n is not a non-negati

### Running agent with notebook history

In [61]:
#| export
from typing import Any
def run_with_history(agent: Agent, prompt: str, message_history: list = None, max_history: int = 5) -> Any:
    """Run the agent with notebook and conversation history context.
    
    Args:
        agent: The PydanticAI agent
        prompt: The user's prompt
        message_history: Previous conversation messages
        max_history: Maximum number of previous cells to include
        
    Returns:
        Agent run result
    """
    prompt, combined_history = create_history_aware_prompt(
        prompt, 
        message_history=message_history, 
        max_history=max_history
    )
    return agent.run_sync(prompt, message_history=combined_history)

Testing run_with_history

In [62]:
result = run_with_history(notebook_agent, 'So what is going on in this notebook?',result.all_messages(), max_history=20)
Markdown(result.data)


Error creating history-aware prompt: cannot import name 'ModelRequest' from 'pydantic_ai.messages' (/home/ndendic/WebDev/FH_SQLModel/.venv/lib/python3.12/site-packages/pydantic_ai/messages.py)


In this notebook, we've set up a Python function to calculate the factorial of a number with input validation. Specifically:

1. **Function Implementation**: We implemented a function named `factorial` that takes an integer input and computes its factorial, provided the input is a non-negative integer. Any invalid inputs will result in a `ValueError`.

2. **Test Cases**: We included a few example calls to the `factorial` function to demonstrate its functionality and validate error handling.

The notebook is interactive, allowing you to run the code cells to see the function in action and test different inputs. If you want to execute the function or modify it further, you can run the appropriate cells within the notebook environment. This setup is typical for testing and demonstrating small pieces of code interactively.

In [63]:
result.all_messages()

[SystemPrompt(content="\nYou are a helpful assistant that operates in a Jupyter notebook.\nYour regular text responses are rendered as cell output.\nYou can create new cells, edit existing cells, and run code.\nYou can also use tools to help you with your tasks.\nToday's date is 2024-12-30.\n", role='system'),
 UserPrompt(content='Create a function that calculates the factorial of a number with input validation', timestamp=datetime.datetime(2024, 12, 30, 16, 22, 30, 115313, tzinfo=datetime.timezone.utc), role='user'),
 ModelStructuredResponse(calls=[ToolCall(tool_name='create_cell', args=ArgsJson(args_json='{"content":"def factorial(n):\\n    \\"\\"\\"\\n    Calculate the factorial of a non-negative integer.\\n    \\n    Parameters:\\n    n (int): The number to calculate the factorial for. Must be a non-negative integer.\\n    \\n    Returns:\\n    int: The factorial of the number.\\n    \\n    Raises:\\n    ValueError: If n is not a non-negative integer.\\n    \\"\\"\\"\\n    # Input 

### Creating prompt cell magic

In [64]:
#| export
from IPython.core.magic import register_cell_magic

@register_cell_magic
def prompt(line, cell):
    """Cell magic to create prompt cells that interact with the AI agent.
    
    Usage:
        %%prompt
        Your prompt text here
    """
    try:
        # Get the last result's message history if it exists
        message_history = None
        if 'last_prompt_result' in get_ipython().user_ns:
            last_result = get_ipython().user_ns['last_prompt_result']
            if hasattr(last_result, 'all_messages'):
                message_history = last_result.all_messages()
        
        # Run the prompt through our agent with history context
        result = run_with_history(
            notebook_agent, 
            cell.strip(), 
            message_history=message_history
        )
        
        # Store the result for next time
        get_ipython().user_ns['last_prompt_result'] = result
        
        # Display the agent's response
        return Markdown(result.data)
    except Exception as e:
        return f"Error processing prompt: {str(e)}"

Testing prompt cell magic

In [65]:
%%prompt
what this notebook is all about?

Error creating history-aware prompt: cannot import name 'ModelRequest' from 'pydantic_ai.messages' (/home/ndendic/WebDev/FH_SQLModel/.venv/lib/python3.12/site-packages/pydantic_ai/messages.py)


This notebook currently consists of a single interaction where you're asking about the notebook's purpose, but it doesn't have any specific content or code yet related to a particular topic. If you want to add specific content or have particular instructions or tasks you'd like to perform in the notebook, please let me know!

In [66]:
last_prompt_result.all_messages()

[SystemPrompt(content="\nYou are a helpful assistant that operates in a Jupyter notebook.\nYour regular text responses are rendered as cell output.\nYou can create new cells, edit existing cells, and run code.\nYou can also use tools to help you with your tasks.\nToday's date is 2024-12-30.\n", role='system'),
 UserPrompt(content='what this notebook is all about?', timestamp=datetime.datetime(2024, 12, 30, 16, 22, 54, 48018, tzinfo=datetime.timezone.utc), role='user'),
 ModelTextResponse(content="This notebook currently consists of a single interaction where you're asking about the notebook's purpose, but it doesn't have any specific content or code yet related to a particular topic. If you want to add specific content or have particular instructions or tasks you'd like to perform in the notebook, please let me know!", timestamp=datetime.datetime(2024, 12, 30, 16, 22, 54, tzinfo=datetime.timezone.utc), role='model-text-response')]

In [90]:
def last_prompt_result_data():
    return last_prompt_result.data
last_prompt_result_data()

"This notebook currently consists of a single interaction where you're asking about the notebook's purpose, but it doesn't have any specific content or code yet related to a particular topic. If you want to add specific content or have particular instructions or tasks you'd like to perform in the notebook, please let me know!"

In [67]:
def all_messages():
    return last_prompt_result.all_messages()
all_messages()

[SystemPrompt(content="\nYou are a helpful assistant that operates in a Jupyter notebook.\nYour regular text responses are rendered as cell output.\nYou can create new cells, edit existing cells, and run code.\nYou can also use tools to help you with your tasks.\nToday's date is 2024-12-30.\n", role='system'),
 UserPrompt(content='what this notebook is all about?', timestamp=datetime.datetime(2024, 12, 30, 16, 22, 54, 48018, tzinfo=datetime.timezone.utc), role='user'),
 ModelTextResponse(content="This notebook currently consists of a single interaction where you're asking about the notebook's purpose, but it doesn't have any specific content or code yet related to a particular topic. If you want to add specific content or have particular instructions or tasks you'd like to perform in the notebook, please let me know!", timestamp=datetime.datetime(2024, 12, 30, 16, 22, 54, tzinfo=datetime.timezone.utc), role='model-text-response')]

In [69]:
%%prompt
tell me what have I added last to this notebook?

Error creating history-aware prompt: cannot import name 'ModelRequest' from 'pydantic_ai.messages' (/home/ndendic/WebDev/FH_SQLModel/.venv/lib/python3.12/site-packages/pydantic_ai/messages.py)


The last addition to this notebook was your question, "what this notebook is all about?" There haven't been any other specific entries, code, or content added besides our interaction. If you want to include more content or perform tasks, feel free to specify!

In [69]:
all_messages()

[SystemPrompt(content="\nYou are a helpful assistant that operates in a Jupyter notebook.\nYour regular text responses are rendered as cell output.\nYou can create new cells, edit existing cells, and run code.\nYou can also use tools to help you with your tasks.\nToday's date is 2024-12-30.\n", role='system'),
 UserPrompt(content='what this notebook is all about?', timestamp=datetime.datetime(2024, 12, 30, 16, 22, 54, 48018, tzinfo=datetime.timezone.utc), role='user'),
 ModelTextResponse(content="This notebook currently consists of a single interaction where you're asking about the notebook's purpose, but it doesn't have any specific content or code yet related to a particular topic. If you want to add specific content or have particular instructions or tasks you'd like to perform in the notebook, please let me know!", timestamp=datetime.datetime(2024, 12, 30, 16, 22, 54, tzinfo=datetime.timezone.utc), role='model-text-response'),
 UserPrompt(content='tell me what have I added last to 

In [73]:
#| hide
import nbdev; nbdev.nbdev_export()