# wordslab-notebooks-lib.tools

> Predefined tools for LLMs

In [1]:
#| default_exp tools

In [1]:
#| exports
from __future__ import annotations
from enum import StrEnum, auto
from typing import List, Dict, Any, Optional
from pathlib import Path
import json
import os
import re
import shutil
import subprocess
import sys

## Mistral vibe bultin tools

The following tools code is inspired from **Mistral Vibe 2.0 (Apache 2.0 licence) :

https://github.com/mistralai/mistral-vibe/tree/main/vibe/core/tools/builtins

### Files tools

Utility functions for file access control (security measure):

1. Find the project directory: parent git repository or current directory
2. Check if the file path the LLM wants to access is located in inside the project directory

In [2]:
#| exports

# Usage in Jupyterlab in wordslab notebooks
if "__notebook_path" in globals():
    STARTUP_DIR = (Path(os.environ["WORDSLAB_WORKSPACE"]) / __notebook_path).parent
# Usage in the command line
else:
    STARTUP_DIR = os.environ.get("PWD", os.getcwd())

def find_git_repo_root(start_path: Path | None = None) -> Path | None:
    """
    Find the root directory of the git repository containing the given path.
    
    Searches upward from the given path (or current directory if None) until
    finding a directory containing a .git subdirectory or file.
    
    Args:
        start_path: Path to start searching from. If None, uses current directory.
        
    Returns:
        Path to git repository root, or None if no git repository found.
    """
    if start_path is None:
        start_path = STARTUP_DIR
    
    start_path = Path(start_path).resolve()
    
    # Check if we're already in a git repo
    current_dir = start_path
    while True:
        git_dir = current_dir / ".git"
        
        # Check if .git exists as directory or file
        if git_dir.exists():
            # Verify it's actually a git repo by checking for HEAD file
            head_file = git_dir / "HEAD" if git_dir.is_dir() else git_dir
            if head_file.exists():
                try:
                    with open(head_file, 'r', encoding='utf-8') as f:
                        head_content = f.read().strip()
                        # Valid git HEAD should start with "ref:" or be a commit hash
                        if head_content.startswith("ref:") or len(head_content) == 40:
                            return current_dir
                except (OSError, UnicodeDecodeError):
                    pass
        
        # Move up to parent directory
        parent = current_dir.parent
        if parent == current_dir:  # Reached root
            return None
        
        current_dir = parent


def get_project_root(start_path: Path | None = None) -> Path:
    """
    Get the project root directory.
    
    First tries to find a git repository root. If none found, defaults to
    current working directory.
    
    Returns:
        Path to project root directory.
    """
    git_root = find_git_repo_root(start_path)
    if git_root is not None:
        return git_root
    
    return start_path


def is_path_in_project(path: Path, project_root: Path | None = None) -> bool:
    """
    Check if a path is within the project directory.
    
    Args:
        path: Path to check
        project_root: Project root directory. If None, uses get_project_root()
        
    Returns:
        Tuple of (is_valid, error_message)
        is_valid is True if path is within project root, False otherwise.
        If is_valid is True, error_message is empty.
        If is_valid is False, error_message contains a descriptive error.
    """
    if project_root is None:
        project_root = get_project_root()
    
    try:
        path_resolved = path.resolve()
        project_root_resolved = project_root.resolve()
        
        # Check if path is under project root
        path.relative_to(project_root_resolved)
        return True, ""
    except ValueError:
        return False, f"Cannot access file outside project directory: {path}. Project root is {project_root_resolved}"


def validate_file_path(file_path: Path, project_root: Path | None = None) -> tuple[bool, str]:
    """
    Validate that a file path is within the project directory.
    
    Args:
        file_path: File path to validate
        project_root: Project root directory. If None, uses get_project_root()
        
    Returns:
        Tuple of (is_valid, error_message)
        If is_valid is True, error_message is empty.
        If is_valid is False, error_message contains a descriptive error.
    """
    if project_root is None:
        project_root = get_project_root()
    
    file_path_resolved = file_path.resolve()
    project_root_resolved = project_root.resolve()
    
    # Check if file is within project
    try:
        file_path.relative_to(project_root_resolved)
    except ValueError:
        return False, f"Cannot access file outside project directory: {file_path}. Project root is {project_root_resolved}"
    
    # Check if path exists
    if not file_path.exists():
        return False, f"File not found at: {file_path}. Please check the path and try again."
    
    # Check if it's a file (not directory)
    if not file_path.is_file():
        return False, f"Path is a directory, not a file: {file_path}. Please provide a file path."
    
    return True, ""

In [3]:
STARTUP_DIR

PosixPath('/home/workspace/wordslab-notebooks-lib/nbs')

In [4]:
get_project_root()

PosixPath('/home/workspace/wordslab-notebooks-lib')

File management tools.

In [5]:
#| exports

# ============================================================================
# File Reading Tool
# ============================================================================

def read_file(
    path: str,  # Path to the file to read
    offset: int = 0,  # Line number to start reading from (0-indexed, inclusive) (default: 0)
    limit: Optional[int] = None  # Maximum number of lines to read (default: None)
) -> str:  # JSON string with file content or error
    """
    Read a UTF-8 file and return content as JSON.
    
    **Key characteristics:**
    - **Stateless**: Each read operation is independent
    - **Large file handling**: Automatically handles large files with byte limits
    - **Line-based reading**: Use offset and limit for efficient navigation
    
    **Usage Tips:**
    - By default, reads from the beginning of the file (offset=0)
    - Use offset (line number) and limit (number of lines) to read specific parts
    - The result includes was_truncated: true if content was cut short
    - For large files, read in chunks: first with limit, then with offset
    
    **Error Handling:**
    - If file not found, check the path is correct
    - If path is a directory, provide a file path instead
    - If offset is negative, use a non-negative value
    - If limit is not positive, use a value greater than 0
    - If reading fails, check file permissions
    
    Default values are optimized for general use:
    - offset: 0 (read from beginning of file)
    - limit: None (read entire file)
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "content": "file content",
        "lines_read": number of lines read,
        "was_truncated": true/false (if reading was stopped due to limit)
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Validate inputs
        if not path.strip():
            return json.dumps({
                "success": False,
                "error": "Path cannot be empty. Please provide a valid file path."
            })
        
        if offset < 0:
            return json.dumps({
                "success": False,
                "error": "Offset cannot be negative. Please use a non-negative value."
            })
        
        if limit is not None and limit <= 0:
            return json.dumps({
                "success": False,
                "error": "Limit must be a positive number. Please use a value greater than 0."
            })
        
        # Resolve file path
        file_path = Path(path).expanduser().resolve()

        # Validate file is in the project directory and is readable        
        is_valid, error_message = validate_file_path(file_path)
        if not is_valid:
            return json.dumps({
                "success": False,
                "error": error_message
            })
            
        # Read file content
        lines_to_return: List[str] = []
        bytes_read = 0
        was_truncated = False
        max_read_bytes = 64_000  # Same as Mistral Vibe default
        
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            line_index = 0
            for line in f:
                if line_index < offset:
                    line_index += 1
                    continue
                
                if limit is not None and len(lines_to_return) >= limit:
                    was_truncated = True
                    break
                
                line_bytes = len(line.encode('utf-8'))
                if bytes_read + line_bytes > max_read_bytes:
                    was_truncated = True
                    break
                
                lines_to_return.append(line)
                bytes_read += line_bytes
                line_index += 1
        
        content = "".join(lines_to_return)
        
        return json.dumps({
            "success": True,
            "content": content,
            "lines_read": len(lines_to_return),
            "was_truncated": was_truncated
        })
        
    except UnicodeDecodeError as e:
        return json.dumps({
            "success": False,
            "error": f"Unicode decode error reading {path}: {str(e)}. Please check if the file is a binary file."
        })
    
    except PermissionError:
        return json.dumps({
            "success": False,
            "error": f"Permission denied reading file: {path}. Please check your permissions and try again."
        })
    
    except OSError as e:
        return json.dumps({
            "success": False,
            "error": f"Error reading file {path}: {str(e)}. Please check the file path and your permissions."
        })
    
    except Exception as e:
        return json.dumps({
            "success": False,
            "error": f"An unexpected error occurred while reading {path}: {str(e)}. Please try again."
        })


# ============================================================================
# File Writing Tool
# ============================================================================

def write_file(
    path: str,  # Path to the file to write
    content: str,  # Content to write to the file
    overwrite: bool = False  # Must be set to true to overwrite an existing file (default: False)
) -> str:  # JSON string with write result or error
    """
    Create or overwrite a UTF-8 file and return result as JSON.
    
    **Key characteristics:**
    - **Safety first**: By default, fails if file exists to prevent accidental data loss
    - **Size limits**: Content is limited to 64KB to prevent large file operations
    - **User confirmation**: Requires explicit confirmation before writing (except for new files)
    - **Path validation**: Validates paths are within the project directory
    
    **Usage Tips:**
    - To create a new file, just provide path and content (overwrite defaults to false)
    - To overwrite an existing file, set overwrite=True
    - Parent directories are created automatically if needed
    - Always use read_file first to understand existing content before overwriting
    - Prefer search_replace over write_file for editing existing files
    - Avoid creating documentation files (*.md) or README files unless explicitly requested
    
    **Error Handling:**
    - If file exists and overwrite=False, set overwrite=True to replace it
    - If path is empty, provide a valid file path
    - If content is empty, provide content to write
    - If content exceeds 64KB, reduce the content size
    - If permission denied, check your write permissions
    - If trying to write outside project directory, use a path within the project
    - If user cancels the operation, don't retry without user confirmation
    
    Default values are optimized for general use:
    - overwrite: False (fail if file exists, set to True to overwrite)
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "file_existed": true/false (whether file existed before writing),
        "bytes_written": number of bytes written,
        "message": "success message"
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Validate inputs
        if not path.strip():
            return json.dumps({
                "success": False,
                "error": "Path cannot be empty. Please provide a valid file path."
            })
        
        if not content.strip():
            return json.dumps({
                "success": False,
                "error": "Content cannot be empty. Please provide content to write."
            })
        
        content_bytes = len(content.encode('utf-8'))
        max_write_bytes = 64_000  # Same as Mistral Vibe default
        
        if content_bytes > max_write_bytes:
            return json.dumps({
                "success": False,
                "error": f"Content exceeds {max_write_bytes} bytes limit. Please reduce the content size."
            })
        
        # Resolve file path
        file_path = Path(path).expanduser().resolve()

        # Validate we're not writing outside project directory       
        is_valid, error_message = is_path_in_project(file_path)
        if not is_valid:
            return json.dumps({
                "success": False,
                "error": error_message
            })
        
        # Check if file exists
        file_existed = file_path.exists()
        
        if file_existed and not overwrite:
            return json.dumps({
                "success": False,
                "error": f"File '{file_path}' already exists. Set overwrite=True to replace the existing file."
            })
        
        # Ask for user confirmation before writing
        # This matches the Mistral Vibe behavior
        response = input(f"\n⚠️  Warning: About to write to file '{file_path}'.\nDo you want to proceed? (yes/no): ").strip().lower()
        if response not in ['yes', 'y']:
            return json.dumps({
                "success": False,
                "error": f"File write to '{file_path}' was cancelled by user."
            })
        
        # Create parent directories if needed
        file_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Write file
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(content)
        
        action = "Overwritten" if file_existed else "Created"
        
        return json.dumps({
            "success": True,
            "file_existed": file_existed,
            "bytes_written": content_bytes,
            "message": f"{action} file {file_path}"
        })
        
    except PermissionError:
        return json.dumps({
            "success": False,
            "error": f"Permission denied writing to file: {path}. Please check your permissions."
        })
    
    except OSError as e:
        return json.dumps({
            "success": False,
            "error": f"OS error writing to {path}: {str(e)}. Please check the file path and your permissions."
        })
    
    except Exception as e:
        return json.dumps({
            "success": False,
            "error": f"An unexpected error occurred while writing {path}: {str(e)}. Please try again."
        })


# ============================================================================
# File Search Tool (Grep)
# ============================================================================

