Skip to content

Commit

Permalink
allow enabling and disabling the stall detector at runtime (DataDog#517)
Browse files Browse the repository at this point in the history
The original work left the method private because exposing it required
the stall detector to use interior mutability. This commit does the
trick.

Also fixes the cargo doc issues.
  • Loading branch information
HippoBaro authored and laa committed Feb 9, 2022
1 parent bef5461 commit 108f21c
Showing 1 changed file with 61 additions and 59 deletions.
120 changes: 61 additions & 59 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,27 @@

#![warn(missing_docs, missing_debug_implementations)]

mod latch;
mod multitask;
mod placement;
pub mod stall;

use crate::{
error::BuilderErrorKind,
executor::stall::StallDetector,
io::DmaBuffer,
parking,
reactor,
sys,
task::{self, waker_fn::dummy_waker},
GlommioError,
IoRequirements,
IoStats,
Latency,
Reactor,
Shares,
};
use ahash::AHashMap;
use futures_lite::pin;
use latch::{Latch, LatchState};
use log::warn;
pub use placement::{CpuSet, Placement, PoolPlacement};
use tracing::trace;

use scoped_tls::scoped_thread_local;
use std::{
cell::RefCell,
collections::{hash_map::Entry, BinaryHeap},
Expand All @@ -57,30 +69,12 @@ use std::{
thread::{Builder, JoinHandle},
time::{Duration, Instant},
};
use tracing::trace;

use futures_lite::pin;
use scoped_tls::scoped_thread_local;

use log::warn;

#[cfg(doc)]
use crate::executor::stall::DefaultStallDetectionHandler;
use crate::{
error::BuilderErrorKind,
executor::stall::StallDetector,
io::DmaBuffer,
parking,
reactor,
sys,
task::{self, waker_fn::dummy_waker},
GlommioError,
IoRequirements,
IoStats,
Latency,
Reactor,
Shares,
};
use ahash::AHashMap;
mod latch;
mod multitask;
mod placement;
pub mod stall;

pub(crate) const DEFAULT_EXECUTOR_NAME: &str = "unnamed";
pub(crate) const DEFAULT_PREEMPT_TIMER: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -476,7 +470,7 @@ pub struct LocalExecutorBuilder {
/// executor
blocking_thread_pool_placement: PoolPlacement,
/// Whether to detect stalls in unyielding tasks.
/// [`DefaultStallDetectionHandler`] installs a signal handler for
/// [`stall::DefaultStallDetectionHandler`] installs a signal handler for
/// [`nix::libc::SIGUSR1`], so is disabled by default.
detect_stalls: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
}
Expand Down Expand Up @@ -579,7 +573,7 @@ impl LocalExecutorBuilder {
}

/// Whether to detect stalls in unyielding tasks.
/// [`DefaultStallDetectionHandler`] installs a signal handler for
/// [`stall::DefaultStallDetectionHandler`] installs a signal handler for
/// [`nix::libc::SIGUSR1`], so is disabled by default.
/// # Examples
///
Expand Down Expand Up @@ -876,7 +870,7 @@ impl LocalExecutorPoolBuilder {
/// Whether to detect stalls in unyielding tasks.
/// This method takes a closure of `handler_gen`, which will be called on
/// each new thread to generate the stall detection handler to be used in
/// that executor. [`DefaultStallDetectionHandler`] installs a signal
/// that executor. [`stall::DefaultStallDetectionHandler`] installs a signal
/// handler for [`nix::libc::SIGUSR1`], so is disabled by default.
/// # Examples
///
Expand Down Expand Up @@ -1103,7 +1097,7 @@ pub struct LocalExecutor {
parker: parking::Parker,
id: usize,
reactor: Rc<reactor::Reactor>,
stall_detector: Option<StallDetector>,
stall_detector: RefCell<Option<StallDetector>>,
}

impl LocalExecutor {
Expand Down Expand Up @@ -1143,23 +1137,26 @@ impl LocalExecutor {
}
let p = parking::Parker::new();
let queues = ExecutorQueues::new(config.preempt_timer, config.spin_before_park);
trace!(id = notifier.id(), "Creating executor");
let mut exec = LocalExecutor {
let id = notifier.id();
trace!(id = id, "Creating executor");
Ok(LocalExecutor {
queues: Rc::new(RefCell::new(queues)),
parker: p,
id: notifier.id(),
id,
reactor: Rc::new(reactor::Reactor::new(
notifier,
config.io_memory,
config.ring_depth,
config.record_io_latencies,
config.thread_pool_placement,
)?),
stall_detector: None,
};
exec.detect_stalls(config.detect_stalls)?;

Ok(exec)
stall_detector: RefCell::new(
config
.detect_stalls
.map(|x| StallDetector::new(id, x))
.transpose()?,
),
})
}

/// Enable or disable task stall detection at runtime
Expand All @@ -1171,14 +1168,15 @@ impl LocalExecutor {
/// let local_ex =
/// LocalExecutor::default().detect_stalls(Some(Box::new(DefaultStallDetectionHandler {})));
/// ```
fn detect_stalls(
&mut self,
pub fn detect_stalls(
&self,
handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
) -> Result<()> {
self.stall_detector = match handler {
Some(handler) => Some(StallDetector::new(self.id, handler)?),
None => None,
};
self.stall_detector.replace(
handler
.map(|x| StallDetector::new(self.id, x))
.transpose()?,
);
Ok(())
}

Expand Down Expand Up @@ -1337,7 +1335,8 @@ impl LocalExecutor {
};

let (runtime, tasks_executed_this_loop) = {
let guard = self.stall_detector.as_ref().map(|x| {
let detector = self.stall_detector.borrow();
let guard = detector.as_ref().map(|x| {
let queue = queue.borrow_mut();
x.enter_task_queue(
queue.stats.index,
Expand Down Expand Up @@ -2640,17 +2639,7 @@ impl ExecutorProxy {

#[cfg(test)]
mod test {
use super::*;
use crate::{
enclose,
timer::{self, sleep, Timer},
SharesManager,
};
use core::mem::MaybeUninit;
use futures::{
future::{join, join_all, poll_fn},
join,
};
use std::{
cell::Cell,
collections::HashMap,
Expand All @@ -2662,6 +2651,19 @@ mod test {
task::Waker,
};

use futures::{
future::{join, join_all, poll_fn},
join,
};

use crate::{
enclose,
timer::{self, sleep, Timer},
SharesManager,
};

use super::*;

#[test]
fn create_and_destroy_executor() {
let mut var = Rc::new(RefCell::new(0));
Expand Down

0 comments on commit 108f21c

Please sign in to comment.