Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dev = [
"pytest-pretty>=1.2.0",
"inline-snapshot>=0.23.0",
"dirty-equals>=0.9.0",
"coverage[toml]==7.10.7",
]
docs = [
"mkdocs>=1.6.1",
Expand Down Expand Up @@ -168,3 +169,27 @@ MD033 = false # no-inline-html Inline HTML
MD041 = false # first-line-heading/first-line-h1
MD046 = false # indented-code-blocks
MD059 = false # descriptive-link-text

# https://coverage.readthedocs.io/en/latest/config.html#run
[tool.coverage.run]
branch = true
patch = ["subprocess"]
concurrency = ["multiprocessing", "thread"]
source = ["src", "tests"]

# https://coverage.readthedocs.io/en/latest/config.html#report
[tool.coverage.report]
fail_under = 100
skip_covered = true
show_missing = true
ignore_errors = true
precision = 2
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"@overload",
"raise NotImplementedError",
]
exclude_also = [
'\A(?s:.*# pragma: exclude file.*)\Z'
]
7 changes: 7 additions & 0 deletions scripts/test_coverage
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh

set -ex

uv run --frozen coverage run -m pytest -n auto $@
uv run --frozen coverage combine
uv run --frozen coverage report
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to enforce new lines at the end of the file on the pre-commit.

2 changes: 1 addition & 1 deletion src/mcp/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

from .cli import app

if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
app()
22 changes: 11 additions & 11 deletions src/mcp/cli/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
MCP_PACKAGE = "mcp[cli]"


def get_claude_config_path() -> Path | None:
def get_claude_config_path() -> Path | None: # pragma: no cover
"""Get the Claude config directory based on platform."""
if sys.platform == "win32":
path = Path(Path.home(), "AppData", "Roaming", "Claude")
Expand All @@ -33,7 +33,7 @@ def get_claude_config_path() -> Path | None:
def get_uv_path() -> str:
"""Get the full path to the uv executable."""
uv_path = shutil.which("uv")
if not uv_path:
if not uv_path: # pragma: no cover
logger.error(
"uv executable not found in PATH, falling back to 'uv'. Please ensure uv is installed and in your PATH"
)
Expand Down Expand Up @@ -65,17 +65,17 @@ def update_claude_config(
"""
config_dir = get_claude_config_path()
uv_path = get_uv_path()
if not config_dir:
if not config_dir: # pragma: no cover
raise RuntimeError(
"Claude Desktop config directory not found. Please ensure Claude Desktop"
" is installed and has been run at least once to initialize its config."
)

config_file = config_dir / "claude_desktop_config.json"
if not config_file.exists():
if not config_file.exists(): # pragma: no cover
try:
config_file.write_text("{}")
except Exception:
except Exception: # pragma: no cover
logger.exception(
"Failed to create Claude config file",
extra={
Expand All @@ -90,7 +90,7 @@ def update_claude_config(
config["mcpServers"] = {}

# Always preserve existing env vars and merge with new ones
if server_name in config["mcpServers"] and "env" in config["mcpServers"][server_name]:
if server_name in config["mcpServers"] and "env" in config["mcpServers"][server_name]: # pragma: no cover
existing_env = config["mcpServers"][server_name]["env"]
if env_vars:
# New vars take precedence over existing ones
Expand All @@ -103,22 +103,22 @@ def update_claude_config(

# Collect all packages in a set to deduplicate
packages = {MCP_PACKAGE}
if with_packages:
if with_packages: # pragma: no cover
packages.update(pkg for pkg in with_packages if pkg)

# Add all packages with --with
for pkg in sorted(packages):
args.extend(["--with", pkg])

if with_editable:
if with_editable: # pragma: no cover
args.extend(["--with-editable", str(with_editable)])

# Convert file path to absolute before adding to command
# Split off any :object suffix first
if ":" in file_spec:
file_path, server_object = file_spec.rsplit(":", 1)
file_spec = f"{Path(file_path).resolve()}:{server_object}"
else:
else: # pragma: no cover
file_spec = str(Path(file_spec).resolve())

# Add fastmcp run command
Expand All @@ -127,7 +127,7 @@ def update_claude_config(
server_config: dict[str, Any] = {"command": uv_path, "args": args}

# Add environment variables if specified
if env_vars:
if env_vars: # pragma: no cover
server_config["env"] = env_vars

config["mcpServers"][server_name] = server_config
Expand All @@ -138,7 +138,7 @@ def update_claude_config(
extra={"config_file": str(config_file)},
)
return True
except Exception:
except Exception: # pragma: no cover
logger.exception(
"Failed to update Claude config",
extra={
Expand Down
20 changes: 10 additions & 10 deletions src/mcp/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@

try:
import typer
except ImportError:
except ImportError: # pragma: no cover
print("Error: typer is required. Install with 'pip install mcp[cli]'")
sys.exit(1)

try:
from mcp.cli import claude
from mcp.server.fastmcp.utilities.logging import get_logger
except ImportError:
except ImportError: # pragma: no cover
print("Error: mcp.server.fastmcp is not installed or not in PYTHONPATH")
sys.exit(1)

try:
import dotenv
except ImportError:
except ImportError: # pragma: no cover
dotenv = None

logger = get_logger("cli")
Expand All @@ -53,7 +53,7 @@ def _get_npx_command():
return "npx" # On Unix-like systems, just use npx


def _parse_env_var(env_var: str) -> tuple[str, str]:
def _parse_env_var(env_var: str) -> tuple[str, str]: # pragma: no cover
"""Parse environment variable string in format KEY=VALUE."""
if "=" not in env_var:
logger.error(f"Invalid environment variable format: {env_var}. Must be KEY=VALUE")
Expand All @@ -77,7 +77,7 @@ def _build_uv_command(

if with_packages:
for pkg in with_packages:
if pkg:
if pkg: # pragma: no branch
cmd.extend(["--with", pkg])

# Add mcp run command
Expand Down Expand Up @@ -116,7 +116,7 @@ def _parse_file_path(file_spec: str) -> tuple[Path, str | None]:
return file_path, server_object


def _import_server(file: Path, server_object: str | None = None):
def _import_server(file: Path, server_object: str | None = None): # pragma: no cover
"""Import a MCP server from a file.

Args:
Expand Down Expand Up @@ -209,7 +209,7 @@ def _check_server_object(server_object: Any, object_name: str):


@app.command()
def version() -> None:
def version() -> None: # pragma: no cover
"""Show the MCP version."""
try:
version = importlib.metadata.version("mcp")
Expand Down Expand Up @@ -243,7 +243,7 @@ def dev(
help="Additional packages to install",
),
] = [],
) -> None:
) -> None: # pragma: no cover
"""Run a MCP server with the MCP Inspector."""
file, server_object = _parse_file_path(file_spec)