def grep(
    pattern: str,  # Search pattern (regex)
    path: str = ".",  # Path to search in (default: ".")
    max_matches: Optional[int] = None,  # Maximum number of matches to return (default: None)
    use_default_ignore: bool = True  # Whether to respect .gitignore and .ignore files (default: True)
) -> str:  # JSON string with search results or error
    """
    Recursively search files for a regex pattern and return results as JSON.
    
    **Key characteristics:**
    - **Fast search**: Very fast and automatically ignores common files and directories
    - **Regex support**: Supports full regular expression patterns
    - **Smart defaults**: Automatically ignores .pyc files, .venv directories, etc.
    
    **Usage Tips:**
    - Use this to find where functions are defined, how variables are used, or to locate specific error messages
    - Search in current directory by default (path=".")
    - Return all matches by default (max_matches=None)
    - Respect .gitignore and .ignore files by default (use_default_ignore=True)
    - For large codebases, limit max_matches to avoid overwhelming output
    - Use specific patterns for better results
    
    **Error Handling:**
    - If empty pattern provided, provide a valid search pattern
    - If path doesn't exist, check the path and try again
    - If permission denied, check your read permissions
    - If pattern is too broad, narrow it down
    
    Default values are optimized for general use:
    - path: "." (search in current directory)
    - max_matches: None (return all matches)
    - use_default_ignore: True (respect .gitignore and .ignore files)
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "matches": "string containing all matches with file paths and line numbers",
        "match_count": number of matches found,
        "was_truncated": true/false (if output was cut short by limits)
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Validate inputs
        if not pattern.strip():
            return json.dumps({
                "success": False,
                "error": "Empty search pattern provided. Please provide a valid search pattern."
            })
        
        # Resolve search path
        search_path = Path(path).expanduser().resolve()

        # Validate we're not searching outside project directory       
        is_valid, error_message = is_path_in_project(file_path)
        if not is_valid:
            return json.dumps({
                "success": False,
                "error": error_message
            })
        
        # Validate path exists
        if not search_path.exists():
            return json.dumps({
                "success": False,
                "error": f"Path does not exist: {path}. Please check the path and try again."
            })
        
        # Default max matches
        default_max_matches = 100
        max_matches = max_matches or default_max_matches
        
        # Default exclude patterns
        exclude_patterns = [
            ".venv/",
            "venv/",
            ".env/",
            "env/",
            "node_modules/",
            ".git/",
            "__pycache__/",
            ".pytest_cache/",
            ".mypy_cache/",
            ".tox/",
            ".nox/",
            ".coverage/",
            "htmlcov/",
            "dist/",
            "build/",
            ".idea/",
            ".vscode/",
            "*.egg-info",
            "*.pyc",
            "*.pyo",
            "*.pyd",
            ".DS_Store",
            "Thumbs.db",
        ]
        
        # Load additional patterns from .vibeignore if it exists
        codeignore_path = get_project_root() / ".vibeignore"
        if codeignore_path.is_file():
            try:
                with open(codeignore_path, 'r', encoding='utf-8') as f:
                    for line in f:
                        line = line.strip()
                        if line and not line.startswith("#"):
                            exclude_patterns.append(line)
            except OSError:
                pass
        
        # Build grep command
        import subprocess
        
        cmd = [
            "grep",
            "-r",
            "-n",
            "-I",
            "-E",
            f"--max-count={max_matches + 1}",  # +1 to detect truncation
            "-e",
            pattern,
            str(search_path)
        ]
        
        # Add exclude patterns
        for pattern in exclude_patterns:
            if pattern.endswith("/"):
                dir_pattern = pattern.rstrip("/")
                cmd.append(f"--exclude-dir={dir_pattern}")
            else:
                cmd.append(f"--exclude={pattern}")
        
        # Execute grep
        result = subprocess.run(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            timeout=60
        )
        
        if result.returncode not in {0, 1}:
            error_msg = result.stderr or f"Process exited with code {result.returncode}"
            return json.dumps({
                "success": False,
                "error": f"grep error: {error_msg}. Please check your search pattern and try again."
            })
        
        stdout = result.stdout
        output_lines = stdout.splitlines() if stdout else []
        
        # Apply truncation
        truncated_lines = output_lines[:max_matches]
        truncated_output = "\n".join(truncated_lines)
        
        was_truncated = len(output_lines) > max_matches
        
        max_output_bytes = 64_000  # Same as Mistral Vibe default
        if len(truncated_output) > max_output_bytes:
            truncated_output = truncated_output[:max_output_bytes]
            was_truncated = True
        
        return json.dumps({
            "success": True,
            "matches": truncated_output,
            "match_count": len(truncated_lines),
            "was_truncated": was_truncated
        })
        
    except subprocess.TimeoutExpired:
        return json.dumps({
            "success": False,
            "error": "Search timed out after 60 seconds. Please try a more specific search pattern or reduce the search scope."
        })
    
    except FileNotFoundError:
        return json.dumps({
            "success": False,
            "error": "grep command not found. Please install grep and try again."
        })
    
    except Exception as e:
        return json.dumps({
            "success": False,
            "error": f"An error occurred while searching: {str(e)}. Please check your search pattern and try again."
        })


# ============================================================================
# Helper Functions
# ============================================================================

SEARCH_REPLACE_BLOCK_RE = re.compile(
    r"<{5,} SEARCH\r?\n(.*?)\r?\n?={5,}\r?\n(.*?)\r?\n?>{5,} REPLACE", flags=re.DOTALL
)

SEARCH_REPLACE_BLOCK_WITH_FENCE_RE = re.compile(
    r"```[\s\S]*?\n<{5,} SEARCH\r?\n(.*?)\r?\n?={5,}\r?\n(.*?)\r?\n?>{5,} REPLACE\s*\n```",
    flags=re.DOTALL,
)


def _parse_search_replace_blocks(content: str) -> List[tuple]:
    """Parse SEARCH/REPLACE blocks from content.
    
    Supports two formats:
    1. With code block fences (```...```)
    2. Without code block fences
    """
    matches = SEARCH_REPLACE_BLOCK_WITH_FENCE_RE.findall(content)
    
    if not matches:
        matches = SEARCH_REPLACE_BLOCK_RE.findall(content)
    
    return [
        (search.rstrip("\r\n"), replace.rstrip("\r\n"))
        for search, replace in matches
    ]


def _find_search_context(content: str, search_text: str, max_context: int = 5) -> str:
    """Find context around search text for debugging."""
    lines = content.split("\n")
    search_lines = search_text.split("\n")
    
    if not search_lines:
        return "Search text is empty"
    
    first_search_line = search_lines[0].strip()
    if not first_search_line:
        return "First line of search text is empty or whitespace only"
    
    matches = []
    for i, line in enumerate(lines):
        if first_search_line in line:
            matches.append(i)
    
    if not matches:
        return f"First search line '{first_search_line}' not found anywhere in file"
    
    context_lines = []
    for match_idx in matches[:3]:
        start = max(0, match_idx - max_context)
        end = min(len(lines), match_idx + max_context + 1)
        
        context_lines.append(f"\nPotential match area around line {match_idx + 1}:")
        for i in range(start, end):
            marker = ">>>" if i == match_idx else "   "
            context_lines.append(f"{marker} {i + 1:3d}: {lines[i]}")
    
    return "\n".join(context_lines)

    
# ============================================================================
# File Search and Replace Tool
# ============================================================================

def search_replace(
    file_path: str,  # Path to the file to modify
    content: str  # SEARCH/REPLACE blocks with format:
    # <<<<<<< SEARCH
    # [replacement text]
    # =======
    # [replacement text]
    # >>>>>> REPLACE
) -> str:  # JSON string with search/replace result or error
    """
    Replace sections of files using SEARCH/REPLACE blocks and return result as JSON.
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "blocks_applied": number of blocks successfully applied,
        "lines_changed": number of lines changed,
        "warnings": [list of warnings],
        "message": "success message"
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Validate inputs
        if not file_path.strip():
            return json.dumps({
                "success": False,
                "error": "File path cannot be empty. Please provide a valid file path."
            })
        
        if not content.strip():
            return json.dumps({
                "success": False,
                "error": "Content cannot be empty. Please provide SEARCH/REPLACE blocks."
            })
        
        max_content_size = 100_000
        if len(content) > max_content_size:
            return json.dumps({
                "success": False,
                "error": f"Content size ({len(content)} bytes) exceeds max_content_size ({max_content_size} bytes). Please reduce the content size."
            })
        
        # Resolve file path
        file_path_obj = Path(file_path).expanduser().resolve()
        
        # Validate file is in the project directory and is readable        
        is_valid, error_message = validate_file_path(file_path_obj)
        if not is_valid:
            return json.dumps({
                "success": False,
                "error": error_message
            })
        
        # Read original content
        with open(file_path_obj, 'r', encoding='utf-8') as f:
            original_content = f.read()
        
        # Parse SEARCH/REPLACE blocks
        search_replace_blocks = _parse_search_replace_blocks(content)
        
        if not search_replace_blocks:
            return json.dumps({
                "success": False,
                "error": "No valid SEARCH/REPLACE blocks found in content.\n"
                       "Expected format:\n"
                       "<<<<<<< SEARCH\n"
                       "[exact content to find]\n"
                       "=======\n"
                       "[new content to replace with]\n"
                       ">>>>>> REPLACE"
            })
        
        # Apply blocks
        applied = 0
        errors: List[str] = []
        warnings: List[str] = []
        current_content = original_content
        
        for i, (search, replace) in enumerate(search_replace_blocks, 1):
            if search not in current_content:
                context = _find_search_context(current_content, search)
                error_msg = (
                    f"SEARCH/REPLACE block {i} failed: Search text not found in {file_path}\n"
                    f"Search text was:\n{search!r}\n"
                    f"Context analysis:\n{context}\n"
                    "Debugging tips:\n"
                    "1. Check for exact whitespace/indentation match\n"
                    "2. Verify line endings match the file exactly (\\r\\n vs \\n)\n"
                    "3. Ensure the search text hasn't been modified by previous blocks or user edits\n"
                    "4. Check for typos or case sensitivity issues"
                )
                errors.append(error_msg)
                continue
            
            occurrences = current_content.count(search)
            if occurrences > 1:
                warning_msg = (
                    f"Search text in block {i} appears {occurrences} times in the file. "
                    f"Only the first occurrence will be replaced. Consider making your "
                    f"search pattern more specific to avoid unintended changes."
                )
                warnings.append(warning_msg)
            
            current_content = current_content.replace(search, replace, 1)
            applied += 1
        
        if errors:
            error_message = "SEARCH/REPLACE blocks failed:\n" + "\n\n".join(errors)
            if warnings:
                error_message += "\n\nWarnings encountered:\n" + "\n".join(warnings)
            return json.dumps({
                "success": False,
                "error": error_message
            })
        
        # Calculate line changes
        if current_content == original_content:
            lines_changed = 0
        else:
            original_lines = len(original_content.splitlines())
            new_lines = len(current_content.splitlines())
            lines_changed = new_lines - original_lines
        
        # Ask for user confirmation before modifying file
        # This matches the Mistral Vibe behavior
        response = input(f"\n⚠️  Warning: About to modify file '{file_path_obj}'.\nDo you want to proceed? (yes/no): ").strip().lower()
        if response not in ['yes', 'y']:
            return json.dumps({
                "success": False,
                "error": f"File modification of '{file_path}' was cancelled by user."
            })
        
        # Write modified content
        with open(file_path_obj, 'w', encoding='utf-8') as f:
            f.write(current_content)
        
        return json.dumps({
            "success": True,
            "blocks_applied": applied,
            "lines_changed": lines_changed,
            "warnings": warnings,
            "message": f"Applied {applied} block{'s' if applied != 1 else ''} to {file_path}"
        })
        
    except UnicodeDecodeError as e:
        return json.dumps({
            "success": False,
            "error": f"Unicode decode error reading {file_path}: {str(e)}. Please check if the file is a binary file."
        })
    
    except PermissionError:
        return json.dumps({
            "success": False,
            "error": f"Permission denied reading/writing file: {file_path}. Please check your permissions."
        })
    
    except OSError as e:
        return json.dumps({
            "success": False,
            "error": f"OS error accessing {file_path}: {str(e)}. Please check the file path and your permissions."
        })
    
    except Exception as e:
        return json.dumps({
            "success": False,
            "error": f"An unexpected error occurred while modifying {file_path}: {str(e)}. Please try again."
        })

Tests for file access control utilities.

In [None]:
import os
import tempfile
from pathlib import Path
import shutil
import json

def test_find_git_repo_root_in_git_repo():
    """Test finding git repo root when in a git repository."""
    with tempfile.TemporaryDirectory() as tmpdir:
        repo_dir = Path(tmpdir) / "repo"
        repo_dir.mkdir()
        
        # Initialize git repo
        os.chdir(repo_dir)
        os.system("git init")
        os.system("git config user.email 'test@test.com'")
        os.system("git config user.name 'Test User'")
        
        # Create a file and commit
        test_file = repo_dir / "test.txt"
        test_file.write_text("test content")
        os.system("git add test.txt")
        os.system("git commit -m 'Initial commit'")
        
        # Test finding repo root from repo root
        result = find_git_repo_root(repo_dir)
        assert result == repo_dir
        
        # Test finding repo root from subdirectory
        subdir = repo_dir / "subdir"
        subdir.mkdir()
        result = find_git_repo_root(subdir)
        assert result == repo_dir


def test_find_git_repo_root_not_in_git_repo():
    """Test finding git repo root when not in a git repository."""
    with tempfile.TemporaryDirectory() as tmpdir:
        non_repo_dir = Path(tmpdir) / "non_repo"
        non_repo_dir.mkdir()
        
        result = find_git_repo_root(non_repo_dir)
        assert result is None


def test_get_project_root_in_git_repo():
    """Test getting project root when in a git repository."""
    with tempfile.TemporaryDirectory() as tmpdir:
        repo_dir = Path(tmpdir) / "repo"
        repo_dir.mkdir()
        
        # Initialize git repo
        os.chdir(repo_dir)
        os.system("git init")
        os.system("git config user.email 'test@test.com'")
        os.system("git config user.name 'Test User'")
        
        # Create a subdir
        file_dir = repo_dir / "subdir"
        file_dir.mkdir()
        
        # Test getting project root
        result = get_project_root(file_dir)
        assert result == repo_dir


def test_get_project_root_not_in_git_repo():
    """Test getting project root when not in a git repository."""
    with tempfile.TemporaryDirectory() as tmpdir:
        non_repo_dir = Path(tmpdir) / "non_repo"
        non_repo_dir.mkdir()        
        
        result = get_project_root(non_repo_dir)
        print(result)
        assert result == non_repo_dir


def test_is_path_in_project():
    """Test checking if a path is within the project directory."""
    with tempfile.TemporaryDirectory() as tmpdir:
        repo_dir = Path(tmpdir) / "repo"
        repo_dir.mkdir()
        
        # Initialize git repo
        os.chdir(repo_dir)
        os.system("git init")
        os.system("git config user.email 'test@test.com'")
        os.system("git config user.name 'Test User'")

        
        # Test path within project        
        test_file = repo_dir / "test.txt"
        is_valid, error_message = is_path_in_project(test_file, project_root=repo_dir)
        assert is_valid
        
        # Test path outside project
        outside_file = Path(tmpdir) / "outside.txt"
        is_valid, error_message = is_path_in_project(outside_file, project_root=repo_dir)
        assert not is_valid

test_find_git_repo_root_in_git_repo()
test_find_git_repo_root_not_in_git_repo()
test_get_project_root_in_git_repo()
test_get_project_root_not_in_git_repo()
test_is_path_in_project()
print("All tests passed!")

Test file management tools.

These tests verify that the simple file management tools work correctly and return proper JSON responses.

In [11]:
import json
import os
import tempfile
from pathlib import Path

