From 518755f3d6fda8d692b062e3f9b82be78dfc9248 Mon Sep 17 00:00:00 2001 From: Tatsuyuki Ishi Date: Mon, 27 Feb 2023 17:38:38 +0900 Subject: [PATCH] perf_event: Remove the size limit from EventIter. To sort perf events on the fly, all available events from one stream must be examined at once, which is not possible with the current EventIter restricted to up to 31 event at once. Rewrite the out-of-order commit logic so that pending commits are tracked in a binary heap, which will remain small as long as events are dropped in-order but can also comfortably accommodate an unbounded amount of out-of-order drop. This changes the event iteration order. Previously, we iterated over 31 events from each buffer before going on to the next. Now, all available events in each buffer will be consumed before going on to the next. This incurs more buffering than before, but live event sorting will incur even more buffering, so seeing this as a transition step it should be acceptable. --- samply/src/linux/perf_event.rs | 126 +++++++++++++++------------------ 1 file changed, 58 insertions(+), 68 deletions(-) diff --git a/samply/src/linux/perf_event.rs b/samply/src/linux/perf_event.rs index 3ee375e2e..51fa7a0da 100644 --- a/samply/src/linux/perf_event.rs +++ b/samply/src/linux/perf_event.rs @@ -1,5 +1,5 @@ use std::cmp::max; -use std::fmt; +use std::collections::BinaryHeap; use std::io; use std::mem; use std::ops::Range; @@ -9,6 +9,7 @@ use std::slice; use std::sync::atomic::fence; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::{cmp, fmt}; use libc::{self, c_void, pid_t}; use linux_perf_data::linux_perf_event_reader; @@ -69,6 +70,13 @@ unsafe fn read_head(pointer: *const u8) -> u64 { head } +unsafe fn read_tail(pointer: *const u8) -> u64 { + let page = &*(pointer as *const PerfEventMmapPage); + // No memory fence required because we're just reading a value previously + // written by us. + ptr::read_volatile(&page.data_tail) +} + unsafe fn write_tail(pointer: *mut u8, value: u64) { let page = &mut *(pointer as *mut PerfEventMmapPage); fence(Ordering::AcqRel); @@ -462,8 +470,7 @@ impl Perf { struct EventRefState { buffer: *mut u8, size: u64, - done: u32, - positions: [u64; 32], + pending_commits: BinaryHeap>, } impl EventRefState { @@ -471,8 +478,28 @@ impl EventRefState { EventRefState { buffer, size, - done: !0, - positions: [0; 32], + pending_commits: BinaryHeap::new(), + } + } + + /// Mark the read of [from, to) as complete. + /// If reads are completed in-order, then this will advance the tail pointer to `to` immediately. + /// Otherwise, it will remain in the "pending commit" queue, and committed once all previous + /// reads are also committed. + fn try_commit(&mut self, from: u64, to: u64) { + self.pending_commits.push(cmp::Reverse((from, to))); + + let mut position = unsafe { read_tail(self.buffer) }; + while let Some(&cmp::Reverse((from, to))) = self.pending_commits.peek() { + if from == position { + unsafe { + write_tail(self.buffer, to); + } + position = to; + self.pending_commits.pop(); + } else { + break; + } } } } @@ -485,13 +512,21 @@ impl Drop for EventRefState { } } +/// Handle to a single event in the perf ring buffer. +/// +/// On Drop, the event will be "consumed" and the read pointer will be advanced. +/// +/// If events are dropped out of order, then it will be added to a list of pending commits and +/// committed when all prior events are also dropped. For this reason, events should be dropped +/// in-order to achieve the lowest overhead. #[derive(Clone)] pub struct EventRef { buffer: *mut u8, buffer_size: usize, - event_location: RawRecordLocation, - mask: u32, state: Arc>, + event_location: RawRecordLocation, + prev_position: u64, + position: u64, parse_info: RecordParseInfo, } @@ -499,7 +534,8 @@ impl fmt::Debug for EventRef { fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { fmt.debug_map() .entry(&"location", &self.event_location) - .entry(&"mask", &format!("{:032b}", self.mask)) + .entry(&"prev_position", &self.prev_position) + .entry(&"position", &self.position) .finish() } } @@ -507,18 +543,9 @@ impl fmt::Debug for EventRef { impl Drop for EventRef { #[inline] fn drop(&mut self) { - let mut state = self.state.lock(); - let last_empty_spaces = state.done.leading_zeros(); - state.done &= self.mask; - let empty_spaces = state.done.leading_zeros(); - - debug_assert!(empty_spaces >= last_empty_spaces); - if empty_spaces != last_empty_spaces { - let position = state.positions[empty_spaces as usize]; - unsafe { - write_tail(self.buffer, position); - } - } + self.state + .lock() + .try_commit(self.prev_position, self.position); } } @@ -532,45 +559,12 @@ impl EventRef { pub struct EventIter<'a> { perf: &'a mut Perf, - index: usize, - locations: Vec, - state: Arc>, } impl<'a> EventIter<'a> { #[inline] fn new(perf: &'a mut Perf) -> Self { - let mut locations = Vec::with_capacity(32); - - { - let state = Arc::get_mut(&mut perf.event_ref_state) - .expect("Perf::iter called while the previous iterator hasn't finished processing"); - let state = state.get_mut(); - - for _ in 0..31 { - state.positions[locations.len()] = perf.position; - let raw_event_location = - match next_raw_event(perf.buffer, perf.size, &mut perf.position) { - Some(location) => location, - None => break, - }; - - locations.push(raw_event_location); - } - - state.positions[locations.len()] = perf.position; - state.done = !0; - } - - // trace!("Batched {} events for PID {}", count, perf.pid); - - let state = perf.event_ref_state.clone(); - EventIter { - perf, - index: 0, - locations, - state, - } + EventIter { perf } } } @@ -579,21 +573,17 @@ impl<'a> Iterator for EventIter<'a> { #[inline] fn next(&mut self) -> Option { - if self.index == self.locations.len() { - return None; - } - - let event_location = self.locations[self.index].clone(); - let event = EventRef { - buffer: self.perf.buffer, - buffer_size: self.perf.size as usize, + let perf = &mut self.perf; + let prev_position = perf.position; + let event_location = next_raw_event(perf.buffer, perf.size, &mut perf.position)?; + Some(EventRef { + buffer: perf.buffer, + buffer_size: perf.size as usize, + state: perf.event_ref_state.clone(), event_location, - mask: !(1 << (31 - self.index)), - state: self.state.clone(), + prev_position, + position: perf.position, parse_info: self.perf.parse_info, - }; - - self.index += 1; - Some(event) + }) } }