diff --git a/README.md b/README.md index 6697b3b..60b1cbe 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ sudo baudbot deploy # start the service sudo baudbot start -# check health +# check health (includes deployed version + broker connection/health status) sudo baudbot status sudo baudbot doctor ``` diff --git a/bin/baudbot b/bin/baudbot index f01cb5f..a70512d 100755 --- a/bin/baudbot +++ b/bin/baudbot @@ -103,7 +103,7 @@ usage() { echo " start Start the agent (systemd, or --direct for foreground)" echo " stop Stop the agent" echo " restart Restart the agent" - echo " status Show agent status + deployed version" + echo " status Show agent status + deployed version + broker connection" echo " logs Tail agent logs" echo " attach Attach to control-agent by default; supports --pi/--tmux" echo " sessions List agent tmux and pi sessions (name → id)" @@ -326,6 +326,134 @@ print_deployed_version() { echo -e "${BOLD}deployed version:${RESET} $line" } +broker_mode_configured() { + local env_file="/home/${1:-baudbot_agent}/.config/.env" + [ -r "$env_file" ] || return 1 + grep -Eq '^SLACK_BROKER_URL=[^[:space:]].*$' "$env_file" || return 1 + grep -Eq '^SLACK_BROKER_WORKSPACE_ID=[^[:space:]].*$' "$env_file" || return 1 +} + +print_broker_connection_status() { + local agent_user="${BAUDBOT_AGENT_USER:-baudbot_agent}" + local health_file="/home/$agent_user/.pi/agent/broker-health.json" + local health_summary="" + local connection_state="" + local components_line="" + + if ! broker_mode_configured "$agent_user"; then + echo -e "${BOLD}broker connection:${RESET} not configured" + return 0 + fi + + if [ "$(id -u)" -eq 0 ]; then + sudo -u "$agent_user" tmux has-session -t slack-bridge 2>/dev/null || { + echo -e "${BOLD}broker connection:${RESET} disconnected (bridge tmux session not running)" + return 0 + } + elif [ "$(id -un)" = "$agent_user" ]; then + tmux has-session -t slack-bridge 2>/dev/null || { + echo -e "${BOLD}broker connection:${RESET} disconnected (bridge tmux session not running)" + return 0 + } + else + echo -e "${BOLD}broker connection:${RESET} configured (run with sudo for runtime status)" + return 0 + fi + + if [ ! -r "$health_file" ]; then + echo -e "${BOLD}broker connection:${RESET} starting" + echo -e "${BOLD}broker health:${RESET} unavailable (waiting for bridge health file)" + return 0 + fi + + health_summary="$(python3 - "$health_file" <<'PY' +import json +import sys +from datetime import datetime, timezone + +path = sys.argv[1] +with open(path, 'r', encoding='utf-8') as f: + h = json.load(f) + +def parse_iso(s): + if not s: + return None + try: + if s.endswith('Z'): + s = s[:-1] + '+00:00' + dt = datetime.fromisoformat(s) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except Exception: + return None + +def age_seconds(ts): + dt = parse_iso(ts) + if not dt: + return None + return (datetime.now(timezone.utc) - dt).total_seconds() + +def status(ok_ts, err_ts): + ok_dt = parse_iso(ok_ts) + err_dt = parse_iso(err_ts) + if err_dt and (not ok_dt or err_dt >= ok_dt): + return 'error' + if ok_dt: + return 'ok' + return 'unknown' + +poll = h.get('poll', {}) +inbound = h.get('inbound', {}) +ack = h.get('ack', {}) +outbound = h.get('outbound', {}) + +poll_age = age_seconds(poll.get('last_ok_at')) +poll_failures = int(poll.get('consecutive_failures') or 0) +poll_state = status(poll.get('last_ok_at'), poll.get('last_error_at')) + +if poll_state == 'error' and poll_failures > 0: + connection = 'reconnecting' +elif poll_age is not None and poll_age <= 120: + connection = 'connected' +elif poll_age is not None: + connection = 'stale' +else: + connection = 'starting' + +inbound_state = status(inbound.get('last_process_ok_at'), inbound.get('last_process_error_at')) +ack_state = status(ack.get('last_ok_at'), ack.get('last_error_at')) +outbound_state = status(outbound.get('last_ok_at'), outbound.get('last_error_at')) + +print(connection) +print(f'poll={poll_state} inbound={inbound_state} ack={ack_state} outbound={outbound_state}') +PY + )" + + connection_state="$(printf '%s\n' "$health_summary" | sed -n '1p')" + components_line="$(printf '%s\n' "$health_summary" | sed -n '2p')" + + case "$connection_state" in + connected) + echo -e "${BOLD}broker connection:${RESET} connected" + ;; + reconnecting) + echo -e "${BOLD}broker connection:${RESET} reconnecting" + ;; + stale) + echo -e "${BOLD}broker connection:${RESET} stale (no recent successful poll)" + ;; + starting) + echo -e "${BOLD}broker connection:${RESET} starting" + ;; + *) + echo -e "${BOLD}broker connection:${RESET} unknown" + ;; + esac + + [ -n "$components_line" ] && echo -e "${BOLD}broker health:${RESET} $components_line" +} + pi_control_dir() { local agent_user="${1:-baudbot_agent}" echo "/home/$agent_user/.pi/session-control" @@ -473,6 +601,7 @@ case "${1:-}" in systemctl status baudbot "$@" || status_rc=$? echo "" print_deployed_version + print_broker_connection_status exit "$status_rc" else # Fallback: check if baudbot_agent has pi running @@ -484,6 +613,7 @@ case "${1:-}" in fi echo "" print_deployed_version + print_broker_connection_status fi ;; diff --git a/docs/operations.md b/docs/operations.md index a565400..fd58fb8 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -10,7 +10,7 @@ sudo baudbot start sudo baudbot stop sudo baudbot restart -# Status and logs +# Status and logs (status includes deployed version + broker connection/health state) sudo baudbot status sudo baudbot logs diff --git a/slack-bridge/broker-bridge.mjs b/slack-bridge/broker-bridge.mjs index c362ed9..bff9f13 100755 --- a/slack-bridge/broker-bridge.mjs +++ b/slack-bridge/broker-bridge.mjs @@ -36,6 +36,7 @@ const POLL_INTERVAL_MS = parseInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || " const MAX_MESSAGES = parseInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 10); const DEDUPE_TTL_MS = parseInt(process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000), 10); const MAX_BACKOFF_MS = 30_000; +const BROKER_HEALTH_PATH = path.join(homedir(), ".pi", "agent", "broker-health.json"); function ts() { return new Date().toISOString(); @@ -95,6 +96,114 @@ let cryptoState = null; const dedupe = new Map(); +const brokerHealth = { + started_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + outbound_mode: outboundMode, + broker_url: brokerBaseUrl, + workspace_id: workspaceId, + poll: { + last_ok_at: null, + last_error_at: null, + consecutive_failures: 0, + last_error: null, + }, + inbound: { + last_decrypt_ok_at: null, + last_decrypt_error_at: null, + last_process_ok_at: null, + last_process_error_at: null, + last_error: null, + }, + ack: { + last_ok_at: null, + last_error_at: null, + last_error: null, + }, + outbound: { + last_ok_at: null, + last_error_at: null, + last_error: null, + }, +}; + +function trimError(err) { + const msg = err instanceof Error ? err.message : String(err || "unknown error"); + return msg.slice(0, 400); +} + +function persistBrokerHealth() { + brokerHealth.updated_at = new Date().toISOString(); + const dir = path.dirname(BROKER_HEALTH_PATH); + const tmp = `${BROKER_HEALTH_PATH}.tmp`; + fs.mkdirSync(dir, { recursive: true }); + fs.writeFileSync(tmp, `${JSON.stringify(brokerHealth, null, 2)}\n`, { mode: 0o600 }); + fs.renameSync(tmp, BROKER_HEALTH_PATH); +} + +function markHealth(section, ok, err = null) { + const now = new Date().toISOString(); + + if (section === "poll") { + if (ok) { + brokerHealth.poll.last_ok_at = now; + brokerHealth.poll.consecutive_failures = 0; + brokerHealth.poll.last_error = null; + } else { + brokerHealth.poll.last_error_at = now; + brokerHealth.poll.consecutive_failures += 1; + brokerHealth.poll.last_error = trimError(err); + } + persistBrokerHealth(); + return; + } + + if (section === "inbound_decrypt") { + if (ok) { + brokerHealth.inbound.last_decrypt_ok_at = now; + } else { + brokerHealth.inbound.last_decrypt_error_at = now; + brokerHealth.inbound.last_error = trimError(err); + } + persistBrokerHealth(); + return; + } + + if (section === "inbound_process") { + if (ok) { + brokerHealth.inbound.last_process_ok_at = now; + } else { + brokerHealth.inbound.last_process_error_at = now; + brokerHealth.inbound.last_error = trimError(err); + } + persistBrokerHealth(); + return; + } + + if (section === "ack") { + if (ok) { + brokerHealth.ack.last_ok_at = now; + brokerHealth.ack.last_error = null; + } else { + brokerHealth.ack.last_error_at = now; + brokerHealth.ack.last_error = trimError(err); + } + persistBrokerHealth(); + return; + } + + if (section === "outbound") { + if (ok) { + brokerHealth.outbound.last_ok_at = now; + brokerHealth.outbound.last_error = null; + } else { + brokerHealth.outbound.last_error_at = now; + brokerHealth.outbound.last_error = trimError(err); + } + persistBrokerHealth(); + } +} + function toBase64(bytes) { return Buffer.from(bytes).toString("base64"); } @@ -329,15 +438,22 @@ async function sendViaBroker({ action, routing, body }) { const sig = sodium.crypto_sign_detached(canonical, cryptoState.serverSignSecretKey); const signature = toBase64(sig); - return brokerFetch("/api/send", { - workspace_id: workspaceId, - action, - routing, - encrypted_body: encryptedBody, - nonce: nonceB64, - timestamp, - signature, - }); + try { + const result = await brokerFetch("/api/send", { + workspace_id: workspaceId, + action, + routing, + encrypted_body: encryptedBody, + nonce: nonceB64, + timestamp, + signature, + }); + markHealth("outbound", true); + return result; + } catch (err) { + markHealth("outbound", false, err); + throw err; + } } /** @@ -386,11 +502,13 @@ async function sendDirectToSlack(apiMethod, params) { const error = data.error || response.statusText; throw new Error(`Slack API ${apiMethod} failed: ${sanitizeError(error)}`); } - + + markHealth("outbound", true); return data; } catch (err) { // Sanitize any error messages to prevent token leakage const sanitizedMessage = sanitizeError(err.message || String(err)); + markHealth("outbound", false, sanitizedMessage); throw new Error(sanitizedMessage); } } @@ -519,7 +637,15 @@ async function processPulledMessage(message) { throw new Error("invalid broker envelope signature"); } - const payload = decryptEnvelope(message); + let payload; + try { + payload = decryptEnvelope(message); + markHealth("inbound_decrypt", true); + } catch (err) { + markHealth("inbound_decrypt", false, err); + throw err; + } + logInfo(`📦 decrypted envelope — type: ${payload?.type || "unknown"}`); if (payload?.type !== "event_callback") { @@ -719,10 +845,13 @@ async function startPollLoop() { const STATUS_LOG_INTERVAL_MS = 60_000; // log a status line every 60s even when idle while (true) { + let pollSucceeded = false; try { pruneDedupe(); const messages = await pullInbox(); + pollSucceeded = true; + markHealth("poll", true); pollCount++; const ackIds = []; @@ -756,6 +885,7 @@ async function startPollLoop() { logInfo(`📩 processing message ${message.message_id}`); const ok = await processPulledMessage(message); if (ok) { + markHealth("inbound_process", true); dedupe.set(message.message_id, Date.now() + DEDUPE_TTL_MS); ackIds.push(message.message_id); logInfo(`✅ processed & acked message ${message.message_id}`); @@ -763,6 +893,7 @@ async function startPollLoop() { logWarn(`⚠️ message ${message.message_id} returned not-ok, will retry next poll`); } } catch (err) { + markHealth("inbound_process", false, err); const errMsg = err instanceof Error ? err.message : "unknown error"; const errStack = err instanceof Error ? err.stack : ""; logError(`❌ message processing failed (${message.message_id}): ${errMsg}`); @@ -777,17 +908,31 @@ async function startPollLoop() { } if (ackIds.length > 0) { - await ackInbox(ackIds); - logInfo(`📤 acked ${ackIds.length} message(s)`); + try { + await ackInbox(ackIds); + markHealth("ack", true); + logInfo(`📤 acked ${ackIds.length} message(s)`); + } catch (err) { + markHealth("ack", false, err); + throw err; + } } backoffMs = POLL_INTERVAL_MS; await sleep(POLL_INTERVAL_MS); } catch (err) { - const errMsg = err instanceof Error ? err.message : "unknown error"; - const errStack = err instanceof Error ? err.stack : ""; - logError(`❌ inbox poll failed: ${errMsg}`); - if (errStack) logError(` stack: ${errStack}`); + if (!pollSucceeded) { + markHealth("poll", false, err); + const errMsg = err instanceof Error ? err.message : "unknown error"; + const errStack = err instanceof Error ? err.stack : ""; + logError(`❌ inbox poll failed: ${errMsg}`); + if (errStack) logError(` stack: ${errStack}`); + } else { + const errMsg = err instanceof Error ? err.message : "unknown error"; + const errStack = err instanceof Error ? err.stack : ""; + logError(`❌ broker cycle failed after successful poll: ${errMsg}`); + if (errStack) logError(` stack: ${errStack}`); + } logError(` ↳ backing off ${backoffMs}ms before next attempt`); await sleep(backoffMs); backoffMs = Math.min(MAX_BACKOFF_MS, Math.max(POLL_INTERVAL_MS, backoffMs * 2)); @@ -811,6 +956,7 @@ async function startPollLoop() { refreshSocket(); startApiServer(); + persistBrokerHealth(); logInfo("⚡ Slack broker pull bridge is running!"); logInfo(` outbound mode: ${outboundMode} ${outboundMode === "direct" ? "(using SLACK_BOT_TOKEN)" : "(via broker)"}`); logInfo(` broker: ${brokerBaseUrl}`);