diff --git a/apps/api/plane/app/serializers/webhook.py b/apps/api/plane/app/serializers/webhook.py index 74ebde89205..c5d0dd41e9a 100644 --- a/apps/api/plane/app/serializers/webhook.py +++ b/apps/api/plane/app/serializers/webhook.py @@ -3,90 +3,55 @@ # See the LICENSE file for details. # Python imports -import socket -import ipaddress +import logging from urllib.parse import urlparse # Third party imports from rest_framework import serializers +# Django imports +from django.conf import settings + # Module imports from .base import DynamicBaseSerializer from plane.db.models import Webhook, WebhookLog from plane.db.models.webhook import validate_domain, validate_schema +from plane.utils.ip_address import validate_url + +logger = logging.getLogger(__name__) class WebhookSerializer(DynamicBaseSerializer): url = serializers.URLField(validators=[validate_schema, validate_domain]) - def create(self, validated_data): - url = validated_data.get("url", None) - - # Extract the hostname from the URL - hostname = urlparse(url).hostname - if not hostname: - raise serializers.ValidationError({"url": "Invalid URL: No hostname found."}) - - # Resolve the hostname to IP addresses + def _validate_webhook_url(self, url): + """Validate a webhook URL against SSRF and disallowed domain rules.""" try: - ip_addresses = socket.getaddrinfo(hostname, None) - except socket.gaierror: - raise serializers.ValidationError({"url": "Hostname could not be resolved."}) - - if not ip_addresses: - raise serializers.ValidationError({"url": "No IP addresses found for the hostname."}) + validate_url(url, allowed_ips=settings.WEBHOOK_ALLOWED_IPS) + except ValueError as e: + logger.warning("Webhook URL validation failed for %s: %s", url, e) + raise serializers.ValidationError({"url": "Invalid or disallowed webhook URL."}) - for addr in ip_addresses: - ip = ipaddress.ip_address(addr[4][0]) - if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: - raise serializers.ValidationError({"url": "URL resolves to a blocked IP address."}) + hostname = (urlparse(url).hostname or "").rstrip(".").lower() - # Additional validation for multiple request domains and their subdomains request = self.context.get("request") - disallowed_domains = ["plane.so"] # Add your disallowed domains here + disallowed_domains = ["plane.so"] if request: - request_host = request.get_host().split(":")[0] # Remove port if present + request_host = request.get_host().split(":")[0].rstrip(".").lower() disallowed_domains.append(request_host) - # Check if hostname is a subdomain or exact match of any disallowed domain if any(hostname == domain or hostname.endswith("." + domain) for domain in disallowed_domains): raise serializers.ValidationError({"url": "URL domain or its subdomain is not allowed."}) + def create(self, validated_data): + url = validated_data.get("url", None) + self._validate_webhook_url(url) return Webhook.objects.create(**validated_data) def update(self, instance, validated_data): url = validated_data.get("url", None) if url: - # Extract the hostname from the URL - hostname = urlparse(url).hostname - if not hostname: - raise serializers.ValidationError({"url": "Invalid URL: No hostname found."}) - - # Resolve the hostname to IP addresses - try: - ip_addresses = socket.getaddrinfo(hostname, None) - except socket.gaierror: - raise serializers.ValidationError({"url": "Hostname could not be resolved."}) - - if not ip_addresses: - raise serializers.ValidationError({"url": "No IP addresses found for the hostname."}) - - for addr in ip_addresses: - ip = ipaddress.ip_address(addr[4][0]) - if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: - raise serializers.ValidationError({"url": "URL resolves to a blocked IP address."}) - - # Additional validation for multiple request domains and their subdomains - request = self.context.get("request") - disallowed_domains = ["plane.so"] # Add your disallowed domains here - if request: - request_host = request.get_host().split(":")[0] # Remove port if present - disallowed_domains.append(request_host) - - # Check if hostname is a subdomain or exact match of any disallowed domain - if any(hostname == domain or hostname.endswith("." + domain) for domain in disallowed_domains): - raise serializers.ValidationError({"url": "URL domain or its subdomain is not allowed."}) - + self._validate_webhook_url(url) return super().update(instance, validated_data) class Meta: diff --git a/apps/api/plane/bgtasks/webhook_task.py b/apps/api/plane/bgtasks/webhook_task.py index 6543c3845b8..89d98757679 100644 --- a/apps/api/plane/bgtasks/webhook_task.py +++ b/apps/api/plane/bgtasks/webhook_task.py @@ -52,6 +52,7 @@ from plane.license.utils.instance_value import get_email_configuration from plane.utils.email import generate_plain_text_from_html from plane.utils.exception_logger import log_exception +from plane.utils.ip_address import validate_url from plane.settings.mongo import MongoConnection @@ -325,6 +326,9 @@ def webhook_send_task( return try: + # Re-validate the webhook URL at send time to prevent DNS-rebinding attacks + validate_url(webhook.url, allowed_ips=settings.WEBHOOK_ALLOWED_IPS) + # Send the webhook event response = requests.post(webhook.url, headers=headers, json=payload, timeout=30) diff --git a/apps/api/plane/settings/common.py b/apps/api/plane/settings/common.py index 9d651bd1b4c..d90ee10f891 100644 --- a/apps/api/plane/settings/common.py +++ b/apps/api/plane/settings/common.py @@ -5,6 +5,8 @@ """Global Settings""" # Python imports +import ipaddress +import logging import os from urllib.parse import urlparse from urllib.parse import urljoin @@ -32,6 +34,21 @@ # Self-hosted mode IS_SELF_MANAGED = True +# Webhook IP allowlist — comma-separated IPs or CIDR ranges that are allowed as +# webhook targets even if they resolve to private networks. +# Example: "10.0.0.0/8,192.168.1.0/24,172.16.0.5" +_webhook_allowed_ips_raw = os.environ.get("WEBHOOK_ALLOWED_IPS", "") +WEBHOOK_ALLOWED_IPS = [] +_logger = logging.getLogger("plane") +for _cidr in _webhook_allowed_ips_raw.split(","): + _cidr = _cidr.strip() + if not _cidr: + continue + try: + WEBHOOK_ALLOWED_IPS.append(ipaddress.ip_network(_cidr, strict=False)) + except ValueError: + _logger.warning("WEBHOOK_ALLOWED_IPS: skipping invalid entry %r", _cidr) + # Allowed Hosts ALLOWED_HOSTS = os.environ.get("ALLOWED_HOSTS", "*").split(",") diff --git a/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py b/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py index 2838260e890..67c61c6ccae 100644 --- a/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py +++ b/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py @@ -2,9 +2,12 @@ # SPDX-License-Identifier: AGPL-3.0-only # See the LICENSE file for details. +import ipaddress + import pytest from unittest.mock import patch, MagicMock from plane.bgtasks.work_item_link_task import safe_get, validate_url_ip +from plane.utils.ip_address import validate_url def _make_response(status_code=200, headers=None, is_redirect=False, content=b""): @@ -43,6 +46,49 @@ def test_allows_public_ip(self): validate_url_ip("https://example.com") # Should not raise +@pytest.mark.unit +class TestValidateUrlAllowlist: + """Test validate_url allowlist permits specific private IPs.""" + + def test_allowlist_permits_private_ip(self): + allowed = [ipaddress.ip_network("192.168.1.0/24")] + with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("192.168.1.50", 0))] + validate_url("http://example.com", allowed_ips=allowed) # Should not raise + + def test_allowlist_does_not_permit_other_private_ip(self): + allowed = [ipaddress.ip_network("192.168.1.0/24")] + with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("10.0.0.1", 0))] + with pytest.raises(ValueError, match="private/internal"): + validate_url("http://example.com", allowed_ips=allowed) + + def test_allowlist_permits_loopback_when_explicitly_allowed(self): + allowed = [ipaddress.ip_network("127.0.0.0/8")] + with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("127.0.0.1", 0))] + validate_url("http://example.com", allowed_ips=allowed) # Should not raise + + def test_allowlist_permits_matching_ipv4_with_mixed_version_networks(self): + allowed = [ + ipaddress.ip_network("2001:db8::/32"), + ipaddress.ip_network("192.168.1.0/24"), + ] + with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("192.168.1.50", 0))] + validate_url("http://example.com", allowed_ips=allowed) # Should not raise + + def test_allowlist_blocks_non_matching_ipv4_with_mixed_version_networks(self): + allowed = [ + ipaddress.ip_network("2001:db8::/32"), + ipaddress.ip_network("192.168.1.0/24"), + ] + with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("10.0.0.1", 0))] + with pytest.raises(ValueError, match="private/internal"): + validate_url("http://example.com", allowed_ips=allowed) + + @pytest.mark.unit class TestSafeGet: """Test safe_get follows redirects safely and blocks SSRF.""" diff --git a/apps/api/plane/utils/ip_address.py b/apps/api/plane/utils/ip_address.py index 3a0f171d793..4102ad6f4c2 100644 --- a/apps/api/plane/utils/ip_address.py +++ b/apps/api/plane/utils/ip_address.py @@ -2,6 +2,52 @@ # SPDX-License-Identifier: AGPL-3.0-only # See the LICENSE file for details. +# Python imports +import ipaddress +import socket +from urllib.parse import urlparse + + +def validate_url(url, allowed_ips=None): + """ + Validate that a URL doesn't resolve to a private/internal IP address (SSRF protection). + + Args: + url: The URL to validate. + allowed_ips: Optional list of ipaddress.ip_network objects. IPs falling within + these networks are permitted even if they are private/loopback/reserved. + Typically sourced from the WEBHOOK_ALLOWED_IPS setting. + + Raises: + ValueError: If the URL is invalid or resolves to a blocked IP. + """ + parsed = urlparse(url) + hostname = parsed.hostname + + if not hostname: + raise ValueError("Invalid URL: No hostname found") + + if parsed.scheme not in ("http", "https"): + raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed") + + try: + addr_info = socket.getaddrinfo(hostname, None) + except socket.gaierror: + raise ValueError("Hostname could not be resolved") + + if not addr_info: + raise ValueError("No IP addresses found for the hostname") + + for addr in addr_info: + ip = ipaddress.ip_address(addr[4][0]) + if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: + if allowed_ips and any( + network.version == ip.version and ip in network for network in allowed_ips + ): + continue + raise ValueError("Access to private/internal networks is not allowed") + + def get_client_ip(request): x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: