diff --git a/samply/src/linux/perf_event.rs b/samply/src/linux/perf_event.rs index 3ee375e2e..51fa7a0da 100644 --- a/samply/src/linux/perf_event.rs +++ b/samply/src/linux/perf_event.rs @@ -1,5 +1,5 @@ use std::cmp::max; -use std::fmt; +use std::collections::BinaryHeap; use std::io; use std::mem; use std::ops::Range; @@ -9,6 +9,7 @@ use std::slice; use std::sync::atomic::fence; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::{cmp, fmt}; use libc::{self, c_void, pid_t}; use linux_perf_data::linux_perf_event_reader; @@ -69,6 +70,13 @@ unsafe fn read_head(pointer: *const u8) -> u64 { head } +unsafe fn read_tail(pointer: *const u8) -> u64 { + let page = &*(pointer as *const PerfEventMmapPage); + // No memory fence required because we're just reading a value previously + // written by us. + ptr::read_volatile(&page.data_tail) +} + unsafe fn write_tail(pointer: *mut u8, value: u64) { let page = &mut *(pointer as *mut PerfEventMmapPage); fence(Ordering::AcqRel); @@ -462,8 +470,7 @@ impl Perf { struct EventRefState { buffer: *mut u8, size: u64, - done: u32, - positions: [u64; 32], + pending_commits: BinaryHeap>, } impl EventRefState { @@ -471,8 +478,28 @@ impl EventRefState { EventRefState { buffer, size, - done: !0, - positions: [0; 32], + pending_commits: BinaryHeap::new(), + } + } + + /// Mark the read of [from, to) as complete. + /// If reads are completed in-order, then this will advance the tail pointer to `to` immediately. + /// Otherwise, it will remain in the "pending commit" queue, and committed once all previous + /// reads are also committed. + fn try_commit(&mut self, from: u64, to: u64) { + self.pending_commits.push(cmp::Reverse((from, to))); + + let mut position = unsafe { read_tail(self.buffer) }; + while let Some(&cmp::Reverse((from, to))) = self.pending_commits.peek() { + if from == position { + unsafe { + write_tail(self.buffer, to); + } + position = to; + self.pending_commits.pop(); + } else { + break; + } } } } @@ -485,13 +512,21 @@ impl Drop for EventRefState { } } +/// Handle to a single event in the perf ring buffer. +/// +/// On Drop, the event will be "consumed" and the read pointer will be advanced. +/// +/// If events are dropped out of order, then it will be added to a list of pending commits and +/// committed when all prior events are also dropped. For this reason, events should be dropped +/// in-order to achieve the lowest overhead. #[derive(Clone)] pub struct EventRef { buffer: *mut u8, buffer_size: usize, - event_location: RawRecordLocation, - mask: u32, state: Arc>, + event_location: RawRecordLocation, + prev_position: u64, + position: u64, parse_info: RecordParseInfo, } @@ -499,7 +534,8 @@ impl fmt::Debug for EventRef { fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { fmt.debug_map() .entry(&"location", &self.event_location) - .entry(&"mask", &format!("{:032b}", self.mask)) + .entry(&"prev_position", &self.prev_position) + .entry(&"position", &self.position) .finish() } } @@ -507,18 +543,9 @@ impl fmt::Debug for EventRef { impl Drop for EventRef { #[inline] fn drop(&mut self) { - let mut state = self.state.lock(); - let last_empty_spaces = state.done.leading_zeros(); - state.done &= self.mask; - let empty_spaces = state.done.leading_zeros(); - - debug_assert!(empty_spaces >= last_empty_spaces); - if empty_spaces != last_empty_spaces { - let position = state.positions[empty_spaces as usize]; - unsafe { - write_tail(self.buffer, position); - } - } + self.state + .lock() + .try_commit(self.prev_position, self.position); } } @@ -532,45 +559,12 @@ impl EventRef { pub struct EventIter<'a> { perf: &'a mut Perf, - index: usize, - locations: Vec, - state: Arc>, } impl<'a> EventIter<'a> { #[inline] fn new(perf: &'a mut Perf) -> Self { - let mut locations = Vec::with_capacity(32); - - { - let state = Arc::get_mut(&mut perf.event_ref_state) - .expect("Perf::iter called while the previous iterator hasn't finished processing"); - let state = state.get_mut(); - - for _ in 0..31 { - state.positions[locations.len()] = perf.position; - let raw_event_location = - match next_raw_event(perf.buffer, perf.size, &mut perf.position) { - Some(location) => location, - None => break, - }; - - locations.push(raw_event_location); - } - - state.positions[locations.len()] = perf.position; - state.done = !0; - } - - // trace!("Batched {} events for PID {}", count, perf.pid); - - let state = perf.event_ref_state.clone(); - EventIter { - perf, - index: 0, - locations, - state, - } + EventIter { perf } } } @@ -579,21 +573,17 @@ impl<'a> Iterator for EventIter<'a> { #[inline] fn next(&mut self) -> Option { - if self.index == self.locations.len() { - return None; - } - - let event_location = self.locations[self.index].clone(); - let event = EventRef { - buffer: self.perf.buffer, - buffer_size: self.perf.size as usize, + let perf = &mut self.perf; + let prev_position = perf.position; + let event_location = next_raw_event(perf.buffer, perf.size, &mut perf.position)?; + Some(EventRef { + buffer: perf.buffer, + buffer_size: perf.size as usize, + state: perf.event_ref_state.clone(), event_location, - mask: !(1 << (31 - self.index)), - state: self.state.clone(), + prev_position, + position: perf.position, parse_info: self.perf.parse_info, - }; - - self.index += 1; - Some(event) + }) } }