Expand Down Expand Up @@ -316,7 +316,7 @@ def run(
help="Transport protocol to use (stdio or sse)",
),
] = None,
) -> None:
) -> None: # pragma: no cover
"""Run a MCP server.

The server can be specified in two ways:\n
Expand Down Expand Up @@ -411,7 +411,7 @@ def install(
resolve_path=True,
),
] = None,
) -> None:
) -> None: # pragma: no cover
"""Install a MCP server in the Claude desktop app.

Environment variables are preserved once added and only updated if new values
Expand Down
1 change: 1 addition & 0 deletions src/mcp/client/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pragma: exclude file
import argparse
import logging
import sys
Expand Down
24 changes: 12 additions & 12 deletions src/mcp/client/auth/extensions/client_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def to_assertion(self, with_audience_fallback: str | None = None) -> str:
assertion = self.assertion
else:
if not self.jwt_signing_key:
raise OAuthFlowError("Missing signing key for JWT bearer grant")
raise OAuthFlowError("Missing signing key for JWT bearer grant") # pragma: no cover
if not self.issuer:
raise OAuthFlowError("Missing issuer for JWT bearer grant")
raise OAuthFlowError("Missing issuer for JWT bearer grant") # pragma: no cover
if not self.subject:
raise OAuthFlowError("Missing subject for JWT bearer grant")
raise OAuthFlowError("Missing subject for JWT bearer grant") # pragma: no cover

audience = self.audience if self.audience else with_audience_fallback
if not audience:
raise OAuthFlowError("Missing audience for JWT bearer grant")
raise OAuthFlowError("Missing audience for JWT bearer grant") # pragma: no cover

now = int(time.time())
claims: dict[str, Any] = {
Expand Down Expand Up @@ -83,22 +83,22 @@ def __init__(

async def _exchange_token_authorization_code(
self, auth_code: str, code_verifier: str, *, token_data: dict[str, Any] | None = None
) -> httpx.Request:
) -> httpx.Request: # pragma: no cover
"""Build token exchange request for authorization_code flow."""
token_data = token_data or {}
if self.context.client_metadata.token_endpoint_auth_method == "private_key_jwt":
self._add_client_authentication_jwt(token_data=token_data)
return await super()._exchange_token_authorization_code(auth_code, code_verifier, token_data=token_data)

async def _perform_authorization(self) -> httpx.Request:
async def _perform_authorization(self) -> httpx.Request: # pragma: no cover
"""Perform the authorization flow."""
if "urn:ietf:params:oauth:grant-type:jwt-bearer" in self.context.client_metadata.grant_types:
token_request = await self._exchange_token_jwt_bearer()
return token_request
else:
return await super()._perform_authorization()

def _add_client_authentication_jwt(self, *, token_data: dict[str, Any]):
def _add_client_authentication_jwt(self, *, token_data: dict[str, Any]): # pragma: no cover
"""Add JWT assertion for client authentication to token endpoint parameters."""
if not self.jwt_parameters:
raise OAuthTokenError("Missing JWT parameters for private_key_jwt flow")
Expand All @@ -120,11 +120,11 @@ def _add_client_authentication_jwt(self, *, token_data: dict[str, Any]):
async def _exchange_token_jwt_bearer(self) -> httpx.Request:
"""Build token exchange request for JWT bearer grant."""
if not self.context.client_info:
raise OAuthFlowError("Missing client info")
raise OAuthFlowError("Missing client info") # pragma: no cover
if not self.jwt_parameters:
raise OAuthFlowError("Missing JWT parameters")
raise OAuthFlowError("Missing JWT parameters") # pragma: no cover
if not self.context.oauth_metadata:
raise OAuthTokenError("Missing OAuth metadata")
raise OAuthTokenError("Missing OAuth metadata") # pragma: no cover

# We need to set the audience to the issuer identifier of the authorization server
# https://datatracker.ietf.org/doc/html/draft-ietf-oauth-rfc7523bis-01#name-updates-to-rfc-7523
Expand All @@ -136,10 +136,10 @@ async def _exchange_token_jwt_bearer(self) -> httpx.Request:
"assertion": assertion,
}

if self.context.should_include_resource_param(self.context.protocol_version):
if self.context.should_include_resource_param(self.context.protocol_version): # pragma: no branch
token_data["resource"] = self.context.get_resource_url()

if self.context.client_metadata.scope:
if self.context.client_metadata.scope: # pragma: no branch
token_data["scope"] = self.context.client_metadata.scope

token_url = self._get_token_endpoint()
Expand Down
Loading