diff --git a/.env.example b/.env.example index a36dc89..849bc9f 100644 --- a/.env.example +++ b/.env.example @@ -77,6 +77,7 @@ FEATURE_FILES_PANEL_ENABLED=true # Uploaded/session files panel FEATURE_CHAT_HISTORY_ENABLED=false # Previous chat history list FEATURE_COMPLIANCE_LEVELS_ENABLED=false # Compliance level filtering for MCP servers and data sources FEATURE_SPLASH_SCREEN_ENABLED=false # Startup splash screen for displaying policies and information +FEATURE_DOMAIN_WHITELIST_ENABLED=false # Restrict access to whitelisted email domains (config/defaults/domain-whitelist.json) # (Adjust above to stage rollouts. For a bare-bones chat set them all to false.) diff --git a/backend/core/domain_whitelist.py b/backend/core/domain_whitelist.py new file mode 100644 index 0000000..2cef168 --- /dev/null +++ b/backend/core/domain_whitelist.py @@ -0,0 +1,151 @@ +""" +Domain whitelist management for email access control. + +Loads domain whitelist definitions from domain-whitelist.json and provides +validation for user email domains. +""" + +import json +import logging +from pathlib import Path +from typing import Optional, Set +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + + +@dataclass +class DomainWhitelistConfig: + """Configuration for domain whitelist.""" + enabled: bool + domains: Set[str] + subdomain_matching: bool + version: str + description: str + + +class DomainWhitelistManager: + """Manages domain whitelist configuration and validation.""" + + def __init__(self, config_path: Optional[Path] = None): + """Initialize the domain whitelist manager. + + Args: + config_path: Path to domain-whitelist.json. If None, uses default location. + """ + self.config: Optional[DomainWhitelistConfig] = None + + if config_path is None: + # Try to find config in standard locations + backend_root = Path(__file__).parent.parent + project_root = backend_root.parent + + search_paths = [ + project_root / "config" / "overrides" / "domain-whitelist.json", + project_root / "config" / "defaults" / "domain-whitelist.json", + backend_root / "configfilesadmin" / "domain-whitelist.json", + backend_root / "configfiles" / "domain-whitelist.json", + ] + + for path in search_paths: + if path.exists(): + config_path = path + break + + if config_path and config_path.exists(): + self._load_config(config_path) + else: + logger.warning("No domain-whitelist.json found, domain whitelist disabled") + self.config = DomainWhitelistConfig( + enabled=False, + domains=set(), + subdomain_matching=True, + version="1.0", + description="No config loaded" + ) + + def _load_config(self, config_path: Path): + """Load domain whitelist configuration from JSON file.""" + try: + with open(config_path, 'r', encoding='utf-8') as f: + config_data = json.load(f) + + # Extract domains from the list of domain objects + domains = set() + for domain_entry in config_data.get('domains', []): + if isinstance(domain_entry, dict): + domains.add(domain_entry.get('domain', '').lower()) + elif isinstance(domain_entry, str): + domains.add(domain_entry.lower()) + + self.config = DomainWhitelistConfig( + enabled=config_data.get('enabled', False), + domains=domains, + subdomain_matching=config_data.get('subdomain_matching', True), + version=config_data.get('version', '1.0'), + description=config_data.get('description', '') + ) + + logger.info(f"Loaded {len(self.config.domains)} domains from {config_path}") + logger.debug(f"Domain whitelist enabled: {self.config.enabled}") + + except Exception as e: + logger.error(f"Error loading domain-whitelist.json: {e}") + # Use disabled config on error + self.config = DomainWhitelistConfig( + enabled=False, + domains=set(), + subdomain_matching=True, + version="1.0", + description="Error loading config" + ) + + def is_enabled(self) -> bool: + """Check if domain whitelist is enabled. + + Returns: + True if enabled, False otherwise + """ + return self.config is not None and self.config.enabled + + def is_domain_allowed(self, email: str) -> bool: + """Check if an email address is from an allowed domain. + + Args: + email: Email address to validate + + Returns: + True if domain is allowed, False otherwise + """ + if not self.config or not self.config.enabled: + # If not enabled or no config, allow all + return True + + if not email or "@" not in email: + return False + + domain = email.split("@", 1)[1].lower() + + # Check if domain is in whitelist (O(1) lookup) + if domain in self.config.domains: + return True + + # Check subdomains if enabled - check each parent level + if self.config.subdomain_matching: + # Split domain and check each parent level + # e.g., for "mail.dept.sandia.gov" check: "dept.sandia.gov", "sandia.gov" + parts = domain.split(".") + for i in range(1, len(parts)): + parent_domain = ".".join(parts[i:]) + if parent_domain in self.config.domains: + return True + + return False + + def get_domains(self) -> Set[str]: + """Get the set of whitelisted domains. + + Returns: + Set of allowed domains + """ + return self.config.domains if self.config else set() diff --git a/backend/core/domain_whitelist_middleware.py b/backend/core/domain_whitelist_middleware.py new file mode 100644 index 0000000..74b3714 --- /dev/null +++ b/backend/core/domain_whitelist_middleware.py @@ -0,0 +1,88 @@ +"""Email domain whitelist validation middleware. + +This middleware enforces that users must have email addresses from whitelisted +domains. Configuration is loaded from domain-whitelist.json and can be +enabled/disabled via the FEATURE_DOMAIN_WHITELIST_ENABLED feature flag. +""" + +import logging +from fastapi import Request +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.responses import JSONResponse, RedirectResponse, Response + +from core.domain_whitelist import DomainWhitelistManager + +logger = logging.getLogger(__name__) + + +class DomainWhitelistMiddleware(BaseHTTPMiddleware): + """Middleware to enforce email domain whitelist restrictions.""" + + def __init__(self, app, auth_redirect_url: str = "/auth"): + """Initialize domain whitelist middleware. + + Args: + app: ASGI application + auth_redirect_url: URL to redirect to on auth failure (default: /auth) + """ + super().__init__(app) + self.auth_redirect_url = auth_redirect_url + self.whitelist_manager = DomainWhitelistManager() + + if self.whitelist_manager.is_enabled(): + logger.info(f"Domain whitelist enabled with {len(self.whitelist_manager.get_domains())} domains") + else: + logger.info("Domain whitelist disabled") + + async def dispatch(self, request: Request, call_next) -> Response: + """Check if user email is from a whitelisted domain. + + Args: + request: Incoming HTTP request + call_next: Next middleware/handler in chain + + Returns: + Response from next handler if authorized, or 403/redirect if not + """ + # Skip check for health endpoint and auth redirect endpoint + if request.url.path == '/api/health' or request.url.path == self.auth_redirect_url: + return await call_next(request) + + # If whitelist is not enabled in config, allow all + if not self.whitelist_manager.is_enabled(): + return await call_next(request) + + # Get email from request state (set by AuthMiddleware) + email = getattr(request.state, "user_email", None) + + if not email or "@" not in email: + logger.warning("Domain whitelist check failed: missing or invalid email") + return self._unauthorized_response(request, "User email required") + + # Check if domain is allowed + if not self.whitelist_manager.is_domain_allowed(email): + domain = email.split("@", 1)[1].lower() + logger.warning(f"Domain whitelist check failed: unauthorized domain {domain}") + return self._unauthorized_response( + request, + "Access restricted to whitelisted domains" + ) + + return await call_next(request) + + def _unauthorized_response(self, request: Request, detail: str) -> Response: + """Return appropriate unauthorized response based on endpoint type. + + Args: + request: Incoming HTTP request + detail: Error detail message + + Returns: + JSONResponse for API endpoints, RedirectResponse for others + """ + if request.url.path.startswith('/api/'): + return JSONResponse( + status_code=403, + content={"detail": detail} + ) + return RedirectResponse(url=self.auth_redirect_url, status_code=302) diff --git a/backend/main.py b/backend/main.py index 6523396..6e785fe 100644 --- a/backend/main.py +++ b/backend/main.py @@ -27,6 +27,7 @@ from core.middleware import AuthMiddleware from core.rate_limit_middleware import RateLimitMiddleware from core.security_headers_middleware import SecurityHeadersMiddleware +from core.domain_whitelist_middleware import DomainWhitelistMiddleware from core.otel_config import setup_opentelemetry from core.utils import sanitize_for_logging from core.auth import get_user_from_header @@ -132,6 +133,12 @@ async def lifespan(app: FastAPI): """ app.add_middleware(SecurityHeadersMiddleware) app.add_middleware(RateLimitMiddleware) +# Domain whitelist check (if enabled) - add before Auth so it runs after +if config.app_settings.feature_domain_whitelist_enabled: + app.add_middleware( + DomainWhitelistMiddleware, + auth_redirect_url=config.app_settings.auth_redirect_url + ) app.add_middleware( AuthMiddleware, debug_mode=config.app_settings.debug_mode, diff --git a/backend/modules/config/config_manager.py b/backend/modules/config/config_manager.py index 255309f..c462931 100644 --- a/backend/modules/config/config_manager.py +++ b/backend/modules/config/config_manager.py @@ -285,6 +285,12 @@ def agent_mode_available(self) -> bool: description="Enable compliance level filtering for MCP servers and data sources", validation_alias=AliasChoices("FEATURE_COMPLIANCE_LEVELS_ENABLED"), ) + # Email domain whitelist feature gate + feature_domain_whitelist_enabled: bool = Field( + False, + description="Enable email domain whitelist restriction (configured in domain-whitelist.json)", + validation_alias=AliasChoices("FEATURE_DOMAIN_WHITELIST_ENABLED", "FEATURE_DOE_LAB_CHECK_ENABLED"), + ) # Capability tokens (for headless access to downloads/iframes) capability_token_secret: str = "" diff --git a/backend/tests/test_domain_whitelist.py b/backend/tests/test_domain_whitelist.py new file mode 100644 index 0000000..227430b --- /dev/null +++ b/backend/tests/test_domain_whitelist.py @@ -0,0 +1,246 @@ +"""Tests for domain whitelist middleware.""" + +import json +import pytest +import tempfile +from pathlib import Path +from fastapi import FastAPI + +from core.domain_whitelist_middleware import DomainWhitelistMiddleware +from core.domain_whitelist import DomainWhitelistManager + + +@pytest.fixture +def temp_config(): + """Create a temporary config file for testing.""" + config_data = { + "version": "1.0", + "description": "Test config", + "enabled": True, + "domains": [ + {"domain": "sandia.gov", "description": "Sandia National Labs"}, + {"domain": "doe.gov", "description": "DOE"}, + {"domain": "example.org", "description": "Example"}, + ], + "subdomain_matching": True + } + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(config_data, f) + temp_path = Path(f.name) + + yield temp_path + + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + +@pytest.fixture +def disabled_config(): + """Create a config file with whitelist disabled.""" + config_data = { + "version": "1.0", + "description": "Disabled config", + "enabled": False, + "domains": [ + {"domain": "sandia.gov", "description": "Sandia National Labs"}, + ], + "subdomain_matching": True + } + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(config_data, f) + temp_path = Path(f.name) + + yield temp_path + + if temp_path.exists(): + temp_path.unlink() + + +class TestDomainWhitelistManager: + """Test the domain whitelist manager.""" + + def test_load_config(self, temp_config): + """Test loading configuration from file.""" + manager = DomainWhitelistManager(config_path=temp_config) + + assert manager.is_enabled() is True + assert "sandia.gov" in manager.get_domains() + assert "doe.gov" in manager.get_domains() + assert "example.org" in manager.get_domains() + assert len(manager.get_domains()) == 3 + + def test_disabled_config(self, disabled_config): + """Test that disabled config doesn't enforce whitelist.""" + manager = DomainWhitelistManager(config_path=disabled_config) + + assert manager.is_enabled() is False + # Even though disabled, should allow all + assert manager.is_domain_allowed("user@gmail.com") is True + + def test_domain_matching(self, temp_config): + """Test domain matching logic.""" + manager = DomainWhitelistManager(config_path=temp_config) + + # Exact matches + assert manager.is_domain_allowed("user@sandia.gov") is True + assert manager.is_domain_allowed("user@doe.gov") is True + + # Subdomain matches + assert manager.is_domain_allowed("user@mail.sandia.gov") is True + assert manager.is_domain_allowed("user@sub.doe.gov") is True + + # Invalid domains + assert manager.is_domain_allowed("user@gmail.com") is False + assert manager.is_domain_allowed("user@sandia.com") is False # Wrong TLD + + def test_invalid_email(self, temp_config): + """Test handling of invalid email addresses.""" + manager = DomainWhitelistManager(config_path=temp_config) + + assert manager.is_domain_allowed("notanemail") is False + assert manager.is_domain_allowed("") is False + assert manager.is_domain_allowed("no-at-sign.com") is False + + +@pytest.fixture +def create_middleware(): + """Factory fixture to create middleware with custom config.""" + from starlette.middleware.base import BaseHTTPMiddleware + + def _create(config_path): + app = FastAPI() + + # Monkey-patch to use custom config + original_init = DomainWhitelistMiddleware.__init__ + def patched_init(self, app, auth_redirect_url="/auth"): + BaseHTTPMiddleware.__init__(self, app) + self.auth_redirect_url = auth_redirect_url + self.whitelist_manager = DomainWhitelistManager(config_path=config_path) + + DomainWhitelistMiddleware.__init__ = patched_init + middleware = DomainWhitelistMiddleware(app) + DomainWhitelistMiddleware.__init__ = original_init + + return middleware + + return _create + + +class TestDomainWhitelistMiddleware: + """Test domain whitelist middleware.""" + + def test_middleware_with_allowed_domain(self, temp_config, create_middleware): + """Test that allowed domains pass through.""" + from starlette.requests import Request + from starlette.responses import Response + + middleware = create_middleware(temp_config) + + async def call_next(request): + return Response("OK", status_code=200) + + async def test_request(): + scope = { + "type": "http", + "method": "GET", + "path": "/api/test", + "query_string": b"", + "headers": [], + "state": {}, + } + request = Request(scope) + request.state.user_email = "test@sandia.gov" + + response = await middleware.dispatch(request, call_next) + assert response.status_code == 200 + + import asyncio + asyncio.run(test_request()) + + def test_middleware_with_disallowed_domain(self, temp_config, create_middleware): + """Test that disallowed domains are blocked.""" + from starlette.requests import Request + from starlette.responses import Response + + middleware = create_middleware(temp_config) + + async def call_next(request): + return Response("OK", status_code=200) + + async def test_request(): + scope = { + "type": "http", + "method": "GET", + "path": "/api/test", + "query_string": b"", + "headers": [], + "state": {}, + } + request = Request(scope) + request.state.user_email = "test@gmail.com" + + response = await middleware.dispatch(request, call_next) + assert response.status_code == 403 + + import asyncio + asyncio.run(test_request()) + + def test_middleware_disabled(self, disabled_config, create_middleware): + """Test that disabled config allows all domains.""" + from starlette.requests import Request + from starlette.responses import Response + + middleware = create_middleware(disabled_config) + + async def call_next(request): + return Response("OK", status_code=200) + + async def test_request(): + scope = { + "type": "http", + "method": "GET", + "path": "/api/test", + "query_string": b"", + "headers": [], + "state": {}, + } + request = Request(scope) + request.state.user_email = "test@gmail.com" + + # Should pass even though gmail.com is not in whitelist + response = await middleware.dispatch(request, call_next) + assert response.status_code == 200 + + import asyncio + asyncio.run(test_request()) + + def test_health_endpoint_bypass(self, temp_config, create_middleware): + """Test that health endpoint bypasses whitelist check.""" + from starlette.requests import Request + from starlette.responses import Response + + middleware = create_middleware(temp_config) + + async def call_next(request): + return Response("OK", status_code=200) + + async def test_request(): + scope = { + "type": "http", + "method": "GET", + "path": "/api/health", + "query_string": b"", + "headers": [], + "state": {}, + } + request = Request(scope) + # No email - should still pass for health check + + response = await middleware.dispatch(request, call_next) + assert response.status_code == 200 + + import asyncio + asyncio.run(test_request()) diff --git a/config/defaults/domain-whitelist.json b/config/defaults/domain-whitelist.json new file mode 100644 index 0000000..f95c131 --- /dev/null +++ b/config/defaults/domain-whitelist.json @@ -0,0 +1,124 @@ +{ + "version": "1.0", + "description": "Email domain whitelist for user access control. When enabled, only users with email addresses from whitelisted domains can access the application.", + "enabled": false, + "domains": [ + { + "domain": "doe.gov", + "description": "Department of Energy", + "category": "Government - DOE HQ" + }, + { + "domain": "nnsa.doe.gov", + "description": "National Nuclear Security Administration", + "category": "Government - DOE HQ" + }, + { + "domain": "hq.doe.gov", + "description": "DOE Headquarters", + "category": "Government - DOE HQ" + }, + { + "domain": "anl.gov", + "description": "Argonne National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "bnl.gov", + "description": "Brookhaven National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "fnal.gov", + "description": "Fermi National Accelerator Laboratory", + "category": "National Laboratory" + }, + { + "domain": "inl.gov", + "description": "Idaho National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "lbl.gov", + "description": "Lawrence Berkeley National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "lanl.gov", + "description": "Los Alamos National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "llnl.gov", + "description": "Lawrence Livermore National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "ornl.gov", + "description": "Oak Ridge National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "pnnl.gov", + "description": "Pacific Northwest National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "sandia.gov", + "description": "Sandia National Laboratories", + "category": "National Laboratory" + }, + { + "domain": "srnl.doe.gov", + "description": "Savannah River National Laboratory", + "category": "National Laboratory" + }, + { + "domain": "ameslab.gov", + "description": "Ames Laboratory", + "category": "National Laboratory" + }, + { + "domain": "jlab.org", + "description": "Thomas Jefferson National Accelerator Facility", + "category": "National Laboratory" + }, + { + "domain": "princeton.edu", + "description": "Princeton University (PPPL host institution)", + "category": "University" + }, + { + "domain": "pppl.gov", + "description": "Princeton Plasma Physics Laboratory", + "category": "National Laboratory" + }, + { + "domain": "slac.stanford.edu", + "description": "SLAC National Accelerator Laboratory", + "category": "National Laboratory" + }, + { + "domain": "pppl.gov", + "description": "Princeton Plasma Physics Laboratory", + "category": "National Laboratory" + }, + { + "domain": "nrel.gov", + "description": "National Renewable Energy Laboratory", + "category": "National Laboratory" + }, + { + "domain": "netl.doe.gov", + "description": "National Energy Technology Laboratory", + "category": "National Laboratory" + }, + { + "domain": "stanford.edu", + "description": "Stanford University (SLAC host institution)", + "category": "University" + } + ], + "subdomain_matching": true, + "subdomain_matching_description": "When true, subdomains are automatically allowed (e.g., mail.sandia.gov matches sandia.gov)" +} diff --git a/docs/admin/domain-whitelist.md b/docs/admin/domain-whitelist.md new file mode 100644 index 0000000..8dff517 --- /dev/null +++ b/docs/admin/domain-whitelist.md @@ -0,0 +1,154 @@ +# Email Domain Whitelist Configuration + +This configuration controls which email domains are allowed to access the application. + +## Overview + +The domain whitelist feature allows you to restrict access to users with email addresses from specific domains. This is useful for: +- Restricting access to government organizations (DOE, NNSA, national labs) +- Limiting access to specific companies or institutions +- Implementing multi-tenant access control + +## Configuration Files + +### Default Configuration +Located at: `config/defaults/domain-whitelist.json` + +Contains DOE and national laboratory domains as an example. This file should not be modified directly. + +### Custom Configuration +To customize domains, create: `config/overrides/domain-whitelist.json` + +The override file takes precedence over the default configuration. + +## Configuration Format + +```json +{ + "version": "1.0", + "description": "Your description here", + "enabled": true, + "domains": [ + { + "domain": "example.com", + "description": "Example Corporation", + "category": "Enterprise" + }, + { + "domain": "another-domain.org", + "description": "Another Organization", + "category": "Partner" + } + ], + "subdomain_matching": true +} +``` + +### Fields + +- **version**: Configuration schema version (currently "1.0") +- **description**: Human-readable description of this configuration +- **enabled**: Whether the whitelist is enforced (true/false) + - Note: Even if true here, must also set `FEATURE_DOMAIN_WHITELIST_ENABLED=true` in environment +- **domains**: Array of domain objects + - **domain**: The email domain (e.g., "example.com") + - **description**: Optional description + - **category**: Optional category for organization +- **subdomain_matching**: If true, subdomains are automatically allowed + - Example: If "example.com" is whitelisted and subdomain_matching is true, then "user@mail.example.com" is also allowed + +## Enabling the Feature + +1. Create your custom configuration at `config/overrides/domain-whitelist.json` +2. Set `"enabled": true` in the config file +3. Set environment variable: `FEATURE_DOMAIN_WHITELIST_ENABLED=true` +4. Restart the application + +## Example Configurations + +### Example 1: DOE National Labs (Default) +```json +{ + "enabled": true, + "domains": [ + {"domain": "doe.gov", "description": "Department of Energy"}, + {"domain": "sandia.gov", "description": "Sandia National Labs"}, + {"domain": "lanl.gov", "description": "Los Alamos National Lab"} + ], + "subdomain_matching": true +} +``` + +### Example 2: Corporate Domains +```json +{ + "enabled": true, + "domains": [ + {"domain": "mycompany.com", "description": "My Company"}, + {"domain": "partner-company.org", "description": "Trusted Partner"} + ], + "subdomain_matching": true +} +``` + +### Example 3: Educational Institutions +```json +{ + "enabled": true, + "domains": [ + {"domain": "university.edu", "description": "University"}, + {"domain": "research-institute.org", "description": "Research Institute"} + ], + "subdomain_matching": true +} +``` + +## Behavior + +### When Enabled +- Users with email addresses from whitelisted domains can access the application +- Users with other email domains receive a 403 Forbidden error (API) or redirect (UI) +- Health check endpoint (`/api/health`) bypasses the check +- Authentication endpoint bypasses the check + +### When Disabled +- All authenticated users can access the application regardless of email domain +- No domain filtering is performed + +## Subdomain Matching + +When `subdomain_matching` is `true`: +- `user@example.com` matches `example.com` ✓ +- `user@mail.example.com` matches `example.com` ✓ +- `user@dept.mail.example.com` matches `example.com` ✓ + +When `subdomain_matching` is `false`: +- `user@example.com` matches `example.com` ✓ +- `user@mail.example.com` does NOT match `example.com` ✗ + +## Troubleshooting + +### Issue: Users are being blocked unexpectedly +- Check that `enabled` is set correctly in the config file +- Verify `FEATURE_DOMAIN_WHITELIST_ENABLED` environment variable +- Check domain spelling in the config file (case-insensitive) +- Check if subdomain_matching is set as needed + +### Issue: Configuration changes not taking effect +- Restart the application after changing config files +- Verify the override file is at `config/overrides/domain-whitelist.json` +- Check application logs for config loading errors + +### Issue: Everyone can access (no filtering) +- Verify `FEATURE_DOMAIN_WHITELIST_ENABLED=true` in environment +- Check that `enabled: true` in the config file +- Restart the application after making changes + +## Logging + +The middleware logs helpful information: +- On startup: Number of domains loaded and enabled status +- On rejection: Domain that was rejected (for debugging) +- On error: Config loading errors + +Check application logs for domain whitelist messages.