Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions galaxy/webui/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
and WebSocket endpoints for the Web UI.
"""

from galaxy.webui.routers.auth import router as auth_router
from galaxy.webui.routers.health import router as health_router
from galaxy.webui.routers.devices import router as devices_router
from galaxy.webui.routers.websocket import router as websocket_router

__all__ = [
"auth_router",
"health_router",
"devices_router",
"websocket_router",
Expand Down
35 changes: 35 additions & 0 deletions galaxy/webui/routers/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
Authentication router for Galaxy Web UI.

Provides a secure endpoint for the frontend to authenticate using the
API key (displayed in the server console) and receive a session token,
without exposing the API key in HTML responses.
"""

import secrets
from typing import Dict, Any

from fastapi import APIRouter, Depends

from galaxy.webui.dependencies import verify_api_key

router = APIRouter(tags=["auth"])


@router.post(
"/api/authenticate",
dependencies=[Depends(verify_api_key)],
)
async def authenticate() -> Dict[str, Any]:
"""
Authenticate with the API key and receive a confirmation.

The client must provide the API key via the X-API-Key header.
The key is displayed in the server console on startup.

:return: Dictionary confirming successful authentication
"""
return {"authenticated": True}
15 changes: 6 additions & 9 deletions galaxy/webui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from galaxy.core.events import get_event_bus
from galaxy.webui.dependencies import get_app_state
from galaxy.webui.routers import health_router, devices_router, websocket_router
from galaxy.webui.routers import auth_router, health_router, devices_router, websocket_router
from galaxy.webui.websocket_observer import WebSocketObserver

if TYPE_CHECKING:
Expand Down Expand Up @@ -104,6 +104,7 @@ async def lifespan(app: FastAPI):
)

# Include routers for different endpoint groups
app.include_router(auth_router)
app.include_router(health_router)
app.include_router(devices_router)
app.include_router(websocket_router)
Expand Down Expand Up @@ -137,6 +138,10 @@ async def root() -> HTMLResponse:
Attempts to serve the built React application if available,
otherwise returns a placeholder HTML page from templates.

The API key is NOT embedded in the HTML response for security.
The frontend must obtain authentication via the /api/authenticate
endpoint using the API key displayed in the server console.

:return: HTMLResponse containing the web UI or placeholder
"""
# Try to serve built React app first
Expand All @@ -145,14 +150,6 @@ async def root() -> HTMLResponse:
with open(frontend_index, "r", encoding="utf-8") as f:
content = f.read()

# Inject API key so the frontend can authenticate WS and HTTP requests
app_state = get_app_state()
api_key = app_state.api_key or ""
api_key_script = (
f'<script>window.__GALAXY_API_KEY__="{api_key}";</script>'
)
content = content.replace("</head>", f"{api_key_script}</head>", 1)

return HTMLResponse(
content=content,
status_code=200,
Expand Down
5 changes: 5 additions & 0 deletions ufo/automator/app_apis/excel/excelclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ufo.automator.app_apis.basic import WinCOMCommand, WinCOMReceiverBasic
from ufo.automator.basic import CommandBasic
from ufo.automator.path_validator import validate_save_path


class ExcelWinCOMReceiver(WinCOMReceiverBasic):
Expand Down Expand Up @@ -310,6 +311,10 @@ def save_as(
if not file_ext:
file_ext = ".csv"

# Validate the save directory to prevent path traversal
document_dir = os.path.dirname(self.com_object.FullName)
file_dir = validate_save_path(file_dir, document_dir)

file_path = os.path.join(file_dir, file_name + file_ext)

try:
Expand Down
5 changes: 5 additions & 0 deletions ufo/automator/app_apis/powerpoint/powerpointclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ufo.automator.app_apis.basic import WinCOMCommand, WinCOMReceiverBasic
from ufo.automator.basic import CommandBasic
from ufo.automator.path_validator import validate_save_path


class PowerPointWinCOMReceiver(WinCOMReceiverBasic):
Expand Down Expand Up @@ -116,6 +117,10 @@ def save_as(
if not file_ext:
file_ext = ".pptx"

# Validate the save directory to prevent path traversal
document_dir = os.path.dirname(self.com_object.FullName)
file_dir = validate_save_path(file_dir, document_dir)

file_path = os.path.join(file_dir, file_name + file_ext)

try:
Expand Down
83 changes: 83 additions & 0 deletions ufo/automator/app_apis/web/webclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,90 @@

from __future__ import annotations

import ipaddress
import logging
import socket
from typing import Any, Dict, Type
from urllib.parse import urlparse

import html2text
import requests

from ufo.automator.basic import CommandBasic, ReceiverBasic

logger = logging.getLogger(__name__)

# Private/reserved IP networks that should be blocked for SSRF protection
_BLOCKED_IP_NETWORKS = [
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("100.64.0.0/10"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.0.0.0/24"),
ipaddress.ip_network("192.0.2.0/24"),
ipaddress.ip_network("192.88.99.0/24"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("198.18.0.0/15"),
ipaddress.ip_network("198.51.100.0/24"),
ipaddress.ip_network("203.0.113.0/24"),
ipaddress.ip_network("224.0.0.0/4"),
ipaddress.ip_network("240.0.0.0/4"),
ipaddress.ip_network("255.255.255.255/32"),
# IPv6 private ranges
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10"),
]

# Only allow http and https schemes
_ALLOWED_SCHEMES = {"http", "https"}


def _validate_url(url: str) -> None:
"""
Validate a URL to prevent SSRF attacks.