class TestReadFile:
    """Test read_file function."""
    
    def test_read_file_success(self):
        """Test successful file reading."""
        # Create a temporary file
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', dir=STARTUP_DIR) as f:
            f.write("Line 1\nLine 2\nLine 3\nLine 4\nLine 5")
            temp_path = f.name
        
        try:
            result = read_file(temp_path)
            data = json.loads(result)
            
            assert data["success"] is True
            assert "Line 1" in data["content"]
            assert data["lines_read"] == 5
            assert data["was_truncated"] is False
        finally:
            os.unlink(temp_path)
    
    def test_read_file_with_offset(self):
        """Test reading file with offset."""
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', dir=STARTUP_DIR) as f:
            f.write("Line 1\nLine 2\nLine 3\nLine 4\nLine 5")
            temp_path = f.name
        
        try:
            result = read_file(temp_path, offset=2)
            data = json.loads(result)
            
            assert data["success"] is True
            assert "Line 1" not in data["content"]
            assert "Line 3" in data["content"]
            assert data["lines_read"] == 3
        finally:
            os.unlink(temp_path)
    
    def test_read_file_with_limit(self):
        """Test reading file with limit."""
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', dir=STARTUP_DIR) as f:
            f.write("Line 1\nLine 2\nLine 3\nLine 4\nLine 5")
            temp_path = f.name
        
        try:
            result = read_file(temp_path, limit=3)
            data = json.loads(result)
            
            assert data["success"] is True
            assert data["lines_read"] == 3
            assert data["was_truncated"] is True
        finally:
            os.unlink(temp_path)
    
    def test_read_file_empty_path(self):
        """Test reading with empty path."""
        result = read_file("")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "empty" in data["error"].lower()
    
    def test_read_file_not_found(self):
        """Test reading non-existent file."""
        result = read_file("nonexistent_file.txt")
        data = json.loads(result)
        print(data)
        assert data["success"] is False
        assert "not found" in data["error"].lower()
    
    def test_read_file_is_directory(self):
        """Test reading a directory."""
        result = read_file(".")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "directory" in data["error"].lower()
    
    def test_read_file_negative_offset(self):
        """Test reading with negative offset."""
        result = read_file("test.txt", offset=-1)
        data = json.loads(result)
        
        assert data["success"] is False
        assert "negative" in data["error"].lower()
    
    def test_read_file_invalid_limit(self):
        """Test reading with invalid limit."""
        result = read_file("test.txt", limit=0)
        data = json.loads(result)
        
        assert data["success"] is False
        assert "positive" in data["error"].lower()


class TestWriteFile:
    """Test write_file function."""
    
    def test_write_file_success(self):
        """Test successful file writing."""
        temp_path = "test_success.txt"
        
        result = write_file(temp_path, "Hello World")
        data = json.loads(result)
        
        assert data["success"] is True
        assert data["file_existed"] is False
        assert data["bytes_written"] == 11
        assert Path(temp_path).exists()
        
        # Clean up
        os.unlink(temp_path)
    
    def test_write_file_overwrite(self):
        """Test file overwriting."""
        # Create a file in the current directory
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', dir='.') as f:
            f.write("Original content")
            temp_path = f.name
        
        try:
            result = write_file(temp_path, "New content", overwrite=True)
            data = json.loads(result)
            
            assert data["success"] is True
            assert data["file_existed"] is True
            assert data["bytes_written"] == 11  # "New content" is 11 bytes
            
            # Verify content was overwritten
            with open(temp_path, 'r') as f:
                content = f.read()
            assert content == "New content"
        finally:
            os.unlink(temp_path)
    
    def test_write_file_no_overwrite(self):
        """Test file writing without overwrite flag."""
        # Create a file in the current directory
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', dir='.') as f:
            f.write("Original content")
            temp_path = f.name
        
        try:
            result = write_file(temp_path, "New content")
            data = json.loads(result)
            
            assert data["success"] is False
            assert "exists" in data["error"].lower()
        finally:
            os.unlink(temp_path)
    
    def test_write_file_empty_path(self):
        """Test writing with empty path."""
        result = write_file("", "content")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "empty" in data["error"].lower()
    
    def test_write_file_empty_content(self):
        """Test writing with empty content."""
        result = write_file("test.txt", "")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "empty" in data["error"].lower()
    
    def test_write_file_too_large(self):
        """Test writing content that's too large."""
        large_content = "x" * 70_000
        result = write_file("test.txt", large_content)
        data = json.loads(result)
        
        assert data["success"] is False
        assert "exceeds" in data["error"].lower()
    
    def test_write_file_outside_project(self):
        """Test writing outside project directory."""
        result = write_file("/tmp/outside_project.txt", "content")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "outside" in data["error"].lower()


class TestGrep:
    """Test grep function."""
    
    def test_grep_success(self):
        """Test successful grep search."""
        # Create a temporary file with content
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
            f.write("apple\nbanana\napple\norange\napple")
            temp_path = f.name
        
        try:
            result = grep("apple", temp_path)
            data = json.loads(result)
            
            assert data["success"] is True
            assert data["match_count"] == 3
            assert "apple" in data["matches"]
        finally:
            os.unlink(temp_path)
    
    def test_grep_no_matches(self):
        """Test grep with no matches."""
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
            f.write("apple\nbanana\norange")
            temp_path = f.name
        
        try:
            result = grep("zebra", temp_path)
            data = json.loads(result)
            
            assert data["success"] is True
            assert data["match_count"] == 0
            assert data["matches"] == ""
        finally:
            os.unlink(temp_path)
    
    def test_grep_with_max_matches(self):
        """Test grep with max matches limit."""
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
            f.write("apple\nbanana\napple\norange\napple\napple")
            temp_path = f.name
        
        try:
            result = grep("apple", temp_path, max_matches=2)
            data = json.loads(result)
            
            assert data["success"] is True
            assert data["match_count"] == 2
            assert data["was_truncated"] is True
        finally:
            os.unlink(temp_path)
    
    def test_grep_empty_pattern(self):
        """Test grep with empty pattern."""
        result = grep("")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "empty" in data["error"].lower()
    
    def test_grep_path_not_found(self):
        """Test grep with non-existent path."""
        result = grep("test", "nonexistent_dir")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "exist" in data["error"].lower()


class TestSearchReplace:
    """Test search_replace function."""
    
    def test_search_replace_success(self):
        """Test successful search and replace."""
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
            f.write("Hello World\nThis is a test")
            temp_path = f.name
        
        try:
            content = """<<<<<<< SEARCH
Hello World
=======
Hello Universe
>>>>>>> REPLACE"""
            
            result = search_replace(temp_path, content)
            data = json.loads(result)
            
            assert data["success"] is True
            assert data["blocks_applied"] == 1
            assert data["lines_changed"] == 0  # Same number of lines
            
            # Verify content was changed
            with open(temp_path, 'r') as f:
                new_content = f.read()
            assert "Hello Universe" in new_content
            assert "Hello World" not in new_content
        finally:
            os.unlink(temp_path)
    
    def test_search_replace_multiple_blocks(self):
        """Test search and replace with multiple blocks."""
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
            f.write("Hello World\nThis is a test\nHello Again")
            temp_path = f.name
        
        try:
            content = """<<<<<<< SEARCH
Hello World
=======
Hello Universe
>>>>>>> REPLACE

<<<<<<< SEARCH
Hello Again
=======
Hello Galaxy
>>>>>>> REPLACE"""
            
            result = search_replace(temp_path, content)
            data = json.loads(result)
            
            assert data["success"] is True
            assert data["blocks_applied"] == 2
            
            # Verify both changes were made
            with open(temp_path, 'r') as f:
                new_content = f.read()
            assert "Hello Universe" in new_content
            assert "Hello Galaxy" in new_content
        finally:
            os.unlink(temp_path)
    
    def test_search_replace_not_found(self):
        """Test search and replace when text not found."""
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
            f.write("Hello World")
            temp_path = f.name
        
        try:
            content = """<<<<<<< SEARCH
Goodbye World
=======
Hello Universe
>>>>>>> REPLACE"""
            
            result = search_replace(temp_path, content)
            data = json.loads(result)
            
            assert data["success"] is False
            assert "not found" in data["error"].lower()
        finally:
            os.unlink(temp_path)
    
    def test_search_replace_empty_file_path(self):
        """Test search and replace with empty file path."""
        content = """<<<<<<< SEARCH
Hello
=======
Goodbye
>>>>>>> REPLACE"""
        
        result = search_replace("", content)
        data = json.loads(result)
        
        assert data["success"] is False
        assert "empty" in data["error"].lower()
    
    def test_search_replace_empty_content(self):
        """Test search and replace with empty content."""
        result = search_replace("test.txt", "")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "empty" in data["error"].lower()
    
    def test_search_replace_file_not_found(self):
        """Test search and replace with non-existent file."""
        content = """<<<<<<< SEARCH
Hello
=======
Goodbye
>>>>>>> REPLACE"""
        
        result = search_replace("nonexistent.txt", content)
        data = json.loads(result)
        
        assert data["success"] is False
        assert "exist" in data["error"].lower()
    
    def test_search_replace_invalid_format(self):
        """Test search and replace with invalid format."""
        # First create the file
        with open("test.txt", "w") as f:
            f.write("some content")
        
        result = search_replace("test.txt", "Just some text without proper format")
        data = json.loads(result)
        
        assert data["success"] is False
        assert "format" in data["error"].lower() or "block" in data["error"].lower()
        
        # Clean up
        os.unlink("test.txt")


testReadFile = TestReadFile()
testReadFile.test_read_file_success()
testReadFile.test_read_file_with_offset()
testReadFile.test_read_file_with_limit()
testReadFile.test_read_file_empty_path()
testReadFile.test_read_file_not_found()
testReadFile.test_read_file_is_directory()
testReadFile.test_read_file_negative_offset()
testReadFile.test_read_file_invalid_limit()

testWriteFile = TestWriteFile()
testWriteFile.test_write_file_success()
testWriteFile.test_write_file_overwrite()
testWriteFile.test_write_file_no_overwrite()
testWriteFile.test_write_file_empty_path()
testWriteFile.test_write_file_empty_content()
testWriteFile.test_write_file_too_large()
testWriteFile.test_write_file_outside_project()

testGrep = TestGrep()
testGrep.test_grep_success()
testGrep.test_grep_no_matches()
testGrep.test_grep_with_max_matches()
testGrep.test_grep_empty_pattern()
testGrep.test_grep_path_not_found()

testSearchReplace = TestSearchReplace()
testSearchReplace.test_search_replace_success()
testSearchReplace.test_search_replace_multiple_blocks()
testSearchReplace.test_search_replace_not_found()
testSearchReplace.test_search_replace_empty_file_path()
testSearchReplace.test_search_replace_empty_content()
testSearchReplace.test_search_replace_file_not_found()
testSearchReplace.test_search_replace_invalid_format()

{'success': False, 'error': 'Error reading file nonexistent_file.txt: [Errno 2] No such file or directory. Please check the file path and your permissions.'}


AssertionError: 

In [12]:
read_file("nonexistent_file.txt")

'{"success": false, "error": "Error reading file nonexistent_file.txt: [Errno 2] No such file or directory. Please check the file path and your permissions."}'

In [13]:
Path("nonexistent_file.txt").expanduser().resolve()

FileNotFoundError: [Errno 2] No such file or directory

### Bash tool

This tool provides a simple interface for running bash commands with comprehensive error handling and JSON output suitable for LLMs.

Usage:

```python
import json
from bash_simple import bash

result = bash("ls -la")
data = json.loads(result)
if data["success"]:
    print(data["stdout"])
else:
    print(f"Error: {data['error']}")
```

In [6]:
#| exports

def _get_subprocess_encoding() -> str:
    """Get the appropriate encoding for subprocess communication."""
    if sys.platform == "win32":
        # Windows console uses OEM code page (e.g., cp850, cp1252)
        import ctypes

        return f"cp{ctypes.windll.kernel32.GetOEMCP()}"
    return "utf-8"


def _get_base_env() -> dict[str, str]:
    """Get base environment for subprocess with safety settings."""
    base_env = {
        **os.environ,
        "CI": "true",
        "NONINTERACTIVE": "1",
        "NO_TTY": "1",
        "NO_COLOR": "1",
    }

    if sys.platform == "win32":
        base_env["GIT_PAGER"] = "more"
        base_env["PAGER"] = "more"
    else:
        base_env["TERM"] = "dumb"
        base_env["DEBIAN_FRONTEND"] = "noninteractive"
        base_env["GIT_PAGER"] = "cat"
        base_env["PAGER"] = "cat"
        base_env["LESS"] = "-FX"
        base_env["LC_ALL"] = "en_US.UTF-8"

    return base_env


def _get_default_allowlist() -> list[str]:
    """Get default list of allowed command prefixes."""
    common = ["echo", "find", "git diff", "git log", "git status", "tree", "whoami"]

    if sys.platform == "win32":
        return common + ["dir", "findstr", "more", "type", "ver", "where"]
    else:
        return common + [
            "cat",
            "file",
            "head",
            "ls",
            "pwd",
            "stat",
            "tail",
            "uname",
            "wc",
            "which",
        ]


def _get_default_denylist() -> list[str]:
    """Get default list of denied command prefixes."""
    common = ["gdb", "pdb", "passwd"]

    if sys.platform == "win32":
        return common + ["cmd /k", "powershell -NoExit", "pwsh -NoExit", "notepad"]
    else:
        return common + [
            "nano",
            "vim",
            "vi",
            "emacs",
            "bash -i",
            "sh -i",
            "zsh -i",
            "fish -i",
            "dash -i",
            "screen",
            "tmux",
        ]


def _get_default_denylist_standalone() -> list[str]:
    """Get default list of commands denied when run without arguments."""
    common = ["python", "python3", "ipython"]

    if sys.platform == "win32":
        return common + ["cmd", "powershell", "pwsh", "notepad"]
    else:
        return common + [
            "bash",
            "sh",
            "nohup",
            "vi",
            "vim",
            "emacs",
            "nano",
            "su",
        ]


def _is_denylisted(command: str, denylist: list[str]) -> bool:
    """Check if a command is in the denylist."""
    return any(command.startswith(pattern) for pattern in denylist)


def _is_standalone_denylisted(command: str, denylist_standalone: list[str]) -> bool:
    """Check if a command is in the standalone denylist."""
    parts = command.split()
    if not parts:
        return False

    base_command = parts[0]
    has_args = len(parts) > 1

    if not has_args:
        command_name = os.path.basename(base_command)
        if command_name in denylist_standalone:
            return True
        if base_command in denylist_standalone:
            return True

    return False


def _is_allowlisted(command: str, allowlist: list[str]) -> bool:
    """Check if a command is in the allowlist."""
    return any(command.startswith(pattern) for pattern in allowlist)


