diff --git a/Cargo.lock b/Cargo.lock index 16a873a..99244cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1997,6 +1997,7 @@ dependencies = [ "plotters", "rmcp", "schemars", + "semver", "serde", "serde_json", "sqlformat", diff --git a/hyperdb-mcp/CHANGELOG.md b/hyperdb-mcp/CHANGELOG.md index d5dea10..6936ff4 100644 --- a/hyperdb-mcp/CHANGELOG.md +++ b/hyperdb-mcp/CHANGELOG.md @@ -18,12 +18,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/). ### Added +- The `status` tool now reports an `engine` block with the backing `hyperd` + connection: `mode` (`daemon` or `local`), `hyperd_endpoint` (the libpq + endpoint queries run against), and `daemon_health_port` (the shared daemon's + control/lock port, `null` in local mode). - **Single-instance `hyperd` daemon** — by default, all MCP clients now share one `hyperd` process per user instead of each spawning their own. Multiple AI clients (Claude Code, Cursor, VS Code Copilot, etc.) can access the same persistent databases simultaneously with reduced resource overhead. The daemon auto-spawns on first client connect and - shuts down after 30 minutes idle. Pass `--no-daemon` to opt out. + stays resident (idle shutdown is opt-in — see below). Pass `--no-daemon` + to opt out. +- **Identity-checked daemon discovery.** Clients verify a daemon by sending + `PING` and requiring a `PONG hyperdb-mcp ` reply (matched on exact + tokens, not a string prefix) before trusting it — a TCP connection alone is + no longer sufficient, so an unrelated process occupying the port is no + longer mistaken for the daemon. +- **Port scanning.** The daemon health/lock port now defaults to scanning + upward from **7485** (16 ports), using the first free one; the old fixed + default 7484 collided with `hyperd`'s conventional gRPC port. Set + `HYPERDB_DAEMON_PORT` to pin an exact port (disables scanning). `daemon + status` / `daemon stop` locate the daemon via discovery + scan, so they + work regardless of which port it landed on. +- **Newer-client version takeover.** A starting client built from a strictly + newer `hyperdb-mcp` version stops and replaces an older running daemon + (and its `hyperd`), so upgrades take effect immediately instead of waiting + for the old daemon to exit. Equal or older versions reuse the daemon. +- **Daemon stays resident by default.** Idle shutdown is now opt-in via + `--idle-timeout ` or `HYPERDB_DAEMON_IDLE_TIMEOUT`; with neither set + the daemon (and `hyperd`) stay warm, eliminating the connection error and + "hyper restarting, please retry" round-trip a client previously hit after + a 30-minute idle shutdown. - New `daemon` subcommand: `hyperdb-mcp daemon status` / `daemon stop`. - New environment variables: `HYPERDB_STATE_DIR`, `HYPERDB_DAEMON_PORT`, `HYPERDB_DAEMON_IDLE_TIMEOUT`. diff --git a/hyperdb-mcp/Cargo.toml b/hyperdb-mcp/Cargo.toml index d565630..c2a9891 100644 --- a/hyperdb-mcp/Cargo.toml +++ b/hyperdb-mcp/Cargo.toml @@ -27,6 +27,7 @@ rmcp = { version = "1.7", features = ["server", "transport-io"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-std", "signal", "time"] } serde = { workspace = true } serde_json = { workspace = true, features = ["preserve_order"] } +semver = "1" clap = { version = "4", features = ["derive"] } tracing = { workspace = true } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/hyperdb-mcp/DEVELOPMENT.md b/hyperdb-mcp/DEVELOPMENT.md index ea1f494..c8649c7 100644 --- a/hyperdb-mcp/DEVELOPMENT.md +++ b/hyperdb-mcp/DEVELOPMENT.md @@ -209,13 +209,15 @@ Logs land next to the persistent file when one is supplied (so users find them i ## Daemon Mode Internals -`Engine::new` defaults to *daemon mode* — it tries `daemon::spawn::ensure_daemon()` first, which discovers an existing daemon via `~/.hyperdb/daemon.json` (overridable via `HYPERDB_STATE_DIR`) or auto-spawns one as a detached background process. The Engine then connects via TCP (`Connection::connect(endpoint, …)`) without owning any `HyperProcess`. +`Engine::new` defaults to *daemon mode* — it tries `daemon::spawn::ensure_daemon(resolve_port_scan())` first, which discovers an existing daemon via `~/.hyperdb/daemon.json` (overridable via `HYPERDB_STATE_DIR`), else scans the port range for a running daemon, else auto-spawns one on the first free port as a detached background process. The Engine then connects via TCP (`Connection::connect(endpoint, …)`) without owning any `HyperProcess`, and records the daemon's `health_port` so the server's debounced `HEARTBEAT` targets the actual discovered port rather than re-resolving. -Falls back to local mode (per-session `hyperd` via `HyperProcess::new`) when the daemon can't be reached, or always when `--no-daemon` is passed. +Falls back to local mode (per-session `hyperd` via `HyperProcess::new`) when the daemon can't be reached (including `AllOccupied` — the whole scan range is held by foreign processes), or always when `--no-daemon` is passed. -Cross-platform single-instance lock is the daemon's TCP health port — bind succeeds for exactly one process per user. Liveness is validated by the discovery flow before trusting the file: a stale `daemon.json` (daemon crashed) is detected and removed. +**Port resolution + identity.** `resolve_port_scan()` returns a `PortScan { base, span }`: when `HYPERDB_DAEMON_PORT` is set it pins that exact port (`span = 1`); otherwise it scans `span = DAEMON_PORT_SCAN_SPAN` (16) ports up from `DEFAULT_DAEMON_BASE_PORT` (7485 — deliberately *not* 7484, which is hyperd's conventional gRPC port). `probe_port` classifies each port as `OurDaemon` / `Camped` / `Refused`: liveness is no longer a bare TCP connect but an identity handshake — `health::ping_identified` sends `PING` and requires the reply's first two tokens to be exactly `PONG` and `hyperdb-mcp` (the third token is the daemon version). A foreign process that merely accepts TCP is `Camped` and skipped; only a `Refused` (connection-refused) port is treated as free to spawn on. `discover()` applies the same identity check before trusting `daemon.json`, so a stale or foreign-owned file is detected and removed. -The daemon's main loop tracks idle time via `DaemonState::last_activity`. `HEARTBEAT` commands from active clients reset the timer; clients debounce these to once per 60 seconds in `HyperMcpServer::with_engine`. Idle timeout (default 30 min) triggers graceful shutdown: discovery file removed → `hyperd` dropped → health listener exits. +**Version takeover.** When discovery finds a running daemon, `maybe_take_over` compares the client's `version::MCP_VERSION` against the daemon's reported version via the pure `client_should_take_over` helper (`semver`). If the client is *strictly newer* it sends `STOP` (which drops the daemon's `HyperProcess`, stopping `hyperd`), waits for the health port to stop answering the identity ping, then respawns a fresh daemon on the same port. Equal/older/unparseable versions reuse the daemon — never a downgrade-kill. This makes upgrades take effect immediately instead of waiting for the old daemon to disappear. + +**Idle shutdown is opt-in.** `DaemonConfig.idle_timeout` is `Option`, set only when `--idle-timeout` or `HYPERDB_DAEMON_IDLE_TIMEOUT` is provided (flag wins over env). With neither set the idle-monitor branch of the `run_daemon` `tokio::select!` is replaced by `std::future::pending()` and never fires — the daemon (and `hyperd`) stay resident indefinitely so clients never pay the cold-start "restarting, please retry" round-trip. `DaemonState::last_activity` and the debounced `HEARTBEAT` plumbing still exist and only matter when the timeout is enabled. The hyperd restart-limit shutdown (below) is independent and always active. ### hyperd liveness monitoring and restart @@ -242,7 +244,7 @@ Two new code paths fire `report_hyperd_error_to_daemon` (best-effort, 200ms time ### Known limitations -- **Hung-but-alive `hyperd`** (TCP listening, but unresponsive to queries) is NOT detected. The monitor's `try_wait()` returns `None` for a hung process; client tool calls hang on the read side without producing a `ConnectionLost` error. Operator recovery is `hyperdb-mcp daemon stop` followed by reconnect. +- **Hung-but-alive `hyperd`** (TCP listening, but unresponsive to queries) is NOT detected. The monitor's `try_wait()` returns `None` for a hung process; client tool calls hang on the read side without producing a `ConnectionLost` error. Operator recovery is `hyperdb-mcp daemon stop` followed by reconnect. Note the tradeoff introduced by resident-by-default: the idle timeout used to be an implicit backstop that reaped a wedged daemon after 30 min, after which the next client respawned a fresh one. With idle shutdown now opt-in (off by default), a hung-but-alive `hyperd` stays wedged until a client reports an error (fast-path `REPORT_HYPERD_ERROR`, which fires on a client-side `ConnectionLost`) or an operator runs `daemon stop`. This is an accepted tradeoff — keeping `hyperd` warm avoids the cold-start "restarting, please retry" round-trip on every active session, and genuine hyperd hangs are rare. A future enhancement could add a daemon-side liveness probe (a periodic trivial query with a timeout) to close the "all clients idle + hyperd hung" gap without reintroducing cold-start latency. - **Watchers** auto-recover from hyperd restarts: when an ingest fails with a connection-lost error, the watcher rebuilds its connection pool against the engine's current endpoint and retries the file once. Persistent failures (the second attempt also fails) fall through to the standard `failed/` move so a single broken file can't keep the watcher pinned in retry loops. See `src/daemon/{mod,discovery,health,run,spawn}.rs` for the full implementation. diff --git a/hyperdb-mcp/README.md b/hyperdb-mcp/README.md index cd0d476..7136483 100644 --- a/hyperdb-mcp/README.md +++ b/hyperdb-mcp/README.md @@ -185,7 +185,7 @@ Each session has **two databases**: an ephemeral primary (scratch space — alwa | Mode | Flag | Behavior | |---|---|---| -| **Shared daemon** *(default)* | *(none)* | One `hyperd` process per user, shared across all MCP clients. The first client auto-spawns the daemon; subsequent clients discover and reuse it. Idle for 30 minutes → daemon shuts itself down; the next client spawns a fresh one. | +| **Shared daemon** *(default)* | *(none)* | One `hyperd` process per user, shared across all MCP clients. The first client auto-spawns the daemon; subsequent clients discover and reuse it. The daemon stays resident by default (idle shutdown is opt-in — see below), so the next client connects instantly instead of waiting for a fresh `hyperd` to start. A client built from a newer `hyperdb-mcp` version transparently takes over (stops and replaces) an older running daemon. | | **Private hyperd** | `--no-daemon` | Each MCP client spawns its own `hyperd` (legacy behavior, one per session). | The shared daemon is the bigger win for users running multiple AI clients (Claude Code + Cursor + VS Code) — they all share one Hyper engine instead of spawning three. @@ -240,7 +240,7 @@ CREATE TABLE "persistent"."public"."revenue_2026" AS ### Daemon management -The daemon is normally invisible — it auto-spawns and idle-times-out on its own. For diagnostics: +The daemon is normally invisible — it auto-spawns on first use and stays resident. For diagnostics: ```bash hyperdb-mcp daemon status # Show running daemon (PID, endpoint, started_at, version) @@ -248,8 +248,14 @@ hyperdb-mcp daemon stop # Gracefully shut down the daemon hyperdb-mcp daemon # Run as a daemon explicitly (rarely needed) ``` +`status` and `stop` locate the running daemon automatically (reading `daemon.json`, then scanning the port range), so they work even if the daemon scanned onto a non-default port. Pass `--port ` to target a specific port explicitly. + State files live at `~/.hyperdb/` by default (override with `HYPERDB_STATE_DIR`). +**Port discovery.** The daemon binds a TCP health/lock port — by default it scans upward from **7485** (16 ports) and uses the first free one; set `HYPERDB_DAEMON_PORT` to pin an exact port (no scan). The health port doubles as a single-instance lock and an identity check: clients send `PING` and require a `PONG hyperdb-mcp ` reply before trusting a daemon, so an unrelated process occupying the port is skipped rather than mistaken for the daemon. + +**Staying resident.** By default the daemon never idle-shuts-down — keeping `hyperd` warm means the next tool call connects immediately instead of triggering a "restarting, please retry" round-trip. To opt into auto-shutdown (e.g. on CI), pass `--idle-timeout ` or set `HYPERDB_DAEMON_IDLE_TIMEOUT`. + ### Recovery from hyperd crashes The daemon polls `hyperd` every 5 seconds. If the process has exited (crashed, OOM, killed), the daemon spawns a replacement, atomically updates `~/.hyperdb/daemon.json` with the new endpoint, and continues serving clients. Clients see one failed tool call (the request that was in flight when hyperd died); the next tool call transparently reconnects to the new hyperd via the same recovery path used for normal connection drops. @@ -813,15 +819,17 @@ Daemon subcommand: hyperdb-mcp daemon Start the daemon (usually auto-spawned) hyperdb-mcp daemon stop Gracefully stop the running daemon hyperdb-mcp daemon status Show running daemon info - hyperdb-mcp daemon --port Override the health/lock port (default 7484) - hyperdb-mcp daemon --idle-timeout Override idle timeout (default 1800 = 30 min) + hyperdb-mcp daemon --port Pin the health/lock port. When omitted, + scans upward from 7485 for a free port. + hyperdb-mcp daemon --idle-timeout Opt into idle shutdown after SECS idle. + When omitted, the daemon stays resident. Environment: HYPERD_PATH Path to hyperd binary (auto-detected if on PATH) HYPERDB_PERSISTENT_DB Override the default persistent-db path HYPERDB_STATE_DIR Override daemon state directory (default ~/.hyperdb/) - HYPERDB_DAEMON_PORT Override daemon health/lock port (default 7484) - HYPERDB_DAEMON_IDLE_TIMEOUT Override daemon idle timeout in seconds (default 1800) + HYPERDB_DAEMON_PORT Pin daemon health/lock port (default: scan from 7485) + HYPERDB_DAEMON_IDLE_TIMEOUT Opt into idle shutdown (seconds); default: stay resident ``` --- diff --git a/hyperdb-mcp/src/daemon/discovery.rs b/hyperdb-mcp/src/daemon/discovery.rs index 002ffb9..27c647d 100644 --- a/hyperdb-mcp/src/daemon/discovery.rs +++ b/hyperdb-mcp/src/daemon/discovery.rs @@ -8,16 +8,15 @@ //! daemon, validating liveness via a TCP health check before trusting it. use std::io; -use std::net::TcpStream; use std::path::PathBuf; use std::time::Duration; use serde::{Deserialize, Serialize}; -use super::DEFAULT_DAEMON_PORT; +use super::{DAEMON_PORT_SCAN_SPAN, DEFAULT_DAEMON_BASE_PORT}; /// Information written by the daemon so clients can discover and connect. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DaemonInfo { /// OS process ID of the daemon. pub pid: u32, @@ -99,21 +98,47 @@ pub fn remove_discovery_file() { } } -/// Check if the daemon is alive by attempting a TCP connection to its health port. +/// Check if the daemon is alive by sending PING and verifying the identifying token. +/// No longer accepts a bare TCP connect (prevents collisions with foreign services). fn is_daemon_alive(port: u16) -> bool { - TcpStream::connect_timeout( - &std::net::SocketAddr::from(([127, 0, 0, 1], port)), - Duration::from_secs(2), - ) - .is_ok() + super::health::ping_identified(port, Duration::from_millis(300), Duration::from_millis(300)) + .is_some() } -/// Resolve the daemon health port from environment or default. -pub fn resolve_port() -> u16 { - std::env::var(super::ENV_DAEMON_PORT) +/// Port scan configuration: a base port and the number of ports to scan. +/// When `span == 1`, the port is pinned (no scan). Used by the later +/// port-scanning stage to discover or spawn a daemon across a range. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PortScan { + pub base: u16, + pub span: u16, +} + +/// Resolve the daemon health port scan configuration from environment or default. +/// If `HYPERDB_DAEMON_PORT` is set and valid, returns a pinned scan (span=1) at +/// that exact port. Otherwise, returns the default base port with the full scan span. +pub fn resolve_port_scan() -> PortScan { + if let Some(port) = std::env::var(super::ENV_DAEMON_PORT) .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(DEFAULT_DAEMON_PORT) + .and_then(|v| v.parse::().ok()) + { + PortScan { + base: port, + span: 1, + } + } else { + PortScan { + base: DEFAULT_DAEMON_BASE_PORT, + span: DAEMON_PORT_SCAN_SPAN, + } + } +} + +/// Resolve the daemon health port from environment or default. Back-compat +/// wrapper for single-port callers; returns the base port from [`resolve_port_scan`]. +/// New code that needs scan-aware logic should call [`resolve_port_scan`] directly. +pub fn resolve_port() -> u16 { + resolve_port_scan().base } /// Cross-platform home directory resolution. @@ -123,3 +148,105 @@ fn home_dir() -> Option { .or_else(|| std::env::var_os("USERPROFILE")) .map(PathBuf::from) } + +/// Result of probing a single port: either our daemon, something else, or refused. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProbeResult { + /// A hyperdb-mcp daemon answered with valid STATUS. + OurDaemon(Box), + /// The port accepted TCP but isn't our daemon (foreign service or broken STATUS). + Camped, + /// Connection refused (port is free). + Refused, +} + +/// Probe a single port to determine if it's occupied by our daemon, a foreign service, or free. +fn probe_port(port: u16) -> ProbeResult { + let ping_timeout = Duration::from_millis(300); + + if let Some(_version) = super::health::ping_identified(port, ping_timeout, ping_timeout) { + // PING succeeded — something is answering with our token. Now send STATUS + // to retrieve the full daemon info. If STATUS fails we can't trust this + // process (might be a test stub or a broken daemon), so treat it as Camped. + match super::health::send_command_with_timeout(port, "STATUS", ping_timeout, ping_timeout) { + Ok(response) => { + if let Ok(info) = serde_json::from_str::(response.trim()) { + ProbeResult::OurDaemon(Box::new(info)) + } else { + // Parsed PING but STATUS is malformed — treat as Camped. + ProbeResult::Camped + } + } + Err(_) => ProbeResult::Camped, + } + } else { + // PING failed or returned no identifying token. Distinguish "refused" + // from "camped non-daemon" via a raw TCP connect attempt. + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], port)); + match std::net::TcpStream::connect_timeout(&addr, ping_timeout) { + Ok(_) => ProbeResult::Camped, // TCP accepted but PING failed → foreign + Err(_) => ProbeResult::Refused, // Connection refused → port is free + } + } +} + +/// The outcome of scanning a port range for a running daemon or a free port to spawn on. +#[derive(Debug)] +pub enum ScanOutcome { + /// Found a running hyperdb-mcp daemon. + Found(Box), + /// No daemon found, but this port is free (can spawn here). + FreePort(u16), + /// All ports in the range are occupied (either by our daemon, foreign services, or both). + AllOccupied, +} + +/// Scan the configured port range to find a running daemon or identify a free port. +/// If any port in the range answers identified-PING and returns valid STATUS, we return +/// `Found` immediately (first wins). Otherwise, we return `FreePort` with the first +/// refused port encountered, or `AllOccupied` if everything is in use. +/// +/// Product decision: prefer finding an existing daemon anywhere in range over +/// spawning a new one. Only spawn if no daemon exists. +pub fn scan_for_daemon(scan: PortScan) -> ScanOutcome { + let mut first_free: Option = None; + + for offset in 0..scan.span { + let Some(port) = scan.base.checked_add(offset) else { + break; // Overflow guard: stop at u16::MAX + }; + + match probe_port(port) { + ProbeResult::OurDaemon(info) => { + // Found a running daemon — return immediately. + return ScanOutcome::Found(info); + } + ProbeResult::Refused => { + // Port is free. Remember the first one we see. + if first_free.is_none() { + first_free = Some(port); + } + } + ProbeResult::Camped => { + // Port is occupied by something else. Keep scanning. + } + } + } + + // No daemon found. Return the first free port, or AllOccupied if none. + match first_free { + Some(port) => ScanOutcome::FreePort(port), + None => ScanOutcome::AllOccupied, + } +} + +/// Discover a running daemon via the discovery file, or by scanning the configured +/// port range. Returns `None` if no daemon is found in either place. +/// +/// Used by CLI commands (status/stop) that want to find a daemon but not spawn one. +pub fn find_running_daemon() -> Option { + discover().or_else(|| match scan_for_daemon(resolve_port_scan()) { + ScanOutcome::Found(info) => Some(*info), + _ => None, + }) +} diff --git a/hyperdb-mcp/src/daemon/health.rs b/hyperdb-mcp/src/daemon/health.rs index 15e1148..88ad060 100644 --- a/hyperdb-mcp/src/daemon/health.rs +++ b/hyperdb-mcp/src/daemon/health.rs @@ -8,7 +8,8 @@ //! 2. **Liveness probe + heartbeat** — clients connect and send simple text commands. //! //! Protocol (line-based, newline-terminated): -//! - `PING\n` → `PONG\n` (liveness check) +//! - `PING\n` → `PONG hyperdb-mcp \n` (liveness check; the identifying +//! token proves it's a hyperdb-mcp daemon, not a foreign process on the same port) //! - `HEARTBEAT\n` → `OK\n` (resets idle timer) //! - `STOP\n` → `STOPPING\n` (triggers graceful shutdown) //! - `STATUS\n` → JSON line with daemon info (reports the *current* hyperd @@ -26,6 +27,15 @@ use tracing::{debug, warn}; use super::discovery::DaemonInfo; +/// Identifying token included in PONG responses. Used to verify that a bound +/// port is owned by a hyperdb-mcp daemon (not a foreign service). +pub const PONG_TOKEN: &str = "hyperdb-mcp"; + +/// Construct the PONG response with the identifying token and version. +fn pong_response() -> String { + format!("PONG {PONG_TOKEN} {}\n", crate::version::MCP_VERSION) +} + /// Handle to the health listener, used to check binding success and manage lifecycle. #[derive(Debug)] pub struct HealthListener { @@ -164,7 +174,7 @@ fn handle_client(stream: TcpStream, state: &DaemonState, info: &Mutex { let cmd = line.trim(); let response = match cmd { - "PING" => "PONG\n".to_string(), + "PING" => pong_response(), "HEARTBEAT" => { state.touch(); "OK\n".to_string() @@ -252,3 +262,31 @@ pub fn send_command_with_timeout( reader.read_line(&mut response)?; Ok(response) } + +/// Send PING and verify the response contains the identifying token. Returns +/// `Some(version)` if the responding daemon is a hyperdb-mcp daemon (the version +/// string is the daemon's `MCP_VERSION`), or `None` if connection fails, the +/// response lacks the expected token, or read times out. An empty version string +/// (`Some(String::new())`) is returned if the PONG prefix matches but no version +/// token is present (graceful degradation for forward/backward compat). +/// +/// This is the primitive for liveness checks now that bare TCP connect is +/// insufficient (a foreign service on the same port would cause collisions). +pub fn ping_identified( + port: u16, + connect_timeout: Duration, + read_timeout: Duration, +) -> Option { + let response = send_command_with_timeout(port, "PING", connect_timeout, read_timeout).ok()?; + // Validate by exact tokens, not a string prefix: a prefix check on + // "PONG hyperdb-mcp" would also match a foreign reply like + // "PONG hyperdb-mcpEVIL 1.0.0". Require the first two whitespace-separated + // tokens to be exactly "PONG" and the token, so only our daemon passes. + let mut tokens = response.split_whitespace(); + if tokens.next() != Some("PONG") || tokens.next() != Some(PONG_TOKEN) { + return None; + } + // The 3rd token is the daemon's version; absent ⇒ accept with empty + // version (future-proofing for a token-only reply). + Some(tokens.next().unwrap_or("").to_string()) +} diff --git a/hyperdb-mcp/src/daemon/mod.rs b/hyperdb-mcp/src/daemon/mod.rs index 8f870a7..bdc2a31 100644 --- a/hyperdb-mcp/src/daemon/mod.rs +++ b/hyperdb-mcp/src/daemon/mod.rs @@ -8,11 +8,19 @@ pub mod health; pub mod run; pub mod spawn; -/// Default TCP port the daemon binds for health checks and single-instance locking. -pub const DEFAULT_DAEMON_PORT: u16 = 7484; +/// Default base TCP port for the daemon health listener. When no env var is set, +/// the daemon scans `[base, base + DAEMON_PORT_SCAN_SPAN)` to find a free port. +/// Previously 7484; changed to 7485 to avoid collision with hyperd's default gRPC port. +pub const DEFAULT_DAEMON_BASE_PORT: u16 = 7485; -/// Default idle timeout in seconds before the daemon shuts down. -pub const DEFAULT_IDLE_TIMEOUT_SECS: u64 = 30 * 60; // 30 minutes +/// Number of ports to scan starting from the base port when discovering or spawning +/// a daemon. Used by the later port-scanning stage (not yet implemented). +pub const DAEMON_PORT_SCAN_SPAN: u16 = 16; + +/// Suggested idle timeout value (30 minutes) for use with the `--idle-timeout` flag +/// or `HYPERDB_DAEMON_IDLE_TIMEOUT` env var. By default (when neither is set), the +/// daemon never auto-shuts down due to inactivity. +pub const DEFAULT_IDLE_TIMEOUT_SECS: u64 = 30 * 60; /// Environment variable to override the daemon port. pub const ENV_DAEMON_PORT: &str = "HYPERDB_DAEMON_PORT"; diff --git a/hyperdb-mcp/src/daemon/run.rs b/hyperdb-mcp/src/daemon/run.rs index 5224b06..f61518d 100644 --- a/hyperdb-mcp/src/daemon/run.rs +++ b/hyperdb-mcp/src/daemon/run.rs @@ -1,8 +1,12 @@ // Copyright (c) 2026, Salesforce, Inc. All rights reserved. // SPDX-License-Identifier: Apache-2.0 OR MIT -//! Daemon main loop: spawns `hyperd`, runs health listener, monitors idle timeout -//! and hyperd liveness, restarts hyperd if it dies. +//! Daemon main loop: spawns `hyperd`, runs health listener, monitors hyperd liveness +//! and optional idle timeout, restarts hyperd if it dies. +//! +//! By default, the daemon never auto-shuts down due to inactivity (opt-in via +//! `--idle-timeout` flag or `HYPERDB_DAEMON_IDLE_TIMEOUT` env var). When enabled, +//! client HEARTBEAT commands reset the idle timer (see [`DaemonState`]). use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -14,29 +18,35 @@ use hyperdb_api::{HyperProcess, Parameters, TransportMode}; use super::discovery::{self, DaemonInfo}; use super::health::{DaemonState, HealthListener}; -use super::{DEFAULT_IDLE_TIMEOUT_SECS, ENV_IDLE_TIMEOUT}; +use super::ENV_IDLE_TIMEOUT; /// Configuration for the daemon process. #[derive(Debug)] pub struct DaemonConfig { pub port: u16, - pub idle_timeout: Duration, + /// Idle timeout duration. When `None`, the daemon never auto-shuts down due to + /// inactivity (default behavior). When `Some`, the daemon shuts down after the + /// specified duration without client activity. + pub idle_timeout: Option, } impl DaemonConfig { + /// Construct a `DaemonConfig` from CLI args and environment variables. + /// + /// Idle-timeout resolution: + /// 1. If `idle_timeout_secs` is `Some`, use that value. + /// 2. Otherwise, if `HYPERDB_DAEMON_IDLE_TIMEOUT` env var is set and parseable, use it. + /// 3. Otherwise, `None` (never auto-shutdown). pub fn from_args(port: u16, idle_timeout_secs: Option) -> Self { - let idle_timeout_secs = idle_timeout_secs + let idle_timeout = idle_timeout_secs .or_else(|| { std::env::var(ENV_IDLE_TIMEOUT) .ok() .and_then(|v| v.parse().ok()) }) - .unwrap_or(DEFAULT_IDLE_TIMEOUT_SECS); + .map(Duration::from_secs); - Self { - port, - idle_timeout: Duration::from_secs(idle_timeout_secs), - } + Self { port, idle_timeout } } } @@ -121,10 +131,23 @@ pub async fn run_daemon(config: DaemonConfig) -> Result<(), Box idle_monitor(Arc::clone(&state), d).await, + None => std::future::pending::<()>().await, + } + }; tokio::select! { - () = idle_monitor(Arc::clone(&state), config.idle_timeout) => {} + () = idle_fut => {} () = hyperd_monitor(Arc::clone(&state), Arc::clone(&hyper_state), Arc::clone(&info_arc)) => {} () = shutdown_signal() => { info!("received shutdown signal"); diff --git a/hyperdb-mcp/src/daemon/spawn.rs b/hyperdb-mcp/src/daemon/spawn.rs index deb3de4..bda2a43 100644 --- a/hyperdb-mcp/src/daemon/spawn.rs +++ b/hyperdb-mcp/src/daemon/spawn.rs @@ -11,9 +11,9 @@ use std::io; use std::process::Command; use std::time::{Duration, Instant}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; -use super::discovery::{self, DaemonInfo}; +use super::discovery::{self, DaemonInfo, PortScan, ScanOutcome}; /// Maximum time to wait for the daemon to write its discovery file after spawning. const SPAWN_TIMEOUT: Duration = Duration::from_secs(10); @@ -21,22 +21,36 @@ const SPAWN_TIMEOUT: Duration = Duration::from_secs(10); /// Polling interval while waiting for the discovery file. const POLL_INTERVAL: Duration = Duration::from_millis(100); -/// Ensure a daemon is running and return its info. -/// If no daemon is detected, spawn one and wait for it to become ready. +/// Ensure a daemon is running and return its info. If a daemon is found (via +/// discovery file or port scan), we MAY take it over if the client is newer. +/// Otherwise we spawn a fresh daemon on the first free port in the scan range. /// /// # Errors -/// Returns an error if the daemon cannot be spawned or does not become ready -/// within the timeout period. -pub fn ensure_daemon(port: u16) -> io::Result { - // Check if already running +/// Returns an error if no free port is available, the daemon cannot be spawned, +/// or it does not become ready within the timeout period. +pub fn ensure_daemon(scan: PortScan) -> io::Result { + // Check discovery file first (fast path). if let Some(info) = discovery::discover() { - debug!(endpoint = %info.hyperd_endpoint, "daemon already running"); - return Ok(info); + return maybe_take_over(info, scan); } - info!("no running daemon detected, spawning one"); - spawn_detached(port)?; - wait_for_daemon() + // Scan the port range for a running daemon or a free port. + match discovery::scan_for_daemon(scan) { + ScanOutcome::Found(info) => maybe_take_over(*info, scan), + ScanOutcome::FreePort(port) => { + info!(port, "no running daemon detected, spawning on free port"); + spawn_detached(port)?; + wait_for_daemon() + } + ScanOutcome::AllOccupied => Err(io::Error::new( + io::ErrorKind::AddrInUse, + format!( + "no free hyperdb daemon port in {}..{}", + scan.base, + scan.base.saturating_add(scan.span) + ), + )), + } } /// Spawn `hyperdb-mcp daemon` as a fully detached background process. @@ -102,3 +116,107 @@ fn wait_for_daemon() -> io::Result { std::thread::sleep(POLL_INTERVAL); } } + +/// Pure version comparison: returns `true` if the client should take over the daemon. +/// Only returns `true` when both versions parse successfully AND client > daemon. +/// Unparseable versions or equal/older client always return `false` (reuse daemon). +pub fn client_should_take_over(client_ver: &str, daemon_ver: &str) -> bool { + let Ok(client) = semver::Version::parse(client_ver) else { + return false; + }; + let Ok(daemon) = semver::Version::parse(daemon_ver) else { + return false; + }; + client > daemon +} + +/// Decide whether to reuse the running daemon or take it over with a newer version. +/// If the client is newer, we send STOP to the old daemon, wait for it to release +/// the port, then spawn a fresh daemon on the same port. Otherwise we reuse the +/// existing daemon. +/// +/// The `scan` argument is intentionally unused for *where* to respawn: a takeover +/// always reuses the port the discovered daemon already holds (`info.health_port`), +/// because that is the port guaranteed to free up when the old daemon stops. A +/// mid-session change to `HYPERDB_DAEMON_PORT` (so the pinned `scan.base` differs +/// from `info.health_port`) is not honored here — spawning on a *different* port +/// would leave the old daemon alive and create two daemons rather than replace one. +/// That edge case is pathological (operators don't repin a live daemon) and the +/// daemon found via discovery is authoritative for its own port. +fn maybe_take_over(info: DaemonInfo, _scan: PortScan) -> io::Result { + let client_ver = crate::version::MCP_VERSION; + + if !client_should_take_over(client_ver, &info.version) { + // Client is older or equal, or one/both versions failed to parse → reuse. + // Distinguish the two reasons so an unexpected unparseable daemon version + // (corrupt daemon.json, foreign writer) is visible when debugging. + let parse_failed = semver::Version::parse(client_ver).is_err() + || semver::Version::parse(&info.version).is_err(); + debug!( + daemon_version = %info.version, + client_version = %client_ver, + port = info.health_port, + reason = if parse_failed { "version unparseable" } else { "client not newer" }, + "reusing existing daemon" + ); + return Ok(info); + } + + // Client is newer — take over. + info!( + daemon_version = %info.version, + client_version = %client_ver, + port = info.health_port, + "newer MCP client taking over older daemon" + ); + + // Send STOP (best-effort; ignore error if daemon is already dying). + let _ = super::health::send_command(info.health_port, "STOP"); + + // Wait for the old daemon to release the port (confirmed by ping_identified returning None). + let deadline = Instant::now() + SPAWN_TIMEOUT; + while Instant::now() < deadline { + if super::health::ping_identified( + info.health_port, + Duration::from_millis(200), + Duration::from_millis(200), + ) + .is_none() + { + // Port is free — spawn the new daemon on the same port. + // + // There is a benign TOCTOU window here: another client could also + // observe the freed port and spawn concurrently. That is safe by the + // same argument as the FreePort path — `spawn_detached` is + // fire-and-forget (it does not itself bind the port; the spawned + // daemon's `HealthListener::bind` is the real single-instance lock). + // The OS grants the bind to exactly one daemon; the loser exits at + // step 1 of `run_daemon` (before spawning hyperd or writing the + // discovery file), and `wait_for_daemon` (which polls `discover()`) + // converges on whichever daemon won. No duplicate daemon survives + // and no AddrInUse surfaces to the client. + // + // Defensive narrowing of that window: if a concurrent takeover has + // already published a fresh, identity-verified daemon on this port, + // adopt it instead of spawning — this avoids returning the stale + // `info` (old endpoint) we were carrying and skips a redundant spawn. + if let Some(fresh) = discovery::discover() { + if fresh.health_port == info.health_port { + return Ok(fresh); + } + } + spawn_detached(info.health_port)?; + return wait_for_daemon(); + } + std::thread::sleep(POLL_INTERVAL); + } + + // Old daemon didn't die within the deadline — log a warning and reuse it + // rather than fail the client. + warn!( + port = info.health_port, + timeout_secs = SPAWN_TIMEOUT.as_secs(), + "old daemon did not stop within timeout, reusing it" + ); + Ok(info) +} diff --git a/hyperdb-mcp/src/engine.rs b/hyperdb-mcp/src/engine.rs index b89001d..deb6ba1 100644 --- a/hyperdb-mcp/src/engine.rs +++ b/hyperdb-mcp/src/engine.rs @@ -183,6 +183,10 @@ pub struct Engine { hyper: Option, /// Stored endpoint for daemon mode (the daemon advertises this). daemon_endpoint: Option, + /// The daemon's health port, if connected via daemon mode. `None` in local mode. + /// Used by the server's heartbeat logic to target the correct port (not a re-resolve, + /// which would break when scanning is enabled). + daemon_health_port: Option, connection: Connection, /// The primary database for this session. Lives in a temp dir and is /// deleted on `Drop`. @@ -337,6 +341,7 @@ impl Engine { Ok(Self { hyper: Some(hyper), daemon_endpoint: None, + daemon_health_port: None, connection, ephemeral_path, persistent_path, @@ -369,8 +374,7 @@ impl Engine { persistent_path: Option, log_dir: &Path, ) -> Result, McpError> { - let port = daemon::discovery::resolve_port(); - let info = match daemon::spawn::ensure_daemon(port) { + let info = match daemon::spawn::ensure_daemon(daemon::discovery::resolve_port_scan()) { Ok(info) => info, Err(e) => { tracing::debug!(error = %e, "daemon unavailable, falling back to local mode"); @@ -412,6 +416,7 @@ impl Engine { Ok(Some(Self { hyper: None, daemon_endpoint: Some(info.hyperd_endpoint), + daemon_health_port: Some(info.health_port), connection, ephemeral_path: ephemeral_path.to_path_buf(), persistent_path, @@ -451,6 +456,12 @@ impl Engine { .map_err(|e| McpError::new(ErrorCode::InternalError, e.to_string())) } + /// The daemon's health port, if this engine is connected via daemon mode. + /// Returns `None` in local mode (when this engine owns a private `HyperProcess`). + pub fn daemon_health_port(&self) -> Option { + self.daemon_health_port + } + /// Absolute path to the ephemeral primary `.hyper` file on disk. pub fn ephemeral_path(&self) -> &Path { &self.ephemeral_path @@ -1425,6 +1436,18 @@ impl Engine { Value::String(p.to_string_lossy().into_owned()) }); + // Connection details for the backing `hyperd`. In daemon mode the + // endpoint and health port come from the shared daemon's discovery + // file; in local mode (`--no-daemon`) this engine owns a private + // `hyperd` and there is no health port. `hyperd_endpoint()` only errors + // if no endpoint is available at all, which `is_running` already + // reflects — surface it as null rather than failing the whole status. + let in_daemon_mode = self.daemon_endpoint.is_some(); + let endpoint_value = self.hyperd_endpoint().map_or(Value::Null, Value::String); + let health_port_value = self + .daemon_health_port + .map_or(Value::Null, |p| Value::Number(p.into())); + Ok(json!({ "hyperd_running": self.is_running(), "ephemeral_path": self.ephemeral_path.to_string_lossy(), @@ -1433,6 +1456,14 @@ impl Engine { "table_count": table_count, "total_rows": total_rows, "disk_usage_bytes": disk_bytes, + // Where this engine is talking to hyperd. `hyperd_endpoint` is the + // libpq endpoint queries run against; `daemon_health_port` is the + // shared daemon's control/lock port (null in local mode). + "engine": { + "mode": if in_daemon_mode { "daemon" } else { "local" }, + "hyperd_endpoint": endpoint_value, + "daemon_health_port": health_port_value, + }, // The MCP server and the `hyperdb-api` crate it's built on live in // the same Cargo workspace and ship from the same commit, so a // single version string identifies both. Label it by the diff --git a/hyperdb-mcp/src/main.rs b/hyperdb-mcp/src/main.rs index a6d9925..4c80d1d 100644 --- a/hyperdb-mcp/src/main.rs +++ b/hyperdb-mcp/src/main.rs @@ -110,9 +110,11 @@ enum Commands { #[command(subcommand)] action: Option, - /// TCP port for health listener and single-instance lock - #[arg(long, default_value_t = daemon::DEFAULT_DAEMON_PORT)] - port: u16, + /// TCP port for health listener and single-instance lock. When omitted, + /// the daemon scans from the base port to find a free port. For stop/status + /// commands, omitting the port uses discovery + scanning to find the running daemon. + #[arg(long)] + port: Option, /// Idle timeout in seconds before the daemon shuts down #[arg(long)] @@ -152,7 +154,11 @@ async fn main() -> Result<(), Box> { action: None, port, idle_timeout, - }) => run_daemon_mode(port, idle_timeout).await, + }) => { + // Resolve the effective port for daemon startup + let effective_port = port.unwrap_or_else(|| discovery::resolve_port_scan().base); + run_daemon_mode(effective_port, idle_timeout).await + } None => run_mcp_mode(cli).await, } } @@ -230,20 +236,33 @@ async fn run_mcp_mode(cli: Cli) -> Result<(), Box> { Ok(()) } -fn daemon_stop(port: u16) { - match health::send_command(port, "STOP") { +fn daemon_stop(port: Option) { + let target_port = match port { + Some(p) => p, + None => { + // No explicit port — discover the running daemon + if let Some(info) = discovery::find_running_daemon() { + info.health_port + } else { + eprintln!("No daemon is currently running."); + std::process::exit(1); + } + } + }; + + match health::send_command(target_port, "STOP") { Ok(response) => { println!("Daemon responded: {}", response.trim()); } Err(e) => { - eprintln!("No daemon running on port {port} (or cannot connect): {e}"); + eprintln!("No daemon running on port {target_port} (or cannot connect): {e}"); std::process::exit(1); } } } fn daemon_status() { - if let Some(info) = discovery::discover() { + if let Some(info) = discovery::find_running_daemon() { println!("Daemon is running:"); println!(" PID: {}", info.pid); println!(" Hyperd endpoint: {}", info.hyperd_endpoint); diff --git a/hyperdb-mcp/src/server.rs b/hyperdb-mcp/src/server.rs index f8c7dd4..dbdca33 100644 --- a/hyperdb-mcp/src/server.rs +++ b/hyperdb-mcp/src/server.rs @@ -1281,10 +1281,22 @@ impl HyperMcpServer { .lock() .is_ok_and(|guard| guard.elapsed() >= HEARTBEAT_INTERVAL); if should_send { - let port = crate::daemon::discovery::resolve_port(); - let _ = crate::daemon::health::send_command(port, "HEARTBEAT"); - if let Ok(mut guard) = self.last_heartbeat.lock() { - *guard = std::time::Instant::now(); + // Use the daemon's discovered health port (recorded on the engine at + // connect time), NOT `resolve_port()`: with port scanning the daemon + // may have bound a non-base port (e.g. 7492 when 7485 was taken), and + // re-resolving would return the base port — the heartbeat would then + // target the wrong address and silently fail to keep the daemon warm. + // Skip if local mode (no daemon) or if the engine lock is poisoned. + let port = self + .engine + .lock() + .ok() + .and_then(|guard| guard.as_ref().and_then(Engine::daemon_health_port)); + if let Some(port) = port { + let _ = crate::daemon::health::send_command(port, "HEARTBEAT"); + if let Ok(mut guard) = self.last_heartbeat.lock() { + *guard = std::time::Instant::now(); + } } } } @@ -2868,9 +2880,10 @@ impl HyperMcpServer { } /// Returns plugin health, workspace info, table count, total rows, disk - /// usage, and the list of active directory watchers with their stats. + /// usage, the backing `hyperd` connection (mode, endpoint, daemon health + /// port), and the list of active directory watchers with their stats. #[tool( - description = "Returns plugin health, workspace info, table count, total rows, disk usage, and active directory watchers." + description = "Returns plugin health, workspace info, table count, total rows, disk usage, the backing hyperd connection (engine.mode, engine.hyperd_endpoint, engine.daemon_health_port), and active directory watchers." )] fn status(&self) -> Result { let result = self.with_engine(super::engine::Engine::status); diff --git a/hyperdb-mcp/tests/daemon_tests.rs b/hyperdb-mcp/tests/daemon_tests.rs index 24a760e..60c0331 100644 --- a/hyperdb-mcp/tests/daemon_tests.rs +++ b/hyperdb-mcp/tests/daemon_tests.rs @@ -9,10 +9,11 @@ //! are process-global, these tests MUST run sequentially. We enforce this via a //! shared mutex — every test that touches env vars acquires `ENV_LOCK` first. +use std::net::TcpListener; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use hyperdb_mcp::daemon::discovery::{self, DaemonInfo}; +use hyperdb_mcp::daemon::discovery::{self, DaemonInfo, PortScan}; use hyperdb_mcp::daemon::health::{self, DaemonState, HealthListener}; use tempfile::TempDir; @@ -158,6 +159,59 @@ fn restart_history_prunes_entries_older_than_window() { ); } +// ─── Unit tests: DaemonConfig (require ENV_LOCK) ───────────────────────────── + +#[test] +fn daemon_config_from_args_none_when_unset() { + let _lock = acquire_env_lock(); + let _guard = EnvGuard::remove("HYPERDB_DAEMON_IDLE_TIMEOUT"); + + let config = hyperdb_mcp::daemon::run::DaemonConfig::from_args(0, None); + assert!( + config.idle_timeout.is_none(), + "idle_timeout should be None when neither flag nor env is set" + ); +} + +#[test] +fn daemon_config_from_args_some_when_flag() { + let _lock = acquire_env_lock(); + let _guard = EnvGuard::remove("HYPERDB_DAEMON_IDLE_TIMEOUT"); + + let config = hyperdb_mcp::daemon::run::DaemonConfig::from_args(0, Some(120)); + assert_eq!( + config.idle_timeout, + Some(Duration::from_secs(120)), + "idle_timeout should match the provided flag value" + ); +} + +#[test] +fn daemon_config_from_args_some_when_env() { + let _lock = acquire_env_lock(); + let _guard = EnvGuard::set("HYPERDB_DAEMON_IDLE_TIMEOUT", "90"); + + let config = hyperdb_mcp::daemon::run::DaemonConfig::from_args(0, None); + assert_eq!( + config.idle_timeout, + Some(Duration::from_secs(90)), + "idle_timeout should match the env var value when no flag is provided" + ); +} + +#[test] +fn daemon_config_from_args_flag_takes_precedence() { + let _lock = acquire_env_lock(); + let _guard = EnvGuard::set("HYPERDB_DAEMON_IDLE_TIMEOUT", "90"); + + let config = hyperdb_mcp::daemon::run::DaemonConfig::from_args(0, Some(120)); + assert_eq!( + config.idle_timeout, + Some(Duration::from_secs(120)), + "flag value should take precedence over env var" + ); +} + // ─── Unit tests: Health protocol (no env vars, safe to run in parallel) ─────── #[test] @@ -181,7 +235,7 @@ fn health_protocol_ping_pong() { let (port, _handle, _state) = start_health_listener(); let response = health::send_command(port, "PING").unwrap(); - assert_eq!(response.trim(), "PONG"); + assert!(response.trim().starts_with("PONG hyperdb-mcp ")); } #[test] @@ -253,7 +307,7 @@ fn health_protocol_multi_command_session() { let (port, _handle, _state) = start_health_listener(); let response1 = health::send_command(port, "PING").unwrap(); - assert_eq!(response1.trim(), "PONG"); + assert!(response1.trim().starts_with("PONG hyperdb-mcp ")); let response2 = health::send_command(port, "STATUS").unwrap(); let parsed: serde_json::Value = serde_json::from_str(response2.trim()).unwrap(); @@ -263,6 +317,100 @@ fn health_protocol_multi_command_session() { assert_eq!(response3.trim(), "OK"); } +#[test] +fn health_protocol_ping_identity_accept() { + let (port, _handle, _state) = start_health_listener(); + + let version = + health::ping_identified(port, Duration::from_millis(300), Duration::from_millis(300)) + .expect("should return Some for a valid hyperdb-mcp daemon"); + assert_eq!(version, hyperdb_mcp::version::MCP_VERSION); +} + +#[test] +fn health_protocol_ping_identity_reject_foreign() { + use std::io::Write; + + // Bind a raw listener that returns a bare "PONG\n" without the identifying token + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + + std::thread::spawn(move || { + if let Ok((mut stream, _)) = listener.accept() { + let _ = stream.write_all(b"PONG\n"); + } + }); + + // Give the thread a moment to start accepting + std::thread::sleep(Duration::from_millis(50)); + + let result = + health::ping_identified(port, Duration::from_millis(300), Duration::from_millis(300)); + assert_eq!(result, None, "should reject foreign PONG without token"); +} + +#[test] +fn health_protocol_ping_identity_reject_token_lookalike() { + use std::io::Write; + + // A foreign service whose token *starts with* "hyperdb-mcp" must NOT pass. + // Guards against a naive `starts_with("PONG hyperdb-mcp")` prefix check. + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + + std::thread::spawn(move || { + if let Ok((mut stream, _)) = listener.accept() { + let _ = stream.write_all(b"PONG hyperdb-mcpEVIL 9.9.9\n"); + } + }); + + std::thread::sleep(Duration::from_millis(50)); + + let result = + health::ping_identified(port, Duration::from_millis(300), Duration::from_millis(300)); + assert_eq!( + result, None, + "must reject a token that only shares a prefix" + ); +} + +#[test] +fn health_protocol_ping_identity_reject_refused() { + // Bind a listener to get a port, then drop it so the port is closed + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + drop(listener); + + // Brief sleep to reduce the race (the OS may not have fully released the port) + std::thread::sleep(Duration::from_millis(10)); + + let result = + health::ping_identified(port, Duration::from_millis(300), Duration::from_millis(300)); + assert_eq!(result, None, "should return None for connection refused"); +} + +#[test] +fn resolve_port_scan_pins_when_env_set() { + let _lock = acquire_env_lock(); + let _guard = EnvGuard::set("HYPERDB_DAEMON_PORT", "9001"); + assert_eq!( + discovery::resolve_port_scan(), + PortScan { + base: 9001, + span: 1 + } + ); +} + +#[test] +fn resolve_port_scan_scans_when_env_unset() { + let _lock = acquire_env_lock(); + let _guard = EnvGuard::remove("HYPERDB_DAEMON_PORT"); + let scan = discovery::resolve_port_scan(); + assert_eq!(scan.base, hyperdb_mcp::daemon::DEFAULT_DAEMON_BASE_PORT); + assert_eq!(scan.span, hyperdb_mcp::daemon::DAEMON_PORT_SCAN_SPAN); +} + // ─── Unit tests: idle timeout logic (no env vars) ───────────────────────────── #[test] @@ -455,10 +603,134 @@ fn resolve_port_uses_default_when_env_unset() { let _guard = EnvGuard::remove("HYPERDB_DAEMON_PORT"); assert_eq!( discovery::resolve_port(), - hyperdb_mcp::daemon::DEFAULT_DAEMON_PORT + hyperdb_mcp::daemon::DEFAULT_DAEMON_BASE_PORT ); } +// ─── Unit tests: Port scanning (require ENV_LOCK + sandbox OFF) ────────────── + +#[test] +fn scan_finds_our_daemon_via_status() { + let _lock = acquire_env_lock(); + let (port, _handle, _state) = start_health_listener(); + + let scan = PortScan { + base: port, + span: 1, + }; + match discovery::scan_for_daemon(scan) { + discovery::ScanOutcome::Found(info) => { + assert_eq!(info.hyperd_endpoint, "127.0.0.1:54321"); + } + other => panic!("expected Found, got {other:?}"), + } +} + +#[test] +fn scan_skips_camped_returns_free() { + let _lock = acquire_env_lock(); + + // Find a camped port `base` whose immediate successor `base + 1` is free, + // so the scan range is exactly the two adjacent ports {base, base+1}. + // + // We deliberately keep the range to two ports rather than scanning the + // (potentially wide) gap between two arbitrary OS-assigned ports: other + // tests' `start_health_listener` helpers leak real identity-answering + // `HealthListener`s on random high ports for the lifetime of the test + // process, and a wide scan can land on one and return `Found` instead of + // `FreePort`. With a 2-port window, a leaked listener would have to occupy + // exactly `base+1` — which we confirm is free immediately before scanning. + let (camped_listener, base) = loop { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + if port < u16::MAX { + // `base + 1` is free iff we can bind it right now. Drop the probe + // so the port is released for the scan to find as `Refused`. + if let Ok(probe) = TcpListener::bind(("127.0.0.1", port + 1)) { + drop(probe); + break (listener, port); + } + } + // `base+1` was occupied (or base == u16::MAX) — retry with a fresh port. + drop(listener); + }; + let expected_free = base + 1; + + // Spawn a thread that keeps the camped listener alive and accepts + // connections, answering with non-protocol garbage so the identity check + // classifies it as `Camped`, not `OurDaemon`. + std::thread::spawn(move || loop { + if let Ok((mut stream, _)) = camped_listener.accept() { + use std::io::Write; + let _ = stream.write_all(b"NOPE\n"); + } + }); + + // Give the thread a moment to start accepting. + std::thread::sleep(Duration::from_millis(50)); + + // Scan exactly {base (camped), base+1 (free)}. + let scan = PortScan { base, span: 2 }; + + match discovery::scan_for_daemon(scan) { + discovery::ScanOutcome::FreePort(port) => { + assert_eq!( + port, expected_free, + "scan should skip the camped base port and return base+1" + ); + } + other => panic!("expected FreePort, got {other:?}"), + } +} + +#[test] +fn scan_all_refused_returns_freeport_base() { + // Pick a high port that's almost certainly free. + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let base = listener.local_addr().unwrap().port(); + drop(listener); // Release the port so it's free again. + + std::thread::sleep(Duration::from_millis(10)); + + // Span of 1: probe only the single known-free `base` port. A wider span + // would risk colliding with a leaked health listener from another parallel + // test (they bind random high ports and leak for the test-process lifetime), + // which would be reported as `Found` rather than `FreePort`. + let scan = PortScan { base, span: 1 }; + match discovery::scan_for_daemon(scan) { + discovery::ScanOutcome::FreePort(port) => { + assert_eq!(port, base, "should return the first free port (base)"); + } + other => panic!("expected FreePort(base), got {other:?}"), + } +} + +#[test] +fn probe_refused_when_closed() { + // Bind a listener to get a port, then drop it so the port is closed. + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + drop(listener); + + std::thread::sleep(Duration::from_millis(10)); + + // Access the private probe_port via the public scan_for_daemon wrapper. + // We know that if the scan returns FreePort, then probe_port returned Refused. + let scan = PortScan { + base: port, + span: 1, + }; + match discovery::scan_for_daemon(scan) { + discovery::ScanOutcome::FreePort(p) => { + assert_eq!( + p, port, + "probe_port should have returned Refused for closed port" + ); + } + other => panic!("expected FreePort (Refused), got {other:?}"), + } +} + #[test] fn discover_finds_live_daemon() { let _lock = acquire_env_lock(); @@ -481,6 +753,56 @@ fn discover_finds_live_daemon() { assert_eq!(discovered.health_port, port); } +// ─── Unit tests: Version takeover decision (no env vars, safe parallel) ───────── + +#[test] +fn takeover_decision_newer_client_takes_over() { + assert!( + hyperdb_mcp::daemon::spawn::client_should_take_over("0.5.0", "0.4.0"), + "0.5.0 client should take over 0.4.0 daemon" + ); +} + +#[test] +fn takeover_decision_equal_version_reuses() { + assert!( + !hyperdb_mcp::daemon::spawn::client_should_take_over("0.4.0", "0.4.0"), + "equal versions should reuse daemon" + ); +} + +#[test] +fn takeover_decision_older_client_reuses() { + assert!( + !hyperdb_mcp::daemon::spawn::client_should_take_over("0.4.0", "0.5.0"), + "older client should reuse newer daemon" + ); +} + +#[test] +fn takeover_decision_client_unparseable_reuses() { + assert!( + !hyperdb_mcp::daemon::spawn::client_should_take_over("garbage", "0.4.0"), + "unparseable client version should reuse daemon" + ); +} + +#[test] +fn takeover_decision_daemon_unparseable_reuses() { + assert!( + !hyperdb_mcp::daemon::spawn::client_should_take_over("0.4.0", "garbage"), + "unparseable daemon version should reuse daemon" + ); +} + +#[test] +fn takeover_decision_both_unparseable_reuses() { + assert!( + !hyperdb_mcp::daemon::spawn::client_should_take_over("garbage", "junk"), + "both unparseable should reuse daemon" + ); +} + // ─── Integration tests: full daemon lifecycle with real hyperd ───────────────── #[test] @@ -531,7 +853,7 @@ fn daemon_mode_two_engines_share_same_hyperd() { ); // Verify the daemon is the one we started (health port reachable) let status = health::send_command(daemon.info.health_port, "PING").unwrap(); - assert_eq!(status.trim(), "PONG"); + assert!(status.trim().starts_with("PONG hyperdb-mcp ")); engine1.execute_command("CREATE TABLE foo (x INT)").unwrap(); engine1 @@ -595,7 +917,7 @@ fn daemon_mode_persistent_engine_data_is_queryable() { assert_eq!(rows[1]["name"], "beta"); let resp = health::send_command(daemon.info.health_port, "PING").unwrap(); - assert_eq!(resp.trim(), "PONG"); + assert!(resp.trim().starts_with("PONG hyperdb-mcp ")); } #[cfg(unix)] @@ -793,7 +1115,7 @@ impl TestDaemon { rt.block_on(async { let config = hyperdb_mcp::daemon::run::DaemonConfig { port: 0, - idle_timeout: Duration::from_secs(300), + idle_timeout: Some(Duration::from_secs(300)), }; hyperdb_mcp::daemon::run::run_daemon(config) .await diff --git a/hyperdb-mcp/tests/engine_tests.rs b/hyperdb-mcp/tests/engine_tests.rs index 2c6d606..3f5f66b 100644 --- a/hyperdb-mcp/tests/engine_tests.rs +++ b/hyperdb-mcp/tests/engine_tests.rs @@ -159,6 +159,21 @@ fn engine_status() { version.contains(".r"), "hyper_rust_api_version should carry .r: {version}" ); + + // Engine connection block. This is a `--no-daemon` engine (TestEngine uses + // `Engine::new_no_daemon`), so mode is "local", there's a private hyperd + // endpoint, and no daemon health port. + let engine_info = &status["engine"]; + assert_eq!(engine_info["mode"], "local"); + assert!( + engine_info["hyperd_endpoint"].is_string(), + "local engine should report a private hyperd endpoint, got {:?}", + engine_info["hyperd_endpoint"] + ); + assert!( + engine_info["daemon_health_port"].is_null(), + "local engine has no daemon health port" + ); } /// Regression test: calling `create_table` twice in append mode must be