diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ff608d..f64123d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,10 @@ For multi-package releases, use package names as subsections: ## [Unreleased] +### Added + +- Added Management toolset with `command` and `list_commands` tools for executing and discovering Django management commands + ## [0.12.0] ### Added diff --git a/README.md b/README.md index 3c07b19..e16b91e 100644 --- a/README.md +++ b/README.md @@ -206,6 +206,13 @@ Read-only resources for project exploration without executing code (note that re | `list_models` | Get detailed information about Django models with optional filtering by app or scope | | `list_routes` | Introspect Django URL routes with filtering support for HTTP method, route name, or URL pattern | +#### Management + +| Tool | Description | +|------|-------------| +| `execute_command` | Execute Django management commands with arguments and options | +| `list_commands` | List all available Django management commands with their source apps | + #### Shell | Tool | Description | diff --git a/src/mcp_django/mgmt/__init__.py b/src/mcp_django/mgmt/__init__.py new file mode 100644 index 0000000..d0a7eb0 --- /dev/null +++ b/src/mcp_django/mgmt/__init__.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from .server import MANAGEMENT_TOOLSET +from .server import mcp + +__all__ = [ + "MANAGEMENT_TOOLSET", + "mcp", +] diff --git a/src/mcp_django/mgmt/core.py b/src/mcp_django/mgmt/core.py new file mode 100644 index 0000000..b2d36e1 --- /dev/null +++ b/src/mcp_django/mgmt/core.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import logging +from contextlib import redirect_stderr +from contextlib import redirect_stdout +from dataclasses import dataclass +from dataclasses import field +from datetime import datetime +from io import StringIO + +from asgiref.sync import sync_to_async +from django.core.management import call_command +from django.core.management import get_commands +from pydantic import BaseModel +from pydantic import ConfigDict + +logger = logging.getLogger(__name__) + + +@dataclass +class CommandResult: + command: str + args: tuple[str, ...] + options: dict[str, str | int | bool] + stdout: str + stderr: str + timestamp: datetime = field(default_factory=datetime.now) + + def __post_init__(self): + logger.debug( + "%s created for command: %s", self.__class__.__name__, self.command + ) + if self.stdout: + logger.debug("%s.stdout: %s", self.__class__.__name__, self.stdout[:200]) + if self.stderr: + logger.debug("%s.stderr: %s", self.__class__.__name__, self.stderr[:200]) + + +@dataclass +class CommandErrorResult: + command: str + args: tuple[str, ...] + options: dict[str, str | int | bool] + exception: Exception + stdout: str + stderr: str + timestamp: datetime = field(default_factory=datetime.now) + + def __post_init__(self): + logger.debug( + "%s created for command: %s - exception type: %s", + self.__class__.__name__, + self.command, + type(self.exception).__name__, + ) + logger.debug("%s.message: %s", self.__class__.__name__, str(self.exception)) + if self.stdout: + logger.debug("%s.stdout: %s", self.__class__.__name__, self.stdout[:200]) + if self.stderr: + logger.debug("%s.stderr: %s", self.__class__.__name__, self.stderr[:200]) + + +Result = CommandResult | CommandErrorResult + + +class ManagementCommandOutput(BaseModel): + status: str # "success" or "error" + command: str + args: list[str] + options: dict[str, str | int | bool] + stdout: str + stderr: str + exception: ExceptionInfo | None = None + + @classmethod + def from_result(cls, result: Result) -> ManagementCommandOutput: + match result: + case CommandResult(): + return cls( + status="success", + command=result.command, + args=list(result.args), + options=result.options, + stdout=result.stdout, + stderr=result.stderr, + exception=None, + ) + case CommandErrorResult(): + return cls( + status="error", + command=result.command, + args=list(result.args), + options=result.options, + stdout=result.stdout, + stderr=result.stderr, + exception=ExceptionInfo( + type=type(result.exception).__name__, + message=str(result.exception), + ), + ) + + +class ExceptionInfo(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + type: str + message: str + + +class ManagementCommandExecutor: + async def execute( + self, + command: str, + args: list[str] | None = None, + options: dict[str, str | int | bool] | None = None, + ) -> Result: + """Execute a Django management command asynchronously. + + Args: + command: The management command name (e.g., 'migrate', 'check') + args: Positional arguments for the command + options: Keyword options for the command + + Returns: + CommandResult if successful, CommandErrorResult if an exception occurred + """ + return await sync_to_async(self._execute)(command, args, options) + + def _execute( + self, + command: str, + args: list[str] | None = None, + options: dict[str, str | int | bool] | None = None, + ) -> Result: + """Execute a Django management command synchronously. + + Captures stdout and stderr from the command execution. + + Args: + command: The management command name + args: Positional arguments for the command + options: Keyword options for the command + + Returns: + CommandResult if successful, CommandErrorResult if an exception occurred + """ + args = args or [] + options = options or {} + + args_tuple = tuple(args) + options_dict = dict(options) + + logger.info( + "Executing management command: %s with args=%s, options=%s", + command, + args_tuple, + options_dict, + ) + + stdout = StringIO() + stderr = StringIO() + + with redirect_stdout(stdout), redirect_stderr(stderr): + try: + call_command(command, *args_tuple, **options_dict) + + logger.debug("Management command executed successfully: %s", command) + + return CommandResult( + command=command, + args=args_tuple, + options=options_dict, + stdout=stdout.getvalue(), + stderr=stderr.getvalue(), + ) + + except Exception as e: + logger.error( + "Exception during management command execution: %s - Command: %s", + f"{type(e).__name__}: {e}", + command, + ) + logger.debug("Full traceback for error:", exc_info=True) + + return CommandErrorResult( + command=command, + args=args_tuple, + options=options_dict, + exception=e, + stdout=stdout.getvalue(), + stderr=stderr.getvalue(), + ) + + +management_command_executor = ManagementCommandExecutor() + + +class CommandInfo(BaseModel): + name: str + app_name: str + + +def get_management_commands() -> list[CommandInfo]: + """Get list of all available Django management commands. + + Returns a list of management commands with their app origins, + sorted alphabetically by command name. + + Returns: + List of CommandInfo objects containing command name and source app. + """ + logger.info("Fetching available management commands") + + commands = get_commands() + command_list = [ + CommandInfo(name=name, app_name=app_name) + for name, app_name in sorted(commands.items()) + ] + + logger.debug("Found %d management commands", len(command_list)) + + return command_list diff --git a/src/mcp_django/mgmt/server.py b/src/mcp_django/mgmt/server.py new file mode 100644 index 0000000..897b9ab --- /dev/null +++ b/src/mcp_django/mgmt/server.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +import logging +from typing import Annotated + +from fastmcp import Context +from fastmcp import FastMCP +from mcp.types import ToolAnnotations + +from .core import CommandInfo +from .core import ManagementCommandOutput +from .core import get_management_commands +from .core import management_command_executor + +logger = logging.getLogger(__name__) + +mcp = FastMCP( + name="Management", + instructions="Execute and discover Django management commands. Run commands with arguments and options, or list available commands to discover what's available in your project.", +) + +MANAGEMENT_TOOLSET = "management" + + +@mcp.tool( + name="execute_command", + annotations=ToolAnnotations( + title="Execute Django Management Command", + destructiveHint=True, + openWorldHint=True, + ), + tags={MANAGEMENT_TOOLSET}, +) +async def execute_command( + ctx: Context, + command: Annotated[ + str, + "Management command name (e.g., 'migrate', 'check', 'collectstatic')", + ], + args: Annotated[ + list[str] | None, + "Positional arguments for the command", + ] = None, + options: Annotated[ + dict[str, str | int | bool] | None, + "Keyword options for the command (use underscores for dashes, e.g., 'run_syncdb' for '--run-syncdb')", + ] = None, +) -> ManagementCommandOutput: + """Execute a Django management command. + + Calls Django's call_command() to run management commands. Arguments and options + are passed directly to the command. Command output (stdout/stderr) is captured + and returned. + + Examples: + - Check for issues: command="check" + - Show migrations: command="showmigrations", args=["myapp"] + - Migrate with options: command="migrate", options={"verbosity": 2} + - Check with tag: command="check", options={"tag": "security"} + + Note: Management commands can modify your database and project state. Use with + caution, especially commands like migrate, flush, loaddata, etc. + """ + logger.info( + "management_command called - request_id: %s, client_id: %s, command: %s, args: %s, options: %s", + ctx.request_id, + ctx.client_id or "unknown", + command, + args, + options, + ) + + try: + result = await management_command_executor.execute(command, args, options) + output = ManagementCommandOutput.from_result(result) + + logger.debug( + "management_command completed - request_id: %s, status: %s", + ctx.request_id, + output.status, + ) + + if output.status == "error" and output.exception: + await ctx.debug( + f"Command failed: {output.exception.type}: {output.exception.message}" + ) + + return output + + except Exception as e: + logger.error( + "Unexpected error in management_command tool - request_id: %s: %s", + ctx.request_id, + e, + exc_info=True, + ) + raise + + +@mcp.tool( + name="list_commands", + annotations=ToolAnnotations( + title="List Django Management Commands", + readOnlyHint=True, + idempotentHint=True, + ), + tags={MANAGEMENT_TOOLSET}, +) +def list_commands(ctx: Context) -> list[CommandInfo]: + """List all available Django management commands. + + Returns a list of all management commands available in the current Django + project, including built-in Django commands and custom commands from + installed apps. Each command includes its name and the app that provides it. + + Useful for discovering what commands are available before executing them + with the execute_command tool. + """ + logger.info( + "list_management_commands called - request_id: %s, client_id: %s", + ctx.request_id, + ctx.client_id or "unknown", + ) + + commands = get_management_commands() + + logger.debug( + "list_management_commands completed - request_id: %s, commands_count: %d", + ctx.request_id, + len(commands), + ) + + return commands diff --git a/src/mcp_django/server.py b/src/mcp_django/server.py index 8ad2a90..3fa2907 100644 --- a/src/mcp_django/server.py +++ b/src/mcp_django/server.py @@ -6,6 +6,8 @@ from fastmcp import FastMCP +from mcp_django.mgmt import MANAGEMENT_TOOLSET +from mcp_django.mgmt import mcp as management_mcp from mcp_django.packages import DJANGOPACKAGES_TOOLSET from mcp_django.packages import mcp as packages_mcp from mcp_django.project import PROJECT_TOOLSET @@ -17,6 +19,7 @@ TOOLSETS = { DJANGOPACKAGES_TOOLSET: packages_mcp, + MANAGEMENT_TOOLSET: management_mcp, PROJECT_TOOLSET: project_mcp, SHELL_TOOLSET: shell_mcp, } diff --git a/tests/test_management_command.py b/tests/test_management_command.py new file mode 100644 index 0000000..73767f8 --- /dev/null +++ b/tests/test_management_command.py @@ -0,0 +1,270 @@ +from __future__ import annotations + +import pytest +import pytest_asyncio +from fastmcp import Client + +from mcp_django.mgmt import core +from mcp_django.mgmt.core import CommandErrorResult +from mcp_django.server import mcp + +pytestmark = [pytest.mark.asyncio, pytest.mark.django_db] + + +@pytest_asyncio.fixture(autouse=True) +async def initialize_server(): + """Initialize the MCP server before tests.""" + await mcp.initialize() + + +async def test_management_command_check(): + """Test running the 'check' command (safe, read-only).""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "check", + }, + ) + + assert result.data is not None + assert result.data.status == "success" + assert result.data.command == "check" + assert result.data.args == [] + assert result.data.exception is None + # The check command should produce some output to stderr + assert ( + "System check identified" in result.data.stderr or result.data.stderr == "" + ) + + +async def test_management_command_with_args(): + """Test running a command with positional arguments.""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "showmigrations", + "args": ["tests"], + }, + ) + + assert result.data is not None + assert result.data.status == "success" + assert result.data.command == "showmigrations" + assert result.data.args == ["tests"] + assert result.data.exception is None + + +async def test_management_command_with_options(): + """Test running a command with keyword options.""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "check", + "options": {"verbosity": 2}, + }, + ) + + assert result.data is not None + assert result.data.status == "success" + assert result.data.command == "check" + assert result.data.exception is None + + +async def test_management_command_with_args_and_options(): + """Test running a command with both args and options.""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "showmigrations", + "args": ["tests"], + "options": {"verbosity": 0}, + }, + ) + + assert result.data is not None + assert result.data.status == "success" + assert result.data.command == "showmigrations" + assert result.data.args == ["tests"] + assert result.data.exception is None + + +async def test_management_command_invalid_command(): + """Test running an invalid/non-existent command.""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "this_command_does_not_exist", + }, + ) + + assert result.data is not None + assert result.data.status == "error" + assert result.data.command == "this_command_does_not_exist" + assert result.data.exception is not None + assert result.data.exception.type in [ + "CommandError", + "ManagementUtilityError", + ] + assert "Unknown command" in result.data.exception.message + + +async def test_management_command_makemigrations_dry_run(): + """Test running makemigrations with --dry-run (safe, read-only).""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "makemigrations", + "options": {"dry_run": True, "verbosity": 0}, + }, + ) + + assert result.data is not None + assert result.data.status == "success" + assert result.data.command == "makemigrations" + assert result.data.args == [] + assert result.data.exception is None + + +async def test_management_command_diffsettings(): + """Test running diffsettings command (read-only introspection).""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "diffsettings", + "options": {"all": True}, + }, + ) + + assert result.data is not None + assert result.data.status == "success" + assert result.data.command == "diffsettings" + # Should output settings + assert len(result.data.stdout) > 0 + + +async def test_management_command_stdout_capture(): + """Test that stdout is properly captured from commands.""" + async with Client(mcp.server) as client: + result = await client.call_tool( + "management_execute_command", + { + "command": "check", + "options": {"verbosity": 2}, + }, + ) + + assert result.data is not None + assert result.data.status == "success" + # With higher verbosity, check should produce output + assert isinstance(result.data.stdout, str) + assert isinstance(result.data.stderr, str) + + +async def test_management_command_list_in_main_server(): + """Test that management_command tool is listed in main server tools.""" + async with Client(mcp.server) as client: + tools = await client.list_tools() + tool_names = [tool.name for tool in tools] + + assert "management_execute_command" in tool_names + + # Find the tool and check its metadata + mgmt_tool = next( + tool for tool in tools if tool.name == "management_execute_command" + ) + assert mgmt_tool.description is not None + assert "management command" in mgmt_tool.description.lower() + + +async def test_list_management_commands(): + """Test listing all available management commands.""" + async with Client(mcp.server) as client: + result = await client.call_tool("management_list_commands", {}) + + assert result.data is not None + assert isinstance(result.data, list) + assert len(result.data) > 0 + + # Check that we have some standard Django commands + command_names = [cmd["name"] for cmd in result.data] + assert "check" in command_names + assert "migrate" in command_names + assert "showmigrations" in command_names + + # Verify structure of command info + first_cmd = result.data[0] + assert "name" in first_cmd + assert "app_name" in first_cmd + assert isinstance(first_cmd["name"], str) + assert isinstance(first_cmd["app_name"], str) + + +async def test_list_management_commands_includes_custom_commands(): + """Test that custom management commands are included in the list.""" + async with Client(mcp.server) as client: + result = await client.call_tool("management_list_commands", {}) + + assert result.data is not None + command_names = [cmd["name"] for cmd in result.data] + + # The mcp command from this project should be in the list + assert "mcp" in command_names + + +async def test_list_management_commands_sorted(): + """Test that management commands are sorted alphabetically.""" + async with Client(mcp.server) as client: + result = await client.call_tool("management_list_commands", {}) + + assert result.data is not None + command_names = [cmd["name"] for cmd in result.data] + + # Verify the list is sorted + assert command_names == sorted(command_names) + + +async def test_management_command_unexpected_exception(monkeypatch): + """Test handling of unexpected exceptions in execute_command tool.""" + + # Mock the executor to raise an unexpected exception + async def mock_execute(*args, **kwargs): + raise RuntimeError("Unexpected error in executor") + + monkeypatch.setattr(core.management_command_executor, "execute", mock_execute) + + async with Client(mcp.server) as client: + # The tool should re-raise unexpected exceptions + with pytest.raises(Exception): # Will be wrapped by FastMCP + await client.call_tool( + "management_execute_command", + { + "command": "check", + }, + ) + + +async def test_command_error_result_with_stdout_stderr(): + """Test CommandErrorResult __post_init__ with stdout and stderr for coverage.""" + # Create a CommandErrorResult with both stdout and stderr + # This triggers lines 58 and 60 in mgmt/core.py + result = CommandErrorResult( + command="test_command", + args=("arg1",), + options={"opt": "value"}, + exception=Exception("Test exception"), + stdout="Some stdout output", + stderr="Some stderr output", + ) + + # Verify the result was created correctly + assert result.command == "test_command" + assert result.stdout == "Some stdout output" + assert result.stderr == "Some stderr output" + assert str(result.exception) == "Test exception" diff --git a/tests/test_server.py b/tests/test_server.py index 3136511..50ab95e 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -45,6 +45,8 @@ async def test_tool_listing(): "djangopackages_get_grid", "djangopackages_get_package", "djangopackages_search", + "management_execute_command", + "management_list_commands", "project_get_project_info", "project_list_apps", "project_list_models",