Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions EVENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Events

`shpool` exposes an event stream so that external programs can react to changes
without polling. This way, a program (e.g. a TUI) can call `shpool list` (or the
equivalent `ConnectHeader::List` request over the main socket; see the
[`shpool-protocol`](./shpool-protocol) crate) after each event so that its model
is always consistent with shpool's state.

## The events socket

The daemon binds a sibling Unix socket next to the main shpool socket:

```bash
<runtime_dir>/shpool/shpool.socket # main socket
<runtime_dir>/shpool/events.socket # events socket (this protocol)
```

A subscriber connects to `events.socket` and reads events. The daemon ignores anything written to the events socket, so for subscribers it's effectively read-only.

## Event types

| `type` | Meaning |
| ------------------ | -------------------------------------------------------- |
| `session.created` | A new session was added to the table. |
| `session.attached` | A client attached or reattached to a session. |
| `session.detached` | A client disconnected from a still-running session. |
| `session.removed` | A session was removed (shell exited, killed, or reaped). |

Subscribers should ignore unknown `type` values so that future event types do
not break older consumers.

## Wire format

The daemon writes one JSON object per line (JSONL). Each event looks like:

```json
{"type":"<event-type>"}
```

There are no other fields. To learn what the event refers to (which session,
when, etc.), call `shpool list` (or use `ConnectHeader::List`).

## Subscribing

For ad-hoc use, `shpool events` connects to the events socket and prints each
event line to stdout, flushing after each line:

```bash
shpool events | while read -r ev; do
echo "got: $ev"
shpool list
done

shpool events | jq .
```

## Slow subscribers

Each subscriber has a bounded outbound queue. A subscriber that falls too far
behind is dropped by the daemon (in which case the subscriber can always reconnect).
There is no replay, so events that fired while a subscriber was disconnected are
lost.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should document a knob in the config.toml that tunes how large this queue is.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want the config knob added in this PR?

The queues currently use std::sync::mpsc::sync_channel here, pre-allocating all SUBSCRIBER_QUEUE_DEPTH slots.

Would you want to use something slightly slower that allocates on-demand and has something like shrink_to_fit to return memory after a spike? I'm just imagining if someone sets their queue depth to 1 billion not expecting it to pre-allocate for each subscriber. If it instead allocates on-demand you could maybe set the queue depth to something large like 100K by default anyway, at which point maybe it wouldn't be worth offering a config knob.

14 changes: 11 additions & 3 deletions libshpool/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{env, os::unix::net::UnixListener, path::PathBuf};
use anyhow::Context;
use tracing::{info, instrument};

use crate::{config, consts, hooks};
use crate::{config, consts, events, hooks};

mod etc_environment;
mod exit_notify;
Expand Down Expand Up @@ -81,8 +81,16 @@ pub fn run(
(Some(socket.clone()), UnixListener::bind(&socket).context("binding to socket")?)
}
};
// spawn the signal handler thread in the background
signals::Handler::new(cleanup_socket.clone()).spawn()?;
let events_socket = events::socket_path(&socket);

// spawn the signal handler thread in the background. Both sockets need
// explicit cleanup on signal exit because the process exits before any
// RAII guard can run.
let mut socks_to_clean: Vec<PathBuf> = cleanup_socket.iter().cloned().collect();
socks_to_clean.push(events_socket.clone());
signals::Handler::new(socks_to_clean).spawn()?;

let _events_guard = server.start_events_listener(events_socket)?;

server::Server::serve(server, listener)?;

Expand Down
70 changes: 54 additions & 16 deletions libshpool/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
etc_environment, exit_notify::ExitNotifier, hooks, pager, pager::PagerError, prompt, shell,
show_motd, ttl_reaper,
},
protocol, test_hooks, tty, user,
events, protocol, test_hooks, tty, user,
};