def _check_command_safety(
    command: str,
    allowlist: list[str] | None = None,
    denylist: list[str] | None = None,
    denylist_standalone: list[str] | None = None,
) -> tuple[bool, str]:
    """
    Check if a command is safe to run.
    
    **Key characteristics:**
    - **Stateless**: Each command runs independently in a fresh environment
    - **Timeout**: Controls how long the command can run before being killed
    - When timeout is not specified (or set to None), the config default is used
    - If a command is timing out, increase the timeout using the timeout argument
    
    **Usage Tips:**
    - Use for quick system checks and git operations
    - Prefer dedicated tools over bash commands when available
    - Use specific, focused commands for better results
    - Limit output with tools like head/tail when dealing with large files
    
    **Error Handling:**
    - If command fails, check the return code and stderr
    - If command times out, increase the timeout parameter
    - If command is denied, use an allowed command or request user confirmation
    - If command is unsafe, use a safer alternative
    
    Returns:
        tuple: (is_safe, error_message)
    """
    if allowlist is None:
        allowlist = _get_default_allowlist()
    if denylist is None:
        denylist = _get_default_denylist()
    if denylist_standalone is None:
        denylist_standalone = _get_default_denylist_standalone()

    # Extract command parts (simplified version)
    # For safety checks, we just check the full command string
    full_command = command.strip()

    if not full_command:
        return False, "Command cannot be empty"

    # Check denylist
    if _is_denylisted(full_command, denylist):
        return False, f"Command is denied: {full_command}"

    # Check standalone denylist
    if _is_standalone_denylisted(full_command, denylist_standalone):
        return False, f"Command is denied when run without arguments: {full_command}"

    # Check allowlist
    if _is_allowlisted(full_command, allowlist):
        return True, ""

    # If not explicitly allowed, ask for user confirmation
    # This matches the Mistral Vibe behavior where unknown commands
    # require user confirmation
    response = input(f"\n⚠️  Warning: About to execute command '{full_command}'.\nDo you want to proceed? (yes/no): ").strip().lower()
    if response not in ['yes', 'y']:
        return False, f"Command '{full_command}' was cancelled by user."
    
    return True, ""

def bash(
    command: str,
    timeout: int = 300,
    max_output_bytes: int = 16000,
    allowlist: list[str] | None = None,
    denylist: list[str] | None = None,
    denylist_standalone: list[str] | None = None,
) -> str:
    """
    Run a bash command and return result as JSON.
    
    **Key characteristics:**
    - **Stateless**: Each command runs independently in a fresh environment
    - **Timeout**: Controls how long the command can run before being killed
    - When timeout is not specified (or set to None), the config default is used
    - If a command is timing out, increase the timeout using the timeout argument
    
    **Usage Tips:**
    - Use for quick system checks and git operations
    - Prefer dedicated tools over bash commands when available
    - Use specific, focused commands for better results
    - Limit output with tools like head/tail when dealing with large files
    
    **IMPORTANT: Use dedicated tools if available instead of these bash commands:**
    
    **File Operations - DO NOT USE:**
    - `cat filename` → Use `read_file(path="filename")`
    - `head -n 20 filename` → Use `read_file(path="filename", limit=20)`
    - `tail -n 20 filename` → Read with offset: `read_file(path="filename", offset=<line_number>, limit=20)`
    - `sed -n '100,200p' filename` → Use `read_file(path="filename", offset=99, limit=101)`
    - `less`, `more`, `vim`, `nano` → Use `read_file` with offset/limit for navigation
    - `echo "content" > file` → Use `write_file(path="file", content="content")`
    - `echo "content" >> file` → Read first, then `write_file` with overwrite=true
    
    **Search Operations - DO NOT USE:**
    - `grep -r "pattern" .` → Use `grep(pattern="pattern", path=".")`
    - `find . -name "*.py"` → Use `bash("ls -la")` for current dir or `grep` with appropriate pattern
    - `ag`, `ack`, `rg` commands → Use the `grep` tool
    - `locate` → Use `grep` tool
    
    **File Modification - DO NOT USE:**
    - `sed -i 's/old/new/g' file` → Use `search_replace` tool
    - `awk` for file editing → Use `search_replace` tool
    - Any in-place file editing → Use `search_replace` tool
    
    **APPROPRIATE bash uses:**
    - System information: `pwd`, `whoami`, `date`, `uname -a`
    - Directory listings: `ls -la`, `tree` (if available)
    - Git operations: `git status`, `git log --oneline -10`, `git diff`
    - Process info: `ps aux | grep process`, `top -n 1`
    - Network checks: `ping -c 1 google.com`, `curl -I https://example.com`
    - Package management: `pip list`, `npm list`
    - Environment checks: `env | grep VAR`, `which python`
    - File metadata: `stat filename`, `file filename`, `wc -l filename`
    
    Args:
        command: The bash command to run
        timeout: Maximum execution time in seconds (default: 300)
        max_output_bytes: Maximum bytes to capture from stdout and stderr (default: 16000)
        allowlist: Command prefixes that are automatically allowed
        denylist: Command prefixes that are automatically denied
        denylist_standalone: Commands that are denied only when run without arguments
    
    Returns:
        JSON string with:
        - success: bool indicating if command succeeded
        - command: the command that was run
        - stdout: captured stdout (truncated if too large)
        - stderr: captured stderr (truncated if too large)
        - returncode: process return code
        - error: error message if command failed (None if success)
    """
    try:
        # Validate command safety
        is_safe, safety_error = _check_command_safety(
            command, allowlist, denylist, denylist_standalone
        )
        if not is_safe:
            return json.dumps({
                "success": False,
                "command": command,
                "stdout": "",
                "stderr": "",
                "returncode": -1,
                "error": safety_error,
            })

        # Run the command
        encoding = _get_subprocess_encoding()
        env = _get_base_env()

        try:
            result = subprocess.run(
                command,
                shell=True,
                check=False,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                stdin=subprocess.DEVNULL,
                timeout=timeout,
                env=env,
            )
        except subprocess.TimeoutExpired:
            return json.dumps({
                "success": False,
                "command": command,
                "stdout": "",
                "stderr": "",
                "returncode": -1,
                "error": f"Command timed out after {timeout}s: {command!r}",
            })
        except Exception as exc:
            return json.dumps({
                "success": False,
                "command": command,
                "stdout": "",
                "stderr": "",
                "returncode": -1,
                "error": f"Error running command {command!r}: {str(exc)}",
            })

        # Decode and truncate output
        stdout = result.stdout.decode(encoding, errors="replace")[:max_output_bytes]
        stderr = result.stderr.decode(encoding, errors="replace")[:max_output_bytes]

        returncode = result.returncode

        # Check if command failed
        if returncode != 0:
            error_msg = f"Command failed: {command!r}\n"
            error_msg += f"Return code: {returncode}"
            if stderr:
                error_msg += f"\nStderr: {stderr}"
            if stdout:
                error_msg += f"\nStdout: {stdout}"

            return json.dumps({
                "success": False,
                "command": command,
                "stdout": stdout,
                "stderr": stderr,
                "returncode": returncode,
                "error": error_msg.strip(),
            })

        # Success
        return json.dumps({
            "success": True,
            "command": command,
            "stdout": stdout,
            "stderr": stderr,
            "returncode": returncode,
            "error": None,
        })

    except Exception as exc:
        return json.dumps({
            "success": False,
            "command": command,
            "stdout": "",
            "stderr": "",
            "returncode": -1,
            "error": f"Unexpected error: {str(exc)}",
        })

Comprehensive tests for bash_simple.py

In [None]:
import json
import os
import sys
from unittest.mock import Mock, patch

class TestBashSimple:
    """Test suite for bash_simple tool"""

    def test_bash_success(self):
        """Test successful command execution"""
        result = bash("echo 'Hello World'")
        data = json.loads(result)
        
        assert data["success"] is True
        assert data["command"] == "echo 'Hello World'"
        assert "Hello World" in data["stdout"]
        assert data["stderr"] == ""
        assert data["returncode"] == 0
        assert data["error"] is None

    def test_bash_failure(self):
        """Test failed command execution"""
        result = bash("exit 1")
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["command"] == "exit 1"
        assert data["returncode"] == 1
        assert data["error"] is not None
        assert "Command failed" in data["error"]

    def test_bash_timeout(self):
        """Test command timeout"""
        result = bash("sleep 10", timeout=1)
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["error"] is not None
        assert "timed out" in data["error"].lower()

    def test_bash_denylisted_command(self):
        """Test denylisted command"""
        result = bash("vim")
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["error"] is not None
        assert "denied" in data["error"].lower()

    def test_bash_denylisted_standalone(self):
        """Test standalone denylisted command"""
        result = bash("python")
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["error"] is not None
        assert "denied" in data["error"].lower()

    def test_bash_allowlisted_command(self):
        """Test allowlisted command"""
        result = bash("echo 'test'")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "test" in data["stdout"]

    def test_bash_custom_allowlist(self):
        """Test custom allowlist"""
        result = bash("echo 'test'", allowlist=["echo"])
        data = json.loads(result)
        
        assert data["success"] is True
        assert "test" in data["stdout"]

    def test_bash_custom_denylist(self):
        """Test custom denylist"""
        result = bash("echo 'test'", denylist=["echo"])
        data = json.loads(result)
        
        assert data["success"] is False
        assert "denied" in data["error"].lower()

    def test_bash_output_truncation(self):
        """Test output truncation"""
        # Create a large output
        result = bash("for i in {1..1000}; do echo 'test line $i'; done", max_output_bytes=100)
        data = json.loads(result)
        
        assert data["success"] is True
        # Output should be truncated
        assert len(data["stdout"]) <= 100

    def test_bash_empty_command(self):
        """Test empty command"""
        result = bash("")
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["error"] is not None

    def test_bash_invalid_command(self):
        """Test invalid command"""
        result = bash("nonexistent_command_xyz123")
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["returncode"] != 0

    def test_bash_stderr_capture(self):
        """Test stderr capture"""
        result = bash("echo 'stdout' >&2")
        data = json.loads(result)
        
        # Should capture stderr
        assert data["stderr"] != ""

    def test_bash_json_format(self):
        """Test JSON format consistency"""
        result = bash("echo 'test'")
        data = json.loads(result)
        
        # Check all required fields are present
        assert "success" in data
        assert "command" in data
        assert "stdout" in data
        assert "stderr" in data
        assert "returncode" in data
        assert "error" in data

    def test_bash_custom_timeout(self):
        """Test custom timeout"""
        result = bash("sleep 5", timeout=1)
        data = json.loads(result)
        
        assert data["success"] is False
        assert "timed out" in data["error"].lower()

    def test_bash_allowlist_with_args(self):
        """Test allowlist with commands that have arguments"""
        result = bash("echo 'hello world'", allowlist=["echo"])
        data = json.loads(result)
        
        assert data["success"] is True
        assert "hello world" in data["stdout"]

    def test_bash_denylist_with_args(self):
        """Test denylist with commands that have arguments"""
        result = bash("vim file.txt", denylist=["vim"])
        data = json.loads(result)
        
        assert data["success"] is False
        assert "denied" in data["error"].lower()

    def test_bash_standalone_denylist_with_args(self):
        """Test standalone denylist with arguments"""
        result = bash("python -c 'print(1)'", denylist_standalone=["python"])
        data = json.loads(result)
        
        # Should succeed because it has arguments
        assert data["success"] is True

    def test_bash_standalone_denylist_without_args(self):
        """Test standalone denylist without arguments"""
        result = bash("python", denylist_standalone=["python"])
        data = json.loads(result)
        
        # Should fail because it has no arguments
        assert data["success"] is False
        assert "denied" in data["error"].lower()

    def test_bash_special_chars(self):
        """Test command with special characters"""
        result = bash("echo 'test with spaces and quotes'")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "test with spaces and quotes" in data["stdout"]

    def test_bash_pipe_command(self):
        """Test piped commands"""
        result = bash("echo 'hello' | tr 'a-z' 'A-Z'")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "HELLO" in data["stdout"]

    def test_bash_redirect_command(self):
        """Test command with redirection"""
        result = bash("echo 'test' > /tmp/test_bash_redirect.txt && cat /tmp/test_bash_redirect.txt")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "test" in data["stdout"]

    def test_bash_environment_variables(self):
        """Test environment variable handling"""
        result = bash("echo $CI")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "true" in data["stdout"].lower()

    def test_bash_safe_environment(self):
        """Test that safe environment variables are set"""
        result = bash("echo $TERM")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "dumb" in data["stdout"].lower()

    def test_bash_unicode_output(self):
        """Test unicode output handling"""
        result = bash("echo 'Hello 世界'")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "世界" in data["stdout"]

    def test_bash_error_message_details(self):
        """Test that error messages contain useful details"""
        result = bash("ls /nonexistent_directory")
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["error"] is not None
        assert "Command failed" in data["error"]
        assert str(data["returncode"]) in data["error"]

    def test_bash_large_output(self):
        """Test handling of large output"""
        result = bash("for i in {1..100}; do echo 'line $i with some text'; done", max_output_bytes=500)
        data = json.loads(result)
        
        assert data["success"] is True
        assert len(data["stdout"]) <= 500

    def test_bash_returncode_zero_success(self):
        """Test that returncode 0 indicates success"""
        result = bash("true")
        data = json.loads(result)
        
        assert data["success"] is True
        assert data["returncode"] == 0

    def test_bash_returncode_nonzero_failure(self):
        """Test that non-zero returncode indicates failure"""
        result = bash("false")
        data = json.loads(result)
        
        assert data["success"] is False
        assert data["returncode"] != 0

    def test_bash_multiple_commands(self):
        """Test multiple commands separated by ;"""
        result = bash("echo 'first'; echo 'second'")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "first" in data["stdout"]
        assert "second" in data["stdout"]

    def test_bash_command_with_quotes(self):
        """Test command with various quote styles"""
        result = bash("echo 'quoted' && echo 'double quoted'")
        data = json.loads(result)
        
        assert data["success"] is True

    def test_bash_shell_features(self):
        """Test shell features like variables and expansions"""
        result = bash("x=5; y=10; echo $((x + y))")
        data = json.loads(result)
        
        assert data["success"] is True
        assert "15" in data["stdout"]

    def test_bash_safety_checks(self):
        """Test comprehensive safety checks"""
        # Test that denylisted commands are blocked
        denylisted_commands = [
            "vim",
            "nano",
            "emacs",
            "bash -i",
        ]
        
        for cmd in denylisted_commands:
            result = bash(cmd)
            data = json.loads(result)
            # These should be blocked by denylist
            assert data["success"] is False
            assert "denied" in data["error"].lower()
        
        # Test that standalone denylisted commands without args are blocked
        result = bash("python")
        data = json.loads(result)
        assert data["success"] is False
        assert "denied" in data["error"].lower()


# Run all tests and report results
test_class = TestBashSimple()
tests = [method for method in dir(test_class) if method.startswith("test_")]

passed = 0
failed = 0
for test_name in tests:
    try:
        getattr(test_class, test_name)()
        print(f"✓ {test_name}")
        passed += 1
    except Exception as e:
        print(f"✗ {test_name}: {e}")
        failed += 1

print(f"\n{passed}/{len(tests)} tests passed")
return failed == 0

### Todo tool

