Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libshpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ strip-ansi-escapes = "0.2.0" # cleaning up strings for pager display
notify = { version = "7", features = ["crossbeam-channel"] } # watch config file for updates
libproc = "0.14.8" # sniffing shells by examining the subprocess
daemonize = "0.5" # autodaemonization
parking_lot = { version = "0.12", features = ["arc_lock"] } # faster more featureful sync primitives
shpool-protocol = { version = "0.3.5", path = "../shpool-protocol" } # client-server protocol

# rusty wrapper for unix apis
Expand Down
26 changes: 11 additions & 15 deletions libshpool/src/daemon/exit_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
sync::{Condvar, Mutex},
time::Duration,
};
use std::time::Duration;

use parking_lot::{Condvar, Mutex};

#[derive(Debug)]
pub struct ExitNotifier {
Expand All @@ -30,15 +29,15 @@ impl ExitNotifier {

/// Notify all waiters that the process has exited.
pub fn notify_exit(&self, status: i32) {
let mut slot = self.slot.lock().unwrap();
let mut slot = self.slot.lock();
*slot = Some(status);
self.cond.notify_all();
}

/// Wait for the process to exit, with an optional timeout
/// to allow the caller to wake up periodically.
pub fn wait(&self, timeout: Option<Duration>) -> Option<i32> {
let slot = self.slot.lock().unwrap();
let mut slot = self.slot.lock();

// If a thread waits on the exit status when the child has already
// exited, we just want to immediately return.
Expand All @@ -48,19 +47,16 @@ impl ExitNotifier {

match timeout {
Some(t) => {
// returns a lock result, so we want to unwrap
// to propagate the lock poisoning
let (exit_status, wait_res) = self
.cond
.wait_timeout_while(slot, t, |exit_status| exit_status.is_none())
.unwrap();
if wait_res.timed_out() {
if self.cond.wait_for(&mut slot, t).timed_out() {
None
} else {
*exit_status
*slot
}
}
None => *self.cond.wait_while(slot, |exit_status| exit_status.is_none()).unwrap(),
None => {
self.cond.wait(&mut slot);
*slot
}
}
}
}
15 changes: 8 additions & 7 deletions libshpool/src/daemon/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ use std::{
},
process,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, Mutex},
sync::Arc,
thread,
time::{Duration, Instant},
};

use anyhow::{anyhow, Context};
use nix::{poll, sys::signal, unistd};
use parking_lot::Mutex;
use shpool_protocol::{Chunk, ChunkKind, TtySize};
use tracing::{error, info, instrument, span, trace, warn, Level};

Expand Down Expand Up @@ -111,7 +112,7 @@ impl Pager {
let (tty_size_change_tx, tty_size_change_rx) = crossbeam_channel::bounded(0);
let (tty_size_change_ack_tx, tty_size_change_ack_rx) = crossbeam_channel::bounded(0);
{
let mut ctl_handle = ctl_slot.lock().unwrap();
let mut ctl_handle = ctl_slot.lock();
if ctl_handle.is_some() {
return Err(anyhow!("only one pager per session at a time allowed"));
}
Expand Down Expand Up @@ -195,7 +196,7 @@ impl Pager {

{
// register the new size so it will get returned
let mut tty_size = tty_size_ref.lock().unwrap();
let mut tty_size = tty_size_ref.lock();
*tty_size = size;
}

Expand All @@ -220,7 +221,7 @@ impl Pager {
];
let nready = poll::poll(&mut poll_fds, POLL_MS).context("polling both streams")?;
if pager_exited.load(Ordering::Relaxed) {
let tty_size = tty_size.lock().unwrap();
let tty_size = tty_size.lock();
return Ok(tty_size.clone());
}
if nready == 0 {
Expand Down Expand Up @@ -289,13 +290,13 @@ impl Pager {
// assume the pager proc just quit normally and the
// timing was such that we didn't pick it up with our
// exit watcher thread.
let tty_size = tty_size.lock().unwrap();
let tty_size = tty_size.lock();
return Ok(tty_size.clone());
}
if let Err(e) = pty_master.flush() {
info!("Error flushing pager pty, nbd though: {:?}", e);
// same logic as above
let tty_size = tty_size.lock().unwrap();
let tty_size = tty_size.lock();
return Ok(tty_size.clone());
}
}
Expand All @@ -312,7 +313,7 @@ struct PagerCltGuard {

impl std::ops::Drop for PagerCltGuard {
fn drop(&mut self) {
let mut pager_ctl = self.ctl_slot.lock().unwrap();
let mut pager_ctl = self.ctl_slot.lock();
// N.B. clobbering the handles here will cause the listening
// thread to exit because it drops the senders. This ensures
// that no callers can grab the lock on the ctl handles and
Expand Down
Loading
Loading