Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions packages/api/src/microsoft_teams/api/auth/cloud_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class CloudEnvironment:
"""The token issuer for Bot Framework tokens (e.g. "https://api.botframework.com")."""
graph_scope: str
"""The Microsoft Graph token scope (e.g. "https://graph.microsoft.com/.default")."""
allowed_service_urls: tuple[str, ...] = ()
"""Allowed service URL hostnames for this cloud environment."""


PUBLIC = CloudEnvironment(
Expand All @@ -41,11 +39,6 @@ class CloudEnvironment:
openid_metadata_url="https://login.botframework.com/v1/.well-known/openidconfiguration",
token_issuer="https://api.botframework.com",
graph_scope="https://graph.microsoft.com/.default",
allowed_service_urls=(
"smba.trafficmanager.net",
"smba.onyx.prod.teams.trafficmanager.net",
"smba.infra.gcc.teams.microsoft.com",
),
)
"""Microsoft public (commercial) cloud."""

Expand All @@ -57,7 +50,6 @@ class CloudEnvironment:
openid_metadata_url="https://login.botframework.azure.us/v1/.well-known/openidconfiguration",
token_issuer="https://api.botframework.us",
graph_scope="https://graph.microsoft.us/.default",
allowed_service_urls=("smba.infra.gov.teams.microsoft.us",),
)
"""US Government Community Cloud High (GCCH)."""

Expand All @@ -69,7 +61,6 @@ class CloudEnvironment:
openid_metadata_url="https://login.botframework.azure.us/v1/.well-known/openidconfiguration",
token_issuer="https://api.botframework.us",
graph_scope="https://dod-graph.microsoft.us/.default",
allowed_service_urls=("smba.infra.dod.teams.microsoft.us",),
)
"""US Government Department of Defense (DoD)."""

Expand All @@ -81,7 +72,6 @@ class CloudEnvironment:
openid_metadata_url="https://login.botframework.azure.cn/v1/.well-known/openidconfiguration",
token_issuer="https://api.botframework.azure.cn",
graph_scope="https://microsoftgraph.chinacloudapi.cn/.default",
allowed_service_urls=("frontend.botapi.msg.infra.teams.microsoftonline.cn",),
)
"""China cloud (21Vianet)."""

Expand Down
2 changes: 0 additions & 2 deletions packages/apps/src/microsoft_teams/apps/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ def __init__(self, **options: Unpack[AppOptions]):
self.credentials.tenant_id,
application_id_uri=self.options.application_id_uri,
cloud=self.cloud,
additional_allowed_domains=self.options.additional_allowed_domains,
)

@property
Expand Down Expand Up @@ -218,7 +217,6 @@ async def initialize(self) -> None:
self.server.initialize(
credentials=self.credentials,
skip_auth=self.options.skip_auth,
additional_allowed_domains=self.options.additional_allowed_domains,
cloud=self.cloud,
)

Expand Down
66 changes: 3 additions & 63 deletions packages/apps/src/microsoft_teams/apps/auth/token_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import jwt
from microsoft_teams.api.auth.cloud_environment import PUBLIC, CloudEnvironment
Expand All @@ -19,37 +18,6 @@
logger = logging.getLogger(__name__)


def is_allowed_service_url(
service_url: str,
cloud: CloudEnvironment,
additional_domains: Optional[List[str]] = None,
) -> bool:
"""Validate that a service URL hostname is allowed.

Checks against the cloud environment's allowed service URLs,
plus any additional domains provided by the caller.
Localhost is always allowed for local development.
"""
try:
parsed = urlparse(service_url)
hostname = (parsed.hostname or "").lower()

if hostname in ("localhost", "127.0.0.1"):
return True

if parsed.scheme != "https":
return False

allowed = [d.lower() for d in [*cloud.allowed_service_urls, *(additional_domains or [])]]
if "*" in allowed:
return True

return hostname in allowed
except Exception: # pragma: no cover
logger.error("Failed to parse service URL for validation: %s", service_url)
return False


@dataclass
class JwtValidationOptions:
"""Configuration for JWT validation."""
Expand All @@ -76,25 +44,14 @@ class TokenValidator:
def __init__(
self,
jwt_validation_options: JwtValidationOptions,
cloud: Optional[CloudEnvironment] = None,
additional_allowed_domains: Optional[List[str]] = None,
):
"""
Initialize the token validator.

Args:
jwt_validation_options: Configuration for JWT validation
cloud: Optional cloud environment for service URL validation
additional_allowed_domains: Additional service URL hostnames accepted beyond the cloud
preset. Entries must be bare hostnames matched exactly (case-insensitive) — wildcard
patterns like ``"*.example.com"``, URL suffixes, or full URLs are NOT supported.
Pass ``["*"]`` as the sole wildcard to accept any hostname.
"""
self.options = jwt_validation_options
self.cloud = cloud or PUBLIC
self.additional_allowed_domains = (
list(additional_allowed_domains) if additional_allowed_domains is not None else None
)
self._jwks_client = jwt.PyJWKClient(jwt_validation_options.jwks_uri)