This tool provides a simple interface for managing todos with read and write operations.

All functions return JSON strings that can be easily parsed by LLMs.

In [None]:
#| exports

class TodoStatus(StrEnum):
    """Status of a todo item."""
    PENDING = "PENDING"
    IN_PROGRESS = "IN_PROGRESS"
    COMPLETED = "COMPLETED"
    CANCELLED = "CANCELLED"


class TodoPriority(StrEnum):
    """Priority of a todo item."""
    LOW = "LOW"
    MEDIUM = "MEDIUM"
    HIGH = "HIGH"


class TodoItem:
    """A todo item with id, content, status, and priority."""
    
    def __init__(self, id: str, content: str, status: TodoStatus = TodoStatus.PENDING, priority: TodoPriority = TodoPriority.MEDIUM):
        self.id = id
        self.content = content
        self.status = status
        self.priority = priority
    
    def to_dict(self) -> dict[str, Any]:
        """Convert todo item to dictionary."""
        return {
            "id": self.id,
            "content": self.content,
            "status": self.status.value,
            "priority": self.priority.value,
        }


class TodoSimple:
    """Simple synchronous todo manager."""
    
    def __init__(self):
        """Initialize todo manager with empty state."""
        self._todos: list[TodoItem] = []
    
    def read_todos(self) -> str:
        """Read all todos.
        
        **Key characteristics:**
        - **Stateless**: Each read operation is independent
        - **Simple interface**: Returns all todos in a single call
        
        **Usage Tips:**
        - Use this to view the current state of all todos
        - Call this before making changes to understand the current state
        - Use the returned todos to plan your next actions
        
        **Error Handling:**
        - If you get an error, check if the todo data is corrupted
        - If todos are empty, start by adding new todos
        
        Returns:
            JSON string with success status and todos list
        
        Example:
            {
                "success": true,
                "message": "Retrieved 3 todos",
                "todos": [
                    {"id": "1", "content": "Task 1", "status": "PENDING", "priority": "MEDIUM"}
                ],
                "total_count": 3
            }
        """
        try:
            todos_dict = [todo.to_dict() for todo in self._todos]
            return json.dumps({
                "success": True,
                "message": f"Retrieved {len(self._todos)} todos",
                "todos": todos_dict,
                "total_count": len(self._todos),
            }, indent=2)
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": f"Failed to read todos: {str(e)}",
            }, indent=2)
    
    def write_todos(self, todos: list[dict[str, Any]] | None = None) -> str:
        """Write/update todos.
        
        **Key characteristics:**
        - **Stateless**: Each write operation is independent
        - **Complete replacement**: You must provide the ENTIRE list of todos
        - **Clear all**: Pass None to clear all todos
        
        **Usage Tips:**
        - Use this to update the complete todo list
        - You must include ALL todos you want to keep (any not included will be removed)
        - Use read_todos() first to get the current state
        - When adding new todos, include all existing ones plus the new ones
        - When removing todos, exclude them from the list
        
        **Error Handling:**
        - If todo format is invalid, check the id and content fields
        - If status or priority is invalid, use valid values (PENDING/IN_PROGRESS/COMPLETED/CANCELLED, LOW/MEDIUM/HIGH)
        - If you get an error, read the todos first to understand the current state
        
        Args:
            todos: List of todo dictionaries to write. If None, clears all todos.
                   Each todo should have: id (str), content (str)
                   Optional: status (str), priority (str)
        
        Returns:
            JSON string with success status and updated todos
        
        Example:
            {
                "success": true,
                "message": "Updated 2 todos",
                "todos": [
                    {"id": "1", "content": "Task 1", "status": "PENDING", "priority": "MEDIUM"},
                    {"id": "2", "content": "Task 2", "status": "IN_PROGRESS", "priority": "HIGH"}
                ],
                "total_count": 2
            }
        """
        try:
            # Validate input
            if todos is None:
                self._todos = []
                todos_dict = []
                message = "Cleared all todos"
            else:
                if not isinstance(todos, list):
                    return json.dumps({
                        "success": False,
                        "error": "todos must be a list of dictionaries",
                    }, indent=2)
                
                if len(todos) > 100:
                    return json.dumps({
                        "success": False,
                        "error": "Cannot store more than 100 todos",
                    }, indent=2)
                
                # Convert dicts to TodoItem objects
                todo_objects = []
                ids = set()
                
                for todo_dict in todos:
                    if not isinstance(todo_dict, dict):
                        return json.dumps({
                            "success": False,
                            "error": "Each todo must be a dictionary",
                        }, indent=2)
                    
                    if "id" not in todo_dict or "content" not in todo_dict:
                        return json.dumps({
                            "success": False,
                            "error": "id and 'content' fields are required",
                        }, indent=2)
                    
                    todo_id = todo_dict["id"]
                    if todo_id in ids:
                        return json.dumps({
                            "success": False,
                            "error": f"Duplicate todo ID: {todo_id}",
                        }, indent=2)
                    ids.add(todo_id)
                    
                    # Parse status
                    status = TodoStatus(todo_dict.get("status", "PENDING"))
                    
                    # Parse priority
                    priority = TodoPriority(todo_dict.get("priority", "MEDIUM"))
                    
                    todo_obj = TodoItem(
                        id=todo_id,
                        content=todo_dict["content"],
                        status=status,
                        priority=priority,
                    )
                    todo_objects.append(todo_obj)
                
                self._todos = todo_objects
                todos_dict = [todo.to_dict() for todo in self._todos]
                message = f"Updated {len(self._todos)} todos"
            
            return json.dumps({
                "success": True,
                "message": message,
                "todos": todos_dict,
                "total_count": len(self._todos),
            }, indent=2)
        
        except ValueError as e:
            return json.dumps({
                "success": False,
                "error": f"Invalid value: {str(e)}",
            }, indent=2)
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": f"Failed to write todos: {str(e)}",
            }, indent=2)
    
    def add_todo(self, id: str, content: str, status: str = "PENDING", priority: str = "MEDIUM") -> str:
        """Add a single todo item.
        
        **Key characteristics:**
        - **Stateless**: Each add operation is independent
        - **Unique IDs**: Each todo must have a unique ID
        
        **Usage Tips:**
        - Use this to add individual todos without managing the full list
        - Use unique, descriptive IDs for each todo
        - Use PENDING status for new todos
        - Use HIGH priority for urgent tasks
        - Use LOW priority for low-priority tasks
        
        **Error Handling:**
        - If ID already exists, use a different ID
        - If status is invalid, use valid values (PENDING, IN_PROGRESS, COMPLETED, CANCELLED)
        - If priority is invalid, use valid values (LOW, MEDIUM, HIGH)
        
        Args:
            id: Unique identifier for the todo
            content: Description of the todo
            status: Status of the todo (PENDING, IN_PROGRESS, COMPLETED, CANCELLED)
            priority: Priority of the todo (LOW, MEDIUM, HIGH)
        
        Returns:
            JSON string with success status
        
        Example:
            {
                "success": true,
                "message": "Added todo with id '1'",
                "todo": {"id": "1", "content": "Task 1", "status": "PENDING", "priority": "MEDIUM"}
            }
        """
        try:
            # Check if id already exists
            if any(todo.id == id for todo in self._todos):
                return json.dumps({
                    "success": False,
                    "error": f"Todo with id '{id}' already exists",
                }, indent=2)
            
            # Parse status
            try:
                status_enum = TodoStatus(status)
            except ValueError:
                return json.dumps({
                    "success": False,
                    "error": f"Invalid status '{status}'. Use: PENDING, IN_PROGRESS, COMPLETED, CANCELLED",
                }, indent=2)
            
            # Parse priority
            try:
                priority_enum = TodoPriority(priority)
            except ValueError:
                return json.dumps({
                    "success": False,
                    "error": f"Invalid priority '{priority}'. Use: LOW, MEDIUM, HIGH",
                }, indent=2)
            
            todo_item = TodoItem(id=id, content=content, status=status_enum, priority=priority_enum)
            self._todos.append(todo_item)
            
            return json.dumps({
                "success": True,
                "message": f"Added todo with id '{id}'",
                "todo": todo_item.to_dict(),
            }, indent=2)
        
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": f"Failed to add todo: {str(e)}",
            }, indent=2)
    
    def update_todo_status(self, id: str, status: str) -> str:
        """Update the status of a todo item.
        
        **Key characteristics:**
        - **Stateless**: Each update operation is independent
        - **Status management**: Only updates the status, not other fields
        
        **Usage Tips:**
        - Use this to mark todos as IN_PROGRESS when starting work
        - Use this to mark todos as COMPLETED when finished
        - Use this to mark todos as CANCELLED when abandoned
        - Use this to reset todos to PENDING when needed
        
        **Error Handling:**
        - If todo not found, check the ID and add the todo first
        - If status is invalid, use valid values (PENDING, IN_PROGRESS, COMPLETED, CANCELLED)
        
        Args:
            id: ID of the todo to update
            status: New status (PENDING, IN_PROGRESS, COMPLETED, CANCELLED)
        
        Returns:
            JSON string with success status
        
        Example:
            {
                "success": true,
                "message": "Updated status of todo '1' to COMPLETED",
                "todo": {"id": "1", "content": "Task 1", "status": "COMPLETED", "priority": "MEDIUM"}
            }
        """
        try:
            # Find the todo
            todo = None
            for t in self._todos:
                if t.id == id:
                    todo = t
                    break
            
            if todo is None:
                return json.dumps({
                    "success": False,
                    "error": f"Todo with id '{id}' not found",
                }, indent=2)
            
            # Parse status
            try:
                status_enum = TodoStatus(status)
            except ValueError:
                return json.dumps({
                    "success": False,
                    "error": f"Invalid status '{status}'. Use: PENDING, IN_PROGRESS, COMPLETED, CANCELLED",
                }, indent=2)
            
            todo.status = status_enum
            
            return json.dumps({
                "success": True,
                "message": f"Updated status of todo '{id}' to {status}",
                "todo": todo.to_dict(),
            }, indent=2)
        
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": f"Failed to update todo status: {str(e)}",
            }, indent=2)
    
    def update_todo_priority(self, id: str, priority: str) -> str:
        """Update the priority of a todo item.
        
        **Key characteristics:**
        - **Stateless**: Each update operation is independent
        - **Priority management**: Only updates the priority, not other fields
        
        **Usage Tips:**
        - Use this to increase priority for urgent tasks
        - Use this to decrease priority for less urgent tasks
        - Use HIGH priority for time-sensitive tasks
        - Use LOW priority for tasks that can wait
        - Use MEDIUM priority for normal tasks
        
        **Error Handling:**
        - If todo not found, check the ID and add the todo first
        - If priority is invalid, use valid values (LOW, MEDIUM, HIGH)
        
        Args:
            id: ID of the todo to update
            priority: New priority (LOW, MEDIUM, HIGH)
        
        Returns:
            JSON string with success status
        
        Example:
            {
                "success": true,
                "message": "Updated priority of todo '1' to HIGH",
                "todo": {"id": "1", "content": "Task 1", "status": "PENDING", "priority": "HIGH"}
            }
        """
        try:
            # Find the todo
            todo = None
            for t in self._todos:
                if t.id == id:
                    todo = t
                    break
            
            if todo is None:
                return json.dumps({
                    "success": False,
                    "error": f"Todo with id '{id}' not found",
                }, indent=2)
            
            # Parse priority
            try:
                priority_enum = TodoPriority(priority)
            except ValueError:
                return json.dumps({
                    "success": False,
                    "error": f"Invalid priority '{priority}'. Use: LOW, MEDIUM, HIGH",
                }, indent=2)
            
            todo.priority = priority_enum
            
            return json.dumps({
                "success": True,
                "message": f"Updated priority of todo '{id}' to {priority}",
                "todo": todo.to_dict(),
            }, indent=2)
        
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": f"Failed to update todo priority: {str(e)}",
            }, indent=2)
    
    def delete_todo(self, id: str) -> str:
        """Delete a todo item.
        
        **Key characteristics:**
        - **Stateless**: Each delete operation is independent
        - **Permanent deletion**: Deleted todos cannot be recovered
        
        **Usage Tips:**
        - Use this to remove completed or cancelled todos
        - Use this to clean up the todo list
        - Use this to remove todos that are no longer needed
        - Always verify the ID before deleting
        
        **Error Handling:**
        - If todo not found, check the ID and verify the todo exists
        - If you delete by mistake, you'll need to recreate the todo
        
        Args:
            id: ID of the todo to delete
        
        Returns:
            JSON string with success status
        
        Example:
            {
                "success": true,
                "message": "Deleted todo with id '1'",
                "remaining_count": 2
            }
        """
        try:
            # Find the todo
            todo = None
            index = -1
            for i, t in enumerate(self._todos):
                if t.id == id:
                    todo = t
                    index = i
                    break
            
            if todo is None:
                return json.dumps({
                    "success": False,
                    "error": f"Todo with id '{id}' not found",
                }, indent=2)
            
            # Delete the todo
            del self._todos[index]
            
            return json.dumps({
                "success": True,
                "message": f"Deleted todo with id '{id}'",
                "remaining_count": len(self._todos),
            }, indent=2)
        
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": f"Failed to delete todo: {str(e)}",
            }, indent=2)

Test suite for Todo tool.

