diff --git a/apps/api/plane/bgtasks/work_item_link_task.py b/apps/api/plane/bgtasks/work_item_link_task.py index 442396c7f02..5cf0fbb1908 100644 --- a/apps/api/plane/bgtasks/work_item_link_task.py +++ b/apps/api/plane/bgtasks/work_item_link_task.py @@ -13,7 +13,7 @@ from urllib.parse import urlparse, urljoin import base64 import ipaddress -from typing import Dict, Any +from typing import Dict, Any, Tuple from typing import Optional from plane.db.models import IssueLink from plane.utils.exception_logger import log_exception @@ -66,6 +66,52 @@ def validate_url_ip(url: str) -> None: MAX_REDIRECTS = 5 +def safe_get( + url: str, + headers: Optional[Dict[str, str]] = None, + timeout: int = 1, +) -> Tuple[requests.Response, str]: + """ + Perform a GET request that validates every redirect hop against private IPs. + Prevents SSRF by ensuring no redirect lands on a private/internal address. + + Args: + url: The URL to fetch + headers: Optional request headers + timeout: Request timeout in seconds + + Returns: + A tuple of (final Response object, final URL after redirects) + + Raises: + ValueError: If any URL in the redirect chain points to a private IP + requests.RequestException: On network errors + RuntimeError: If max redirects exceeded + """ + validate_url_ip(url) + + current_url = url + response = requests.get( + current_url, headers=headers, timeout=timeout, allow_redirects=False + ) + + redirect_count = 0 + while response.is_redirect: + if redirect_count >= MAX_REDIRECTS: + raise RuntimeError(f"Too many redirects for URL: {url}") + redirect_url = response.headers.get("Location") + if not redirect_url: + break + current_url = urljoin(current_url, redirect_url) + validate_url_ip(current_url) + redirect_count += 1 + response = requests.get( + current_url, headers=headers, timeout=timeout, allow_redirects=False + ) + + return response, current_url + + def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: """ Crawls a URL to extract the title and favicon. @@ -86,26 +132,8 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: title = None final_url = url - validate_url_ip(final_url) - try: - # Manually follow redirects to validate each URL before requesting - redirect_count = 0 - response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False) - - while response.is_redirect and redirect_count < MAX_REDIRECTS: - redirect_url = response.headers.get("Location") - if not redirect_url: - break - # Resolve relative redirects against current URL - final_url = urljoin(final_url, redirect_url) - # Validate the redirect target BEFORE making the request - validate_url_ip(final_url) - redirect_count += 1 - response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False) - - if redirect_count >= MAX_REDIRECTS: - logger.warning(f"Too many redirects for URL: {url}") + response, final_url = safe_get(url, headers=headers) soup = BeautifulSoup(response.content, "html.parser") title_tag = soup.find("title") @@ -113,8 +141,10 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: except requests.RequestException as e: logger.warning(f"Failed to fetch HTML for title: {str(e)}") + except (ValueError, RuntimeError) as e: + logger.warning(f"URL validation failed: {str(e)}") - # Fetch and encode favicon using final URL (after redirects) + # Fetch and encode favicon using final URL (after redirects) for correct relative href resolution favicon_base64 = fetch_and_encode_favicon(headers, soup, final_url) # Prepare result @@ -204,9 +234,7 @@ def fetch_and_encode_favicon( "favicon_base64": f"data:image/svg+xml;base64,{DEFAULT_FAVICON}", } - validate_url_ip(favicon_url) - - response = requests.get(favicon_url, headers=headers, timeout=1) + response, _ = safe_get(favicon_url, headers=headers) # Get content type content_type = response.headers.get("content-type", "image/x-icon") diff --git a/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py b/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py new file mode 100644 index 00000000000..2838260e890 --- /dev/null +++ b/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py @@ -0,0 +1,126 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +import pytest +from unittest.mock import patch, MagicMock +from plane.bgtasks.work_item_link_task import safe_get, validate_url_ip + + +def _make_response(status_code=200, headers=None, is_redirect=False, content=b""): + """Create a mock requests.Response.""" + resp = MagicMock() + resp.status_code = status_code + resp.is_redirect = is_redirect + resp.headers = headers or {} + resp.content = content + return resp + + +@pytest.mark.unit +class TestValidateUrlIp: + """Test validate_url_ip blocks private/internal IPs.""" + + def test_rejects_private_ip(self): + with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("192.168.1.1", 0))] + with pytest.raises(ValueError, match="private/internal"): + validate_url_ip("http://example.com") + + def test_rejects_loopback(self): + with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("127.0.0.1", 0))] + with pytest.raises(ValueError, match="private/internal"): + validate_url_ip("http://example.com") + + def test_rejects_non_http_scheme(self): + with pytest.raises(ValueError, match="Only HTTP and HTTPS"): + validate_url_ip("file:///etc/passwd") + + def test_allows_public_ip(self): + with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(None, None, None, None, ("93.184.216.34", 0))] + validate_url_ip("https://example.com") # Should not raise + + +@pytest.mark.unit +class TestSafeGet: + """Test safe_get follows redirects safely and blocks SSRF.""" + + @patch("plane.bgtasks.work_item_link_task.requests.get") + @patch("plane.bgtasks.work_item_link_task.validate_url_ip") + def test_returns_response_for_non_redirect(self, mock_validate, mock_get): + final_resp = _make_response(status_code=200, content=b"OK") + mock_get.return_value = final_resp + + response, final_url = safe_get("https://example.com") + + assert response is final_resp + assert final_url == "https://example.com" + mock_validate.assert_called_once_with("https://example.com") + + @patch("plane.bgtasks.work_item_link_task.requests.get") + @patch("plane.bgtasks.work_item_link_task.validate_url_ip") + def test_follows_redirect_and_validates_each_hop(self, mock_validate, mock_get): + redirect_resp = _make_response( + status_code=301, + is_redirect=True, + headers={"Location": "https://other.com/page"}, + ) + final_resp = _make_response(status_code=200, content=b"OK") + mock_get.side_effect = [redirect_resp, final_resp] + + response, final_url = safe_get("https://example.com") + + assert response is final_resp + assert final_url == "https://other.com/page" + # Should validate both the initial URL and the redirect target + assert mock_validate.call_count == 2 + mock_validate.assert_any_call("https://example.com") + mock_validate.assert_any_call("https://other.com/page") + + @patch("plane.bgtasks.work_item_link_task.requests.get") + @patch("plane.bgtasks.work_item_link_task.validate_url_ip") + def test_blocks_redirect_to_private_ip(self, mock_validate, mock_get): + redirect_resp = _make_response( + status_code=302, + is_redirect=True, + headers={"Location": "http://192.168.1.1:8080"}, + ) + mock_get.return_value = redirect_resp + # First call (initial URL) succeeds, second call (redirect target) fails + mock_validate.side_effect = [None, ValueError("Access to private/internal networks is not allowed")] + + with pytest.raises(ValueError, match="private/internal"): + safe_get("https://evil.com/redirect") + + @patch("plane.bgtasks.work_item_link_task.requests.get") + @patch("plane.bgtasks.work_item_link_task.validate_url_ip") + def test_raises_on_too_many_redirects(self, mock_validate, mock_get): + redirect_resp = _make_response( + status_code=302, + is_redirect=True, + headers={"Location": "https://example.com/loop"}, + ) + mock_get.return_value = redirect_resp + + with pytest.raises(RuntimeError, match="Too many redirects"): + safe_get("https://example.com/start") + + @patch("plane.bgtasks.work_item_link_task.requests.get") + @patch("plane.bgtasks.work_item_link_task.validate_url_ip") + def test_succeeds_at_exact_max_redirects(self, mock_validate, mock_get): + """After exactly MAX_REDIRECTS hops, if the final response is 200, it should succeed.""" + redirect_resp = _make_response( + status_code=302, + is_redirect=True, + headers={"Location": "https://example.com/next"}, + ) + final_resp = _make_response(status_code=200, content=b"OK") + # 5 redirects then a 200 + mock_get.side_effect = [redirect_resp] * 5 + [final_resp] + + response, final_url = safe_get("https://example.com/start") + + assert response is final_resp + assert not response.is_redirect