diff --git a/EVENTS.md b/EVENTS.md new file mode 100644 index 00000000..4d122561 --- /dev/null +++ b/EVENTS.md @@ -0,0 +1,62 @@ +# Events + +`shpool` exposes an event stream so that external programs can react to changes +without polling. This way, a program (e.g. a TUI) can call `shpool list` (or the +equivalent `ConnectHeader::List` request over the main socket; see the +[`shpool-protocol`](./shpool-protocol) crate) after each event so that its model +is always consistent with shpool's state. + +## The events socket + +The daemon binds a sibling Unix socket next to the main shpool socket: + +```bash +/shpool/shpool.socket # main socket +/shpool/events.socket # events socket (this protocol) +``` + +A subscriber connects to `events.socket` and reads events. The daemon ignores anything written to the events socket, so for subscribers it's effectively read-only. + +## Event types + +| `type` | Meaning | +| ------------------ | -------------------------------------------------------- | +| `session.created` | A new session was added to the table. | +| `session.attached` | A client attached or reattached to a session. | +| `session.detached` | A client disconnected from a still-running session. | +| `session.removed` | A session was removed (shell exited, killed, or reaped). | + +Subscribers should ignore unknown `type` values so that future event types do +not break older consumers. + +## Wire format + +The daemon writes one JSON object per line (JSONL). Each event looks like: + +```json +{"type":""} +``` + +There are no other fields. To learn what the event refers to (which session, +when, etc.), call `shpool list` (or use `ConnectHeader::List`). + +## Subscribing + +For ad-hoc use, `shpool events` connects to the events socket and prints each +event line to stdout, flushing after each line: + +```bash +shpool events | while read -r ev; do + echo "got: $ev" + shpool list +done + +shpool events | jq . +``` + +## Slow subscribers + +Each subscriber has a bounded outbound queue. A subscriber that falls too far +behind is dropped by the daemon (in which case the subscriber can always reconnect). +There is no replay, so events that fired while a subscriber was disconnected are +lost. diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index 7b52ec71..d23e8357 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -17,7 +17,7 @@ use std::{env, os::unix::net::UnixListener, path::PathBuf}; use anyhow::Context; use tracing::{info, instrument}; -use crate::{config, consts, hooks}; +use crate::{config, consts, events, hooks}; mod etc_environment; mod exit_notify; @@ -81,8 +81,16 @@ pub fn run( (Some(socket.clone()), UnixListener::bind(&socket).context("binding to socket")?) } }; - // spawn the signal handler thread in the background - signals::Handler::new(cleanup_socket.clone()).spawn()?; + let events_socket = events::socket_path(&socket); + + // spawn the signal handler thread in the background. Both sockets need + // explicit cleanup on signal exit because the process exits before any + // RAII guard can run. + let mut socks_to_clean: Vec = cleanup_socket.iter().cloned().collect(); + socks_to_clean.push(events_socket.clone()); + signals::Handler::new(socks_to_clean).spawn()?; + + let _events_guard = server.start_events_listener(events_socket)?; server::Server::serve(server, listener)?; diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index adbeb622..1739853c 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -49,7 +49,7 @@ use crate::{ etc_environment, exit_notify::ExitNotifier, hooks, pager, pager::PagerError, prompt, shell, show_motd, ttl_reaper, }, - protocol, test_hooks, tty, user, + events, protocol, test_hooks, tty, user, }; const DEFAULT_INITIAL_SHELL_PATH: &str = "/usr/bin:/bin:/usr/sbin:/sbin"; @@ -74,6 +74,7 @@ pub struct Server { runtime_dir: PathBuf, register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>, hooks: Box, + events_bus: Arc, daily_messenger: Arc, log_level_handle: tracing_subscriber::reload::Handle< tracing_subscriber::filter::LevelFilter, @@ -93,12 +94,14 @@ impl Server { >, ) -> anyhow::Result> { let shells = Arc::new(Mutex::new(HashMap::new())); + let events_bus = events::EventBus::new(); // buffered so that we are unlikely to block when setting up a // new session let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10); let shells_tab = Arc::clone(&shells); + let reaper_bus = Arc::clone(&events_bus); thread::spawn(move || { - if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab) { + if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab, reaper_bus) { warn!("ttl reaper exited with error: {:?}", e); } }); @@ -110,11 +113,27 @@ impl Server { runtime_dir, register_new_reapable_session: new_sess_tx, hooks, + events_bus, daily_messenger, log_level_handle, })) } + /// Bind the events socket and spawn the accept thread. The returned + /// guard unlinks the socket file on drop. + pub fn start_events_listener( + self: &Arc, + socket_path: PathBuf, + ) -> anyhow::Result { + let server = Arc::clone(self); + events::start_listener(socket_path, move |stream| server.handle_events_subscriber(stream)) + } + + fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { + let receiver = self.events_bus.register(); + events::spawn_writer(stream, receiver) + } + #[instrument(skip_all)] pub fn serve(server: Arc, listener: UnixListener) -> anyhow::Result<()> { test_hooks::emit("daemon-about-to-listen"); @@ -310,7 +329,11 @@ impl Server { { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock().unwrap(); - shells.remove(&header.name); + // Gated: a concurrent kill or reaper may have already + // removed the entry and published its own removal. + if shells.remove(&header.name).is_some() { + self.events_bus.publish(&events::Event::SessionRemoved); + } } // The child shell has exited, so the shell->client thread should @@ -329,8 +352,10 @@ impl Server { let _s = span!(Level::INFO, "disconnect_lock(shells)").entered(); let shells = self.shells.lock().unwrap(); if let Some(session) = shells.get(&header.name) { + let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_disconnected_at = - Some(time::SystemTime::now()); + Some(now); + self.events_bus.publish(&events::Event::SessionDetached); } } if let Err(err) = self.hooks.on_client_disconnect(&header.name) { @@ -392,8 +417,9 @@ impl Server { // the channel is still open so the subshell is still running info!("taking over existing session inner"); inner.client_stream = Some(stream.try_clone()?); + let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_connected_at = - Some(time::SystemTime::now()); + Some(now); if inner .shell_to_client_join_h @@ -405,9 +431,11 @@ impl Server { "child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell" ); status = AttachStatus::Created { warnings }; + } else { + // Reattach confirmed; the create path won't run + // and clobber the entry, so it's safe to publish. + self.events_bus.publish(&events::Event::SessionAttached); } - - // status is already attached } Some(exit_status) => { // the channel is closed so we know the subshell exited @@ -466,13 +494,22 @@ impl Server { matches!(motd, MotdDisplayMode::Dump), )?; - session.lifecycle_timestamps.lock().unwrap().last_connected_at = - Some(time::SystemTime::now()); + let now = time::SystemTime::now(); + session.lifecycle_timestamps.lock().unwrap().last_connected_at = Some(now); { // we unwrap to propagate the poison as an unwind let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); let mut shells = self.shells.lock().unwrap(); + // If we're replacing a stale entry whose shell process is + // gone, surface that to subscribers before announcing the + // replacement. + let clobbered = shells.contains_key(&header.name); shells.insert(header.name.clone(), Box::new(session)); + if clobbered { + self.events_bus.publish(&events::Event::SessionRemoved); + } + self.events_bus.publish(&events::Event::SessionCreated); + self.events_bus.publish(&events::Event::SessionAttached); } // fallthrough to bidi streaming } else if let Err(err) = self.hooks.on_reattach(&header.name) { @@ -572,6 +609,10 @@ impl Server { if let shell::ClientConnectionStatus::DetachNone = status { not_attached_sessions.push(session); } else { + // The bidi-loop unwind in handle_attach owns the + // SessionDetached publish; we just update + // last_disconnected_at eagerly so a concurrent list() + // reflects the detach immediately. s.lifecycle_timestamps.lock().unwrap().last_disconnected_at = Some(time::SystemTime::now()); } @@ -631,6 +672,7 @@ impl Server { for session in to_remove.iter() { shells.remove(session); + self.events_bus.publish(&events::Event::SessionRemoved); } if !to_remove.is_empty() { test_hooks::emit("daemon-handle-kill-removed-shells"); @@ -646,8 +688,7 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock().unwrap(); - - let sessions: anyhow::Result> = shells + let sessions = shells .iter() .map(|(k, v)| { let status = match v.inner.try_lock() { @@ -660,7 +701,6 @@ impl Server { .last_connected_at .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) .transpose()?; - let last_disconnected_at_unix_ms = timestamps .last_disconnected_at .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) @@ -675,11 +715,9 @@ impl Server { status, }) }) - .collect(); - let sessions = sessions.context("collecting running session metadata")?; - + .collect::>>() + .context("collecting running session metadata")?; write_reply(&mut stream, ListReply { sessions })?; - Ok(()) } diff --git a/libshpool/src/daemon/signals.rs b/libshpool/src/daemon/signals.rs index 3359883d..5e533f04 100644 --- a/libshpool/src/daemon/signals.rs +++ b/libshpool/src/daemon/signals.rs @@ -23,11 +23,11 @@ use signal_hook::{consts::TERM_SIGNALS, flag, iterator::Signals}; use tracing::{error, info}; pub struct Handler { - sock: Option, + socks: Vec, } impl Handler { - pub fn new(sock: Option) -> Self { - Handler { sock } + pub fn new(socks: Vec) -> Self { + Handler { socks } } pub fn spawn(self) -> anyhow::Result<()> { @@ -57,10 +57,12 @@ impl Handler { for signal in &mut signals { assert!(TERM_SIGNALS.contains(&signal)); - info!("term sig handler: cleaning up socket"); - if let Some(sock) = self.sock { - if let Err(e) = std::fs::remove_file(sock).context("cleaning up socket") { - error!("error cleaning up socket file: {}", e); + info!("term sig handler: cleaning up sockets"); + for sock in &self.socks { + match std::fs::remove_file(sock) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => error!("error cleaning up socket {:?}: {}", sock, e), } } diff --git a/libshpool/src/daemon/ttl_reaper.rs b/libshpool/src/daemon/ttl_reaper.rs index f5664b75..5b4bfe4a 100644 --- a/libshpool/src/daemon/ttl_reaper.rs +++ b/libshpool/src/daemon/ttl_reaper.rs @@ -30,12 +30,14 @@ use std::{ use tracing::{info, span, warn, Level}; use super::shell; +use crate::events; /// Run the reaper thread loop. Should be invoked in a dedicated /// thread. pub fn run( new_sess: crossbeam_channel::Receiver<(String, Instant)>, shells: Arc>>>, + events_bus: Arc, ) -> anyhow::Result<()> { let _s = span!(Level::INFO, "ttl_reaper").entered(); @@ -115,6 +117,7 @@ pub fn run( continue; } shells.remove(&reapable.session_name); + events_bus.publish(&events::Event::SessionRemoved); } } } diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs new file mode 100644 index 00000000..4d9a6077 --- /dev/null +++ b/libshpool/src/events.rs @@ -0,0 +1,316 @@ +//! Push-event protocol for the daemon. +//! +//! Events are published to subscribers connected to a sibling Unix socket +//! next to the main shpool socket. The wire format is JSON, one event per +//! line (newline-delimited; aka JSONL). Non-Rust clients only need a Unix +//! socket and a JSON parser to consume the stream. +//! +//! Events carry no payload beyond their type — they signal that *something* +//! changed in the session table. Subscribers learn the new state by calling +//! `shpool list` (or the equivalent over the main socket). Subscribers that +//! fall too far behind are dropped and may simply reconnect. + +use std::{ + io::{BufRead, BufReader, Write}, + os::unix::net::{UnixListener, UnixStream}, + path::{Path, PathBuf}, + sync::{ + mpsc::{self, Receiver, SyncSender, TrySendError}, + Arc, Mutex, + }, + thread, + time::Duration, +}; + +use anyhow::Context; +use serde_derive::Serialize; +use tracing::{error, info, warn}; + +/// Per-subscriber outbound queue depth. Subscribers that fall this far +/// behind are dropped and must reconnect. +const SUBSCRIBER_QUEUE_DEPTH: usize = 64; + +/// Write timeout for stuck subscribers (e.g. suspended via Ctrl-Z). After +/// this elapses on a blocked write, the writer thread exits and the +/// subscriber is implicitly dropped on the next publish. +const WRITE_TIMEOUT: Duration = Duration::from_secs(5); + +/// An event published on the events socket. +#[derive(Serialize, Debug)] +#[serde(tag = "type")] +#[allow(clippy::enum_variant_names)] +pub enum Event { + #[serde(rename = "session.created")] + SessionCreated, + #[serde(rename = "session.attached")] + SessionAttached, + #[serde(rename = "session.detached")] + SessionDetached, + #[serde(rename = "session.removed")] + SessionRemoved, +} + +/// Fans out events to all connected subscribers. +/// +/// Lock ordering: callers that publish under another lock (e.g. the session +/// table) must take that lock before [`EventBus::publish`] takes its own +/// internal lock. Publishing under the table lock keeps wire-order = +/// causal-order across mutators. +pub struct EventBus { + subscribers: Mutex>>>, +} + +impl EventBus { + pub fn new() -> Arc { + Arc::new(Self { subscribers: Mutex::new(Vec::new()) }) + } + + /// Broadcast `event` to all current subscribers. Subscribers whose + /// queues are full or whose receivers have hung up are dropped. + pub fn publish(&self, event: &Event) { + let line = match serialize_line(event) { + Some(line) => line, + None => return, + }; + let mut subs = self.subscribers.lock().unwrap(); + subs.retain(|tx| match tx.try_send(line.clone()) { + Ok(()) => true, + Err(TrySendError::Full(_)) => { + warn!("dropping events subscriber: queue full"); + false + } + Err(TrySendError::Disconnected(_)) => false, + }); + } + + /// Register a new subscriber. Returns the receiver to be handed to a + /// writer thread. + pub fn register(&self) -> Receiver> { + let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); + self.subscribers.lock().unwrap().push(tx); + rx + } +} + +fn serialize_line(event: &Event) -> Option> { + match serde_json::to_string(event) { + Ok(s) => Some(format!("{s}\n").into()), + Err(e) => { + error!("serializing event {:?}: {:?}", event, e); + None + } + } +} + +/// Connect to the events socket, copy each line to stdout, and flush per +/// line so the stream is usable in pipes (`shpool events | jq`). Returns +/// when the daemon closes the connection. +pub fn subscribe_to_stdout(socket_path: &Path) -> anyhow::Result<()> { + let stream = UnixStream::connect(socket_path) + .with_context(|| format!("connecting to events socket {:?}", socket_path))?; + let reader = BufReader::new(stream); + let stdout = std::io::stdout(); + let mut out = stdout.lock(); + for line in reader.lines() { + let line = line.context("reading event")?; + writeln!(out, "{line}").context("writing event")?; + out.flush().context("flushing stdout")?; + } + Ok(()) +} + +/// Sibling events socket path next to the main shpool socket. +pub fn socket_path(main_socket: &Path) -> PathBuf { + let mut path = main_socket.to_path_buf(); + path.set_file_name("events.socket"); + path +} + +/// Owns the events socket file. Dropping the guard unlinks the socket +/// path so a fresh daemon doesn't trip on stale files. The accept thread +/// is not stopped — daemon shutdown takes the process down. +pub struct ListenerGuard { + path: PathBuf, +} + +impl Drop for ListenerGuard { + fn drop(&mut self) { + match std::fs::remove_file(&self.path) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => warn!("removing events socket {:?}: {:?}", self.path, e), + } + } +} + +/// Bind the events socket and spawn the accept thread. For each accepted +/// connection, `on_accept` is invoked with the stream; it is expected to +/// register the subscriber with the bus and spawn a writer thread (see +/// [`spawn_writer`]). The returned guard unlinks the socket file on drop. +pub fn start_listener(socket_path: PathBuf, on_accept: F) -> anyhow::Result +where + F: Fn(UnixStream) -> anyhow::Result<()> + Send + 'static, +{ + if socket_path.exists() { + std::fs::remove_file(&socket_path) + .with_context(|| format!("removing stale events socket {:?}", socket_path))?; + } + let listener = UnixListener::bind(&socket_path) + .with_context(|| format!("binding events socket {:?}", socket_path))?; + info!("events socket listening at {:?}", socket_path); + thread::Builder::new() + .name("events-accept".into()) + .spawn(move || run_accept_loop(listener, on_accept)) + .context("spawning events accept thread")?; + Ok(ListenerGuard { path: socket_path }) +} + +fn run_accept_loop(listener: UnixListener, on_accept: F) +where + F: Fn(UnixStream) -> anyhow::Result<()>, +{ + for stream in listener.incoming() { + match stream { + Ok(stream) => { + if let Err(e) = on_accept(stream) { + warn!("accepting events subscriber: {:?}", e); + } + } + Err(e) => { + error!("events listener accept failed: {:?}", e); + break; + } + } + } +} + +/// Set the write timeout and spawn a thread that drains `receiver` to +/// `stream` until either side closes or a write times out. +pub fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { + stream.set_write_timeout(Some(WRITE_TIMEOUT)).context("setting write timeout")?; + thread::Builder::new() + .name("events-writer".into()) + .spawn(move || run_writer(stream, receiver)) + .context("spawning events writer thread")?; + Ok(()) +} + +fn run_writer(mut stream: UnixStream, receiver: Receiver>) { + while let Ok(line) = receiver.recv() { + if let Err(e) = stream.write_all(line.as_bytes()) { + info!("events subscriber gone: {:?}", e); + break; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn json(event: &Event) -> String { + serde_json::to_string(event).unwrap() + } + + #[test] + fn session_created_serializes_with_only_type() { + assert_eq!(json(&Event::SessionCreated), r#"{"type":"session.created"}"#); + } + + #[test] + fn session_attached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionAttached), r#"{"type":"session.attached"}"#); + } + + #[test] + fn session_detached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionDetached), r#"{"type":"session.detached"}"#); + } + + #[test] + fn session_removed_serializes_with_only_type() { + assert_eq!(json(&Event::SessionRemoved), r#"{"type":"session.removed"}"#); + } + + #[test] + fn bus_publish_with_no_subscribers_is_a_noop() { + let bus = EventBus::new(); + bus.publish(&Event::SessionCreated); + } + + #[test] + fn bus_publish_reaches_subscriber() { + let bus = EventBus::new(); + let rx = bus.register(); + bus.publish(&Event::SessionCreated); + let line = rx.recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); + } + + #[test] + fn bus_drops_subscriber_whose_queue_is_full() { + let bus = EventBus::new(); + let rx = bus.register(); + for _ in 0..SUBSCRIBER_QUEUE_DEPTH { + bus.publish(&Event::SessionCreated); + } + assert_eq!(bus.subscribers.lock().unwrap().len(), 1); + bus.publish(&Event::SessionCreated); + assert_eq!(bus.subscribers.lock().unwrap().len(), 0); + drop(rx); + } + + #[test] + fn bus_drops_subscriber_whose_receiver_hung_up() { + let bus = EventBus::new(); + let rx = bus.register(); + drop(rx); + bus.publish(&Event::SessionCreated); + assert_eq!(bus.subscribers.lock().unwrap().len(), 0); + } + + #[test] + fn bus_publish_reaches_every_subscriber() { + let bus = EventBus::new(); + let rx_a = bus.register(); + let rx_b = bus.register(); + bus.publish(&Event::SessionCreated); + for rx in [&rx_a, &rx_b] { + let line = rx.recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); + } + } + + #[test] + fn writer_exits_when_peer_closes_stream() { + let (a, b) = UnixStream::pair().unwrap(); + let (tx, rx) = mpsc::sync_channel::>(8); + let handle = thread::spawn(move || run_writer(a, rx)); + drop(b); + // The send may succeed (kernel buffered) or fail; what matters is + // that closing the channel unblocks the writer thread on the next + // recv, regardless of write outcome. + let _ = tx.try_send("ignored\n".into()); + drop(tx); + handle.join().unwrap(); + } + + #[test] + fn spawn_writer_sets_write_timeout() { + let (a, _b) = UnixStream::pair().unwrap(); + let probe = a.try_clone().unwrap(); + let (_tx, rx) = mpsc::sync_channel(1); + spawn_writer(a, rx).unwrap(); + assert_eq!(probe.write_timeout().unwrap(), Some(WRITE_TIMEOUT)); + } + + #[test] + fn listener_guard_unlinks_socket_on_drop() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let guard = start_listener(path.clone(), |_| Ok(())).unwrap(); + assert!(path.exists(), "socket file should exist while guard is alive"); + drop(guard); + assert!(!path.exists(), "socket file should be unlinked on guard drop"); + } +} diff --git a/libshpool/src/lib.rs b/libshpool/src/lib.rs index 354ba851..2b728d48 100644 --- a/libshpool/src/lib.rs +++ b/libshpool/src/lib.rs @@ -36,6 +36,7 @@ mod daemon; mod daemonize; mod detach; mod duration; +mod events; mod hooks; mod kill; mod list; @@ -209,6 +210,15 @@ needs debugging, but would be clobbered by a restart.")] #[clap(help = "new log level")] level: shpool_protocol::LogLevel, }, + + #[clap(about = "Subscribe to the daemon's push-event stream + +Connects to the events socket and writes each event (one JSON object +per line) to stdout, flushing after every line so the stream is +pipeline-friendly (e.g. `shpool events | jq`). The first line is a +snapshot of the current session table; subsequent lines are deltas. +Reconnect to force a fresh snapshot.")] + Events, } impl Args { @@ -381,6 +391,7 @@ pub fn run(args: Args, hooks: Option>) -> an Commands::Kill { sessions } => kill::run(sessions, socket), Commands::List { json } => list::run(socket, json), Commands::SetLogLevel { level } => set_log_level::run(level, socket), + Commands::Events => events::subscribe_to_stdout(&events::socket_path(&socket)), }; if let Err(err) = res { diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs new file mode 100644 index 00000000..e360d05c --- /dev/null +++ b/shpool/tests/events.rs @@ -0,0 +1,194 @@ +use std::{ + io::{BufRead, BufReader}, + os::unix::net::UnixStream, + path::PathBuf, + time::Duration, +}; + +use anyhow::{anyhow, Context}; +use ntest::timeout; +use serde_json::Value; + +mod support; + +use crate::support::daemon::{AttachArgs, DaemonArgs, Proc}; + +fn events_socket_path(daemon: &Proc) -> PathBuf { + daemon.socket_path.with_file_name("events.socket") +} + +fn connect_events(daemon: &Proc) -> anyhow::Result> { + let path = events_socket_path(daemon); + let mut sleep_dur = Duration::from_millis(5); + for _ in 0..12 { + if let Ok(stream) = UnixStream::connect(&path) { + return Ok(BufReader::new(stream)); + } + std::thread::sleep(sleep_dur); + sleep_dur *= 2; + } + Err(anyhow!("events socket never became available at {:?}", path)) +} + +fn next_event(reader: &mut BufReader) -> anyhow::Result { + let mut line = String::new(); + let n = reader.read_line(&mut line).context("reading event line")?; + if n == 0 { + return Err(anyhow!("events socket closed unexpectedly")); + } + serde_json::from_str(&line).with_context(|| format!("parsing event JSON: {line:?}")) +} + +#[test] +#[timeout(30000)] +fn lifecycle() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + // Background attach: client connects, daemon publishes created+attached; + // the client immediately detaches, triggering the detached event. + let _attach = daemon + .attach("s1", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("starting attach proc")?; + + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + let kill_out = daemon.kill(vec!["s1".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + assert_eq!(next_event(&mut sub)?["type"], "session.removed"); + + Ok(()) +} + +// `shpool detach` triggers two code paths that both updated the session: +// the explicit handler and, asynchronously, the bidi-loop unwind in the +// attach worker. An earlier version emitted SessionDetached from both, +// producing a duplicate event. This test pins exactly one detached event +// per detach by using a kill as a known-next-event fence — if a duplicate +// detached were buffered, the next read would return it instead of +// `session.removed`. +#[test] +#[timeout(30000)] +fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + // Foreground attach (no `background`) keeps the session attached. + let _attach = daemon + .attach("s", AttachArgs { null_stdin: true, ..AttachArgs::default() }) + .context("starting attach proc")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + + let detach_out = daemon.detach(vec!["s".into()]).context("running detach")?; + assert!(detach_out.status.success(), "detach failed: {:?}", detach_out); + + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + // Wait for the unwind path to complete (session shows disconnected in + // the list output) so any duplicate detached event would already be + // queued by the time we issue the kill below. + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let kill_out = daemon.kill(vec!["s".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + let next = next_event(&mut sub)?; + assert_eq!( + next["type"], "session.removed", + "expected next event to be removed, got {next} — possible duplicate detached" + ); + + Ok(()) +} + +// Reattach should produce a single `session.attached` for the existing +// session, with no `session.created`. Catches regressions where the +// reattach path accidentally falls through to the create path. +#[test] +#[timeout(30000)] +fn reattach_emits_attached_only() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + let _attach1 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("first attach")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let _attach2 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("reattach")?; + + let attached = next_event(&mut sub)?; + assert_eq!( + attached["type"], "session.attached", + "expected attached on reattach, got {attached}" + ); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + Ok(()) +} + +// SIGTERM should clean up both sockets via the signal handler, since +// process::exit bypasses any RAII guard. +#[test] +#[timeout(30000)] +fn signal_exit_unlinks_sockets() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let main_sock = daemon.socket_path.clone(); + let events_sock = events_socket_path(&daemon); + assert!(main_sock.exists(), "main socket should exist while daemon runs"); + assert!(events_sock.exists(), "events socket should exist while daemon runs"); + + let pid = nix::unistd::Pid::from_raw( + daemon.proc.as_ref().expect("daemon process handle").id() as i32, + ); + nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM).context("sending SIGTERM")?; + daemon.proc_wait().context("waiting for daemon to exit")?; + + assert!(!main_sock.exists(), "main socket should be unlinked on signal exit"); + assert!(!events_sock.exists(), "events socket should be unlinked on signal exit"); + + Ok(()) +} + +#[test] +#[timeout(30000)] +fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub_a = connect_events(&daemon)?; + let mut sub_b = connect_events(&daemon)?; + + let _attach = daemon + .attach( + "shared", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + for sub in [&mut sub_a, &mut sub_b] { + assert_eq!(next_event(sub)?["type"], "session.created"); + assert_eq!(next_event(sub)?["type"], "session.attached"); + assert_eq!(next_event(sub)?["type"], "session.detached"); + } + + Ok(()) +}