From 0dbaa669a6a32cce18196955aeab0c5341d77c8f Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 11:13:21 -0500 Subject: [PATCH 1/8] Enhancing URL detection --- src/guardrails/checks/text/urls.py | 173 ++++++++++++++++++++++--- tests/unit/checks/test_urls.py | 200 +++++++++++++++++++++++++++++ 2 files changed, 353 insertions(+), 20 deletions(-) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index f8a4b89..b55ebc9 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -27,7 +27,7 @@ from typing import Any from urllib.parse import ParseResult, urlparse -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator from guardrails.registry import default_spec_registry from guardrails.spec import GuardrailSpecMetadata @@ -35,6 +35,13 @@ __all__ = ["urls"] +DEFAULT_PORTS = { + "http": 80, + "https": 443, +} + +SCHEME_PREFIX_RE = re.compile(r"^[a-z][a-z0-9+.-]*://") + @dataclass(frozen=True, slots=True) class UrlDetectionResult: @@ -66,9 +73,54 @@ class URLConfig(BaseModel): description="Allow subdomains of allowed domains (e.g. api.example.com if example.com is allowed)", ) + @field_validator("allowed_schemes", mode="before") + @classmethod + def normalize_allowed_schemes(cls, value: Any) -> set[str]: + """Normalize allowed schemes to bare identifiers without delimiters.""" + if value is None: + return {"https"} + + if isinstance(value, str): + raw_values = [value] + else: + raw_values = list(value) + + normalized: set[str] = set() + for entry in raw_values: + if not isinstance(entry, str): + raise TypeError("allowed_schemes entries must be strings") + cleaned = entry.strip().lower() + if not cleaned: + continue + # Support inputs like "https://", "HTTPS:", or " https " + if cleaned.endswith("://"): + cleaned = cleaned[:-3] + cleaned = cleaned.removesuffix(":") + cleaned = cleaned.strip() + if cleaned: + normalized.add(cleaned) + + if not normalized: + raise ValueError("allowed_schemes must include at least one scheme") + + return normalized + def _detect_urls(text: str) -> list[str]: - """Detect URLs using regex.""" + """Detect URLs using regex patterns with deduplication. + + Detects URLs with explicit schemes (http, https, ftp, data, javascript, + vbscript), domain-like patterns without schemes, and IP addresses. + Deduplicates to avoid returning both scheme-ful and scheme-less versions + of the same URL. + + Args: + text: The text to scan for URLs. + + Returns: + List of unique URL strings found in the text, with trailing + punctuation removed. + """ # Pattern for cleaning trailing punctuation (] must be escaped) PUNCTUATION_CLEANUP = r"[.,;:!?)\]]+$" @@ -156,7 +208,20 @@ def _detect_urls(text: str) -> list[str]: def _validate_url_security(url_string: str, config: URLConfig) -> tuple[ParseResult | None, str]: - """Validate URL using stdlib urllib.parse.""" + """Validate URL security properties using urllib.parse. + + Checks URL structure, validates the scheme is allowed, and ensures no + credentials are embedded in userinfo if block_userinfo is enabled. + + Args: + url_string: The URL string to validate. + config: Configuration specifying allowed schemes and userinfo policy. + + Returns: + A tuple of (parsed_url, error_reason). If validation succeeds, + parsed_url is a ParseResult and error_reason is empty. If validation + fails, parsed_url is None and error_reason describes the failure. + """ try: # Parse URL - preserve original scheme for validation if "://" in url_string: @@ -185,7 +250,7 @@ def _validate_url_security(url_string: str, config: URLConfig) -> tuple[ParseRes if original_scheme not in config.allowed_schemes: return None, f"Blocked scheme: {original_scheme}" - if config.block_userinfo and parsed_url.username: + if config.block_userinfo and (parsed_url.username or parsed_url.password): return None, "Contains userinfo (potential credential injection)" # Everything else (IPs, localhost, private IPs) goes through allow list logic @@ -203,7 +268,20 @@ def _validate_url_security(url_string: str, config: URLConfig) -> tuple[ParseRes def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdomains: bool) -> bool: - """Check if URL is allowed.""" + """Check if parsed URL matches any entry in the allow list. + + Supports domain names, IP addresses, CIDR blocks, and full URLs with + paths/ports/query strings. Allow list entries without explicit schemes + match any scheme. Entries with schemes must match exactly. + + Args: + parsed_url: The parsed URL to check. + allow_list: List of allowed URL patterns (domains, IPs, CIDR, full URLs). + allow_subdomains: If True, subdomains of allowed domains are permitted. + + Returns: + True if the URL matches any allow list entry, False otherwise. + """ if not allow_list: return False @@ -212,30 +290,85 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom return False url_host = url_host.lower() + url_domain = url_host.replace("www.", "") + scheme_lower = parsed_url.scheme.lower() if parsed_url.scheme else "" + url_port = parsed_url.port or DEFAULT_PORTS.get(scheme_lower) + url_path = parsed_url.path or "/" + url_query = parsed_url.query + url_fragment = parsed_url.fragment + + try: + url_ip = ip_address(url_host) + except (AddressValueError, ValueError): + url_ip = None for allowed_entry in allow_list: allowed_entry = allowed_entry.lower().strip() - # Handle IP addresses and CIDR blocks + has_explicit_scheme = bool(SCHEME_PREFIX_RE.match(allowed_entry)) + if has_explicit_scheme: + parsed_allowed = urlparse(allowed_entry) + else: + parsed_allowed = urlparse(f"//{allowed_entry}") + allowed_host = (parsed_allowed.hostname or "").lower() + allowed_port = parsed_allowed.port + allowed_path = parsed_allowed.path + allowed_query = parsed_allowed.query + allowed_fragment = parsed_allowed.fragment + + # Handle IP addresses and CIDR blocks (including schemes) try: - ip_address(allowed_entry.split("/")[0]) - if allowed_entry == url_host or ("/" in allowed_entry and ip_address(url_host) in ip_network(allowed_entry, strict=False)): + allowed_ip = ip_address(allowed_host) + except (AddressValueError, ValueError): + allowed_ip = None + + if allowed_ip is not None: + if url_ip is None: + continue + if allowed_port is not None and allowed_port != url_port: + continue + if allowed_ip == url_ip: return True + + network_spec = allowed_host + if parsed_allowed.path not in ("", "/"): + network_spec = f"{network_spec}{parsed_allowed.path}" + try: + if network_spec and "/" in network_spec and url_ip in ip_network(network_spec, strict=False): + return True + except (AddressValueError, ValueError): + # Path segment might not represent a CIDR mask; ignore. + pass continue - except (AddressValueError, ValueError): - pass - # Handle domain matching - allowed_domain = allowed_entry.replace("www.", "") - url_domain = url_host.replace("www.", "") + if not allowed_host: + continue - # Exact match always allowed - if url_domain == allowed_domain: - return True + allowed_domain = allowed_host.replace("www.", "") + + if allowed_port is not None and allowed_port != url_port: + continue + + host_matches = url_domain == allowed_domain or ( + allow_subdomains and url_domain.endswith(f".{allowed_domain}") + ) + if not host_matches: + continue + + # Path matching with segment boundary respect + if allowed_path not in ("", "/"): + # Ensure path matching respects segment boundaries to prevent + # "/api" from matching "/api2" or "/api-v2" + if url_path != allowed_path and not url_path.startswith(f"{allowed_path}/"): + continue + + if allowed_query and allowed_query != url_query: + continue + + if allowed_fragment and allowed_fragment != url_fragment: + continue - # Subdomain matching if enabled - if allow_subdomains and url_domain.endswith(f".{allowed_domain}"): - return True + return True return False @@ -282,7 +415,7 @@ async def urls(ctx: Any, data: str, config: URLConfig) -> GuardrailResult: return GuardrailResult( tripwire_triggered=bool(blocked), info={ - "guardrail_name": "URL Filter (Direct Config)", + "guardrail_name": "URL Filter", "config": { "allowed_schemes": list(config.allowed_schemes), "block_userinfo": config.block_userinfo, diff --git a/tests/unit/checks/test_urls.py b/tests/unit/checks/test_urls.py index 048bb48..afb1d84 100644 --- a/tests/unit/checks/test_urls.py +++ b/tests/unit/checks/test_urls.py @@ -42,6 +42,86 @@ def test_validate_url_security_blocks_userinfo_when_configured() -> None: assert "userinfo" in reason # noqa: S101 +def test_validate_url_security_blocks_password_without_username() -> None: + """URLs that only include a password in userinfo must be blocked.""" + config = URLConfig(allowed_schemes={"https"}, block_userinfo=True) + parsed, reason = _validate_url_security("https://:secret@example.com", config) + + assert parsed is None # noqa: S101 + assert "userinfo" in reason # noqa: S101 + + +def test_url_config_normalizes_allowed_scheme_inputs() -> None: + """URLConfig should accept schemes with delimiters and normalize them.""" + config = URLConfig(allowed_schemes={"HTTPS://", "http:", " https "}) + + assert config.allowed_schemes == {"https", "http"} # noqa: S101 + + +def test_is_url_allowed_handles_full_urls_with_paths() -> None: + """Allow list entries with schemes and paths should be honored.""" + config = URLConfig( + url_allow_list=["https://suntropy.es", "https://api.example.com/v1"], + allow_subdomains=False, + allowed_schemes={"https://"}, + ) + root_url, _ = _validate_url_security("https://suntropy.es", config) + path_url, _ = _validate_url_security("https://api.example.com/v1/resources?id=2", config) + wrong_path_url, _ = _validate_url_security("https://api.example.com/v2", config) + + assert root_url is not None # noqa: S101 + assert path_url is not None # noqa: S101 + assert wrong_path_url is not None # noqa: S101 + assert _is_url_allowed(root_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(path_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(wrong_path_url, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + + +def test_is_url_allowed_respects_path_segment_boundaries() -> None: + """Path matching should respect segment boundaries to prevent security issues.""" + config = URLConfig( + url_allow_list=["https://example.com/api"], + allow_subdomains=False, + allowed_schemes={"https"}, + ) + # These should be allowed + exact_match, _ = _validate_url_security("https://example.com/api", config) + valid_subpath, _ = _validate_url_security("https://example.com/api/users", config) + + # These should NOT be allowed (different path segments) + similar_path1, _ = _validate_url_security("https://example.com/api2", config) + similar_path2, _ = _validate_url_security("https://example.com/api-v2", config) + + assert exact_match is not None # noqa: S101 + assert valid_subpath is not None # noqa: S101 + assert similar_path1 is not None # noqa: S101 + assert similar_path2 is not None # noqa: S101 + + # Exact match and valid subpath should be allowed + assert _is_url_allowed(exact_match, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(valid_subpath, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + + # Similar paths that don't respect segment boundaries should be blocked + assert _is_url_allowed(similar_path1, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(similar_path2, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + + +def test_is_url_allowed_without_scheme_matches_multiple_protocols() -> None: + """Scheme-less allow list entries should match any allowed scheme.""" + config = URLConfig( + url_allow_list=["example.com"], + allow_subdomains=False, + allowed_schemes={"https", "http"}, + ) + https_result, https_reason = _validate_url_security("https://example.com", config) + http_result, http_reason = _validate_url_security("http://example.com", config) + + assert https_result is not None, https_reason # noqa: S101 + assert http_result is not None, http_reason # noqa: S101 + assert _is_url_allowed(https_result, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(http_result, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + + def test_is_url_allowed_supports_subdomains_and_cidr() -> None: """Allow list should support subdomains and CIDR ranges.""" config = URLConfig( @@ -86,3 +166,123 @@ async def test_urls_guardrail_allows_benign_input() -> None: result = await urls(ctx=None, data="No links here", config=config) assert result.tripwire_triggered is False # noqa: S101 + + +@pytest.mark.asyncio +async def test_urls_guardrail_allows_full_url_configuration() -> None: + """Reported regression: full URLs in config and schemes with delimiters should pass.""" + config = URLConfig( + url_allow_list=["https://suntropy.es"], + allowed_schemes={"https://"}, + block_userinfo=True, + allow_subdomains=True, + ) + text = "La url de la herramienta de estudios solares es: https://suntropy.es" + + result = await urls(ctx=None, data=text, config=config) + + assert result.tripwire_triggered is False # noqa: S101 + assert result.info["allowed"] == ["https://suntropy.es"] # noqa: S101 + assert result.info["blocked"] == [] # noqa: S101 + + +def test_url_config_rejects_invalid_scheme_types() -> None: + """URLConfig should reject non-string scheme entries.""" + with pytest.raises(TypeError, match="allowed_schemes entries must be strings"): + URLConfig(allowed_schemes={123, "https"}) # type: ignore[arg-type] + + +def test_url_config_rejects_empty_schemes() -> None: + """URLConfig should reject empty scheme sets.""" + with pytest.raises(ValueError, match="must include at least one scheme"): + URLConfig(allowed_schemes={"", " "}) + + +def test_validate_url_security_handles_malformed_urls() -> None: + """Malformed URLs should be rejected with clear error messages.""" + config = URLConfig(allowed_schemes={"https"}) + parsed, reason = _validate_url_security("https://", config) + + assert parsed is None # noqa: S101 + assert "Invalid URL" in reason # noqa: S101 + + +def test_is_url_allowed_handles_cidr_blocks() -> None: + """CIDR blocks in allow list should match IP ranges.""" + config = URLConfig( + url_allow_list=["10.0.0.0/8", "192.168.1.0/24"], + allow_subdomains=False, + allowed_schemes={"https"}, + ) + # IPs within CIDR ranges + ip_in_range1, _ = _validate_url_security("https://10.5.5.5", config) + ip_in_range2, _ = _validate_url_security("https://192.168.1.100", config) + # IP outside CIDR range + ip_outside, _ = _validate_url_security("https://192.168.2.1", config) + + assert ip_in_range1 is not None # noqa: S101 + assert ip_in_range2 is not None # noqa: S101 + assert ip_outside is not None # noqa: S101 + + assert _is_url_allowed(ip_in_range1, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(ip_in_range2, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(ip_outside, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + + +def test_is_url_allowed_handles_port_matching() -> None: + """Allow list entries with explicit ports should require exact port match.""" + config = URLConfig( + url_allow_list=["https://example.com:8443", "api.internal.com"], + allow_subdomains=False, + allowed_schemes={"https"}, + ) + # Correct port + correct_port, _ = _validate_url_security("https://example.com:8443", config) + # Wrong port (implicit 443) + wrong_port, _ = _validate_url_security("https://example.com", config) + # Any port when not specified in allow list + any_port, _ = _validate_url_security("https://api.internal.com:9000", config) + + assert correct_port is not None # noqa: S101 + assert wrong_port is not None # noqa: S101 + assert any_port is not None # noqa: S101 + + assert _is_url_allowed(correct_port, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(wrong_port, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(any_port, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + + +def test_is_url_allowed_handles_query_and_fragment() -> None: + """Allow list entries with query/fragment should match exactly.""" + config = URLConfig( + url_allow_list=["https://example.com/search?q=test", "https://example.com/docs#intro"], + allow_subdomains=False, + allowed_schemes={"https"}, + ) + # Exact query match + exact_query, _ = _validate_url_security("https://example.com/search?q=test", config) + # Different query + diff_query, _ = _validate_url_security("https://example.com/search?q=other", config) + # Exact fragment match + exact_fragment, _ = _validate_url_security("https://example.com/docs#intro", config) + # Different fragment + diff_fragment, _ = _validate_url_security("https://example.com/docs#outro", config) + + assert exact_query is not None # noqa: S101 + assert diff_query is not None # noqa: S101 + assert exact_fragment is not None # noqa: S101 + assert diff_fragment is not None # noqa: S101 + + assert _is_url_allowed(exact_query, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(diff_query, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(exact_fragment, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(diff_fragment, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + + +def test_validate_url_security_allows_userinfo_when_disabled() -> None: + """URLs with userinfo should be allowed when block_userinfo=False.""" + config = URLConfig(allowed_schemes={"https"}, block_userinfo=False) + parsed, reason = _validate_url_security("https://user:pass@example.com", config) + + assert parsed is not None # noqa: S101 + assert reason == "" # noqa: S101 From 3f2711b7716fa2653a6f66cec35a52c5222e25cd Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 11:34:28 -0500 Subject: [PATCH 2/8] If allow list has scheme, match exactly --- src/guardrails/checks/text/urls.py | 11 +++++++++ tests/unit/checks/test_urls.py | 39 ++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index b55ebc9..b0ae359 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -325,6 +325,11 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom if allowed_ip is not None: if url_ip is None: continue + # Scheme matching for IPs: if allow list entry has explicit scheme, it must match exactly + if has_explicit_scheme: + allowed_scheme = parsed_allowed.scheme.lower() if parsed_allowed.scheme else "" + if allowed_scheme and allowed_scheme != scheme_lower: + continue if allowed_port is not None and allowed_port != url_port: continue if allowed_ip == url_ip: @@ -355,6 +360,12 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom if not host_matches: continue + # Scheme matching: if allow list entry has explicit scheme, it must match exactly + if has_explicit_scheme: + allowed_scheme = parsed_allowed.scheme.lower() if parsed_allowed.scheme else "" + if allowed_scheme and allowed_scheme != scheme_lower: + continue + # Path matching with segment boundary respect if allowed_path not in ("", "/"): # Ensure path matching respects segment boundaries to prevent diff --git a/tests/unit/checks/test_urls.py b/tests/unit/checks/test_urls.py index afb1d84..f7bba14 100644 --- a/tests/unit/checks/test_urls.py +++ b/tests/unit/checks/test_urls.py @@ -286,3 +286,42 @@ def test_validate_url_security_allows_userinfo_when_disabled() -> None: assert parsed is not None # noqa: S101 assert reason == "" # noqa: S101 + + +def test_is_url_allowed_enforces_scheme_when_explicitly_specified() -> None: + """Scheme-qualified allow list entries must match scheme exactly (security).""" + config = URLConfig( + url_allow_list=["https://bank.example.com"], + allow_subdomains=False, + allowed_schemes={"https", "http"}, # Both schemes allowed globally + ) + # HTTPS should be allowed (matches the scheme in allow list) + https_url, _ = _validate_url_security("https://bank.example.com", config) + # HTTP should be BLOCKED (doesn't match the explicit https:// in allow list) + http_url, _ = _validate_url_security("http://bank.example.com", config) + + assert https_url is not None # noqa: S101 + assert http_url is not None # noqa: S101 + + # This is the security-critical check: scheme-qualified entries must match exactly + assert _is_url_allowed(https_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(http_url, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + + +def test_is_url_allowed_enforces_scheme_for_ips() -> None: + """Scheme-qualified IP addresses in allow list must match scheme exactly.""" + config = URLConfig( + url_allow_list=["https://192.168.1.100"], + allow_subdomains=False, + allowed_schemes={"https", "http"}, + ) + # HTTPS should be allowed + https_ip, _ = _validate_url_security("https://192.168.1.100", config) + # HTTP should be BLOCKED + http_ip, _ = _validate_url_security("http://192.168.1.100", config) + + assert https_ip is not None # noqa: S101 + assert http_ip is not None # noqa: S101 + + assert _is_url_allowed(https_ip, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(http_ip, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 From 30893a6cbb15d5fa3668f39349ef46c0b8972904 Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 11:56:11 -0500 Subject: [PATCH 3/8] Validating ports --- src/guardrails/checks/text/urls.py | 26 +++++++++++++++++++++++--- tests/unit/checks/test_urls.py | 19 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index b0ae359..9a325b8 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -96,7 +96,6 @@ def normalize_allowed_schemes(cls, value: Any) -> set[str]: if cleaned.endswith("://"): cleaned = cleaned[:-3] cleaned = cleaned.removesuffix(":") - cleaned = cleaned.strip() if cleaned: normalized.add(cleaned) @@ -267,6 +266,23 @@ def _validate_url_security(url_string: str, config: URLConfig) -> tuple[ParseRes return None, f"URL parsing error: {type(e).__name__}: {str(e)}" +def _safe_get_port(parsed: ParseResult, scheme: str) -> int | None: + """Safely extract port from ParseResult, handling malformed ports. + + Args: + parsed: The parsed URL. + scheme: The URL scheme (for default port lookup). + + Returns: + The port number, the default port for the scheme, or None if invalid. + """ + try: + return parsed.port or DEFAULT_PORTS.get(scheme.lower()) + except ValueError: + # Port is out of range (0-65535) or malformed + return None + + def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdomains: bool) -> bool: """Check if parsed URL matches any entry in the allow list. @@ -292,7 +308,10 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom url_host = url_host.lower() url_domain = url_host.replace("www.", "") scheme_lower = parsed_url.scheme.lower() if parsed_url.scheme else "" - url_port = parsed_url.port or DEFAULT_PORTS.get(scheme_lower) + url_port = _safe_get_port(parsed_url, scheme_lower) + # If port is invalid (None from _safe_get_port due to ValueError), reject the URL + if url_port is None and parsed_url.netloc and ":" in parsed_url.netloc: + return False url_path = parsed_url.path or "/" url_query = parsed_url.query url_fragment = parsed_url.fragment @@ -311,7 +330,8 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom else: parsed_allowed = urlparse(f"//{allowed_entry}") allowed_host = (parsed_allowed.hostname or "").lower() - allowed_port = parsed_allowed.port + allowed_scheme = parsed_allowed.scheme.lower() if parsed_allowed.scheme else "" + allowed_port = _safe_get_port(parsed_allowed, allowed_scheme) allowed_path = parsed_allowed.path allowed_query = parsed_allowed.query allowed_fragment = parsed_allowed.fragment diff --git a/tests/unit/checks/test_urls.py b/tests/unit/checks/test_urls.py index f7bba14..17eb3f8 100644 --- a/tests/unit/checks/test_urls.py +++ b/tests/unit/checks/test_urls.py @@ -325,3 +325,22 @@ def test_is_url_allowed_enforces_scheme_for_ips() -> None: assert _is_url_allowed(https_ip, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 assert _is_url_allowed(http_ip, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + + +@pytest.mark.asyncio +async def test_urls_guardrail_handles_malformed_ports_gracefully() -> None: + """URLs with out-of-range or malformed ports should be blocked, not crash.""" + config = URLConfig( + url_allow_list=["example.com"], + allowed_schemes={"https"}, + ) + # Test various malformed ports + text = "Visit https://example.com:99999 or https://example.com:abc or https://example.com:-1" + + result = await urls(ctx=None, data=text, config=config) + + # Should not crash; all should be blocked (either due to malformed ports or not in allow list) + assert result.tripwire_triggered is True # noqa: S101 + assert len(result.info["blocked"]) == 3 # noqa: S101 + # All three URLs should be blocked (the key is they don't crash the guardrail) + assert len(result.info["blocked_reasons"]) == 3 # noqa: S101 From 7ce778f3fc0a7655f0f4fb2fa926513718c6901c Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 12:12:53 -0500 Subject: [PATCH 4/8] Removing redundant variable assignment --- src/guardrails/checks/text/urls.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index 9a325b8..f30b16e 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -346,10 +346,8 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom if url_ip is None: continue # Scheme matching for IPs: if allow list entry has explicit scheme, it must match exactly - if has_explicit_scheme: - allowed_scheme = parsed_allowed.scheme.lower() if parsed_allowed.scheme else "" - if allowed_scheme and allowed_scheme != scheme_lower: - continue + if has_explicit_scheme and allowed_scheme and allowed_scheme != scheme_lower: + continue if allowed_port is not None and allowed_port != url_port: continue if allowed_ip == url_ip: @@ -381,10 +379,8 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom continue # Scheme matching: if allow list entry has explicit scheme, it must match exactly - if has_explicit_scheme: - allowed_scheme = parsed_allowed.scheme.lower() if parsed_allowed.scheme else "" - if allowed_scheme and allowed_scheme != scheme_lower: - continue + if has_explicit_scheme and allowed_scheme and allowed_scheme != scheme_lower: + continue # Path matching with segment boundary respect if allowed_path not in ("", "/"): From 78ba233ae7a987534a69304e15cfd1df27e3f6c6 Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 13:00:00 -0500 Subject: [PATCH 5/8] Handle trailing slash in allow lists --- src/guardrails/checks/text/urls.py | 5 ++++- tests/unit/checks/test_urls.py | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index f30b16e..a229481 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -384,9 +384,12 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom # Path matching with segment boundary respect if allowed_path not in ("", "/"): + # Normalize trailing slashes to prevent issues with entries like "/api/" + # which should match "/api/users" but would fail with double-slash check + normalized_allowed_path = allowed_path.rstrip("/") # Ensure path matching respects segment boundaries to prevent # "/api" from matching "/api2" or "/api-v2" - if url_path != allowed_path and not url_path.startswith(f"{allowed_path}/"): + if url_path != allowed_path and url_path != normalized_allowed_path and not url_path.startswith(f"{normalized_allowed_path}/"): continue if allowed_query and allowed_query != url_query: diff --git a/tests/unit/checks/test_urls.py b/tests/unit/checks/test_urls.py index 17eb3f8..e78ba45 100644 --- a/tests/unit/checks/test_urls.py +++ b/tests/unit/checks/test_urls.py @@ -344,3 +344,23 @@ async def test_urls_guardrail_handles_malformed_ports_gracefully() -> None: assert len(result.info["blocked"]) == 3 # noqa: S101 # All three URLs should be blocked (the key is they don't crash the guardrail) assert len(result.info["blocked_reasons"]) == 3 # noqa: S101 + + +def test_is_url_allowed_handles_trailing_slash_in_path() -> None: + """Allow list entries with trailing slashes should match subpaths correctly.""" + config = URLConfig( + url_allow_list=["https://example.com/api/"], + allow_subdomains=False, + allowed_schemes={"https"}, + ) + # URL with subpath should be allowed + subpath_url, _ = _validate_url_security("https://example.com/api/users", config) + # Exact match (with trailing slash) should be allowed + exact_url, _ = _validate_url_security("https://example.com/api/", config) + + assert subpath_url is not None # noqa: S101 + assert exact_url is not None # noqa: S101 + + # Both should be allowed + assert _is_url_allowed(subpath_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(exact_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 From bb55585ff43bab277d5df7faad3b4eaf5baea219 Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 13:35:09 -0500 Subject: [PATCH 6/8] Allow scheme-less user input --- src/guardrails/checks/text/urls.py | 79 ++++++++----- tests/unit/checks/test_urls.py | 177 ++++++++++++++++++----------- 2 files changed, 165 insertions(+), 91 deletions(-) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index a229481..f98308b 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -206,7 +206,7 @@ def _detect_urls(text: str) -> list[str]: return list(dict.fromkeys([url for url in final_urls if url])) -def _validate_url_security(url_string: str, config: URLConfig) -> tuple[ParseResult | None, str]: +def _validate_url_security(url_string: str, config: URLConfig) -> tuple[ParseResult | None, str, bool]: """Validate URL security properties using urllib.parse. Checks URL structure, validates the scheme is allowed, and ensures no @@ -217,53 +217,58 @@ def _validate_url_security(url_string: str, config: URLConfig) -> tuple[ParseRes config: Configuration specifying allowed schemes and userinfo policy. Returns: - A tuple of (parsed_url, error_reason). If validation succeeds, - parsed_url is a ParseResult and error_reason is empty. If validation - fails, parsed_url is None and error_reason describes the failure. + A tuple of (parsed_url, error_reason, had_explicit_scheme). If validation + succeeds, parsed_url is a ParseResult, error_reason is empty, and + had_explicit_scheme indicates if the original URL included a scheme. + If validation fails, parsed_url is None and error_reason describes the failure. """ try: - # Parse URL - preserve original scheme for validation + # Parse URL - track whether scheme was explicit + has_explicit_scheme = False if "://" in url_string: # Standard URL with double-slash scheme (http://, https://, ftp://, etc.) parsed_url = urlparse(url_string) original_scheme = parsed_url.scheme + has_explicit_scheme = True elif ":" in url_string and url_string.split(":", 1)[0] in {"data", "javascript", "vbscript", "mailto"}: # Special single-colon schemes parsed_url = urlparse(url_string) original_scheme = parsed_url.scheme + has_explicit_scheme = True else: - # Add http scheme for parsing, but remember this is a default + # Add http scheme for parsing only (user didn't specify a scheme) parsed_url = urlparse(f"http://{url_string}") - original_scheme = "http" # Default scheme for scheme-less URLs + original_scheme = None # No explicit scheme + has_explicit_scheme = False # Basic validation: must have scheme and netloc (except for special schemes) if not parsed_url.scheme: - return None, "Invalid URL format" + return None, "Invalid URL format", False # Special schemes like data: and javascript: don't need netloc special_schemes = {"data", "javascript", "vbscript", "mailto"} - if original_scheme not in special_schemes and not parsed_url.netloc: - return None, "Invalid URL format" + if parsed_url.scheme not in special_schemes and not parsed_url.netloc: + return None, "Invalid URL format", False - # Security validations - use original scheme - if original_scheme not in config.allowed_schemes: - return None, f"Blocked scheme: {original_scheme}" + # Security validations - only validate scheme if it was explicitly provided + if has_explicit_scheme and original_scheme not in config.allowed_schemes: + return None, f"Blocked scheme: {original_scheme}", has_explicit_scheme if config.block_userinfo and (parsed_url.username or parsed_url.password): - return None, "Contains userinfo (potential credential injection)" + return None, "Contains userinfo (potential credential injection)", has_explicit_scheme # Everything else (IPs, localhost, private IPs) goes through allow list logic - return parsed_url, "" + return parsed_url, "", has_explicit_scheme except (ValueError, UnicodeError, AttributeError) as e: # Common URL parsing errors: # - ValueError: Invalid URL structure, invalid port, etc. # - UnicodeError: Invalid encoding in URL # - AttributeError: Unexpected URL structure - return None, f"Invalid URL format: {str(e)}" + return None, f"Invalid URL format: {str(e)}", False except Exception as e: # Catch any unexpected errors but provide debugging info - return None, f"URL parsing error: {type(e).__name__}: {str(e)}" + return None, f"URL parsing error: {type(e).__name__}: {str(e)}", False def _safe_get_port(parsed: ParseResult, scheme: str) -> int | None: @@ -283,17 +288,24 @@ def _safe_get_port(parsed: ParseResult, scheme: str) -> int | None: return None -def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdomains: bool) -> bool: +def _is_url_allowed( + parsed_url: ParseResult, + allow_list: list[str], + allow_subdomains: bool, + url_had_explicit_scheme: bool, +) -> bool: """Check if parsed URL matches any entry in the allow list. Supports domain names, IP addresses, CIDR blocks, and full URLs with paths/ports/query strings. Allow list entries without explicit schemes - match any scheme. Entries with schemes must match exactly. + match any scheme. Entries with schemes must match exactly against URLs + with explicit schemes, but match any scheme-less URL. Args: parsed_url: The parsed URL to check. allow_list: List of allowed URL patterns (domains, IPs, CIDR, full URLs). allow_subdomains: If True, subdomains of allowed domains are permitted. + url_had_explicit_scheme: Whether the original URL included an explicit scheme. Returns: True if the URL matches any allow list entry, False otherwise. @@ -308,6 +320,12 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom url_host = url_host.lower() url_domain = url_host.replace("www.", "") scheme_lower = parsed_url.scheme.lower() if parsed_url.scheme else "" + # Check if port was explicitly specified (safely) + try: + url_port_explicit = parsed_url.port + except ValueError: + # Malformed port (out of range or invalid) - reject the URL + return False url_port = _safe_get_port(parsed_url, scheme_lower) # If port is invalid (None from _safe_get_port due to ValueError), reject the URL if url_port is None and parsed_url.netloc and ":" in parsed_url.netloc: @@ -331,6 +349,11 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom parsed_allowed = urlparse(f"//{allowed_entry}") allowed_host = (parsed_allowed.hostname or "").lower() allowed_scheme = parsed_allowed.scheme.lower() if parsed_allowed.scheme else "" + # Check if port was explicitly specified (safely) + try: + allowed_port_explicit = parsed_allowed.port + except ValueError: + allowed_port_explicit = None allowed_port = _safe_get_port(parsed_allowed, allowed_scheme) allowed_path = parsed_allowed.path allowed_query = parsed_allowed.query @@ -345,10 +368,11 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom if allowed_ip is not None: if url_ip is None: continue - # Scheme matching for IPs: if allow list entry has explicit scheme, it must match exactly - if has_explicit_scheme and allowed_scheme and allowed_scheme != scheme_lower: + # Scheme matching for IPs: if both allow list and URL have explicit schemes, they must match + if has_explicit_scheme and url_had_explicit_scheme and allowed_scheme and allowed_scheme != scheme_lower: continue - if allowed_port is not None and allowed_port != url_port: + # Port matching: only enforce if either side explicitly specified a port + if (allowed_port_explicit is not None or url_port_explicit is not None) and allowed_port != url_port: continue if allowed_ip == url_ip: return True @@ -369,7 +393,8 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom allowed_domain = allowed_host.replace("www.", "") - if allowed_port is not None and allowed_port != url_port: + # Port matching: only enforce if either side explicitly specified a port + if (allowed_port_explicit is not None or url_port_explicit is not None) and allowed_port != url_port: continue host_matches = url_domain == allowed_domain or ( @@ -378,8 +403,8 @@ def _is_url_allowed(parsed_url: ParseResult, allow_list: list[str], allow_subdom if not host_matches: continue - # Scheme matching: if allow list entry has explicit scheme, it must match exactly - if has_explicit_scheme and allowed_scheme and allowed_scheme != scheme_lower: + # Scheme matching: if both allow list and URL have explicit schemes, they must match + if has_explicit_scheme and url_had_explicit_scheme and allowed_scheme and allowed_scheme != scheme_lower: continue # Path matching with segment boundary respect @@ -421,7 +446,7 @@ async def urls(ctx: Any, data: str, config: URLConfig) -> GuardrailResult: for url_string in detected_urls: # Validate URL with security checks - parsed_url, error_reason = _validate_url_security(url_string, config) + parsed_url, error_reason, url_had_explicit_scheme = _validate_url_security(url_string, config) if parsed_url is None: blocked.append(url_string) @@ -436,7 +461,7 @@ async def urls(ctx: Any, data: str, config: URLConfig) -> GuardrailResult: # For hostless schemes, only scheme permission matters (no allow list needed) # They were already validated for scheme permission in _validate_url_security allowed.append(url_string) - elif _is_url_allowed(parsed_url, config.url_allow_list, config.allow_subdomains): + elif _is_url_allowed(parsed_url, config.url_allow_list, config.allow_subdomains, url_had_explicit_scheme): allowed.append(url_string) else: blocked.append(url_string) diff --git a/tests/unit/checks/test_urls.py b/tests/unit/checks/test_urls.py index e78ba45..bdeefa0 100644 --- a/tests/unit/checks/test_urls.py +++ b/tests/unit/checks/test_urls.py @@ -27,7 +27,7 @@ def test_detect_urls_deduplicates_scheme_and_domain() -> None: def test_validate_url_security_blocks_bad_scheme() -> None: """Disallowed schemes should produce an error.""" config = URLConfig() - parsed, reason = _validate_url_security("http://blocked.com", config) + parsed, reason, _ = _validate_url_security("http://blocked.com", config) assert parsed is None # noqa: S101 assert "Blocked scheme" in reason # noqa: S101 @@ -36,7 +36,7 @@ def test_validate_url_security_blocks_bad_scheme() -> None: def test_validate_url_security_blocks_userinfo_when_configured() -> None: """URLs with embedded credentials should be rejected when block_userinfo=True.""" config = URLConfig(allowed_schemes={"https"}, block_userinfo=True) - parsed, reason = _validate_url_security("https://user:pass@example.com", config) + parsed, reason, _ = _validate_url_security("https://user:pass@example.com", config) assert parsed is None # noqa: S101 assert "userinfo" in reason # noqa: S101 @@ -45,7 +45,7 @@ def test_validate_url_security_blocks_userinfo_when_configured() -> None: def test_validate_url_security_blocks_password_without_username() -> None: """URLs that only include a password in userinfo must be blocked.""" config = URLConfig(allowed_schemes={"https"}, block_userinfo=True) - parsed, reason = _validate_url_security("https://:secret@example.com", config) + parsed, reason, _ = _validate_url_security("https://:secret@example.com", config) assert parsed is None # noqa: S101 assert "userinfo" in reason # noqa: S101 @@ -65,16 +65,16 @@ def test_is_url_allowed_handles_full_urls_with_paths() -> None: allow_subdomains=False, allowed_schemes={"https://"}, ) - root_url, _ = _validate_url_security("https://suntropy.es", config) - path_url, _ = _validate_url_security("https://api.example.com/v1/resources?id=2", config) - wrong_path_url, _ = _validate_url_security("https://api.example.com/v2", config) + root_url, _, had_scheme1 = _validate_url_security("https://suntropy.es", config) + path_url, _, had_scheme2 = _validate_url_security("https://api.example.com/v1/resources?id=2", config) + wrong_path_url, _, had_scheme3 = _validate_url_security("https://api.example.com/v2", config) assert root_url is not None # noqa: S101 assert path_url is not None # noqa: S101 assert wrong_path_url is not None # noqa: S101 - assert _is_url_allowed(root_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(path_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(wrong_path_url, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(root_url, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(path_url, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 + assert _is_url_allowed(wrong_path_url, config.url_allow_list, config.allow_subdomains, had_scheme3) is False # noqa: S101 def test_is_url_allowed_respects_path_segment_boundaries() -> None: @@ -85,12 +85,12 @@ def test_is_url_allowed_respects_path_segment_boundaries() -> None: allowed_schemes={"https"}, ) # These should be allowed - exact_match, _ = _validate_url_security("https://example.com/api", config) - valid_subpath, _ = _validate_url_security("https://example.com/api/users", config) + exact_match, _, had_scheme1 = _validate_url_security("https://example.com/api", config) + valid_subpath, _, had_scheme2 = _validate_url_security("https://example.com/api/users", config) # These should NOT be allowed (different path segments) - similar_path1, _ = _validate_url_security("https://example.com/api2", config) - similar_path2, _ = _validate_url_security("https://example.com/api-v2", config) + similar_path1, _, had_scheme3 = _validate_url_security("https://example.com/api2", config) + similar_path2, _, had_scheme4 = _validate_url_security("https://example.com/api-v2", config) assert exact_match is not None # noqa: S101 assert valid_subpath is not None # noqa: S101 @@ -98,12 +98,12 @@ def test_is_url_allowed_respects_path_segment_boundaries() -> None: assert similar_path2 is not None # noqa: S101 # Exact match and valid subpath should be allowed - assert _is_url_allowed(exact_match, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(valid_subpath, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(exact_match, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(valid_subpath, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 # Similar paths that don't respect segment boundaries should be blocked - assert _is_url_allowed(similar_path1, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 - assert _is_url_allowed(similar_path2, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(similar_path1, config.url_allow_list, config.allow_subdomains, had_scheme3) is False # noqa: S101 + assert _is_url_allowed(similar_path2, config.url_allow_list, config.allow_subdomains, had_scheme4) is False # noqa: S101 def test_is_url_allowed_without_scheme_matches_multiple_protocols() -> None: @@ -113,13 +113,13 @@ def test_is_url_allowed_without_scheme_matches_multiple_protocols() -> None: allow_subdomains=False, allowed_schemes={"https", "http"}, ) - https_result, https_reason = _validate_url_security("https://example.com", config) - http_result, http_reason = _validate_url_security("http://example.com", config) + https_result, https_reason, had_scheme1 = _validate_url_security("https://example.com", config) + http_result, http_reason, had_scheme2 = _validate_url_security("http://example.com", config) assert https_result is not None, https_reason # noqa: S101 assert http_result is not None, http_reason # noqa: S101 - assert _is_url_allowed(https_result, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(http_result, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(https_result, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(http_result, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 def test_is_url_allowed_supports_subdomains_and_cidr() -> None: @@ -128,13 +128,13 @@ def test_is_url_allowed_supports_subdomains_and_cidr() -> None: url_allow_list=["example.com", "10.0.0.0/8"], allow_subdomains=True, ) - https_result, _ = _validate_url_security("https://api.example.com", config) - ip_result, _ = _validate_url_security("https://10.1.2.3", config) + https_result, _, had_scheme1 = _validate_url_security("https://api.example.com", config) + ip_result, _, had_scheme2 = _validate_url_security("https://10.1.2.3", config) assert https_result is not None # noqa: S101 assert ip_result is not None # noqa: S101 - assert _is_url_allowed(https_result, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(ip_result, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(https_result, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(ip_result, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 @pytest.mark.asyncio @@ -201,7 +201,7 @@ def test_url_config_rejects_empty_schemes() -> None: def test_validate_url_security_handles_malformed_urls() -> None: """Malformed URLs should be rejected with clear error messages.""" config = URLConfig(allowed_schemes={"https"}) - parsed, reason = _validate_url_security("https://", config) + parsed, reason, _ = _validate_url_security("https://", config) assert parsed is None # noqa: S101 assert "Invalid URL" in reason # noqa: S101 @@ -215,41 +215,45 @@ def test_is_url_allowed_handles_cidr_blocks() -> None: allowed_schemes={"https"}, ) # IPs within CIDR ranges - ip_in_range1, _ = _validate_url_security("https://10.5.5.5", config) - ip_in_range2, _ = _validate_url_security("https://192.168.1.100", config) + ip_in_range1, _, had_scheme1 = _validate_url_security("https://10.5.5.5", config) + ip_in_range2, _, had_scheme2 = _validate_url_security("https://192.168.1.100", config) # IP outside CIDR range - ip_outside, _ = _validate_url_security("https://192.168.2.1", config) + ip_outside, _, had_scheme3 = _validate_url_security("https://192.168.2.1", config) assert ip_in_range1 is not None # noqa: S101 assert ip_in_range2 is not None # noqa: S101 assert ip_outside is not None # noqa: S101 - assert _is_url_allowed(ip_in_range1, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(ip_in_range2, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(ip_outside, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(ip_in_range1, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(ip_in_range2, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 + assert _is_url_allowed(ip_outside, config.url_allow_list, config.allow_subdomains, had_scheme3) is False # noqa: S101 def test_is_url_allowed_handles_port_matching() -> None: - """Allow list entries with explicit ports should require exact port match.""" + """Port matching: only enforced when explicitly specified.""" config = URLConfig( url_allow_list=["https://example.com:8443", "api.internal.com"], allow_subdomains=False, allowed_schemes={"https"}, ) - # Correct port - correct_port, _ = _validate_url_security("https://example.com:8443", config) - # Wrong port (implicit 443) - wrong_port, _ = _validate_url_security("https://example.com", config) - # Any port when not specified in allow list - any_port, _ = _validate_url_security("https://api.internal.com:9000", config) + # Explicit port 8443 matches allow list + correct_port, _, had_scheme1 = _validate_url_security("https://example.com:8443", config) + # Implicit 443 doesn't match explicit 8443 in allow list + wrong_port, _, had_scheme2 = _validate_url_security("https://example.com", config) + # Explicit 9000 doesn't match implicit default in allow list (no port = not checking) + explicit_port_vs_implicit, _, had_scheme3 = _validate_url_security("https://api.internal.com:9000", config) + # Implicit default matches implicit default + implicit_match, _, had_scheme4 = _validate_url_security("https://api.internal.com", config) assert correct_port is not None # noqa: S101 assert wrong_port is not None # noqa: S101 - assert any_port is not None # noqa: S101 + assert explicit_port_vs_implicit is not None # noqa: S101 + assert implicit_match is not None # noqa: S101 - assert _is_url_allowed(correct_port, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(wrong_port, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 - assert _is_url_allowed(any_port, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(correct_port, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(wrong_port, config.url_allow_list, config.allow_subdomains, had_scheme2) is False # noqa: S101 + assert _is_url_allowed(explicit_port_vs_implicit, config.url_allow_list, config.allow_subdomains, had_scheme3) is False # noqa: S101 + assert _is_url_allowed(implicit_match, config.url_allow_list, config.allow_subdomains, had_scheme4) is True # noqa: S101 def test_is_url_allowed_handles_query_and_fragment() -> None: @@ -260,29 +264,29 @@ def test_is_url_allowed_handles_query_and_fragment() -> None: allowed_schemes={"https"}, ) # Exact query match - exact_query, _ = _validate_url_security("https://example.com/search?q=test", config) + exact_query, _, had_scheme1 = _validate_url_security("https://example.com/search?q=test", config) # Different query - diff_query, _ = _validate_url_security("https://example.com/search?q=other", config) + diff_query, _, had_scheme2 = _validate_url_security("https://example.com/search?q=other", config) # Exact fragment match - exact_fragment, _ = _validate_url_security("https://example.com/docs#intro", config) + exact_fragment, _, had_scheme3 = _validate_url_security("https://example.com/docs#intro", config) # Different fragment - diff_fragment, _ = _validate_url_security("https://example.com/docs#outro", config) + diff_fragment, _, had_scheme4 = _validate_url_security("https://example.com/docs#outro", config) assert exact_query is not None # noqa: S101 assert diff_query is not None # noqa: S101 assert exact_fragment is not None # noqa: S101 assert diff_fragment is not None # noqa: S101 - assert _is_url_allowed(exact_query, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(diff_query, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 - assert _is_url_allowed(exact_fragment, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(diff_fragment, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(exact_query, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(diff_query, config.url_allow_list, config.allow_subdomains, had_scheme2) is False # noqa: S101 + assert _is_url_allowed(exact_fragment, config.url_allow_list, config.allow_subdomains, had_scheme3) is True # noqa: S101 + assert _is_url_allowed(diff_fragment, config.url_allow_list, config.allow_subdomains, had_scheme4) is False # noqa: S101 def test_validate_url_security_allows_userinfo_when_disabled() -> None: """URLs with userinfo should be allowed when block_userinfo=False.""" config = URLConfig(allowed_schemes={"https"}, block_userinfo=False) - parsed, reason = _validate_url_security("https://user:pass@example.com", config) + parsed, reason, _ = _validate_url_security("https://user:pass@example.com", config) assert parsed is not None # noqa: S101 assert reason == "" # noqa: S101 @@ -296,16 +300,16 @@ def test_is_url_allowed_enforces_scheme_when_explicitly_specified() -> None: allowed_schemes={"https", "http"}, # Both schemes allowed globally ) # HTTPS should be allowed (matches the scheme in allow list) - https_url, _ = _validate_url_security("https://bank.example.com", config) + https_url, _, had_scheme1 = _validate_url_security("https://bank.example.com", config) # HTTP should be BLOCKED (doesn't match the explicit https:// in allow list) - http_url, _ = _validate_url_security("http://bank.example.com", config) + http_url, _, had_scheme2 = _validate_url_security("http://bank.example.com", config) assert https_url is not None # noqa: S101 assert http_url is not None # noqa: S101 # This is the security-critical check: scheme-qualified entries must match exactly - assert _is_url_allowed(https_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(http_url, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(https_url, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(http_url, config.url_allow_list, config.allow_subdomains, had_scheme2) is False # noqa: S101 def test_is_url_allowed_enforces_scheme_for_ips() -> None: @@ -316,15 +320,15 @@ def test_is_url_allowed_enforces_scheme_for_ips() -> None: allowed_schemes={"https", "http"}, ) # HTTPS should be allowed - https_ip, _ = _validate_url_security("https://192.168.1.100", config) + https_ip, _, had_scheme1 = _validate_url_security("https://192.168.1.100", config) # HTTP should be BLOCKED - http_ip, _ = _validate_url_security("http://192.168.1.100", config) + http_ip, _, had_scheme2 = _validate_url_security("http://192.168.1.100", config) assert https_ip is not None # noqa: S101 assert http_ip is not None # noqa: S101 - assert _is_url_allowed(https_ip, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(http_ip, config.url_allow_list, config.allow_subdomains) is False # noqa: S101 + assert _is_url_allowed(https_ip, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(http_ip, config.url_allow_list, config.allow_subdomains, had_scheme2) is False # noqa: S101 @pytest.mark.asyncio @@ -354,13 +358,58 @@ def test_is_url_allowed_handles_trailing_slash_in_path() -> None: allowed_schemes={"https"}, ) # URL with subpath should be allowed - subpath_url, _ = _validate_url_security("https://example.com/api/users", config) + subpath_url, _, had_scheme1 = _validate_url_security("https://example.com/api/users", config) # Exact match (with trailing slash) should be allowed - exact_url, _ = _validate_url_security("https://example.com/api/", config) + exact_url, _, had_scheme2 = _validate_url_security("https://example.com/api/", config) assert subpath_url is not None # noqa: S101 assert exact_url is not None # noqa: S101 # Both should be allowed - assert _is_url_allowed(subpath_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 - assert _is_url_allowed(exact_url, config.url_allow_list, config.allow_subdomains) is True # noqa: S101 + assert _is_url_allowed(subpath_url, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(exact_url, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 + + +@pytest.mark.asyncio +async def test_urls_guardrail_scheme_matching_with_qualified_allow_list() -> None: + """Test exact behavior: scheme-qualified allow list vs scheme-less/explicit URLs.""" + config = URLConfig( + url_allow_list=["https://suntropy.es"], + allowed_schemes={"https"}, + allow_subdomains=False, + ) + + # Test schemeless URL + result1 = await urls(ctx=None, data="Visit suntropy.es", config=config) + assert "suntropy.es" in result1.info["allowed"] # noqa: S101 + assert result1.tripwire_triggered is False # noqa: S101 + + # Test HTTPS URL (should match allow list scheme) + result2 = await urls(ctx=None, data="Visit https://suntropy.es", config=config) + assert "https://suntropy.es" in result2.info["allowed"] # noqa: S101 + assert result2.tripwire_triggered is False # noqa: S101 + + # Test HTTP URL (wrong explicit scheme should be blocked) + result3 = await urls(ctx=None, data="Visit http://suntropy.es", config=config) + assert "http://suntropy.es" in result3.info["blocked"] # noqa: S101 + assert result3.tripwire_triggered is True # noqa: S101 + + +@pytest.mark.asyncio +async def test_urls_guardrail_blocks_subdomains_and_paths_correctly() -> None: + """Verify subdomains and paths are still blocked according to allow list rules.""" + config = URLConfig( + url_allow_list=["https://suntropy.es"], + allowed_schemes={"https"}, + allow_subdomains=False, + ) + # Test blocked cases - different domains and subdomains + text = "Visit help-suntropy.es and help.suntropy.es" + + result = await urls(ctx=None, data=text, config=config) + + # Both should be blocked - not in allow list + assert result.tripwire_triggered is True # noqa: S101 + assert len(result.info["blocked"]) == 2 # noqa: S101 + assert "help-suntropy.es" in result.info["blocked"] # noqa: S101 + assert "help.suntropy.es" in result.info["blocked"] # noqa: S101 From 9ad835d6bcde915f635f60b8e94bbda5f19cf7b8 Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 13:51:08 -0500 Subject: [PATCH 7/8] Fix port matching --- src/guardrails/checks/text/urls.py | 3 --- tests/unit/checks/test_urls.py | 20 ++++++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index f98308b..de90acb 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -327,9 +327,6 @@ def _is_url_allowed( # Malformed port (out of range or invalid) - reject the URL return False url_port = _safe_get_port(parsed_url, scheme_lower) - # If port is invalid (None from _safe_get_port due to ValueError), reject the URL - if url_port is None and parsed_url.netloc and ":" in parsed_url.netloc: - return False url_path = parsed_url.path or "/" url_query = parsed_url.query url_fragment = parsed_url.fragment diff --git a/tests/unit/checks/test_urls.py b/tests/unit/checks/test_urls.py index bdeefa0..3590d5e 100644 --- a/tests/unit/checks/test_urls.py +++ b/tests/unit/checks/test_urls.py @@ -395,6 +395,26 @@ async def test_urls_guardrail_scheme_matching_with_qualified_allow_list() -> Non assert result3.tripwire_triggered is True # noqa: S101 +def test_is_url_allowed_handles_ipv6_addresses() -> None: + """IPv6 addresses should be handled correctly (colons are not ports).""" + config = URLConfig( + url_allow_list=["[2001:db8::1]", "ftp://[2001:db8::2]"], + allow_subdomains=False, + allowed_schemes={"https", "ftp"}, + ) + # IPv6 without scheme + ipv6_no_scheme, _, had_scheme1 = _validate_url_security("[2001:db8::1]", config) + # IPv6 with ftp scheme + ipv6_with_ftp, _, had_scheme2 = _validate_url_security("ftp://[2001:db8::2]", config) + + assert ipv6_no_scheme is not None # noqa: S101 + assert ipv6_with_ftp is not None # noqa: S101 + + # Both should be allowed + assert _is_url_allowed(ipv6_no_scheme, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(ipv6_with_ftp, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 + + @pytest.mark.asyncio async def test_urls_guardrail_blocks_subdomains_and_paths_correctly() -> None: """Verify subdomains and paths are still blocked according to allow list rules.""" From 0dd7311667e38a9d4ccf8e0f0c781bd2d59d89cc Mon Sep 17 00:00:00 2001 From: Steven C Date: Thu, 20 Nov 2025 14:20:45 -0500 Subject: [PATCH 8/8] improve port matching --- src/guardrails/checks/text/urls.py | 16 +++++------ tests/unit/checks/test_urls.py | 45 ++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index de90acb..b2911d5 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -320,13 +320,13 @@ def _is_url_allowed( url_host = url_host.lower() url_domain = url_host.replace("www.", "") scheme_lower = parsed_url.scheme.lower() if parsed_url.scheme else "" - # Check if port was explicitly specified (safely) + # Safely get port (rejects malformed ports) + url_port = _safe_get_port(parsed_url, scheme_lower) + # Early rejection of malformed ports try: - url_port_explicit = parsed_url.port + _ = parsed_url.port # This will raise ValueError for malformed ports except ValueError: - # Malformed port (out of range or invalid) - reject the URL return False - url_port = _safe_get_port(parsed_url, scheme_lower) url_path = parsed_url.path or "/" url_query = parsed_url.query url_fragment = parsed_url.fragment @@ -368,8 +368,8 @@ def _is_url_allowed( # Scheme matching for IPs: if both allow list and URL have explicit schemes, they must match if has_explicit_scheme and url_had_explicit_scheme and allowed_scheme and allowed_scheme != scheme_lower: continue - # Port matching: only enforce if either side explicitly specified a port - if (allowed_port_explicit is not None or url_port_explicit is not None) and allowed_port != url_port: + # Port matching: enforce if allow list has explicit port + if allowed_port_explicit is not None and allowed_port != url_port: continue if allowed_ip == url_ip: return True @@ -390,8 +390,8 @@ def _is_url_allowed( allowed_domain = allowed_host.replace("www.", "") - # Port matching: only enforce if either side explicitly specified a port - if (allowed_port_explicit is not None or url_port_explicit is not None) and allowed_port != url_port: + # Port matching: enforce if allow list has explicit port + if allowed_port_explicit is not None and allowed_port != url_port: continue host_matches = url_domain == allowed_domain or ( diff --git a/tests/unit/checks/test_urls.py b/tests/unit/checks/test_urls.py index 3590d5e..aaef633 100644 --- a/tests/unit/checks/test_urls.py +++ b/tests/unit/checks/test_urls.py @@ -230,30 +230,34 @@ def test_is_url_allowed_handles_cidr_blocks() -> None: def test_is_url_allowed_handles_port_matching() -> None: - """Port matching: only enforced when explicitly specified.""" + """Port matching: enforced if allow list has explicit port, otherwise any port allowed.""" config = URLConfig( url_allow_list=["https://example.com:8443", "api.internal.com"], allow_subdomains=False, allowed_schemes={"https"}, ) - # Explicit port 8443 matches allow list + # Explicit port 8443 matches allow list's explicit port → ALLOWED correct_port, _, had_scheme1 = _validate_url_security("https://example.com:8443", config) - # Implicit 443 doesn't match explicit 8443 in allow list + # Implicit 443 doesn't match allow list's explicit 8443 → BLOCKED wrong_port, _, had_scheme2 = _validate_url_security("https://example.com", config) - # Explicit 9000 doesn't match implicit default in allow list (no port = not checking) - explicit_port_vs_implicit, _, had_scheme3 = _validate_url_security("https://api.internal.com:9000", config) - # Implicit default matches implicit default + # Explicit 9000 with no port restriction in allow list → ALLOWED + explicit_port_no_restriction, _, had_scheme3 = _validate_url_security("https://api.internal.com:9000", config) + # Implicit 443 with no port restriction in allow list → ALLOWED implicit_match, _, had_scheme4 = _validate_url_security("https://api.internal.com", config) + # Explicit default 443 with no port restriction in allow list → ALLOWED (regression fix) + explicit_default_port, _, had_scheme5 = _validate_url_security("https://api.internal.com:443", config) assert correct_port is not None # noqa: S101 assert wrong_port is not None # noqa: S101 - assert explicit_port_vs_implicit is not None # noqa: S101 + assert explicit_port_no_restriction is not None # noqa: S101 assert implicit_match is not None # noqa: S101 + assert explicit_default_port is not None # noqa: S101 assert _is_url_allowed(correct_port, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 assert _is_url_allowed(wrong_port, config.url_allow_list, config.allow_subdomains, had_scheme2) is False # noqa: S101 - assert _is_url_allowed(explicit_port_vs_implicit, config.url_allow_list, config.allow_subdomains, had_scheme3) is False # noqa: S101 + assert _is_url_allowed(explicit_port_no_restriction, config.url_allow_list, config.allow_subdomains, had_scheme3) is True # noqa: S101 assert _is_url_allowed(implicit_match, config.url_allow_list, config.allow_subdomains, had_scheme4) is True # noqa: S101 + assert _is_url_allowed(explicit_default_port, config.url_allow_list, config.allow_subdomains, had_scheme5) is True # noqa: S101 def test_is_url_allowed_handles_query_and_fragment() -> None: @@ -415,6 +419,31 @@ def test_is_url_allowed_handles_ipv6_addresses() -> None: assert _is_url_allowed(ipv6_with_ftp, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 +def test_is_url_allowed_handles_ipv6_cidr_notation() -> None: + """IPv6 CIDR blocks should be handled correctly (brackets stripped, path concatenated).""" + config = URLConfig( + url_allow_list=["[2001:db8::]/64", "[fe80::]/10"], + allow_subdomains=False, + allowed_schemes={"https"}, + ) + # IP within first CIDR range + ip_in_range1, _, had_scheme1 = _validate_url_security("https://[2001:db8::1234]", config) + # IP within second CIDR range + ip_in_range2, _, had_scheme2 = _validate_url_security("https://[fe80::5678]", config) + # IP outside CIDR ranges + ip_outside, _, had_scheme3 = _validate_url_security("https://[2001:db9::1]", config) + + assert ip_in_range1 is not None # noqa: S101 + assert ip_in_range2 is not None # noqa: S101 + assert ip_outside is not None # noqa: S101 + + # IPs within CIDR ranges should be allowed + assert _is_url_allowed(ip_in_range1, config.url_allow_list, config.allow_subdomains, had_scheme1) is True # noqa: S101 + assert _is_url_allowed(ip_in_range2, config.url_allow_list, config.allow_subdomains, had_scheme2) is True # noqa: S101 + # IP outside should be blocked + assert _is_url_allowed(ip_outside, config.url_allow_list, config.allow_subdomains, had_scheme3) is False # noqa: S101 + + @pytest.mark.asyncio async def test_urls_guardrail_blocks_subdomains_and_paths_correctly() -> None: """Verify subdomains and paths are still blocked according to allow list rules."""