In [None]:
class TestTodoSimple:
    """Test suite for TodoSimple class."""
    
    def test_initial_state(self):
        """Test that TodoSimple starts with empty state."""
        todo = TodoSimple()
        result = json.loads(todo.read_todos())
        assert result["success"] is True
        assert result["todos"] == []
        assert result["total_count"] == 0
        assert result["message"] == "Retrieved 0 todos"
    
    def test_add_todo(self):
        """Test adding a single todo."""
        todo = TodoSimple()
        result = json.loads(todo.add_todo("1", "Test task"))
        assert result["success"] is True
        assert result["message"] == "Added todo with id '1'"
        assert result["todo"]["id"] == "1"
        assert result["todo"]["content"] == "Test task"
        assert result["todo"]["status"] == "PENDING"
        assert result["todo"]["priority"] == "MEDIUM"
    
    def test_add_todo_with_status_and_priority(self):
        """Test adding a todo with custom status and priority."""
        todo = TodoSimple()
        result = json.loads(todo.add_todo("1", "Test task", "IN_PROGRESS", "HIGH"))
        assert result["success"] is True
        assert result["todo"]["status"] == "IN_PROGRESS"
        assert result["todo"]["priority"] == "HIGH"
    
    def test_add_todo_duplicate_id(self):
        """Test that adding a todo with duplicate ID fails."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.add_todo("1", "Task 2"))
        assert result["success"] is False
        assert "already exists" in result["error"]
    
    def test_add_todo_invalid_status(self):
        """Test that adding a todo with invalid status fails."""
        todo = TodoSimple()
        result = json.loads(todo.add_todo("1", "Test task", "INVALID"))
        assert result["success"] is False
        assert "Invalid status" in result["error"]
    
    def test_add_todo_invalid_priority(self):
        """Test that adding a todo with invalid priority fails."""
        todo = TodoSimple()
        result = json.loads(todo.add_todo("1", "Test task", "PENDING", "INVALID"))
        assert result["success"] is False
        assert "Invalid priority" in result["error"]
    
    def test_read_todos_after_adding(self):
        """Test reading todos after adding some."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        todo.add_todo("2", "Task 2")
        result = json.loads(todo.read_todos())
        assert result["success"] is True
        assert len(result["todos"]) == 2
        assert result["total_count"] == 2
        assert result["message"] == "Retrieved 2 todos"
    
    def test_write_todos(self):
        """Test writing todos using write_todos method."""
        todo = TodoSimple()
        todos = [
            {"id": "1", "content": "Task 1", "status": "PENDING", "priority": "MEDIUM"},
            {"id": "2", "content": "Task 2", "status": "IN_PROGRESS", "priority": "HIGH"},
        ]
        result = json.loads(todo.write_todos(todos))
        assert result["success"] is True
        assert result["total_count"] == 2
        assert len(result["todos"]) == 2
    
    def test_write_todos_empty_list(self):
        """Test writing empty list clears todos."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.write_todos([]))
        assert result["success"] is True
        assert result["total_count"] == 0
        assert len(result["todos"]) == 0
    
    def test_write_todos_none(self):
        """Test writing None clears todos."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.write_todos(None))
        assert result["success"] is True
        assert result["total_count"] == 0
        assert result["message"] == "Cleared all todos"
    
    def test_write_todos_invalid_input(self):
        """Test that write_todos with invalid input fails."""
        todo = TodoSimple()
        result = json.loads(todo.write_todos("invalid"))
        assert result["success"] is False
        assert "todos must be a list" in result["error"]
    
    def test_write_todos_missing_fields(self):
        """Test that write_todos with missing fields fails."""
        todo = TodoSimple()
        result = json.loads(todo.write_todos([{"id": "1"}]))
        assert result["success"] is False
        assert "fields are required" in result["error"]
    
    def test_write_todos_duplicate_ids(self):
        """Test that write_todos with duplicate IDs fails."""
        todo = TodoSimple()
        todos = [
            {"id": "1", "content": "Task 1"},
            {"id": "1", "content": "Task 2"},
        ]
        result = json.loads(todo.write_todos(todos))
        assert result["success"] is False
        assert "Duplicate todo ID" in result["error"]
    
    def test_write_todos_too_many(self):
        """Test that write_todos with more than 100 todos fails."""
        todo = TodoSimple()
        todos = [{"id": str(i), "content": f"Task {i}"} for i in range(101)]
        result = json.loads(todo.write_todos(todos))
        assert result["success"] is False
        assert "Cannot store more than 100 todos" in result["error"]
    
    def test_update_todo_status(self):
        """Test updating todo status."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.update_todo_status("1", "COMPLETED"))
        assert result["success"] is True
        assert result["message"] == "Updated status of todo '1' to COMPLETED"
        assert result["todo"]["status"] == "COMPLETED"
    
    def test_update_todo_status_not_found(self):
        """Test updating status of non-existent todo."""
        todo = TodoSimple()
        result = json.loads(todo.update_todo_status("1", "COMPLETED"))
        assert result["success"] is False
        assert "not found" in result["error"]
    
    def test_update_todo_status_invalid(self):
        """Test updating todo status with invalid value."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.update_todo_status("1", "INVALID"))
        assert result["success"] is False
        assert "Invalid status" in result["error"]
    
    def test_update_todo_priority(self):
        """Test updating todo priority."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.update_todo_status("1", "COMPLETED"))
        assert result["success"] is True
        assert result["message"] == "Updated status of todo '1' to COMPLETED"
        assert result["todo"]["status"] == "COMPLETED"
    
    def test_update_todo_priority_not_found(self):
        """Test updating priority of non-existent todo."""
        todo = TodoSimple()
        result = json.loads(todo.update_todo_priority("1", "HIGH"))
        assert result["success"] is False
        assert "not found" in result["error"]
    
    def test_update_todo_priority_invalid(self):
        """Test updating todo priority with invalid value."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.update_todo_priority("1", "INVALID"))
        assert result["success"] is False
        assert "Invalid priority" in result["error"]
    
    def test_delete_todo(self):
        """Test deleting a todo."""
        todo = TodoSimple()
        todo.add_todo("1", "Task 1")
        todo.add_todo("2", "Task 2")
        result = json.loads(todo.delete_todo("1"))
        assert result["success"] is True
        assert result["message"] == "Deleted todo with id '1'"
        assert result["remaining_count"] == 1
        
        # Verify the todo is actually deleted
        read_result = json.loads(todo.read_todos())
        assert len(read_result["todos"]) == 1
        assert read_result["todos"][0]["id"] == "2"
    
    def test_delete_todo_not_found(self):
        """Test deleting non-existent todo."""
        todo = TodoSimple()
        result = json.loads(todo.delete_todo("1"))
        assert result["success"] is False
        assert "not found" in result["error"]
    
    def test_json_format_consistency(self):
        """Test that all methods return valid JSON with consistent format."""
        todo = TodoSimple()
        
        # Test read_todos
        result = todo.read_todos()
        data = json.loads(result)
        assert "success" in data
        assert "message" in data
        
        # Test add_todo
        result = todo.add_todo("1", "Task 1")
        data = json.loads(result)
        assert "success" in data
        assert "message" in data
        
        # Test write_todos
        result = todo.write_todos([{"id": "2", "content": "Task 2"}])
        data = json.loads(result)
        assert "success" in data
        assert "message" in data
        
        # Test update_todo_status
        result = todo.update_todo_status("2", "COMPLETED")
        data = json.loads(result)
        assert "success" in data
        assert "message" in data
        
        # Test update_todo_priority
        result = todo.update_todo_priority("2", "HIGH")
        data = json.loads(result)
        assert "success" in data
        assert "message" in data
        
        # Test delete_todo
        result = todo.delete_todo("2")
        data = json.loads(result)
        assert "success" in data
        assert "message" in data
    
    def test_error_messages_are_actionable(self):
        """Test that error messages guide users on how to fix issues."""
        todo = TodoSimple()
        
        # Test duplicate ID error
        todo.add_todo("1", "Task 1")
        result = json.loads(todo.add_todo("1", "Task 2"))
        assert "already exists" in result["error"]
        
        # Test invalid status error
        result = json.loads(todo.add_todo("2", "Task 2", "INVALID"))
        assert "Invalid status" in result["error"]
        assert "PENDING, IN_PROGRESS, COMPLETED, CANCELLED" in result["error"]
        
        # Test invalid priority error
        result = json.loads(todo.add_todo("3", "Task 3", "PENDING", "INVALID"))
        assert "Invalid priority" in result["error"]
        assert "LOW, MEDIUM, HIGH" in result["error"]
        
        # Test not found error
        result = json.loads(todo.update_todo_status("999", "COMPLETED"))
        assert "not found" in result["error"]
        
        # Test too many todos error
        todos = [{"id": str(i), "content": f"Task {i}"} for i in range(101)]
        result = json.loads(todo.write_todos(todos))
        assert "Cannot store more than 100 todos" in result["error"]
    
    def test_todo_status_enum_values(self):
        """Test that TodoStatus enum has correct values."""
        assert TodoStatus.PENDING.value == "PENDING"
        assert TodoStatus.IN_PROGRESS.value == "IN_PROGRESS"
        assert TodoStatus.COMPLETED.value == "COMPLETED"
        assert TodoStatus.CANCELLED.value == "CANCELLED"
    
    def test_todo_priority_enum_values(self):
        """Test that TodoPriority enum has correct values."""
        assert TodoPriority.LOW.value == "LOW"
        assert TodoPriority.MEDIUM.value == "MEDIUM"
        assert TodoPriority.HIGH.value == "HIGH"
    
    def test_todo_item_to_dict(self):
        """Test that TodoItem.to_dict() returns correct structure."""
        from todo_simple import TodoItem
        todo = TodoItem("1", "Test", TodoStatus.IN_PROGRESS, TodoPriority.HIGH)
        result = todo.to_dict()
        assert result == {
            "id": "1",
            "content": "Test",
            "status": "IN_PROGRESS",
            "priority": "HIGH",
        }
    
    def test_complex_workflow(self):
        """Test a complex workflow with multiple operations."""
        todo = TodoSimple()
        
        # Add multiple todos
        todo.add_todo("1", "Write tests", "PENDING", "HIGH")
        todo.add_todo("2", "Review code", "PENDING", "MEDIUM")
        todo.add_todo("3", "Fix bugs", "IN_PROGRESS", "HIGH")
        
        # Read all todos
        result = json.loads(todo.read_todos())
        assert result["total_count"] == 3
        
        # Update status
        todo.update_todo_status("1", "IN_PROGRESS")
        todo.update_todo_status("2", "COMPLETED")
        
        # Update priority
        todo.update_todo_priority("3", "LOW")
        
        # Delete one todo
        todo.delete_todo("2")
        
        # Final state check
        result = json.loads(todo.read_todos())
        assert result["total_count"] == 2
        
        # Verify remaining todos
        todos_by_id = {t["id"]: t for t in result["todos"]}
        assert todos_by_id["1"]["status"] == "IN_PROGRESS"
        assert todos_by_id["1"]["priority"] == "HIGH"
        assert todos_by_id["3"]["status"] == "IN_PROGRESS"
        assert todos_by_id["3"]["priority"] == "LOW"


testTodo = TestTodo()
testTodo.test_initial_state()
testTodo.test_add_todo()
testTodo.test_add_todo_with_status_and_priority()
testTodo.test_add_todo_duplicate_id()
testTodo.test_add_todo_invalid_status()
testTodo.test_add_todo_invalid_priority()
testTodo.test_read_todos_after_adding()
testTodo.test_write_todos()
testTodo.test_write_todos_empty_list()
testTodo.test_write_todos_none()
testTodo.test_write_todos_invalid_input()
testTodo.test_write_todos_missing_fields()
testTodo.test_write_todos_duplicate_ids()
testTodo.test_write_todos_too_many()
testTodo.test_update_todo_status()
testTodo.test_update_todo_status_not_found()
testTodo.test_update_todo_status_invalid()
testTodo.test_update_todo_priority()
testTodo.test_update_todo_priority_not_found()
testTodo.test_update_todo_priority_invalid()
testTodo.test_delete_todo()
testTodo.test_delete_todo_not_found()
testTodo.test_json_format_consistency()
testTodo.test_error_messages_are_actionable()
testTodo.test_todo_status_enum_values()
testTodo.test_todo_priority_enum_values()
testTodo.test_todo_item_to_dict()
testTodo.test_complex_workflow()

## Tavily API tools

### Web search tools

Web search tools using Tavily SDK.

https://docs.tavily.com/sdk/python/reference

These functions return JSON strings that are easy to parse for LLMs.

In [None]:
#| exports

# ============================================================================
# Web Search Tool
# ============================================================================