const DEFAULT_INITIAL_SHELL_PATH: &str = "/usr/bin:/bin:/usr/sbin:/sbin";
Expand All @@ -74,6 +74,7 @@ pub struct Server {
runtime_dir: PathBuf,
register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>,
hooks: Box<dyn hooks::Hooks + Send + Sync>,
events_bus: Arc<events::EventBus>,
daily_messenger: Arc<show_motd::DailyMessenger>,
log_level_handle: tracing_subscriber::reload::Handle<
tracing_subscriber::filter::LevelFilter,
Expand All @@ -93,12 +94,14 @@ impl Server {
>,
) -> anyhow::Result<Arc<Self>> {
let shells = Arc::new(Mutex::new(HashMap::new()));
let events_bus = events::EventBus::new();
// buffered so that we are unlikely to block when setting up a
// new session
let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10);
let shells_tab = Arc::clone(&shells);
let reaper_bus = Arc::clone(&events_bus);
thread::spawn(move || {
if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab) {
if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab, reaper_bus) {
warn!("ttl reaper exited with error: {:?}", e);
}
});
Expand All @@ -110,11 +113,27 @@ impl Server {
runtime_dir,
register_new_reapable_session: new_sess_tx,
hooks,
events_bus,
daily_messenger,
log_level_handle,
}))
}

/// Bind the events socket and spawn the accept thread. The returned
/// guard unlinks the socket file on drop.
pub fn start_events_listener(
self: &Arc<Self>,
socket_path: PathBuf,
) -> anyhow::Result<events::ListenerGuard> {
let server = Arc::clone(self);
events::start_listener(socket_path, move |stream| server.handle_events_subscriber(stream))
}

fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> {
let receiver = self.events_bus.register();
events::spawn_writer(stream, receiver)
}

