Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send structured fields on the wire #26

Merged
merged 5 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions console-api/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata {
location: Some(location),
kind: kind as i32,
level: metadata::Level::from(*meta.level()) as i32,
field_names: Vec::new(),
..Default::default()
}
}
Expand All @@ -61,6 +62,31 @@ impl<'a> From<&'a std::panic::Location<'a>> for Location {
}
}

impl fmt::Display for field::Value {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
field::Value::BoolVal(v) => fmt::Display::fmt(v, f)?,
field::Value::StrVal(v) => fmt::Display::fmt(v, f)?,
field::Value::U64Val(v) => fmt::Display::fmt(v, f)?,
field::Value::DebugVal(v) => fmt::Display::fmt(v, f)?,
field::Value::I64Val(v) => fmt::Display::fmt(v, f)?,
}

Ok(())
}
}

impl fmt::Display for Field {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name_val = (self.name.as_ref(), self.value.as_ref());
if let (Some(field::Name::StrName(name)), Some(val)) = name_val {
write!(f, "{}={}", name, val)?;
}

Ok(())
}
}

impl fmt::Display for Location {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match (self.module_path.as_ref(), self.file.as_ref()) {
Expand Down Expand Up @@ -109,3 +135,39 @@ impl From<&'static tracing_core::Metadata<'static>> for register_metadata::NewMe
}
}
}

impl From<i64> for field::Value {
fn from(val: i64) -> Self {
field::Value::I64Val(val)
}
}

impl From<u64> for field::Value {
fn from(val: u64) -> Self {
field::Value::U64Val(val)
}
}

impl From<bool> for field::Value {
fn from(val: bool) -> Self {
field::Value::BoolVal(val)
}
}

impl From<&str> for field::Value {
fn from(val: &str) -> Self {
field::Value::StrVal(val.into())
}
}

impl From<&str> for field::Name {
fn from(val: &str) -> Self {
field::Name::StrName(val.into())
}
}

impl From<&dyn std::fmt::Debug> for field::Value {
fn from(val: &dyn std::fmt::Debug) -> Self {
field::Value::DebugVal(format!("{:?}", val))
}
}
4 changes: 2 additions & 2 deletions console-subscriber/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct TaskData<T> {

struct Task {
metadata: &'static Metadata<'static>,
fields: String,
fields: Vec<proto::Field>,
}

impl Aggregator {
Expand Down Expand Up @@ -385,9 +385,9 @@ impl Task {
id: Some(id.into()),
// TODO: more kinds of tasks...
kind: proto::tasks::task::Kind::Spawn as i32,
string_fields: self.fields.clone(),
metadata: Some(self.metadata.into()),
parents: Vec::new(), // TODO: implement parents nicely
fields: self.fields.clone(),
}
}
}
92 changes: 60 additions & 32 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,21 @@ use std::{
time::{Duration, SystemTime},
};
use tracing_core::{
field::{self, Visit},
span,
subscriber::{self, Subscriber},
Metadata,
};
use tracing_subscriber::{
fmt::{
format::{DefaultFields, FormatFields},
FormattedFields,
},
layer::Context,
registry::LookupSpan,
Layer,
};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

mod aggregator;
use aggregator::Aggregator;