def web_search(
    query: str,  # The search query
    max_results: int = 5,  # Maximum number of results to return (default: 5)
    search_depth: str = "basic",  # Search depth: "basic" (default) or "advanced" (default: "basic")
    timeout: int = 60  # Timeout in seconds (default: 60)
) -> str:  # JSON string with search results or error
    """
    Perform a web search and return results as JSON.
    
    **Key characteristics:**
    - **Stateless**: Each search runs independently in a fresh environment
    - **Timeout**: Controls how long the search can run before being cancelled
    - When timeout is not specified (or set to None), the config default is used
    - If a search is timing out, increase the timeout using the timeout argument
    
    **Usage Tips:**
    - Use specific, focused queries for better results
    - Limit max_results to avoid overwhelming the response
    - Use "advanced" search_depth for more comprehensive but slower searches
    - Use "basic" search_depth for quick, focused results
    
    **Error Handling:**
    - If you get API errors, check:
      1. Your API key is valid (set TAVILY_API_KEY environment variable)
      2. Your query is not too broad or too narrow
      3. You're not hitting rate limits (wait and retry)
      4. The timeout is sufficient for the search depth
    - Error messages will indicate if the issue is with the API key, query, or timeout
    
    IMPORTANT: The API key is set via the TAVILY_API_KEY environment variable and should not be
    modified by the LLM. Only change other parameters if you encounter errors or failures.
    
    Default values are optimized for general use:
    - max_results: 5 (good for most queries, increase for comprehensive searches)
    - search_depth: "basic" (faster, use "advanced" for more thorough results)
    - timeout: 60 seconds (standard timeout, increase if getting timeout errors)
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "results": [
            {
                "url": "URL of the result",
                "title": "Title of the result",
                "content": "Content snippet",
                "score": relevance score,
                "published_date": "Publication date" (optional),
                "favicons": ["favicon URLs"] (optional),
                "raw_content": "Full content" (optional)
            }
        ]
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Get API key from environment
        api_key = os.environ.get("TAVILY_API_KEY")
        if api_key is None:
            return json.dumps({
                "success": False,
                "error": "API key not provided. Please set the TAVILY_API_KEY environment variable."
            })
        
        client = TavilyClient(api_key=api_key)
        
        response = client.search(
            query=query,
            max_results=max_results,
            search_depth=search_depth,
            include_raw_content=True,
            timeout=timeout
        )
        
        return json.dumps({
            "success": True,
            "results": response["results"]
        })
        
    except Exception as e:
        error_msg = str(e)
        if "401" in error_msg or "Unauthorized" in error_msg:
            return json.dumps({
                "success": False,
                "error": "Invalid API key. Please check your TAVILY_API_KEY and ensure it's valid."
            })
        elif "429" in error_msg or "rate limit" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": "Rate limit exceeded. Please wait before making more requests or check your Tavily plan."
            })
        elif "timeout" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": f"Request timed out. Try increasing the timeout parameter (currently {timeout} seconds)."
            })
        else:
            return json.dumps({
                "success": False,
                "error": f"An error occurred: {error_msg}. Please try again or check your query parameters."
            })


# ============================================================================
# Web Extract Tool
# ============================================================================

def web_extract(
    urls: List[str],  # List of URLs to extract content from
    extract_depth: str = "low",  # Extraction depth: "low" (default), "medium", or "high" (default: "low")
    format: str = "text",  # Output format: "text" (default) or "markdown" (default: "text")
    timeout: int = 60,  # Timeout in seconds (default: 60)
    chunks_per_source: int = 1  # Number of chunks per source (default: 1)
) -> str:  # JSON string with extracted content or error
    """
    Extract content from URLs and return as JSON.
    
    **Key characteristics:**
    - **Stateless**: Each extraction runs independently
    - **Timeout**: Controls how long the extraction can run before being cancelled
    - When timeout is not specified (or set to None), the config default is used
    - If extraction is timing out, increase the timeout using the timeout argument
    
    **Usage Tips:**
    - Use "low" extract_depth for quick, basic content extraction
    - Use "medium" or "high" for more thorough extraction (slower but more detailed)
    - Use "markdown" format for formatted output that's easier to read
    - Use "text" format for plain text (default, works well with most use cases)
    - Increase chunks_per_source for more detailed content from each URL
    
    **Error Handling:**
    - If you get API errors, check:
      1. Your API key is valid (set TAVILY_API_KEY environment variable)
      2. The URLs are valid and accessible
      3. You're not hitting rate limits (wait and retry)
      4. The timeout is sufficient for the extraction depth
    - Error messages will indicate if the issue is with the API key, URLs, or timeout
    
    IMPORTANT: The API key is set via the TAVILY_API_KEY environment variable and should not be
    modified by the LLM. Only change other parameters if you encounter errors or failures.
    
    Default values are optimized for general use:
    - extract_depth: "low" (default, use "medium" or "high" for more thorough extraction)
    - format: "text" (default, use "markdown" for formatted output)
    - timeout: 60 seconds (default, increase if getting timeout errors)
    - chunks_per_source: 1 (default, increase for more detailed content)
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "results": [
            {
                "url": "URL that was extracted",
                "content": "Extracted content",
                "chunks": ["content chunks"],
                "word_count": number of words,
                "errors": ["error messages"] (if any)
            }
        ]
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Get API key from environment
        api_key = os.environ.get("TAVILY_API_KEY")
        if api_key is None:
            return json.dumps({
                "success": False,
                "error": "API key not provided. Please set the TAVILY_API_KEY environment variable."
            })
        
        # Validate URLs
        if not urls or len(urls) == 0:
            return json.dumps({
                "success": False,
                "error": "No URLs provided. Please provide at least one URL to extract content from."
            })
        
        for url in urls:
            if not url.startswith("http"):
                return json.dumps({
                    "success": False,
                    "error": f"Invalid URL format: {url}. URLs must start with http:// or https://"
                })
        
        client = TavilyClient(api_key=api_key)
        
        response = client.extract_urls(
            urls=urls,
            extract_depth=extract_depth,
            format=format,
            timeout=timeout,
            chunks_per_source=chunks_per_source
        )
        
        return json.dumps({
            "success": True,
            "results": response["results"]
        })
        
    except Exception as e:
        error_msg = str(e)
        if "401" in error_msg or "Unauthorized" in error_msg:
            return json.dumps({
                "success": False,
                "error": "Invalid API key. Please check your TAVILY_API_KEY and ensure it's valid."
            })
        elif "429" in error_msg or "rate limit" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": "Rate limit exceeded. Please wait before making more requests or check your Tavily plan."
            })
        elif "timeout" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": f"Request timed out. Try increasing the timeout parameter (currently {timeout} seconds)."
            })
        else:
            return json.dumps({
                "success": False,
                "error": f"An error occurred while extracting content: {error_msg}. Please check the URLs and try again."
            })


# ============================================================================
# Web Crawl Tool
# ============================================================================

