-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(trace): host-controlled tracing #92
Changes from all commits
00b9de1
8bfe61a
0b7c9a8
7ebd5fb
5701c02
d3205c7
32bbfd9
7852e5e
d803182
b81ee40
89c78f4
13db994
318c479
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,8 @@ | ||
use crate::{comms::bbq, drivers::serial_mux}; | ||
use level_filters::LevelFilter; | ||
use mnemos_trace_proto::TraceEvent; | ||
use mnemos_trace_proto::{HostRequest, TraceEvent}; | ||
use mycelium_util::sync::InitOnce; | ||
use portable_atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering}; | ||
use portable_atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicU8, AtomicUsize, Ordering}; | ||
|
||
pub use tracing_02::*; | ||
use tracing_core_02::span::Current; | ||
|
@@ -25,7 +25,11 @@ pub struct SerialCollector { | |
// TODO(eliza): Currently, this is recorded but not actually consumed... | ||
dropped_events: AtomicUsize, | ||
|
||
max_level: LevelFilter, | ||
max_level: AtomicU8, | ||
|
||
/// Tracks whether we are inside of the collector's `send_event` method, so | ||
/// that BBQueue tracing can be disabled. | ||
in_send: AtomicBool, | ||
} | ||
|
||
// === impl SerialCollector === | ||
|
@@ -34,35 +38,48 @@ impl SerialCollector { | |
pub const PORT: u16 = 3; | ||
const CAPACITY: usize = 1024 * 4; | ||
|
||
pub const fn new(max_level: LevelFilter) -> Self { | ||
pub const fn new() -> Self { | ||
Self::with_max_level(LevelFilter::OFF) | ||
} | ||
|
||
pub const fn with_max_level(max_level: LevelFilter) -> Self { | ||
Self { | ||
tx: InitOnce::uninitialized(), | ||
current_span: AtomicU64::new(0), | ||
current_meta: AtomicPtr::new(core::ptr::null_mut()), | ||
next_id: AtomicU64::new(1), | ||
dropped_events: AtomicUsize::new(0), | ||
max_level, | ||
max_level: AtomicU8::new(level_to_u8(max_level)), | ||
in_send: AtomicBool::new(false), | ||
} | ||
} | ||
|
||
pub async fn start(&'static self, k: &'static crate::Kernel) { | ||
let mut mux = serial_mux::SerialMuxClient::from_registry(k) | ||
.await | ||
.expect("cannot initialize serial tracing, no serial mux exists!"); | ||
let port = mux | ||
.open_port(3, 1024) | ||
.await | ||
.expect("cannot initialize serial tracing, cannot open port 3!"); | ||
// acquire sermux port 3 | ||
let port = { | ||
let mut mux = serial_mux::SerialMuxClient::from_registry(k) | ||
.await | ||
.expect("cannot initialize serial tracing, no serial mux exists!"); | ||
mux.open_port(3, 1024) | ||
.await | ||
.expect("cannot initialize serial tracing, cannot open port 3!") | ||
}; | ||
|
||
let (tx, rx) = bbq::new_spsc_channel(k.heap(), Self::CAPACITY).await; | ||
self.tx.init(tx); | ||
k.spawn(Self::worker(rx, port)).await; | ||
|
||
// set the default tracing collector | ||
let dispatch = tracing_02::Dispatch::from_static(self); | ||
tracing_02::dispatch::set_global_default(dispatch) | ||
.expect("cannot set global default tracing dispatcher"); | ||
|
||
// spawn a worker to read from the channel and write to the serial port. | ||
k.spawn(Self::worker(self, rx, port, k)).await; | ||
} | ||
|
||
/// Serialize a `TraceEvent`, returning `true` if the event was correctly serialized. | ||
fn send_event<'a>(&self, sz: usize, event: impl FnOnce() -> TraceEvent<'a>) -> bool { | ||
self.in_send.store(true, Ordering::Release); | ||
let Some(mut wgr) = self.tx.get().send_grant_exact_sync(sz) else { | ||
self.dropped_events.fetch_add(1, Ordering::Relaxed); | ||
return false; | ||
|
@@ -82,29 +99,118 @@ impl SerialCollector { | |
} | ||
}; | ||
wgr.commit(len); | ||
self.in_send.store(false, Ordering::Release); | ||
|
||
// return true if we committed a non-zero number of bytes. | ||
len > 0 | ||
} | ||
|
||
async fn worker(rx: bbq::Consumer, port: serial_mux::PortHandle) { | ||
loop { | ||
let rgr = rx.read_grant().await; | ||
async fn worker( | ||
&'static self, | ||
rx: bbq::Consumer, | ||
port: serial_mux::PortHandle, | ||
k: &'static crate::Kernel, | ||
) { | ||
use futures::FutureExt; | ||
use maitake::time; | ||
use postcard::accumulator::{CobsAccumulator, FeedResult}; | ||
|
||
// we probably won't use 256 whole bytes of cobs yet since all the host | ||
// -> target messages are quite small | ||
let mut cobs_buf: CobsAccumulator<16> = CobsAccumulator::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're gunna want some kind of config soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. definitely, yeah |
||
let mut read_level = |rgr: bbq::GrantR| { | ||
let mut window = &rgr[..]; | ||
let len = rgr.len(); | ||
port.send(&rgr[..]).await; | ||
'cobs: while !window.is_empty() { | ||
window = match cobs_buf.feed_ref::<HostRequest>(window) { | ||
FeedResult::Consumed => break 'cobs, | ||
FeedResult::OverFull(new_wind) => new_wind, | ||
FeedResult::DeserError(new_wind) => new_wind, | ||
FeedResult::Success { data, remaining } => { | ||
match data { | ||
HostRequest::SetMaxLevel(lvl) => { | ||
let level = lvl | ||
.map(|lvl| lvl as u8) | ||
.unwrap_or(level_to_u8(LevelFilter::OFF)); | ||
let prev = self.max_level.swap(level, Ordering::AcqRel); | ||
if prev != level { | ||
tracing_core_02::callsite::rebuild_interest_cache(); | ||
} | ||
} | ||
} | ||
|
||
remaining | ||
} | ||
}; | ||
} | ||
rgr.release(len); | ||
}; | ||
|
||
loop { | ||
'idle: loop { | ||
let mut heartbeat = [0u8; 8]; | ||
let heartbeat = { | ||
let level = u8_to_level(self.max_level.load(Ordering::Acquire)) | ||
.into_level() | ||
.as_ref() | ||
.map(AsSerde::as_serde); | ||
postcard::to_slice_cobs(&TraceEvent::Heartbeat(level), &mut heartbeat[..]) | ||
.expect("failed to encode heartbeat msg") | ||
}; | ||
port.send(heartbeat).await; | ||
if let Ok(rgr) = k | ||
.timer() | ||
.timeout(time::Duration::from_secs(2), port.consumer().read_grant()) | ||
.await | ||
{ | ||
read_level(rgr); | ||
|
||
// ack the new max level | ||
let mut ack = [0u8; 8]; | ||
let ack = { | ||
let level = u8_to_level(self.max_level.load(Ordering::Acquire)) | ||
.into_level() | ||
.as_ref() | ||
.map(AsSerde::as_serde); | ||
postcard::to_slice_cobs(&TraceEvent::Heartbeat(level), &mut ack[..]) | ||
.expect("failed to encode heartbeat msg") | ||
}; | ||
port.send(ack).await; | ||
break 'idle; | ||
} | ||
} | ||
|
||
loop { | ||
futures::select_biased! { | ||
rgr = rx.read_grant().fuse() => { | ||
let len = rgr.len(); | ||
port.send(&rgr[..]).await; | ||
rgr.release(len) | ||
}, | ||
rgr = port.consumer().read_grant().fuse() => { | ||
read_level(rgr); | ||
}, | ||
// TODO(eliza): make the host also send a heartbeat, and | ||
// if we don't get it, break back to the idle loop... | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[inline] | ||
fn level_enabled(&self, metadata: &Metadata<'_>) -> bool { | ||
// TODO(eliza): more sophisticated filtering | ||
metadata.level() <= &u8_to_level(self.max_level.load(Ordering::Relaxed)) | ||
} | ||
} | ||
|
||
impl Collect for SerialCollector { | ||
fn enabled(&self, metadata: &Metadata<'_>) -> bool { | ||
// TODO(eliza): more sophisticated filtering | ||
metadata.level() <= &self.max_level | ||
self.level_enabled(metadata) && !self.in_send.load(Ordering::Acquire) | ||
} | ||
|
||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core_02::Interest { | ||
if !self.enabled(metadata) { | ||
if !self.level_enabled(metadata) { | ||
return tracing_core_02::Interest::never(); | ||
} | ||
|
||
|
@@ -116,17 +222,28 @@ impl Collect for SerialCollector { | |
meta: metadata.as_serde(), | ||
}); | ||
|
||
if sent { | ||
tracing_core_02::Interest::always() | ||
} else { | ||
// if we couldn't send the metadata, skip this callsite, because the | ||
// consumer will not be able to understand it without its metadata. | ||
tracing_core_02::Interest::never() | ||
// If we couldn't send the metadata, skip this callsite, because the | ||
// consumer will not be able to understand it without its metadata. | ||
if !sent { | ||
return tracing_core_02::Interest::never(); | ||
} | ||
|
||
// Due to the fact that the collector uses `bbq` internally, we must | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels a little icky, because this is a platform-specific impl detail - melpo doesn't use bbqueue for the tracing collection. Might be worth instead to::
I don't think this is a today blocker, but if we merge this as-is it might be good to throw up an issue to fix this someday. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, this collector does use |
||
// return `Interest::sometimes` rather than `Interest::always` for | ||
// `bbq` callsites, so that they can be dynamically enabled/disabled | ||
// by the `enabled` method based on whether or not we are inside the | ||
// collector. This avoids an infinite loop that previously occurred | ||
// when enabling the `TRACE` level. | ||
if metadata.target().starts_with("kernel::comms::bbq") { | ||
return tracing_core_02::Interest::sometimes(); | ||
} | ||
|
||
// Otherwise, always enable this callsite. | ||
tracing_core_02::Interest::always() | ||
} | ||
|
||
fn max_level_hint(&self) -> Option<LevelFilter> { | ||
Some(self.max_level) | ||
Some(u8_to_level(self.max_level.load(Ordering::Relaxed))) | ||
} | ||
|
||
fn new_span(&self, span: &span::Attributes<'_>) -> span::Id { | ||
|
@@ -195,3 +312,25 @@ impl Collect for SerialCollector { | |
false | ||
} | ||
} | ||
|
||
const fn level_to_u8(level: LevelFilter) -> u8 { | ||
match level { | ||
LevelFilter::TRACE => 0, | ||
LevelFilter::DEBUG => 1, | ||
LevelFilter::INFO => 2, | ||
LevelFilter::WARN => 3, | ||
LevelFilter::ERROR => 4, | ||
LevelFilter::OFF => 5, | ||
} | ||
} | ||
|
||
const fn u8_to_level(level: u8) -> LevelFilter { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be good to have an invalid level be an error (and ignore the "change log level command" instead), otherwise if we receive a garbage message that HAPPENS to look like a command, we have a 251/256 chance of just shutting off all logging. Tho I guess we don't really need to log at all to a connection that is sending us garbage. Hm. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these functions are used internally for converting the |
||
match level { | ||
0 => LevelFilter::TRACE, | ||
1 => LevelFilter::DEBUG, | ||
2 => LevelFilter::INFO, | ||
3 => LevelFilter::WARN, | ||
4 => LevelFilter::ERROR, | ||
_ => LevelFilter::OFF, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,14 @@ | |
|
||
use core::{fmt, num::NonZeroU64}; | ||
use tracing_serde_structured::{ | ||
SerializeId, SerializeMetadata, SerializeRecordFields, SerializeSpanFields, | ||
SerializeId, SerializeLevel, SerializeMetadata, SerializeRecordFields, SerializeSpanFields, | ||
}; | ||
|
||
#[derive(serde::Serialize, serde::Deserialize)] | ||
pub enum TraceEvent<'a> { | ||
/// Sent by the target periodically when not actively tracing, to indicate | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to mark this non-exhaustive and add things to the end instead, to avoid breaking wire format, once we care about that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, i figured we didn't currently care about stability for the crate API or for the wire format, but we can do that in the future. |
||
/// liveness, or to ack a [`HostRequest::SetMaxLevel`]. | ||
Heartbeat(Option<SerializeLevel>), | ||
RegisterMeta { | ||
id: MetaId, | ||
|
||
|
@@ -36,6 +39,16 @@ pub enum TraceEvent<'a> { | |
DropSpan(SerializeId), | ||
} | ||
|
||
/// Requests sent from a host to a trace target. | ||
#[derive(serde::Serialize, serde::Deserialize)] | ||
pub enum HostRequest { | ||
/// Sets the maximum tracing level. Traces above this verbosity level will | ||
/// be discarded. | ||
/// | ||
/// This may cause the trace target to send new metadata to the host. | ||
SetMaxLevel(Option<SerializeLevel>), // TODO(eliza): add a keepalive? | ||
} | ||
|
||
#[derive(Copy, Clone, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize)] | ||
pub struct MetaId(NonZeroU64); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love it