pub struct TasksLayer<F = DefaultFields> {
pub struct TasksLayer {
task_meta: AtomicPtr<Metadata<'static>>,
blocking_meta: AtomicPtr<Metadata<'static>>,
tx: mpsc::Sender<Event>,
flush: Arc<aggregator::Flush>,
format: F,
}

pub struct Server {
Expand All @@ -43,6 +35,11 @@ pub struct Server {
client_buffer: usize,
}

struct FieldVisitor {
fields: Vec<proto::Field>,
meta_id: proto::MetaId,
}

struct Watch(mpsc::Sender<Result<proto::tasks::TaskUpdate, tonic::Status>>);

enum Event {
Expand All @@ -51,7 +48,7 @@ enum Event {
id: span::Id,
metadata: &'static Metadata<'static>,
at: SystemTime,
fields: String,
fields: Vec<proto::Field>,
},
Enter {
id: span::Id,
Expand Down Expand Up @@ -94,13 +91,12 @@ impl TasksLayer {
flush,
task_meta: AtomicPtr::new(ptr::null_mut()),
blocking_meta: AtomicPtr::new(ptr::null_mut()),
format: Default::default(),
};
(layer, server)
}
}

impl<F> TasksLayer<F> {
impl TasksLayer {
pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 10;
pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
pub const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -163,10 +159,9 @@ impl<F> TasksLayer<F> {
}
}

impl<S, F> Layer<S> for TasksLayer<F>
impl<S> Layer<S> for TasksLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
F: for<'writer> FormatFields<'writer> + 'static,
{
fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
if meta.target() == "tokio::task" && meta.name() == "task" {
Expand All @@ -192,30 +187,21 @@ where
subscriber::Interest::always()
}

fn new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, cx: Context<'_, S>) {
fn new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, _: Context<'_, S>) {
let metadata = attrs.metadata();
if self.is_spawn(metadata) {
let at = SystemTime::now();
let span = cx.span(id).expect("newly-created span should exist");
let mut exts = span.extensions_mut();
let fields = match exts.get_mut::<FormattedFields<F>>() {
Some(fields) => fields.fields.clone(),
None => {
let mut fields = String::new();
match self.format.format_fields(&mut fields, attrs) {
Ok(()) => exts.insert(FormattedFields::<F>::new(fields.clone())),
Err(_) => {
tracing::warn!(span.id = ?id, span.attrs = ?attrs, "error formatting fields for span")
}
}
fields
}
let mut fields_collector = FieldVisitor {
fields: Vec::default(),
meta_id: metadata.into(),
};
attrs.record(&mut fields_collector);

self.send(Event::Spawn {
id: id.clone(),
at,
metadata,
fields,
fields: fields_collector.fields,
});
}
}
Expand Down Expand Up @@ -304,3 +290,45 @@ impl proto::tasks::tasks_server::Tasks for Server {
Ok(tonic::Response::new(stream))
}
}

impl Visit for FieldVisitor {
fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that &dyn std::error::Error is also a potential primitive tracing field value.

this implementation will just record Errors with fmt::Debug, which is fine for now, but it might be nice to also allow recording Errors in a more structured way as a follow-up? we could have an Error message which includes the Error type's fmt::Display output and an optional source field, or maybe the entire chain of sources as a protobuf list?

right now this wouldn't be particularly useful, since we only look at the tokio::task spans which never record Error fields. but in the future, we could potentially support nice error reporting in the console, so it's something that could be worth thinking about as a follow-up change...

}
13 changes: 11 additions & 2 deletions console/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
cell::RefCell,
collections::HashMap,
convert::TryFrom,
fmt::Write,
rc::{Rc, Weak},
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -115,13 +116,21 @@ impl State {
proto::tasks::task::Kind::Spawn => "T",
proto::tasks::task::Kind::Blocking => "B",
};

let fields = task
.fields
.iter()
.fold(String::new(), |mut res, f| {
write!(&mut res, "{} ", f).unwrap();
res
})
.trim_end()
.into();
let id = task.id?.id;
let stats = stats_update.remove(&id)?.into();
let task = Task {
id,
id_hex: format!("{:x}", id),
fields: task.string_fields,
fields,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for now, since the purpose of this change was just to add structured fields to the wire format.

a good follow-up would be to change the console TUI to store task fields as structured values as well (maybe a hashmap of names -> values for starters?). that would enable some additional things we could do, such as sorting the task list by the values of various fields, filtering which fields are displayed, and nicer formatting (e.g. bolding field names or displaying them in a different color).

kind,
stats,
completed_for: 0,
Expand Down
22 changes: 19 additions & 3 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,28 @@ message SpanId {
uint64 id = 1;
}

// A message representing a key-value pair of data associated with a `Span`
message Field {
oneof field {
string debug = 1;
oneof name {
// The string representation of the name.
string str_name = 1;
// An index position into the `Metadata.field_names`.
uint64 name_idx = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in order to actually use this, we'll also want to include the list of field names in the metadata message, so that these indices are meaningful. but we don't have to do that in this PR, that can be added in a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, just a thought: it might be worth having this message also include the metadata ID that the index refers to, so that a Field message with a name index can always be hydrated? but, since the Field message is only ever sent in the Span and Task messages, which also include metadata IDs, this isn't strictly necessary...

}
oneof value {
string debug_val = 3;
string str_val = 4;
uint64 u64_val = 5;
sint64 i64_val = 6;
bool bool_val = 7;
}
MetaId metadata_id = 8;
}

message Span {
SpanId id = 1;
MetaId metadata_id = 2;
map<string, Field> fields = 3;
repeated Field fields = 3;
google.protobuf.Timestamp at = 4;
}

Expand All @@ -54,6 +66,10 @@ message Metadata {
Kind kind = 5;
Level level = 6;

// The names of the key-value fields attached to the
// span or event this metadata is associated with.
repeated string field_names = 7;
Comment on lines +69 to +71
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow-up, we should actually populate this field...


enum Kind {
SPAN = 0;
EVENT = 1;
Expand Down
9 changes: 3 additions & 6 deletions proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ message Task {
// When the task's stats change, or when the task completes, it will be
// identified by this ID; if the client requires additional information
// included in the `Task` message, it should store that data and access it
// by ID.
// by ID.
common.SpanId id = 1;
// The numeric ID of the task's `Metadata`.
//
Expand All @@ -74,11 +74,8 @@ message Task {
// The category of task this task belongs to.
Kind kind = 3;

// A string representation of any fields recorded about this task.
//
// NOTE: eventually, it would be nice to support structured fields in tasks;
// we can deprecate this when we add that.
string string_fields = 4;
// A list of `Field` objects attached to this task.
repeated common.Field fields = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it would be nice to have a comment here


// An ordered list of span IDs corresponding to the `tracing` span context
// in which this task was spawned.
Expand Down