Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
ensure clean shutdown of all threads running JS
  • Loading branch information
gterzian committed Jun 30, 2020
1 parent 0b61cfc commit 44ebca7
Show file tree
Hide file tree
Showing 25 changed files with 565 additions and 232 deletions.
107 changes: 82 additions & 25 deletions components/background_hang_monitor/background_hang_monitor.rs
Expand Up @@ -3,14 +3,17 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

use crate::sampler::{NativeStack, Sampler};
use crossbeam_channel::{after, unbounded, Receiver, Sender};
use crossbeam_channel::{after, never, unbounded, Receiver, Sender};
use ipc_channel::ipc::{IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use msg::constellation_msg::MonitoredComponentId;
use msg::constellation_msg::{
BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorRegister,
BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorExitSignal,
BackgroundHangMonitorRegister,
};
use msg::constellation_msg::{
BackgroundHangMonitorControlMsg, HangAlert, HangAnnotation, HangMonitorAlert,
};
use msg::constellation_msg::{HangAlert, HangAnnotation, HangMonitorAlert, SamplerControlMsg};
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::thread;
Expand All @@ -19,23 +22,32 @@ use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct HangMonitorRegister {
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
monitoring_enabled: bool,
}

impl HangMonitorRegister {
/// Start a new hang monitor worker, and return a handle to register components for monitoring.
pub fn init(
constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<SamplerControlMsg>,
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
monitoring_enabled: bool,
) -> Box<dyn BackgroundHangMonitorRegister> {
let (sender, port) = unbounded();
let _ = thread::Builder::new().spawn(move || {
let mut monitor =
BackgroundHangMonitorWorker::new(constellation_chan, control_port, port);
let mut monitor = BackgroundHangMonitorWorker::new(
constellation_chan,
control_port,
port,
monitoring_enabled,
);
while monitor.run() {
// Monitoring until all senders have been dropped...
}
});
Box::new(HangMonitorRegister { sender })
Box::new(HangMonitorRegister {
sender,
monitoring_enabled,
})
}
}

Expand All @@ -48,8 +60,13 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
component_id: MonitoredComponentId,
transient_hang_timeout: Duration,
permanent_hang_timeout: Duration,
exit_signal: Option<Box<dyn BackgroundHangMonitorExitSignal>>,
) -> Box<dyn BackgroundHangMonitor> {
let bhm_chan = BackgroundHangMonitorChan::new(self.sender.clone(), component_id);
let bhm_chan = BackgroundHangMonitorChan::new(
self.sender.clone(),
component_id,
self.monitoring_enabled,
);

#[cfg(all(
target_os = "windows",
Expand All @@ -71,6 +88,7 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
thread::current().name().map(str::to_owned),
transient_hang_timeout,
permanent_hang_timeout,
exit_signal,
));
Box::new(bhm_chan)
}
Expand All @@ -85,7 +103,13 @@ impl BackgroundHangMonitorClone for HangMonitorRegister {
/// Messages sent from monitored components to the monitor.
pub enum MonitoredComponentMsg {
/// Register component for monitoring,
Register(Box<dyn Sampler>, Option<String>, Duration, Duration),
Register(
Box<dyn Sampler>,
Option<String>,
Duration,
Duration,
Option<Box<dyn BackgroundHangMonitorExitSignal>>,
),
/// Unregister component for monitoring.
Unregister,
/// Notify start of new activity for a given component,
Expand All @@ -101,17 +125,20 @@ pub struct BackgroundHangMonitorChan {
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
component_id: MonitoredComponentId,
disconnected: Cell<bool>,
monitoring_enabled: bool,
}

impl BackgroundHangMonitorChan {
pub fn new(
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
component_id: MonitoredComponentId,
monitoring_enabled: bool,
) -> Self {
BackgroundHangMonitorChan {
sender,
component_id: component_id,
disconnected: Default::default(),
monitoring_enabled,
}
}

Expand All @@ -128,12 +155,16 @@ impl BackgroundHangMonitorChan {

impl BackgroundHangMonitor for BackgroundHangMonitorChan {
fn notify_activity(&self, annotation: HangAnnotation) {
let msg = MonitoredComponentMsg::NotifyActivity(annotation);
self.send(msg);
if self.monitoring_enabled {
let msg = MonitoredComponentMsg::NotifyActivity(annotation);
self.send(msg);
}
}
fn notify_wait(&self) {
let msg = MonitoredComponentMsg::NotifyWait;
self.send(msg);
if self.monitoring_enabled {
let msg = MonitoredComponentMsg::NotifyWait;
self.send(msg);
}
}
fn unregister(&self) {
let msg = MonitoredComponentMsg::Unregister;
Expand All @@ -150,6 +181,7 @@ struct MonitoredComponent {
sent_transient_alert: bool,
sent_permanent_alert: bool,
is_waiting: bool,
exit_signal: Option<Box<dyn BackgroundHangMonitorExitSignal>>,
}

struct Sample(MonitoredComponentId, Instant, NativeStack);
Expand All @@ -159,20 +191,22 @@ pub struct BackgroundHangMonitorWorker {
monitored_components: HashMap<MonitoredComponentId, MonitoredComponent>,
constellation_chan: IpcSender<HangMonitorAlert>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
control_port: Receiver<SamplerControlMsg>,
control_port: Receiver<BackgroundHangMonitorControlMsg>,
sampling_duration: Option<Duration>,
sampling_max_duration: Option<Duration>,
last_sample: Instant,
creation: Instant,
sampling_baseline: Instant,
samples: VecDeque<Sample>,
monitoring_enabled: bool,
}

impl BackgroundHangMonitorWorker {
pub fn new(
constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<SamplerControlMsg>,
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
monitoring_enabled: bool,
) -> Self {
let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port);
Self {
Expand All @@ -187,6 +221,7 @@ impl BackgroundHangMonitorWorker {
sampling_baseline: Instant::now(),
creation: Instant::now(),
samples: Default::default(),
monitoring_enabled,
}
}

Expand Down Expand Up @@ -232,13 +267,19 @@ impl BackgroundHangMonitorWorker {
}

pub fn run(&mut self) -> bool {
let timeout = if let Some(duration) = self.sampling_duration {
duration
let tick = if let Some(duration) = self.sampling_duration {
let duration = duration
.checked_sub(Instant::now() - self.last_sample)
.unwrap_or_else(|| Duration::from_millis(0))
.unwrap_or_else(|| Duration::from_millis(0));
after(duration)
} else {
Duration::from_millis(100)
if self.monitoring_enabled {
after(Duration::from_millis(100))
} else {
never()
}
};

let received = select! {
recv(self.port) -> event => {
match event {
Expand All @@ -249,24 +290,38 @@ impl BackgroundHangMonitorWorker {
},
recv(self.control_port) -> event => {
match event {
Ok(SamplerControlMsg::Enable(rate, max_duration)) => {
Ok(BackgroundHangMonitorControlMsg::EnableSampler(rate, max_duration)) => {
println!("Enabling profiler.");
self.sampling_duration = Some(rate);
self.sampling_max_duration = Some(max_duration);
self.sampling_baseline = Instant::now();
None
}
Ok(SamplerControlMsg::Disable) => {
},
Ok(BackgroundHangMonitorControlMsg::DisableSampler) => {
println!("Disabling profiler.");
self.finish_sampled_profile();
self.sampling_duration = None;
None
}
return true;
},
Ok(BackgroundHangMonitorControlMsg::Exit(sender)) => {
for component in self.monitored_components.values() {
if let Some(signal) = component.exit_signal.as_ref() {
signal.signal_to_exit();
}
}

// Confirm exit with to the constellation.
let _ = sender.send(());

// Also exit the BHM.
return false;
},
Err(_) => return false,
}
}
recv(after(timeout)) -> _ => None,
recv(tick) -> _ => None,
};

if let Some(msg) = received {
self.handle_msg(msg);
while let Ok(another_msg) = self.port.try_recv() {
Expand Down Expand Up @@ -297,6 +352,7 @@ impl BackgroundHangMonitorWorker {
name,
transient_hang_timeout,
permanent_hang_timeout,
exit_signal,
),
) => {
let component = MonitoredComponent {
Expand All @@ -308,6 +364,7 @@ impl BackgroundHangMonitorWorker {
sent_transient_alert: false,
sent_permanent_alert: false,
is_waiting: true,
exit_signal,
};
if let Some(name) = name {
self.component_names.insert(component_id.clone(), name);
Expand Down
70 changes: 65 additions & 5 deletions components/background_hang_monitor/tests/hang_monitor_tests.rs
Expand Up @@ -9,8 +9,13 @@ use background_hang_monitor::HangMonitorRegister;
use ipc_channel::ipc;
use msg::constellation_msg::ScriptHangAnnotation;
use msg::constellation_msg::TEST_PIPELINE_ID;
use msg::constellation_msg::{HangAlert, HangAnnotation, HangMonitorAlert};
use msg::constellation_msg::{
BackgroundHangMonitorControlMsg, BackgroundHangMonitorExitSignal, HangAlert, HangAnnotation,
HangMonitorAlert,
};
use msg::constellation_msg::{MonitoredComponentId, MonitoredComponentType};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
Expand All @@ -27,12 +32,16 @@ fn test_hang_monitoring() {
ipc::channel().expect("ipc channel failure");
let (_sampler_sender, sampler_receiver) = ipc::channel().expect("ipc channel failure");

let background_hang_monitor_register =
HangMonitorRegister::init(background_hang_monitor_ipc_sender.clone(), sampler_receiver);
let background_hang_monitor_register = HangMonitorRegister::init(
background_hang_monitor_ipc_sender.clone(),
sampler_receiver,
true,
);
let background_hang_monitor = background_hang_monitor_register.register_component(
MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
Duration::from_millis(10),
Duration::from_millis(1000),
None,
);

// Start an activity.
Expand Down Expand Up @@ -125,12 +134,16 @@ fn test_hang_monitoring_unregister() {
ipc::channel().expect("ipc channel failure");
let (_sampler_sender, sampler_receiver) = ipc::channel().expect("ipc channel failure");

let background_hang_monitor_register =
HangMonitorRegister::init(background_hang_monitor_ipc_sender.clone(), sampler_receiver);
let background_hang_monitor_register = HangMonitorRegister::init(
background_hang_monitor_ipc_sender.clone(),
sampler_receiver,
true,
);
let background_hang_monitor = background_hang_monitor_register.register_component(
MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
Duration::from_millis(10),
Duration::from_millis(1000),
None,
);

// Start an activity.
Expand All @@ -146,3 +159,50 @@ fn test_hang_monitoring_unregister() {
// No new alert yet
assert!(background_hang_monitor_receiver.try_recv().is_err());
}

#[test]
fn test_hang_monitoring_exit_signal() {
let _lock = SERIAL.lock().unwrap();

let (background_hang_monitor_ipc_sender, _background_hang_monitor_receiver) =
ipc::channel().expect("ipc channel failure");
let (control_sender, control_receiver) = ipc::channel().expect("ipc channel failure");

struct BHMExitSignal {
closing: Arc<AtomicBool>,
}

impl BackgroundHangMonitorExitSignal for BHMExitSignal {
fn signal_to_exit(&self) {
self.closing.store(true, Ordering::SeqCst);
}
}

let closing = Arc::new(AtomicBool::new(false));
let signal = BHMExitSignal {
closing: closing.clone(),
};

// Init a worker, without active monitoring.
let background_hang_monitor_register = HangMonitorRegister::init(
background_hang_monitor_ipc_sender.clone(),
control_receiver,
false,
);
let _background_hang_monitor = background_hang_monitor_register.register_component(
MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
Duration::from_millis(10),
Duration::from_millis(1000),
Some(Box::new(signal)),
);

let (exit_sender, exit_receiver) = ipc::channel().expect("Failed to create IPC channel!");
// Send the exit message.
let _ = control_sender.send(BackgroundHangMonitorControlMsg::Exit(exit_sender));

// Assert we receive a confirmation back.
assert!(exit_receiver.recv().is_ok());

// Assert we get the exit signal.
while !closing.load(Ordering::SeqCst) {}
}

0 comments on commit 44ebca7

Please sign in to comment.