def web_crawl(
    base_url: str,  # The starting URL for crawling
    max_depth: int = 1,  # Maximum crawl depth (default: 1)
    max_breadth: int = 10,  # Maximum number of pages to crawl (default: 10)
    limit: int = 10,  # Maximum number of results (default: 10)
    extract_depth: str = "low",  # Extraction depth: "low" (default), "medium", or "high" (default: "low")
    format: str = "text",  # Output format: "text" (default) or "markdown" (default: "text")
    timeout: int = 60,  # Timeout in seconds (default: 60)
    chunks_per_source: int = 1  # Number of chunks per source (default: 1)
) -> str:  # JSON string with crawled content or error
    """
    Crawl a website and extract content from pages, returning as JSON.
    
    **Key characteristics:**
    - **Stateless**: Each crawl runs independently
    - **Timeout**: Controls how long the crawl can run before being cancelled
    - When timeout is not specified (or set to None), the config default is used
    - If crawl is timing out, increase the timeout using the timeout argument
    
    **Usage Tips:**
    - Use max_depth=1 for quick crawling of a single page and its direct links
    - Use max_depth=2 or 3 for more comprehensive crawling (slower but more thorough)
    - Limit max_breadth to avoid crawling too many pages (default 10 is good)
    - Limit the total number of results with the limit parameter
    - Use "low" extract_depth for quick, basic content extraction
    - Use "medium" or "high" for more thorough extraction (slower but more detailed)
    - Use "markdown" format for formatted output that's easier to read
    
    **Error Handling:**
    - If you get API errors, check:
      1. Your API key is valid (set TAVILY_API_KEY environment variable)
      2. The base URL is valid and accessible
      3. You're not hitting rate limits (wait and retry)
      4. The timeout is sufficient for the crawl depth and breadth
    - Error messages will indicate if the issue is with the API key, URL, or timeout
    
    IMPORTANT: The API key is set via the TAVILY_API_KEY environment variable and should not be
    modified by the LLM. Only change other parameters if you encounter errors or failures.
    
    Default values are optimized for general use:
    - max_depth: 1 (default, good for most websites)
    - max_breadth: 10 (default, good balance between coverage and performance)
    - limit: 10 (default, good for comprehensive but not excessive crawling)
    - extract_depth: "low" (default, use "medium" or "high" for more thorough extraction)
    - format: "text" (default, use "markdown" for formatted output)
    - timeout: 60 seconds (default, increase if getting timeout errors)
    - chunks_per_source: 1 (default, increase for more detailed content)
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "results": [
            {
                "url": "URL that was crawled",
                "title": "Page title",
                "content": "Extracted content",
                "depth": crawl depth,
                "word_count": number of words,
                "errors": ["error messages"] (if any)
            }
        ]
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Get API key from environment
        api_key = os.environ.get("TAVILY_API_KEY")
        if api_key is None:
            return json.dumps({
                "success": False,
                "error": "API key not provided. Please set the TAVILY_API_KEY environment variable."
            })
        
        # Validate base URL
        if not base_url.startswith("http"):
            return json.dumps({
                "success": False,
                "error": f"Invalid base URL format: {base_url}. URLs must start with http:// or https://"
            })
        
        client = TavilyClient(api_key=api_key)
        
        response = client.crawl_base_url(
            base_url=base_url,
            max_depth=max_depth,
            max_breadth=max_breadth,
            limit=limit,
            extract_depth=extract_depth,
            format=format,
            timeout=timeout,
            chunks_per_source=chunks_per_source
        )
        
        return json.dumps({
            "success": True,
            "results": response["results"]
        })
        
    except Exception as e:
        error_msg = str(e)
        if "401" in error_msg or "Unauthorized" in error_msg:
            return json.dumps({
                "success": False,
                "error": "Invalid API key. Please check your TAVILY_API_KEY and ensure it's valid."
            })
        elif "429" in error_msg or "rate limit" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": "Rate limit exceeded. Please wait before making more requests or check your Tavily plan."
            })
        elif "timeout" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": f"Request timed out. Try increasing the timeout parameter (currently {timeout} seconds)."
            })
        else:
            return json.dumps({
                "success": False,
                "error": f"An error occurred while crawling: {error_msg}. Please check the base URL and try again."
            })


# ============================================================================
# Web Map Tool
# ============================================================================

def web_map(
    base_url: str,  # The starting URL for sitemap generation
    max_depth: int = 1,  # Maximum crawl depth (default: 1)
    max_breadth: int = 10,  # Maximum number of pages to discover (default: 10)
    limit: int = 10,  # Maximum number of URLs to return (default: 10)
    timeout: int = 60  # Timeout in seconds (default: 60)
) -> str:  # JSON string with sitemap or error
    """
    Generate a sitemap by discovering URLs on a website, returning as JSON.
    
    **Key characteristics:**
    - **Stateless**: Each sitemap generation runs independently
    - **Timeout**: Controls how long the sitemap generation can run before being cancelled
    - When timeout is not specified (or set to None), the config default is used
    - If sitemap generation is timing out, increase the timeout using the timeout argument
    
    **Usage Tips:**
    - Use max_depth=1 for quick discovery of a single page and its direct links
    - Use max_depth=2 or 3 for more comprehensive sitemap generation (slower but more thorough)
    - Limit max_breadth to avoid discovering too many URLs (default 10 is good)
    - Limit the total number of URLs with the limit parameter
    
    **Error Handling:**
    - If you get API errors, check:
      1. Your API key is valid (set TAVILY_API_KEY environment variable)
      2. The base URL is valid and accessible
      3. You're not hitting rate limits (wait and retry)
      4. The timeout is sufficient for the sitemap depth and breadth
    - Error messages will indicate if the issue is with the API key, URL, or timeout
    
    IMPORTANT: The API key is set via the TAVILY_API_KEY environment variable and should not be
    modified by the LLM. Only change other parameters if you encounter errors or failures.
    
    Default values are optimized for general use:
    - max_depth: 1 (default, good for most websites)
    - max_breadth: 10 (default, good balance between coverage and performance)
    - limit: 10 (default, good for comprehensive but not excessive mapping)
    - timeout: 60 seconds (default, increase if getting timeout errors)
    
    Returns JSON with format:
    {
        "success": true/false,
        "error": "error message" (only if success=false),
        "results": [
            {
                "url": "Discovered URL",
                "title": "Page title" (optional),
                "depth": discovery depth,
                "status_code": HTTP status code,
                "errors": ["error messages"] (if any)
            }
        ]
    }
    
    Error messages will guide the LLM on how to fix the issue.
    """
    try:
        # Get API key from environment
        api_key = os.environ.get("TAVILY_API_KEY")
        if api_key is None:
            return json.dumps({
                "success": False,
                "error": "API key not provided. Please set the TAVILY_API_KEY environment variable."
            })
        
        # Validate base URL
        if not base_url.startswith("http"):
            return json.dumps({
                "success": False,
                "error": f"Invalid base URL format: {base_url}. URLs must start with http:// or https://"
            })
        
        client = TavilyClient(api_key=api_key)
        
        response = client.sitemap(
            base_url=base_url,
            max_depth=max_depth,
            max_breadth=max_breadth,
            limit=limit,
            timeout=timeout
        )
        
        return json.dumps({
            "success": True,
            "results": response["results"]
        })
        
    except Exception as e:
        error_msg = str(e)
        if "401" in error_msg or "Unauthorized" in error_msg:
            return json.dumps({
                "success": False,
                "error": "Invalid API key. Please check your TAVILY_API_KEY and ensure it's valid."
            })
        elif "429" in error_msg or "rate limit" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": "Rate limit exceeded. Please wait before making more requests or check your Tavily plan."
            })
        elif "timeout" in error_msg.lower():
            return json.dumps({
                "success": False,
                "error": f"Request timed out. Try increasing the timeout parameter (currently {timeout} seconds)."
            })
        else:
            return json.dumps({
                "success": False,
                "error": f"An error occurred while generating sitemap: {error_msg}. Please check the base URL and try again."
            })

Test suite for web search tools.

Tests both functionality and error handling with JSON return format.

In [None]:
import os
import pytest
import json
from unittest.mock import Mock, patch, MagicMock

# ============================================================================
# Test Web Search
# ============================================================================


def test_web_search_with_api_key():
    """Test web_search with explicit API key returns JSON"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.search.return_value = {
            "results": [
                {"url": "https://example.com", "title": "Example", "content": "Test content"}
            ]
        }
        
        # Call function
        result = web_search(
            query="test query",
            api_key="test-api-key",
            max_results=5
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        assert len(data["results"]) == 1
        assert data["results"][0]["url"] == "https://example.com"
        assert "error" not in data


def test_web_search_with_env_api_key():
    """Test web_search with API key from environment variable"""
    with patch.dict(os.environ, {"TAVILY_API_KEY": "env-api-key"}), \
         patch('web_search_simple.TavilyClient') as mock_client:
        
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.search.return_value = {
            "results": [{"url": "https://example.com", "title": "Example"}]
        }
        
        # Call function without api_key parameter
        result = web_search(query="test query")
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        assert len(data["results"]) == 1
        mock_client.assert_called_once_with(api_key="env-api-key")


def test_web_search_missing_api_key():
    """Test web_search returns error JSON when API key is missing"""
    with patch.dict(os.environ, {}, clear=True):
        result = web_search(query="test query")
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "API key not provided" in data["error"]
        assert "TAVILY_API_KEY" in data["error"]


def test_web_search_custom_params():
    """Test web_search with custom parameters"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.search.return_value = {"results": []}
        
        # Call with custom params
        result = web_search(
            query="custom query",
            api_key="test-key",
            max_results=10,
            search_depth="advanced",
            timeout=120
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        mock_instance.search.assert_called_once_with(
            query="custom query",
            max_results=10,
            search_depth="advanced",
            include_raw_content=True,
            timeout=120
        )


def test_web_search_unauthorized_error():
    """Test web_search returns helpful error for unauthorized"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock to raise 401 error
        mock_instance = mock_client.return_value
        mock_instance.search.side_effect = Exception("401 Unauthorized")
        
        # Call function
        result = web_search(query="test" )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "Invalid API key" in data["error"]
        assert "TAVILY_API_KEY" in data["error"]


def test_web_search_rate_limit_error():
    """Test web_search returns helpful error for rate limit"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock to raise rate limit error
        mock_instance = mock_client.return_value
        mock_instance.search.side_effect = Exception("429 Too Many Requests")
        
        # Call function
        result = web_search(query="test" )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "rate limit" in data["error"].lower()


def test_web_search_timeout_error():
    """Test web_search returns helpful error for timeout"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock to raise timeout error
        mock_instance = mock_client.return_value
        mock_instance.search.side_effect = Exception("Timeout after 60 seconds")
        
        # Call function
        result = web_search(query="test" , timeout=60)
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "timeout" in data["error"].lower()
        assert "60 seconds" in data["error"]


# ============================================================================
# Test Web Extract
# ============================================================================


def test_web_extract_with_api_key():
    """Test web_extract with explicit API key returns JSON"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.extract_urls.return_value = {
            "results": [
                {
                    "url": "https://example.com",
                    "content": "Extracted content",
                    "word_count": 100
                }
            ]
        }
        
        # Call function
        result = web_extract(
            urls=["https://example.com"],
            api_key="test-api-key"
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        assert len(data["results"]) == 1
        assert data["results"][0]["url"] == "https://example.com"
        assert "error" not in data


def test_web_extract_with_env_api_key():
    """Test web_extract with API key from environment variable"""
    with patch.dict(os.environ, {"TAVILY_API_KEY": "env-api-key"}), \
         patch('web_search_simple.TavilyClient') as mock_client:
        
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.extract_urls.return_value = {"results": []}
        
        # Call function without api_key parameter
        result = web_extract(urls=["https://example.com"])
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        mock_client.assert_called_once_with(api_key="env-api-key")


def test_web_extract_empty_urls():
    """Test web_extract returns error for empty URLs list"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        result = web_extract(urls=[] )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "No URLs provided" in data["error"]


def test_web_extract_invalid_url():
    """Test web_extract returns error for invalid URL format"""
    result = web_extract(urls=["example.com"] )
    
    # Parse JSON
    data = json.loads(result)
    
    # Assertions
    assert data["success"] is False
    assert "error" in data
    assert "Invalid URL format" in data["error"]
    assert "http" in data["error"]


def test_web_extract_custom_params():
    """Test web_extract with custom parameters"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.extract_urls.return_value = {"results": []}
        
        # Call with custom params
        result = web_extract(
            urls=["https://example.com", "https://example.org"],
            api_key="test-key",
            extract_depth="high",
            format="markdown",
            timeout=90,
            chunks_per_source=3
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        mock_instance.extract_urls.assert_called_once_with(
            urls=["https://example.com", "https://example.org"],
            extract_depth="high",
            format="markdown",
            timeout=90,
            chunks_per_source=3
        )


def test_web_extract_unauthorized_error():
    """Test web_extract returns helpful error for unauthorized"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock to raise 401 error
        mock_instance = mock_client.return_value
        mock_instance.extract_urls.side_effect = Exception("401 Unauthorized")
        
        # Call function
        result = web_extract(urls=["https://example.com"] )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "Invalid API key" in data["error"]


# ============================================================================
# Test Web Crawl
# ============================================================================


def test_web_crawl_with_api_key():
    """Test web_crawl with explicit API key returns JSON"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.crawl_base_url.return_value = {
            "results": [
                {
                    "url": "https://example.com/page1",
                    "title": "Page 1",
                    "content": "Content 1",
                    "depth": 0
                }
            ]
        }
        
        # Call function
        result = web_crawl(
            base_url="https://example.com",
            api_key="test-api-key"
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        assert len(data["results"]) == 1
        assert data["results"][0]["url"] == "https://example.com/page1"
        assert "error" not in data


def test_web_crawl_invalid_base_url():
    """Test web_crawl returns error for invalid base URL format"""
    result = web_crawl(base_url="example.com" )
    
    # Parse JSON
    data = json.loads(result)
    
    # Assertions
    assert data["success"] is False
    assert "error" in data
    assert "Invalid base URL format" in data["error"]
    assert "http" in data["error"]


def test_web_crawl_with_env_api_key():
    """Test web_crawl with API key from environment variable"""
    with patch.dict(os.environ, {"TAVILY_API_KEY": "env-api-key"}), \
         patch('web_search_simple.TavilyClient') as mock_client:
        
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.crawl_base_url.return_value = {"results": []}
        
        # Call function without api_key parameter
        result = web_crawl(base_url="https://example.com")
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        mock_client.assert_called_once_with(api_key="env-api-key")


def test_web_crawl_custom_params():
    """Test web_crawl with custom parameters"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.crawl_base_url.return_value = {"results": []}
        
        # Call with custom params
        result = web_crawl(
            base_url="https://example.com",
            api_key="test-key",
            max_depth=2,
            max_breadth=20,
            limit=5,
            extract_depth="medium",
            format="markdown",
            timeout=180,
            chunks_per_source=2
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        mock_instance.crawl_base_url.assert_called_once_with(
            base_url="https://example.com",
            max_depth=2,
            max_breadth=20,
            limit=5,
            extract_depth="medium",
            format="markdown",
            timeout=180,
            chunks_per_source=2
        )


def test_web_crawl_unauthorized_error():
    """Test web_crawl returns helpful error for unauthorized"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock to raise 401 error
        mock_instance = mock_client.return_value
        mock_instance.crawl_base_url.side_effect = Exception("401 Unauthorized")
        
        # Call function
        result = web_crawl(base_url="https://example.com" )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "Invalid API key" in data["error"]


# ============================================================================
# Test Web Map
# ============================================================================


def test_web_map_with_api_key():
    """Test web_map with explicit API key returns JSON"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.sitemap.return_value = {
            "results": [
                {
                    "url": "https://example.com/page1",
                    "title": "Page 1",
                    "depth": 0,
                    "status_code": 200
                },
                {
                    "url": "https://example.com/page2",
                    "title": "Page 2",
                    "depth": 0,
                    "status_code": 200
                }
            ]
        }
        
        # Call function
        result = web_map(
            base_url="https://example.com",
            api_key="test-api-key"
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        assert len(data["results"]) == 2
        assert data["results"][0]["url"] == "https://example.com/page1"
        assert "error" not in data


def test_web_map_invalid_base_url():
    """Test web_map returns error for invalid base URL format"""
    result = web_map(base_url="example.com" )
    
    # Parse JSON
    data = json.loads(result)
    
    # Assertions
    assert data["success"] is False
    assert "error" in data
    assert "Invalid base URL format" in data["error"]
    assert "http" in data["error"]


def test_web_map_with_env_api_key():
    """Test web_map with API key from environment variable"""
    with patch.dict(os.environ, {"TAVILY_API_KEY": "env-api-key"}), \
         patch('web_search_simple.TavilyClient') as mock_client:
        
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.sitemap.return_value = {"results": []}
        
        # Call function without api_key parameter
        result = web_map(base_url="https://example.com")
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        mock_client.assert_called_once_with(api_key="env-api-key")


def test_web_map_custom_params():
    """Test web_map with custom parameters"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock
        mock_instance = mock_client.return_value
        mock_instance.sitemap.return_value = {"results": []}
        
        # Call with custom params
        result = web_map(
            base_url="https://example.com",
            api_key="test-key",
            max_depth=3,
            max_breadth=50,
            limit=25,
            timeout=300
        )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is True
        mock_instance.sitemap.assert_called_once_with(
            base_url="https://example.com",
            max_depth=3,
            max_breadth=50,
            limit=25,
            timeout=300
        )


def test_web_map_unauthorized_error():
    """Test web_map returns helpful error for unauthorized"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock to raise 401 error
        mock_instance = mock_client.return_value
        mock_instance.sitemap.side_effect = Exception("401 Unauthorized")
        
        # Call function
        result = web_map(base_url="https://example.com" )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "Invalid API key" in data["error"]


def test_web_map_rate_limit_error():
    """Test web_map returns helpful error for rate limit"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        # Setup mock to raise rate limit error
        mock_instance = mock_client.return_value
        mock_instance.sitemap.side_effect = Exception("Rate limit exceeded")
        
        # Call function
        result = web_map(base_url="https://example.com" )
        
        # Parse JSON
        data = json.loads(result)
        
        # Assertions
        assert data["success"] is False
        assert "error" in data
        assert "rate limit" in data["error"].lower()
        assert "wait" in data["error"].lower()


# ============================================================================
# Test Error Handling
# ============================================================================


def test_all_tools_error_without_api_key():
    """Test all tools return error JSON when API key is missing"""
    with patch.dict(os.environ, {}, clear=True):
        # Test web_search
        result = web_search(query="test")
        data = json.loads(result)
        assert data["success"] is False
        assert "error" in data
        
        # Test web_extract
        result = web_extract(urls=["https://example.com"])
        data = json.loads(result)
        assert data["success"] is False
        assert "error" in data
        
        # Test web_crawl
        result = web_crawl(base_url="https://example.com")
        data = json.loads(result)
        assert data["success"] is False
        assert "error" in data
        
        # Test web_map
        result = web_map(base_url="https://example.com")
        data = json.loads(result)
        assert data["success"] is False
        assert "error" in data


def test_all_tools_use_env_api_key():
    """Test all tools can use API key from environment"""
    with patch.dict(os.environ, {"TAVILY_API_KEY": "env-key"}), \
         patch('web_search_simple.TavilyClient') as mock_client:
        
        mock_instance = mock_client.return_value
        mock_instance.search.return_value = {"results": []}
        mock_instance.extract_urls.return_value = {"results": []}
        mock_instance.crawl_base_url.return_value = {"results": []}
        mock_instance.sitemap.return_value = {"results": []}
        
        # All should work without explicit api_key
        result = web_search(query="test")
        assert json.loads(result)["success"] is True
        
        result = web_extract(urls=["https://example.com"])
        assert json.loads(result)["success"] is True
        
        result = web_crawl(base_url="https://example.com")
        assert json.loads(result)["success"] is True
        
        result = web_map(base_url="https://example.com")
        assert json.loads(result)["success"] is True
        
        # All should use the env key
        assert mock_client.call_count == 4
        for call in mock_client.call_args_list:
            assert call[1]['api_key'] == 'env-key'


def test_error_messages_are_helpful():
    """Test that error messages guide the LLM on how to fix issues"""
    with patch.dict(os.environ, {}, clear=True):
        # Test missing API key
        result = web_search(query="test")
        data = json.loads(result)
        assert "TAVILY_API_KEY" in data["error"]
        assert "environment variable" in data["error"].lower()
        
        # Test invalid URL format
        # First set API key to test URL validation
        os.environ["TAVILY_API_KEY"] = "test-key"
        result = web_extract(urls=["example.com"] )
        data = json.loads(result)
        assert "http" in data["error"].lower()
        
        # Test invalid base URL format
        result = web_crawl(base_url="example.com" )
        data = json.loads(result)
        assert "http" in data["error"].lower()
        
        # Clean up
        del os.environ["TAVILY_API_KEY"]


# ============================================================================
# Test JSON Format
# ============================================================================


def test_all_tools_return_valid_json():
    """Test all tools return valid JSON strings"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        mock_instance = mock_client.return_value
        mock_instance.search.return_value = {"results": []}
        mock_instance.extract_urls.return_value = {"results": []}
        mock_instance.crawl_base_url.return_value = {"results": []}
        mock_instance.sitemap.return_value = {"results": []}
        
        # Test web_search
        result = web_search(query="test" )
        assert isinstance(result, str)
        json.loads(result)  # Should not raise
        
        # Test web_extract
        result = web_extract(urls=["https://example.com"] )
        assert isinstance(result, str)
        json.loads(result)  # Should not raise
        
        # Test web_crawl
        result = web_crawl(base_url="https://example.com" )
        assert isinstance(result, str)
        json.loads(result)  # Should not raise
        
        # Test web_map
        result = web_map(base_url="https://example.com" )
        assert isinstance(result, str)
        json.loads(result)  # Should not raise


def test_json_has_required_fields():
    """Test JSON responses have required fields"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        mock_instance = mock_client.return_value
        mock_instance.search.return_value = {
            "results": [{"url": "https://example.com", "title": "Test"}]
        }
        
        # Test success response
        result = web_search(query="test" )
        data = json.loads(result)
        assert "success" in data
        assert "results" in data
        assert data["success"] is True
        
        # Test error response (with no API key and no env var)
        with patch.dict(os.environ, {}, clear=True):
            result = web_search(query="test")
            data = json.loads(result)
            assert "success" in data
            assert "error" in data
            assert data["success"] is False


def test_json_error_format():
    """Test error responses have consistent format"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        mock_instance = mock_client.return_value
        mock_instance.search.side_effect = Exception("Test error")
        
        result = web_search(query="test" )
        data = json.loads(result)
        
        assert data["success"] is False
        assert "error" in data
        assert isinstance(data["error"], str)
        assert len(data["error"]) > 0
        assert "Test error" in data["error"]


# ============================================================================
# Test Edge Cases
# ============================================================================


def test_web_search_empty_results():
    """Test web_search with empty results"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        mock_instance = mock_client.return_value
        mock_instance.search.return_value = {"results": []}
        
        result = web_search(query="test" )
        data = json.loads(result)
        
        assert data["success"] is True
        assert data["results"] == []


def test_web_extract_multiple_urls():
    """Test web_extract with multiple URLs"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        mock_instance = mock_client.return_value
        mock_instance.extract_urls.return_value = {
            "results": [
                {"url": "https://example.com", "content": "Content 1"},
                {"url": "https://example.org", "content": "Content 2"}
            ]
        }
        
        result = web_extract(
            urls=["https://example.com", "https://example.org"],
            api_key="test-key"
        )
        data = json.loads(result)
        
        assert data["success"] is True
        assert len(data["results"]) == 2
        assert data["results"][0]["url"] == "https://example.com"
        assert data["results"][1]["url"] == "https://example.org"


def test_web_crawl_with_limit():
    """Test web_crawl respects limit parameter"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        mock_instance = mock_client.return_value
        mock_instance.crawl_base_url.return_value = {
            "results": [
                {"url": f"https://example.com/page{i}"} for i in range(5)
            ]
        }
        
        result = web_crawl(
            base_url="https://example.com",
            api_key="test-key",
            limit=3
        )
        data = json.loads(result)
        
        assert data["success"] is True
        mock_instance.crawl_base_url.assert_called_once()
        call_kwargs = mock_instance.crawl_base_url.call_args[1]
        assert call_kwargs['limit'] == 3


def test_web_map_with_limit():
    """Test web_map respects limit parameter"""
    with patch('web_search_simple.TavilyClient') as mock_client:
        mock_instance = mock_client.return_value
        mock_instance.sitemap.return_value = {
            "results": [
                {"url": f"https://example.com/page{i}"} for i in range(10)
            ]
        }
        
        result = web_map(
            base_url="https://example.com",
            api_key="test-key",
            limit=5
        )
        data = json.loads(result)
        
        assert data["success"] is True
        mock_instance.sitemap.assert_called_once()
        call_kwargs = mock_instance.sitemap.call_args[1]
        assert call_kwargs['limit'] == 5