Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions libshpool/src/daemon/prompt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,31 @@
// This file contains the logic for injecting the `prompt_annotation`
// config option into a user's prompt for known shells.

use std::io::{Read, Write};
use std::{
io::{Read, Write},
time,
};

use anyhow::{anyhow, Context};
use tracing::{debug, info, instrument, warn};
use nix::{poll, poll::PollFlags};
use tracing::{debug, error, info, instrument, warn};

use crate::{
consts::{SENTINEL_FLAG_VAR, STARTUP_SENTINEL},
daemon::trie::{Trie, TrieCursor},
test_hooks,
};

// We don't need an agressive poll cadence because the normal case is
// that we exit the startup sentinal loop after some data comes in and
// we scan the sentinal successfully. We only need to wake up every so
// often to check if we've hit our timeout, which is long.
const SENTINEL_POLL_MS: u16 = 500;

// Even the most sluggish dotfile setup ought to be done within
// 90 seconds.
const SENTINEL_POLL_TIMEOUT: time::Duration = time::Duration::from_secs(90);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it could be worth mentioning this change in the commit, my first thought when I saw this after reading the title was that the solution was to add a timeout.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.


#[derive(Debug, Clone)]
enum KnownShell {
Bash,
Expand Down Expand Up @@ -120,6 +135,7 @@ pub fn maybe_inject_prefix(

#[instrument(skip_all)]
fn wait_for_startup(pty_master: &mut shpool_pty::fork::Master) -> anyhow::Result<()> {
test_hooks::emit("wait-for-startup-enter");
let mut startup_sentinel_scanner = SentinelScanner::new(STARTUP_SENTINEL);
let exe_path =
std::env::current_exe().context("getting current exe path")?.to_string_lossy().into_owned();
Expand All @@ -129,8 +145,33 @@ fn wait_for_startup(pty_master: &mut shpool_pty::fork::Master) -> anyhow::Result
.write_all(startup_sentinel_cmd.as_bytes())
.context("running startup sentinel script")?;

let watchable_master = *pty_master;
let mut poll_fds = [poll::PollFd::new(
watchable_master.borrow_fd().ok_or(anyhow!("no master fd"))?,
PollFlags::POLLIN | PollFlags::POLLHUP | PollFlags::POLLERR,
)];

let deadline = time::Instant::now() + SENTINEL_POLL_TIMEOUT;
let mut buf: [u8; 2048] = [0; 2048];
loop {
if time::Instant::now() > deadline {
return Err(anyhow!("timed out waiting for shell startup"));
}
let nready = match poll::poll(&mut poll_fds, SENTINEL_POLL_MS) {
Ok(n) => n,
Err(e) => {
error!("polling pty master: {:?}", e);
return Err(e)?;
}
};
if nready == 0 {
// if timeout
continue;
}
if nready != 1 {
return Err(anyhow!("sentinal scan: expected exactly 1 ready fd"));
}

let len = pty_master.read(&mut buf).context("reading chunk to scan for startup")?;
if len == 0 {
continue;
Expand Down
153 changes: 86 additions & 67 deletions libshpool/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ impl Server {
}
Err(err) => return Err(err)?,
};
info!("released lock on shells table");

self.link_ssh_auth_sock(&header).context("linking SSH_AUTH_SOCK")?;
self.populate_session_env_file(&header).context("populating session env file")?;
Expand Down Expand Up @@ -362,84 +361,95 @@ impl Server {
AttachStatus,
)> {
let warnings = vec![];
let mut status = AttachStatus::Attached { warnings: warnings.clone() };

// we unwrap to propagate the poison as an unwind
let _s = span!(Level::INFO, "1_lock(shells)").entered();
let mut shells = self.shells.lock().unwrap();
// Critical section for the global shells lock. We only hold it
// while grubbing about for an existing session, then release it
// so that we do not hold it during shell startup, which might
// fail.
{
// we unwrap to propagate the poison as an unwind
let _s = span!(Level::INFO, "select_shell_lock_1(shells)").entered();
let shells = self.shells.lock().unwrap();

let mut status = AttachStatus::Attached { warnings: warnings.clone() };
if let Some(session) = shells.get(&header.name) {
info!("found entry for '{}'", header.name);
if let Ok(mut inner) = session.inner.try_lock() {
let _s =
span!(Level::INFO, "aquired_lock(session.inner)", s = header.name).entered();
// We have an existing session in our table, but the subshell
// proc might have exited in the meantime, for example if the
// user typed `exit` right before the connection dropped there
// could be a zombie entry in our session table. We need to
// re-check whether the subshell has exited before taking this over.
//
// N.B. this is still technically a race, but in practice it does
// not ever cause problems, and there is no real way to avoid some
// sort of race without just always creating a new session when
// a shell exits, which would break `exit` typed at the shell prompt.
match session.child_exit_notifier.wait(Some(time::Duration::from_millis(0))) {
None => {
// the channel is still open so the subshell is still running
info!("taking over existing session inner");
inner.client_stream = Some(stream.try_clone()?);
session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());
if let Some(session) = shells.get(&header.name) {
info!("found entry for '{}'", header.name);
if let Ok(mut inner) = session.inner.try_lock() {
let _s = span!(Level::INFO, "aquired_lock(session.inner)", s = header.name)
.entered();
// We have an existing session in our table, but the subshell
// proc might have exited in the meantime, for example if the
// user typed `exit` right before the connection dropped there
// could be a zombie entry in our session table. We need to
// re-check whether the subshell has exited before taking this over.
//
// N.B. this is still technically a race, but in practice it does
// not ever cause problems, and there is no real way to avoid some
// sort of race without just always creating a new session when
// a shell exits, which would break `exit` typed at the shell prompt.
match session.child_exit_notifier.wait(Some(time::Duration::from_millis(0))) {
None => {
// the channel is still open so the subshell is still running
info!("taking over existing session inner");
inner.client_stream = Some(stream.try_clone()?);
session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());

if inner
.shell_to_client_join_h
.as_ref()
.map(|h| h.is_finished())
.unwrap_or(false)
{
warn!(
"child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell"
);
status = AttachStatus::Created { warnings };
}

if inner
.shell_to_client_join_h
.as_ref()
.map(|h| h.is_finished())
.unwrap_or(false)
{
warn!(
"child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell"
// status is already attached
}
Some(exit_status) => {
// the channel is closed so we know the subshell exited
info!(
"stale inner, (child exited with status {}) clobbering with new subshell",
exit_status
);
status = AttachStatus::Created { warnings };
}

// status is already attached
}
Some(exit_status) => {
// the channel is closed so we know the subshell exited
info!(
"stale inner, (child exited with status {}) clobbering with new subshell",
exit_status
);
status = AttachStatus::Created { warnings };

if inner
.shell_to_client_join_h
.as_ref()
.map(|h| h.is_finished())
.unwrap_or(false)
{
info!("shell->client thread finished, joining");
if let Some(h) = inner.shell_to_client_join_h.take() {
h.join()
.map_err(|e| anyhow!("joining shell->client on reattach: {:?}", e))?
.context("within shell->client thread on reattach")?;
}
assert!(matches!(status, AttachStatus::Created { .. }));
}
}

if inner.shell_to_client_join_h.as_ref().map(|h| h.is_finished()).unwrap_or(false) {
info!("shell->client thread finished, joining");
if let Some(h) = inner.shell_to_client_join_h.take() {
h.join()
.map_err(|e| anyhow!("joining shell->client on reattach: {:?}", e))?
.context("within shell->client thread on reattach")?;
// fallthrough to bidi streaming
} else {
info!("busy shell session, doing nothing");
// The stream is busy, so we just inform the client and close the stream.
write_reply(&mut stream, AttachReplyHeader { status: AttachStatus::Busy })?;
stream.shutdown(net::Shutdown::Both).context("closing stream")?;
if let Err(err) = self.hooks.on_busy(&header.name) {
warn!("busy hook: {:?}", err);
}
assert!(matches!(status, AttachStatus::Created { .. }));
return Err(ShellSelectionError::BusyShellSession)?;
}

// fallthrough to bidi streaming
} else {
info!("busy shell session, doing nothing");
// The stream is busy, so we just inform the client and close the stream.
write_reply(&mut stream, AttachReplyHeader { status: AttachStatus::Busy })?;
stream.shutdown(net::Shutdown::Both).context("closing stream")?;
if let Err(err) = self.hooks.on_busy(&header.name) {
warn!("busy hook: {:?}", err);
}
return Err(ShellSelectionError::BusyShellSession)?;
info!("no existing '{}' session, creating new one", &header.name);
status = AttachStatus::Created { warnings };
}
} else {
info!("no existing '{}' session, creating new one", &header.name);
status = AttachStatus::Created { warnings };
}
};

if matches!(status, AttachStatus::Created { .. }) {
info!("creating new subshell");
Expand All @@ -458,12 +468,21 @@ impl Server {

session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());
shells.insert(header.name.clone(), Box::new(session));
{
// we unwrap to propagate the poison as an unwind
let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered();
let mut shells = self.shells.lock().unwrap();
shells.insert(header.name.clone(), Box::new(session));
}
// fallthrough to bidi streaming
} else if let Err(err) = self.hooks.on_reattach(&header.name) {
warn!("reattach hook: {:?}", err);
}

// we unwrap to propagate the poison as an unwind
let _s = span!(Level::INFO, "select_shell_lock_3(shells)").entered();
let shells = self.shells.lock().unwrap();

// return a reference to the inner session so that
// we can work with it without the global session
// table lock held
Expand Down
4 changes: 1 addition & 3 deletions libshpool/src/daemon/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
};

use anyhow::{anyhow, Context};
use nix::{poll::PollFlags, sys::signal, unistd::Pid};
use nix::{poll, poll::PollFlags, sys::signal, unistd::Pid};
use shpool_protocol::{Chunk, ChunkKind, TtySize};
use tracing::{debug, error, info, instrument, span, trace, warn, Level};

Expand Down Expand Up @@ -221,8 +221,6 @@ impl SessionInner {
&self,
args: ShellToClientArgs,
) -> anyhow::Result<thread::JoinHandle<anyhow::Result<()>>> {
use nix::poll;

let term_db = Arc::clone(&self.term_db);
let mut prompt_sentinel_scanner = prompt::SentinelScanner::new(consts::PROMPT_SENTINEL);

Expand Down
7 changes: 6 additions & 1 deletion shpool/tests/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,9 +1185,12 @@ fn prompt_prefix_zsh() -> anyhow::Result<()> {

// This has stopped working in CI. Probably due to a fish version
// change or something.
//
// fails on mac due to hard-coded /usr/bin/fish path, not sure why it fails on
// linux, but it's started doing so at head. I suspect something environmental.
#[test]
#[timeout(30000)]
#[cfg_attr(target_os = "macos", ignore)] // hard-coded /usr/bin/fish path
#[ignore]
fn prompt_prefix_fish() -> anyhow::Result<()> {
let daemon_proc = support::daemon::Proc::new("prompt_prefix_fish.toml", DaemonArgs::default())
.context("starting daemon proc")?;
Expand All @@ -1213,12 +1216,14 @@ fn prompt_prefix_fish() -> anyhow::Result<()> {
let mut stderr = child.stderr.take().context("missing stderr")?;
let mut stderr_str = String::from("");
stderr.read_to_string(&mut stderr_str).context("slurping stderr")?;
eprintln!("stderr: {stderr_str}");
assert!(stderr_str.is_empty());

let mut stdout = child.stdout.take().context("missing stdout")?;
let mut stdout_str = String::from("");
stdout.read_to_string(&mut stdout_str).context("slurping stdout")?;
let stdout_re = Regex::new(".*session_name=sh1.*")?;
eprintln!("stdout: {stdout_str}");
assert!(stdout_re.is_match(&stdout_str));

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions shpool/tests/data/hang_shell.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

# A shell that never starts up properly.
# It reads stdin to avoid SIGPIPE but never executes commands.
exec cat > /dev/null
9 changes: 9 additions & 0 deletions shpool/tests/data/hang_shell.toml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
norc = true
noecho = true
shell = "SHELL"
session_restore_mode = "simple"
prompt_prefix = "test> "

[env]
PS1 = "prompt> "
TERM = ""
Loading
Loading