Skip to content
Permalink
Browse files

Support multiprocess in sampling profiler.

  • Loading branch information...
jdm committed Mar 25, 2019
1 parent 90f67c1 commit 8b7244f0d1d259971547e76aa299f404b5baedfd

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
@@ -16,7 +16,6 @@ doctest = false
backtrace = "0.3"
bitflags = "1.0"
ipc-channel = "0.11"
lazy_static = "1"
libc = "0.2"
log = "0.4"
msg = {path = "../msg"}
@@ -4,15 +4,15 @@

use crate::sampler::{NativeStack, Sampler};
use crossbeam_channel::{after, unbounded, Receiver, Sender};
use ipc_channel::ipc::IpcSender;
use ipc_channel::ipc::{IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use msg::constellation_msg::MonitoredComponentId;
use msg::constellation_msg::{
BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorRegister,
};
use msg::constellation_msg::{HangAlert, HangAnnotation, HangMonitorAlert};
use msg::constellation_msg::{HangAlert, HangAnnotation, HangMonitorAlert, SamplerControlMsg};
use std::cell::Cell;
use std::collections::HashMap;
use std::sync::Mutex;
use std::thread;
use std::time::{Duration, Instant};

@@ -21,46 +21,22 @@ pub struct HangMonitorRegister {
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
}

#[derive(Copy, Clone, PartialEq)]
enum SamplerState {
NotSampling,
StartSampling(Duration),
Sampling,
Resolving,
}

lazy_static! {
static ref SAMPLING_STATE: Mutex<SamplerState> = Mutex::new(SamplerState::NotSampling);
}
impl HangMonitorRegister {
/// Start a new hang monitor worker, and return a handle to register components for monitoring.
pub fn init(
constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<SamplerControlMsg>,
) -> Box<BackgroundHangMonitorRegister> {
let (sender, port) = unbounded();
let _ = thread::Builder::new().spawn(move || {
let mut monitor = BackgroundHangMonitorWorker::new(constellation_chan, port);
let mut monitor =
BackgroundHangMonitorWorker::new(constellation_chan, control_port, port);
while monitor.run() {
// Monitoring until all senders have been dropped...
}
});
Box::new(HangMonitorRegister { sender })
}

pub fn toggle(rate: Duration) {
let state = *SAMPLING_STATE.lock().unwrap();
match state {
SamplerState::NotSampling => {
println!("Starting profiler.");
*SAMPLING_STATE.lock().unwrap() = SamplerState::StartSampling(rate);
},
SamplerState::Sampling => {
println!("Stopping profiler.");
*SAMPLING_STATE.lock().unwrap() = SamplerState::Resolving;
},
_ => (),
}
}
}

impl BackgroundHangMonitorRegister for HangMonitorRegister {
@@ -175,6 +151,7 @@ pub struct BackgroundHangMonitorWorker {
monitored_components: HashMap<MonitoredComponentId, MonitoredComponent>,
constellation_chan: IpcSender<HangMonitorAlert>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
control_port: Receiver<SamplerControlMsg>,
sampling_duration: Option<Duration>,
last_sample: Instant,
creation: Instant,
@@ -185,13 +162,16 @@ pub struct BackgroundHangMonitorWorker {
impl BackgroundHangMonitorWorker {
pub fn new(
constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<SamplerControlMsg>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
) -> Self {
let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port);
Self {
component_names: Default::default(),
monitored_components: Default::default(),
constellation_chan,
port,
control_port,
sampling_duration: None,
last_sample: Instant::now(),
sampling_baseline: Instant::now(),
@@ -200,64 +180,48 @@ impl BackgroundHangMonitorWorker {
}
}

fn handle_sampling(&mut self) {
let state = *SAMPLING_STATE.lock().unwrap();
match state {
SamplerState::StartSampling(rate) => {
*SAMPLING_STATE.lock().unwrap() = SamplerState::Sampling;
self.sampling_duration = Some(rate);
self.sampling_baseline = Instant::now();
},
SamplerState::Resolving => {
let mut bytes = vec![];
bytes.extend(
format!(
"{{ \"rate\": {}, \"start\": {}, \"data\": [\n",
self.sampling_duration.unwrap().as_millis(),
(self.sampling_baseline - self.creation).as_millis(),
)
.as_bytes(),
);

let mut first = true;
let to_resolve = self.samples.len();
for (i, Sample(id, instant, stack)) in self.samples.drain(..).enumerate() {
println!("Resolving {}/{}", i + 1, to_resolve);
let profile = stack.to_hangprofile();
let name = match self.component_names.get(&id) {
Some(ref s) => format!("\"{}\"", s),
None => format!("null"),
};
let json = format!(
"{}{{ \"name\": {}, \"namespace\": {}, \"index\": {}, \"type\": \"{:?}\", \
\"time\": {}, \"frames\": {} }}",
if !first { ",\n" } else { "" },
name,
id.0.namespace_id.0,
id.0.index.0.get(),
id.1,
(instant - self.sampling_baseline).as_millis(),
serde_json::to_string(&profile.backtrace).unwrap(),
);
bytes.extend(json.as_bytes());
first = false;
}

bytes.extend(b"\n] }");
let _ = self
.constellation_chan
.send(HangMonitorAlert::Profile(bytes));
fn finish_sampled_profile(&mut self) {
let mut bytes = vec![];
bytes.extend(
format!(
"{{ \"rate\": {}, \"start\": {}, \"data\": [\n",
self.sampling_duration.unwrap().as_millis(),
(self.sampling_baseline - self.creation).as_millis(),
)
.as_bytes(),
);

*SAMPLING_STATE.lock().unwrap() = SamplerState::NotSampling;
self.sampling_duration = None;
},
_ => (),
let mut first = true;
let to_resolve = self.samples.len();
for (i, Sample(id, instant, stack)) in self.samples.drain(..).enumerate() {
println!("Resolving {}/{}", i + 1, to_resolve);
let profile = stack.to_hangprofile();
let name = match self.component_names.get(&id) {
Some(ref s) => format!("\"{}\"", s),
None => format!("null"),
};
let json = format!(
"{}{{ \"name\": {}, \"namespace\": {}, \"index\": {}, \"type\": \"{:?}\", \
\"time\": {}, \"frames\": {} }}",
if !first { ",\n" } else { "" },
name,
id.0.namespace_id.0,
id.0.index.0.get(),
id.1,
(instant - self.sampling_baseline).as_millis(),
serde_json::to_string(&profile.backtrace).unwrap(),
);
bytes.extend(json.as_bytes());
first = false;
}

bytes.extend(b"\n] }");
let _ = self
.constellation_chan
.send(HangMonitorAlert::Profile(bytes));
}

pub fn run(&mut self) -> bool {
self.handle_sampling();

let timeout = if let Some(duration) = self.sampling_duration {
duration
.checked_sub(Instant::now() - self.last_sample)
@@ -273,6 +237,23 @@ impl BackgroundHangMonitorWorker {
Err(_) => return false,
}
},
recv(self.control_port) -> event => {
match event {
Ok(SamplerControlMsg::Enable(rate)) => {
println!("Enabling profiler.");
self.sampling_duration = Some(rate);
self.sampling_baseline = Instant::now();
None
}
Ok(SamplerControlMsg::Disable) => {
println!("Disabling profiler.");
self.finish_sampled_profile();
self.sampling_duration = None;
None
}
Err(_) => return false,
}
}
recv(after(timeout)) -> _ => None,
};
if let Some(msg) = received {
@@ -285,9 +266,10 @@ impl BackgroundHangMonitorWorker {
}

if let Some(duration) = self.sampling_duration {
if Instant::now() - self.last_sample > duration {
let now = Instant::now();
if now - self.last_sample > duration {
self.sample();
self.last_sample = Instant::now();
self.last_sample = now;
}
} else {
self.perform_a_hang_monitor_checkpoint();
@@ -7,8 +7,6 @@
#[macro_use]
extern crate crossbeam_channel;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate log;

pub mod background_hang_monitor;
@@ -124,7 +124,7 @@ use keyboard_types::webdriver::Event as WebDriverInputEvent;
use keyboard_types::KeyboardEvent;
use layout_traits::LayoutThreadFactory;
use log::{Level, LevelFilter, Log, Metadata, Record};
use msg::constellation_msg::{BackgroundHangMonitorRegister, HangMonitorAlert};
use msg::constellation_msg::{BackgroundHangMonitorRegister, HangMonitorAlert, SamplerControlMsg};
use msg::constellation_msg::{
BrowsingContextId, HistoryStateId, PipelineId, TopLevelBrowsingContextId,
};
@@ -208,6 +208,9 @@ pub struct Constellation<Message, LTF, STF> {
/// None when in multiprocess mode.
background_monitor_register: Option<Box<BackgroundHangMonitorRegister>>,

/// Channels to control all sampling profilers.
sampling_profiler_control: Vec<IpcSender<SamplerControlMsg>>,

/// A channel for the background hang monitor to send messages
/// to the constellation.
background_hang_monitor_sender: IpcSender<HangMonitorAlert>,
@@ -614,12 +617,19 @@ where
// If we are in multiprocess mode,
// a dedicated per-process hang monitor will be initialized later inside the content process.
// See run_content_process in servo/lib.rs
let background_monitor_register = if opts::multiprocess() {
None
let (background_monitor_register, sampler_chan) = if opts::multiprocess() {
(None, vec![])
} else {
Some(HangMonitorRegister::init(
background_hang_monitor_sender.clone(),
))
let (sampling_profiler_control, sampling_profiler_port) =
ipc::channel().expect("ipc channel failure");

(
Some(HangMonitorRegister::init(
background_hang_monitor_sender.clone(),
sampling_profiler_port,
)),
vec![sampling_profiler_control],
)
};

let (ipc_layout_sender, ipc_layout_receiver) =
@@ -640,6 +650,7 @@ where
background_hang_monitor_sender,
background_hang_monitor_receiver,
background_monitor_register,
sampling_profiler_control: sampler_chan,
layout_sender: ipc_layout_sender,
script_receiver: script_receiver,
compositor_receiver: compositor_receiver,
@@ -841,18 +852,22 @@ where
Err(e) => return self.handle_send_error(pipeline_id, e),
};

if let Some(sampler_chan) = pipeline.sampler_control_chan {
self.sampling_profiler_control.push(sampler_chan);
}

if let Some(host) = host {
debug!(
"Adding new host entry {} for top-level browsing context {}.",
host, top_level_browsing_context_id
);
let _ = self
.event_loops
.insert(host, Rc::downgrade(&pipeline.event_loop));
.insert(host, Rc::downgrade(&pipeline.pipeline.event_loop));
}

assert!(!self.pipelines.contains_key(&pipeline_id));
self.pipelines.insert(pipeline_id, pipeline);
self.pipelines.insert(pipeline_id, pipeline.pipeline);
}

/// Get an iterator for the fully active browsing contexts in a subtree.
@@ -1202,6 +1217,20 @@ where
self.forward_event(destination_pipeline_id, event);
},
FromCompositorMsg::SetCursor(cursor) => self.handle_set_cursor_msg(cursor),
FromCompositorMsg::EnableProfiler(rate) => {
for chan in &self.sampling_profiler_control {
if let Err(e) = chan.send(SamplerControlMsg::Enable(rate)) {
warn!("error communicating with sampling profiler: {}", e);
}
}
},
FromCompositorMsg::DisableProfiler => {
for chan in &self.sampling_profiler_control {
if let Err(e) = chan.send(SamplerControlMsg::Disable) {
warn!("error communicating with sampling profiler: {}", e);
}
}
},
}
}

Oops, something went wrong.

0 comments on commit 8b7244f

Please sign in to comment.
You can’t perform that action at this time.