@staticmethod
Expand All @@ -108,7 +65,6 @@ def for_service(
app_id: str,
service_url: Optional[str] = None,
cloud: Optional[CloudEnvironment] = None,
additional_allowed_domains: Optional[List[str]] = None,
) -> TokenValidator:
"""Create a validator for Bot Framework service tokens.

Expand All @@ -118,10 +74,6 @@ def for_service(
app_id: The bot's Microsoft App ID (used for audience validation)
service_url: Optional service URL to validate against token claims
cloud: Optional cloud environment for sovereign cloud support
additional_allowed_domains: Additional service URL hostnames accepted beyond the cloud
preset. Entries must be bare hostnames matched exactly (case-insensitive) — wildcard
patterns like ``"*.example.com"``, URL suffixes, or full URLs are NOT supported.
Pass ``["*"]`` as the sole wildcard to accept any hostname.
"""
env = cloud or PUBLIC
jwks_keys_uri = re.sub(r"/openidconfiguration$", "/keys", env.openid_metadata_url)
Expand All @@ -132,7 +84,7 @@ def for_service(
jwks_uri=jwks_keys_uri,
service_url=service_url,
)
return cls(options, cloud=env, additional_allowed_domains=additional_allowed_domains)
return cls(options)

@classmethod
def for_entra(
Expand All @@ -142,7 +94,6 @@ def for_entra(
scope: Optional[str] = None,
application_id_uri: Optional[str] = None,
cloud: Optional[CloudEnvironment] = None,
additional_allowed_domains: Optional[List[str]] = None,
) -> TokenValidator:
"""Create a validator for Entra ID tokens.

Expand All @@ -153,10 +104,6 @@ def for_entra(
application_id_uri: Optional Application ID URI from Azure portal.
Matches webApplicationInfo.resource in the app manifest.
cloud: Optional cloud environment for sovereign cloud support
additional_allowed_domains: Additional service URL hostnames accepted beyond the cloud
preset. Entries must be bare hostnames matched exactly (case-insensitive) — wildcard
patterns like ``"*.example.com"``, URL suffixes, or full URLs are NOT supported.
Pass ``["*"]`` as the sole wildcard to accept any hostname.
"""
env = cloud or PUBLIC
valid_issuers: List[str] = []
Expand All @@ -177,7 +124,7 @@ def for_entra(
jwks_uri=f"{env.login_endpoint}/{tenant_id}/discovery/v2.0/keys",
scope=scope,
)
return cls(options, cloud=env, additional_allowed_domains=additional_allowed_domains)
return cls(options)

async def validate_token(
self, raw_token: str, service_url: Optional[str] = None, scope: Optional[str] = None
Expand Down Expand Up @@ -220,15 +167,8 @@ async def validate_token(
leeway=JWT_LEEWAY_SECONDS,
)

# Validate service URL against allowed domains
effective_service_url = service_url or self.options.service_url
if effective_service_url and not is_allowed_service_url(
effective_service_url, self.cloud, self.additional_allowed_domains
):
logger.error(f"Rejected service URL: {effective_service_url}")
raise jwt.InvalidTokenError("Service URL is not from an allowed domain")

# Optional service URL claim validation
effective_service_url = service_url or self.options.service_url
if effective_service_url:
self._validate_service_url(payload, effective_service_url)

Expand Down
14 changes: 0 additions & 14 deletions packages/apps/src/microsoft_teams/apps/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from pydantic import BaseModel

from ..auth import TokenValidator
from ..auth.token_validator import is_allowed_service_url
from ..events import ActivityEvent, CoreActivity
from .adapter import HttpRequest, HttpResponse, HttpServerAdapter

Expand All @@ -37,7 +36,6 @@ def __init__(self, adapter: HttpServerAdapter, messaging_endpoint: str = "/api/m
self._on_request: Optional[Callable[[ActivityEvent], Awaitable[InvokeResponse[Any]]]] = None
self._token_validator: Optional[TokenValidator] = None
self._skip_auth: bool = False
self._additional_allowed_domains: Optional[list[str]] = None
self._cloud: CloudEnvironment = PUBLIC
self._initialized: bool = False

Expand All @@ -64,7 +62,6 @@ def initialize(
self,
credentials: Optional[Credentials] = None,
skip_auth: bool = False,
additional_allowed_domains: Optional[list[str]] = None,
cloud: Optional[CloudEnvironment] = None,
) -> None:
"""
Expand All @@ -73,25 +70,19 @@ def initialize(
Args:
credentials: App credentials for JWT validation.
skip_auth: Whether to skip JWT validation.
additional_allowed_domains: Additional allowed service URL domain suffixes.
cloud: Optional cloud environment for sovereign cloud support.
"""
if self._initialized:
return

self._skip_auth = skip_auth
self._additional_allowed_domains = additional_allowed_domains
self._cloud = cloud or PUBLIC

if "*" in (additional_allowed_domains or []):
logger.warning("Service URL validation is disabled via wildcard in additional_allowed_domains")

app_id = getattr(credentials, "client_id", None) if credentials else None
if app_id and not skip_auth:
self._token_validator = TokenValidator.for_service(
app_id,
cloud=self._cloud,
additional_allowed_domains=self._additional_allowed_domains,
)
logger.debug("JWT validation enabled for %s", self._messaging_endpoint)

Expand Down Expand Up @@ -137,11 +128,6 @@ async def handle_request(self, request: HttpRequest) -> HttpResponse:
),
)

# Validate service URL against allowed domains
if service_url and not is_allowed_service_url(service_url, self._cloud, self._additional_allowed_domains):
logger.warning(f"Rejected service URL: {service_url}")
return HttpResponse(status=403, body={"error": "Service URL not allowed"})

core_activity = CoreActivity.model_validate(body)
activity_type = core_activity.type or "unknown"
activity_id = core_activity.id or "unknown"
Expand Down
4 changes: 0 additions & 4 deletions packages/apps/src/microsoft_teams/apps/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class AppOptions(TypedDict, total=False):
storage: Optional[Storage[str, Any]]
plugins: Optional[List[PluginBase]]
skip_auth: Optional[bool]
additional_allowed_domains: Optional[List[str]]
"""Additional allowed service URL hostnames beyond the built-in defaults."""

# HTTP adapter
http_server_adapter: Optional[HttpServerAdapter]
Expand Down Expand Up @@ -88,8 +86,6 @@ class InternalAppOptions:

# Fields with defaults
skip_auth: bool = False
additional_allowed_domains: Optional[List[str]] = None
"""Additional allowed service URL hostnames beyond the built-in defaults."""
default_connection_name: str = "graph"
"""The OAuth connection name to use for authentication."""
plugins: List[PluginBase] = field(default_factory=lambda: [])
Expand Down
87 changes: 0 additions & 87 deletions packages/apps/tests/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,93 +176,6 @@ async def test_rejects_invalid_jwt(self, server, mock_adapter):
result = await server.handle_request(request)
assert result["status"] == 401

@pytest.mark.asyncio
async def test_rejects_non_allowed_service_url(self, server):
"""Test that requests with non-allowed serviceUrl are rejected."""
server.initialize(skip_auth=True)

request = HttpRequest(
body={
"type": "message",
"id": "test-123",
"text": "Test",
"serviceUrl": "https://evil.com/steal",
},
headers={},
)

result = await server.handle_request(request)
assert result["status"] == 403

@pytest.mark.asyncio
async def test_accepts_allowed_service_url(self, server):
"""Test that requests with allowed serviceUrl pass validation."""

async def mock_handler(event):
return InvokeResponse(status=200, body=cast(ConfigResponse, {}))

server.on_request = mock_handler
server.initialize(skip_auth=True)

request = HttpRequest(
body={
"type": "message",
"id": "test-123",
"text": "Test",
"serviceUrl": "https://smba.trafficmanager.net/teams/",
},
headers={},
)

result = await server.handle_request(request)
assert result["status"] == 200

@pytest.mark.asyncio
async def test_allows_any_service_url_with_wildcard_domain(self, mock_adapter):
"""Test that additional_allowed_domains=["*"] allows any serviceUrl."""
server = HttpServer(mock_adapter)

async def mock_handler(event):
return InvokeResponse(status=200, body=cast(ConfigResponse, {}))

server.on_request = mock_handler
server.initialize(skip_auth=True, additional_allowed_domains=["*"])

request = HttpRequest(
body={
"type": "message",
"id": "test-123",
"text": "Test",
"serviceUrl": "https://evil.com/steal",
},
headers={},
)

result = await server.handle_request(request)
assert result["status"] == 200

def test_initialize_forwards_allowlist_to_token_validator(self, mock_adapter):
"""With skip_auth=False and credentials, the allowlist must reach TokenValidator.

Regression: HttpServer.initialize() previously constructed TokenValidator.for_service()
without passing additional_allowed_domains, so the token-validation service-URL check
ignored user-configured domains.
"""
server = HttpServer(mock_adapter)
credentials = MagicMock(client_id="test-app-id")

with patch("microsoft_teams.apps.http.http_server.TokenValidator.for_service") as mock_for_service:
mock_for_service.return_value = MagicMock()
server.initialize(
credentials=credentials,
skip_auth=False,
additional_allowed_domains=["canary.botapi.skype.com"],
)

mock_for_service.assert_called_once()
_, kwargs = mock_for_service.call_args
assert kwargs.get("additional_allowed_domains") == ["canary.botapi.skype.com"]


class TestFastAPIAdapter:
"""Test cases for FastAPIAdapter."""
Expand Down
Loading
Loading