#[instrument(skip_all)]
pub fn serve(server: Arc<Self>, listener: UnixListener) -> anyhow::Result<()> {
test_hooks::emit("daemon-about-to-listen");
Expand Down Expand Up @@ -310,7 +329,11 @@ impl Server {
{
let _s = span!(Level::INFO, "2_lock(shells)").entered();
let mut shells = self.shells.lock().unwrap();
shells.remove(&header.name);
// Gated: a concurrent kill or reaper may have already
// removed the entry and published its own removal.
if shells.remove(&header.name).is_some() {
self.events_bus.publish(&events::Event::SessionRemoved);
}
}

// The child shell has exited, so the shell->client thread should
Expand All @@ -329,8 +352,10 @@ impl Server {
let _s = span!(Level::INFO, "disconnect_lock(shells)").entered();
let shells = self.shells.lock().unwrap();
if let Some(session) = shells.get(&header.name) {
let now = time::SystemTime::now();
session.lifecycle_timestamps.lock().unwrap().last_disconnected_at =
Some(time::SystemTime::now());
Some(now);
self.events_bus.publish(&events::Event::SessionDetached);
}
}
if let Err(err) = self.hooks.on_client_disconnect(&header.name) {
Expand Down Expand Up @@ -392,8 +417,9 @@ impl Server {
// the channel is still open so the subshell is still running
info!("taking over existing session inner");
inner.client_stream = Some(stream.try_clone()?);
let now = time::SystemTime::now();
session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());
Some(now);

if inner
.shell_to_client_join_h
Expand All @@ -405,9 +431,11 @@ impl Server {
"child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell"
);
status = AttachStatus::Created { warnings };
} else {
// Reattach confirmed; the create path won't run
// and clobber the entry, so it's safe to publish.
self.events_bus.publish(&events::Event::SessionAttached);
}

// status is already attached
}
Some(exit_status) => {
// the channel is closed so we know the subshell exited
Expand Down Expand Up @@ -466,13 +494,22 @@ impl Server {
matches!(motd, MotdDisplayMode::Dump),
)?;

session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());
let now = time::SystemTime::now();
session.lifecycle_timestamps.lock().unwrap().last_connected_at = Some(now);
{
// we unwrap to propagate the poison as an unwind
let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered();
let mut shells = self.shells.lock().unwrap();
// If we're replacing a stale entry whose shell process is
// gone, surface that to subscribers before announcing the
// replacement.
let clobbered = shells.contains_key(&header.name);
shells.insert(header.name.clone(), Box::new(session));
if clobbered {
self.events_bus.publish(&events::Event::SessionRemoved);
}
self.events_bus.publish(&events::Event::SessionCreated);
self.events_bus.publish(&events::Event::SessionAttached);
}
// fallthrough to bidi streaming
} else if let Err(err) = self.hooks.on_reattach(&header.name) {
Expand Down Expand Up @@ -572,6 +609,10 @@ impl Server {
if let shell::ClientConnectionStatus::DetachNone = status {
not_attached_sessions.push(session);
} else {
// The bidi-loop unwind in handle_attach owns the
// SessionDetached publish; we just update
// last_disconnected_at eagerly so a concurrent list()
// reflects the detach immediately.
s.lifecycle_timestamps.lock().unwrap().last_disconnected_at =
Some(time::SystemTime::now());
}
Expand Down Expand Up @@ -631,6 +672,7 @@ impl Server {

for session in to_remove.iter() {
shells.remove(session);
self.events_bus.publish(&events::Event::SessionRemoved);
}
if !to_remove.is_empty() {
test_hooks::emit("daemon-handle-kill-removed-shells");
Expand All @@ -646,8 +688,7 @@ impl Server {
fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> {
let _s = span!(Level::INFO, "lock(shells)").entered();
let shells = self.shells.lock().unwrap();

let sessions: anyhow::Result<Vec<Session>> = shells
let sessions = shells
.iter()
.map(|(k, v)| {
let status = match v.inner.try_lock() {
Expand All @@ -660,7 +701,6 @@ impl Server {
.last_connected_at
.map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64))
.transpose()?;

let last_disconnected_at_unix_ms = timestamps
.last_disconnected_at
.map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64))
Expand All @@ -675,11 +715,9 @@ impl Server {
status,
})
})
.collect();
let sessions = sessions.context("collecting running session metadata")?;

.collect::<anyhow::Result<Vec<Session>>>()
.context("collecting running session metadata")?;
write_reply(&mut stream, ListReply { sessions })?;

Ok(())
}

Expand Down
16 changes: 9 additions & 7 deletions libshpool/src/daemon/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use signal_hook::{consts::TERM_SIGNALS, flag, iterator::Signals};
use tracing::{error, info};

pub struct Handler {
sock: Option<PathBuf>,
socks: Vec<PathBuf>,
}
impl Handler {
pub fn new(sock: Option<PathBuf>) -> Self {
Handler { sock }
pub fn new(socks: Vec<PathBuf>) -> Self {
Handler { socks }
}

pub fn spawn(self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -57,10 +57,12 @@ impl Handler {
for signal in &mut signals {
assert!(TERM_SIGNALS.contains(&signal));

info!("term sig handler: cleaning up socket");
if let Some(sock) = self.sock {
if let Err(e) = std::fs::remove_file(sock).context("cleaning up socket") {
error!("error cleaning up socket file: {}", e);
info!("term sig handler: cleaning up sockets");
for sock in &self.socks {
match std::fs::remove_file(sock) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => error!("error cleaning up socket {:?}: {}", sock, e),
}
}

Expand Down
3 changes: 3 additions & 0 deletions libshpool/src/daemon/ttl_reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use std::{
use tracing::{info, span, warn, Level};

use super::shell;
use crate::events;

/// Run the reaper thread loop. Should be invoked in a dedicated
/// thread.
pub fn run(
new_sess: crossbeam_channel::Receiver<(String, Instant)>,
shells: Arc<Mutex<HashMap<String, Box<shell::Session>>>>,
events_bus: Arc<events::EventBus>,
) -> anyhow::Result<()> {
let _s = span!(Level::INFO, "ttl_reaper").entered();

Expand Down Expand Up @@ -115,6 +117,7 @@ pub fn run(
continue;
}
shells.remove(&reapable.session_name);
events_bus.publish(&events::Event::SessionRemoved);
}
}
}
Expand Down
Loading