Blocks requests to:
- Non-HTTP(S) schemes (e.g., file://, ftp://, gopher://)
- Private/internal IP addresses
- Cloud metadata endpoints (169.254.169.254)
- Loopback addresses

:param url: The URL to validate
:raises ValueError: If the URL is blocked for security reasons
"""
if not url:
raise ValueError("URL must not be empty")

parsed = urlparse(url)

# Block non-HTTP(S) schemes
if parsed.scheme.lower() not in _ALLOWED_SCHEMES:
raise ValueError(
f"URL scheme '{parsed.scheme}' is not allowed. "
f"Only {_ALLOWED_SCHEMES} are permitted."
)

hostname = parsed.hostname
if not hostname:
raise ValueError("URL must contain a valid hostname")

# Resolve hostname to IP address and check against blocked networks
try:
addr_infos = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise ValueError(f"Cannot resolve hostname: {hostname}")

for addr_info in addr_infos:
ip = ipaddress.ip_address(addr_info[4][0])
for network in _BLOCKED_IP_NETWORKS:
if ip in network:
raise ValueError(
f"Access to private/internal address {ip} is blocked"
)


class WebReceiver(ReceiverBasic):
"""
Expand Down Expand Up @@ -37,6 +114,9 @@ def web_crawler(self, url: str, ignore_link: bool) -> str:
"""

try:
# Validate URL to prevent SSRF
_validate_url(url)

# Get the HTML content of the webpage
response = requests.get(url, headers=self._headers)
response.raise_for_status()
Expand All @@ -61,6 +141,9 @@ def navigate_to_url(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
url = params.get("url")
try:
# Validate URL to prevent SSRF
_validate_url(url)

# For now, use requests to fetch the page
response = requests.get(url, headers=self._headers)
response.raise_for_status()
Expand Down
5 changes: 5 additions & 0 deletions ufo/automator/app_apis/word/wordclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ufo.automator.app_apis.basic import WinCOMCommand, WinCOMReceiverBasic
from ufo.automator.basic import CommandBasic
from ufo.automator.path_validator import validate_save_path


class WordWinCOMReceiver(WinCOMReceiverBasic):
Expand Down Expand Up @@ -178,6 +179,10 @@ def save_as(
if not file_ext:
file_ext = ".pdf"

# Validate the save directory to prevent path traversal
document_dir = os.path.dirname(self.com_object.FullName)
file_dir = validate_save_path(file_dir, document_dir)

file_path = os.path.join(file_dir, file_name + file_ext)

try:
Expand Down
125 changes: 125 additions & 0 deletions ufo/automator/path_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
Path validation utilities for preventing path traversal attacks (CWE-22).

Provides functions to validate and sanitize file paths, ensuring they
stay within allowed directories and don't traverse to sensitive locations.
"""

import os
import platform
from pathlib import Path
from typing import Optional, Sequence


# System-sensitive directories that should never be written to
_SENSITIVE_DIRS_WINDOWS = [
"C:\\Windows",
"C:\\Program Files",
"C:\\Program Files (x86)",
"C:\\ProgramData",
]

_SENSITIVE_DIRS_LINUX = [
"/bin",
"/sbin",
"/usr/bin",
"/usr/sbin",
"/etc",
"/boot",
"/dev",
"/proc",
"/sys",
"/var/run",
"/lib",
"/lib64",
]


def validate_path_within_base(
path_str: str,
base_directory: str,
) -> str:
"""
Resolve a path and ensure it stays within the base directory.

:param path_str: The path to validate (absolute or relative)
:param base_directory: The allowed base directory
:return: The resolved absolute path as a string
:raises ValueError: If the path resolves outside the base directory
"""
base = Path(base_directory).resolve()
if Path(path_str).is_absolute():
resolved = Path(path_str).resolve()
else:
resolved = (base / path_str).resolve()

if not (str(resolved).startswith(str(base) + os.sep) or resolved == base):
raise ValueError(
f"Path '{path_str}' resolves outside the allowed base directory '{base}'"
)
return str(resolved)


def validate_path_not_sensitive(path_str: str) -> str:
"""
Validate that a path does not point to a sensitive system directory.

:param path_str: The path to validate
:return: The resolved absolute path as a string
:raises ValueError: If the path targets a sensitive directory
"""
resolved = Path(path_str).resolve()
resolved_str = str(resolved)

if platform.system() == "Windows":
sensitive_dirs = _SENSITIVE_DIRS_WINDOWS
else:
sensitive_dirs = _SENSITIVE_DIRS_LINUX

for sensitive_dir in sensitive_dirs:
sensitive_resolved = str(Path(sensitive_dir).resolve())
if resolved_str.lower().startswith(sensitive_resolved.lower()):
raise ValueError(
f"Path '{path_str}' targets a sensitive system directory: {sensitive_dir}"
)

return str(resolved)


def validate_save_path(
file_dir: str,
document_dir: Optional[str] = None,
) -> str:
"""
Validate a directory path for file save operations.

Ensures the path:
- Does not contain path traversal sequences
- Does not target sensitive system directories
- Is within the document's directory or a user-writable location

:param file_dir: The target directory for saving
:param document_dir: The directory of the source document (optional)
:return: The resolved absolute directory path
:raises ValueError: If the path is not safe for saving
"""
if not file_dir:
if document_dir:
return str(Path(document_dir).resolve())
return os.getcwd()

resolved = Path(file_dir).resolve()

# Block path traversal sequences in the raw input
if ".." in Path(file_dir).parts:
raise ValueError(
f"Path '{file_dir}' contains directory traversal sequences"
)

# Block sensitive directories
validate_path_not_sensitive(str(resolved))

return str(resolved)
Loading