diff --git a/Cargo.toml b/Cargo.toml index 27d779b..2073eb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ description = "Simple, modern, ergonomic JSON-RPC 2.0 router built with tower an keywords = ["json-rpc", "jsonrpc", "json"] categories = ["web-programming::http-server", "web-programming::websocket"] -version = "0.3.4" +version = "0.4.0" edition = "2021" rust-version = "1.81" authors = ["init4", "James Prestwich"] @@ -15,6 +15,7 @@ repository = "https://github.com/init4tech/ajj" [dependencies] bytes = "1.9.0" +opentelemetry = "0.31.0" pin-project = "1.1.8" serde = { version = "1.0.217", features = ["derive"] } serde_json = { version = "1.0.135", features = ["raw_value"] } @@ -23,10 +24,12 @@ tokio = { version = "1.43.0", features = ["sync", "rt", "macros"] } tokio-util = { version = "0.7.13", features = ["io", "rt"] } tower = { version = "0.5.2", features = ["util"] } tracing = "0.1.41" +tracing-opentelemetry = "0.32.0" # axum axum = { version = "0.8.1", optional = true } mime = { version = "0.3.17", optional = true } +opentelemetry-http = { version = "0.31.0", optional = true } # pubsub tokio-stream = { version = "0.1.17", optional = true } @@ -51,7 +54,7 @@ eyre = "0.6.12" [features] default = ["axum", "ws", "ipc"] -axum = ["dep:axum", "dep:mime"] +axum = ["dep:axum", "dep:mime", "dep:opentelemetry-http"] pubsub = ["dep:tokio-stream", "axum?/ws"] ipc = ["pubsub", "dep:interprocess"] ws = ["pubsub", "dep:tokio-tungstenite", "dep:futures-util"] diff --git a/README.md b/README.md index dfaa536..22ae42e 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,23 @@ implementations. See the [crate documentation on docs.rs] for more detailed examples. +## Specification Complinace + +`ajj` aims to be fully compliant with the [JSON-RPC 2.0] specification. If any +issues are found, please [open an issue]! + +`ajj` produces [`tracing`] spans and events that meet the [OpenTelemetry +semantic conventions] for JSON-RPC servers with the following exceptions: + +- The `server.address` attribute is NOT set, as the server address is not always + known to the ajj system. +- `rpc.message` events are included in AJJ system spans for the batch request, + which technically does not comply with semantic conventions. The semantic + conventions do not specify how to handle batch requests, and assume that each + message corresponds to a separate request. In AJJ, batch requests are a single + message, and result in a single `rpc.message` event at receipt and at + response. + ## Note on code provenance Some code in this project has been reproduced or adapted from other projects. @@ -94,3 +111,6 @@ reproduced from the following projects, and we are grateful for their work: [`interprocess::local_socket::ListenerOptions`]: https://docs.rs/interprocess/latest/interprocess/local_socket/struct.ListenerOptions.html [std::net::SocketAddr]: https://doc.rust-lang.org/std/net/enum.SocketAddr.html [alloy]: https://docs.rs/alloy/latest/alloy/ +[open an issue]: https://github.com/init4tech/ajj/issues/new +[OpenTelemetry semantic conventions]: https://opentelemetry.io/docs/specs/semconv/rpc/json-rpc/ +[`tracing`]: https://docs.rs/tracing/latest/tracing/ diff --git a/src/axum.rs b/src/axum.rs index 7bb337e..d1327e8 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -1,6 +1,6 @@ use crate::{ types::{InboundData, Response}, - HandlerCtx, TaskSet, + HandlerCtx, TaskSet, TracingInfo, }; use axum::{ extract::FromRequest, @@ -8,8 +8,13 @@ use axum::{ response::IntoResponse, }; use bytes::Bytes; -use std::{future::Future, pin::Pin}; +use std::{ + future::Future, + pin::Pin, + sync::{atomic::AtomicU32, Arc}, +}; use tokio::runtime::Handle; +use tracing::{Instrument, Span}; /// A wrapper around an [`Router`] that implements the /// [`axum::handler::Handler`] trait. This struct is an implementation detail @@ -21,7 +26,13 @@ use tokio::runtime::Handle; #[derive(Debug, Clone)] pub(crate) struct IntoAxum { pub(crate) router: crate::Router, + pub(crate) task_set: TaskSet, + + /// Counter for OTEL messages received. + pub(crate) rx_msg_id: Arc, + /// Counter for OTEL messages sent. + pub(crate) tx_msg_id: Arc, } impl From> for IntoAxum { @@ -29,6 +40,8 @@ impl From> for IntoAxum { Self { router, task_set: Default::default(), + rx_msg_id: Arc::new(AtomicU32::new(1)), + tx_msg_id: Arc::new(AtomicU32::new(1)), } } } @@ -39,12 +52,26 @@ impl IntoAxum { Self { router, task_set: handle.into(), + rx_msg_id: Arc::new(AtomicU32::new(1)), + tx_msg_id: Arc::new(AtomicU32::new(1)), } } +} - /// Get a new context, built from the task set. - fn ctx(&self) -> HandlerCtx { - self.task_set.clone().into() +impl IntoAxum +where + S: Clone + Send + Sync + 'static, +{ + fn ctx(&self, req: &axum::extract::Request) -> HandlerCtx { + let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&opentelemetry_http::HeaderExtractor(req.headers())) + }); + + HandlerCtx::new( + None, + self.task_set.clone(), + TracingInfo::new_with_context(self.router.service_name(), parent_context), + ) } } @@ -56,25 +83,49 @@ where fn call(self, req: axum::extract::Request, state: S) -> Self::Future { Box::pin(async move { + let ctx = self.ctx(&req); + ctx.init_request_span(&self.router, Some(&Span::current())); + let Ok(bytes) = Bytes::from_request(req, &state).await else { return Box::::from(Response::parse_error()).into_response(); }; - // If the inbound data is not currently parsable, we - // send an empty one it to the router, as the router enforces - // the specification. - let req = InboundData::try_from(bytes).unwrap_or_default(); + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md#message-event + let req = ctx.span().in_scope(|| { + message_event!( + @received, + counter: &self.rx_msg_id, + bytes: bytes.len(), + ); + + // If the inbound data is not currently parsable, we + // send an empty one it to the router, as the router enforces + // the specification. + InboundData::try_from(bytes).unwrap_or_default() + }); + let span = ctx.span().clone(); if let Some(response) = self .router - .call_batch_with_state(self.ctx(), req, state) + .call_batch_with_state(ctx, req, state) + .instrument(span.clone()) .await { let headers = [( header::CONTENT_TYPE, HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), )]; + let body = Box::::from(response); + + span.in_scope(|| { + message_event!( + @sent, + counter: &self.tx_msg_id, + bytes: body.len(), + ); + }); + (headers, body).into_response() } else { ().into_response() diff --git a/src/lib.rs b/src/lib.rs index a5da1e6..e83fd11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ //! }) //! // Routes get a ctx, which can be used to send notifications. //! .route("notify", |ctx: HandlerCtx| async move { -//! if ctx.notifications().is_none() { +//! if !ctx.notifications_enabled() { //! // This error will appear in the ResponsePayload's `data` field. //! return Err("notifications are disabled"); //! } @@ -171,6 +171,7 @@ pub use pubsub::ReadJsonStream; mod routes; pub use routes::{ BatchFuture, Handler, HandlerArgs, HandlerCtx, NotifyError, Params, RouteFuture, State, + TracingInfo, }; pub(crate) use routes::{BoxedIntoRoute, ErasedIntoRoute, Method, Route}; @@ -206,7 +207,8 @@ pub(crate) mod test_utils { mod test { use crate::{ - router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, ResponsePayload, + router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, HandlerCtx, + ResponsePayload, }; use bytes::Bytes; use serde_json::value::RawValue; @@ -231,10 +233,7 @@ mod test { let res = router .call_with_state( - HandlerArgs { - ctx: Default::default(), - req: req.try_into().unwrap(), - }, + HandlerArgs::new(HandlerCtx::mock(), req.try_into().unwrap()), (), ) .await @@ -250,10 +249,7 @@ mod test { let res2 = router .call_with_state( - HandlerArgs { - ctx: Default::default(), - req: req2.try_into().unwrap(), - }, + HandlerArgs::new(HandlerCtx::mock(), req2.try_into().unwrap()), (), ) .await diff --git a/src/macros.rs b/src/macros.rs index 105eaa4..db9cbd6 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -54,6 +54,28 @@ macro_rules! unwrap_infallible { }; } +/// Log a message event to the current span. +/// +/// See +macro_rules! message_event { + ($type:literal, counter: $counter:expr, bytes: $bytes:expr,) => {{ + ::tracing::info!( + "rpc.message.id" = $counter.fetch_add(1, ::std::sync::atomic::Ordering::Relaxed), + "rpc.message.type" = $type, + "rpc.message.uncompressed_size" = $bytes, + "rpc.message" + ); + }}; + + (@received, counter: $counter:expr, bytes: $bytes:expr, ) => { + message_event!("RECEIVED", counter: $counter, bytes: $bytes,); + }; + + (@sent, counter: $counter:expr, bytes: $bytes:expr, ) => { + message_event!("SENT", counter: $counter, bytes: $bytes,); + }; +} + // Some code is this file is reproduced under the terms of the MIT license. It // originates from the `axum` crate. The original source code can be found at // the following URL, and the original license is included below. diff --git a/src/pubsub/axum.rs b/src/pubsub/axum.rs index 338b099..ad305d1 100644 --- a/src/pubsub/axum.rs +++ b/src/pubsub/axum.rs @@ -126,6 +126,8 @@ impl AxumWsCfg { next_id: arc.next_id.clone(), router: arc.router.clone(), notification_buffer_per_task: arc.notification_buffer_per_task, + tx_msg_id: arc.tx_msg_id.clone(), + rx_msg_id: arc.rx_msg_id.clone(), }, } } diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index 64dfb79..51aa896 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -95,6 +95,7 @@ mod ipc; pub use ipc::ReadJsonStream; mod shared; +pub(crate) use shared::WriteItem; pub use shared::{ConnectionId, DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT}; mod shutdown; diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index 74ed4b3..26c3900 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -1,15 +1,18 @@ use crate::{ pubsub::{In, JsonSink, Listener, Out}, types::InboundData, - HandlerCtx, TaskSet, + HandlerCtx, TaskSet, TracingInfo, }; use core::fmt; use serde_json::value::RawValue; -use std::sync::{atomic::AtomicU64, Arc}; +use std::sync::{ + atomic::{AtomicU32, AtomicU64, Ordering}, + Arc, +}; use tokio::{pin, runtime::Handle, select, sync::mpsc, task::JoinHandle}; use tokio_stream::StreamExt; use tokio_util::sync::WaitForCancellationFutureOwned; -use tracing::{debug, debug_span, error, trace, Instrument}; +use tracing::{debug, error, trace, Instrument}; /// Default notification buffer size per task. pub const DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT: usize = 16; @@ -67,6 +70,10 @@ pub(crate) struct ConnectionManager { pub(crate) router: crate::Router<()>, pub(crate) notification_buffer_per_task: usize, + + // OTEL message counters + pub(crate) tx_msg_id: Arc, + pub(crate) rx_msg_id: Arc, } impl ConnectionManager { @@ -77,6 +84,8 @@ impl ConnectionManager { next_id: AtomicU64::new(0).into(), router, notification_buffer_per_task: DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT, + tx_msg_id: Arc::new(AtomicU32::new(1)), + rx_msg_id: Arc::new(AtomicU32::new(1)), } } @@ -105,8 +114,7 @@ impl ConnectionManager { /// Increment the connection ID counter and return an unused ID. fn next_id(&self) -> ConnectionId { - self.next_id - .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + self.next_id.fetch_add(1, Ordering::Relaxed) } /// Get a clone of the router. @@ -131,13 +139,15 @@ impl ConnectionManager { write_task: tx, requests, tasks: tasks.clone(), + rx_msg_id: self.rx_msg_id.clone(), }; let wt = WriteTask { tasks, conn_id, - json: rx, + items: rx, connection, + tx_msg_id: self.tx_msg_id.clone(), }; (rt, wt) @@ -168,11 +178,14 @@ struct RouteTask { /// Connection ID for the connection serviced by this task. pub(crate) conn_id: ConnectionId, /// Sender to the write task. - pub(crate) write_task: mpsc::Sender>, + pub(crate) write_task: mpsc::Sender, /// Stream of requests. pub(crate) requests: In, /// The task set for this connection pub(crate) tasks: TaskSet, + + /// Counter for OTEL messages received. + pub(crate) rx_msg_id: Arc, } impl fmt::Debug for RouteTask { @@ -199,6 +212,7 @@ where mut requests, write_task, tasks, + rx_msg_id, .. } = self; @@ -224,6 +238,8 @@ where break; }; + let item_bytes = item.len(); + // If the inbound data is not currently parsable, we // send an empty one it to the router, as the router // enforces the specification. @@ -234,15 +250,32 @@ where // if the client stops accepting responses, we do not keep // handling inbound requests. let Ok(permit) = write_task.clone().reserve_owned().await else { - tracing::error!("write task dropped while waiting for permit"); + error!("write task dropped while waiting for permit"); break; }; + // This span is populated with as much detail as + // possible, and then given to the Handler ctx. It + // will be populated with request-specific details + // (e.g. method) during ctx instantiation. + let tracing = TracingInfo::new(router.service_name()); + let ctx = HandlerCtx::new( Some(write_task.clone()), children.clone(), + tracing, ); + ctx.init_request_span(&router, None); + + let span = ctx.span().clone(); + span.in_scope(|| { + message_event!( + @received, + counter: &rx_msg_id, + bytes: item_bytes, + ); + }); // Run the future in a new task. let fut = router.handle_request_batch(ctx, reqs); @@ -252,9 +285,9 @@ where // Send the response to the write task. // we don't care if the receiver has gone away, // as the task is done regardless. - if let Some(rv) = fut.await { + if let Some(json) = fut.await { let _ = permit.send( - rv + WriteItem { span, json } ); } } @@ -275,6 +308,13 @@ where } } +/// An item to be written to an outbound JSON pubsub stream. +#[derive(Debug, Clone)] +pub(crate) struct WriteItem { + pub(crate) span: tracing::Span, + pub(crate) json: Box, +} + /// The Write Task is responsible for writing JSON to the outbound connection. struct WriteTask { /// Task set @@ -287,10 +327,13 @@ struct WriteTask { /// /// Dropping this channel will cause the associated [`RouteTask`] to /// shutdown. - pub(crate) json: mpsc::Receiver>, + pub(crate) items: mpsc::Receiver, /// Outbound connections. pub(crate) connection: Out, + + /// Counter for OTEL messages sent. + pub(crate) tx_msg_id: Arc, } impl WriteTask { @@ -305,8 +348,9 @@ impl WriteTask { pub(crate) async fn task_future(self) { let WriteTask { tasks, - mut json, + mut items, mut connection, + tx_msg_id, .. } = self; @@ -318,12 +362,19 @@ impl WriteTask { debug!("Shutdown signal received"); break; } - json = json.recv() => { - let Some(json) = json else { + item = items.recv() => { + let Some(WriteItem { span, json }) = item else { tracing::error!("Json stream has closed"); break; }; - let span = debug_span!("WriteTask", conn_id = self.conn_id); + span.in_scope(|| { + message_event!( + @sent, + counter: &tx_msg_id, + bytes: json.get().len(), + ); + }); + if let Err(err) = connection.send_json(json).instrument(span).await { debug!(%err, conn_id = self.conn_id, "Failed to send json"); break; diff --git a/src/router.rs b/src/router.rs index a589896..3939b1a 100644 --- a/src/router.rs +++ b/src/router.rs @@ -95,6 +95,22 @@ where } } + /// Create a new, empty router with the specified OpenTelemetry service + /// name. + pub fn new_named(service_name: &'static str) -> Self { + Self { + inner: Arc::new(RouterInner { + service_name: Some(service_name), + ..RouterInner::new() + }), + } + } + + /// Get the OpenTelemetry service name for this router. + pub fn service_name(&self) -> &'static str { + self.inner.service_name.unwrap_or("ajj") + } + /// If this router is the only reference to its inner state, return the /// inner state. Otherwise, clone the inner state and return the clone. fn into_inner(self) -> RouterInner { @@ -104,6 +120,7 @@ where routes: arc.routes.clone(), last_id: arc.last_id, fallback: arc.fallback.clone(), + service_name: arc.service_name, name_to_id: arc.name_to_id.clone(), id_to_name: arc.id_to_name.clone(), }, @@ -301,7 +318,7 @@ where pub fn call_with_state(&self, args: HandlerArgs, state: S) -> RouteFuture { let id = args.req().id_owned(); let method = args.req().method(); - let span = debug_span!(parent: None, "Router::call_with_state", %method, ?id); + let span = debug_span!(parent: args.span(), "Router::call_with_state", %method, ?id); self.inner.call_with_state(args, state).with_span(span) } @@ -316,20 +333,25 @@ where let mut fut = BatchFuture::new_with_capacity(inbound.single(), inbound.len()); // According to spec, non-parsable requests should still receive a // response. - let span = debug_span!(parent: None, "BatchFuture::poll", reqs = inbound.len(), futs = tracing::field::Empty); + let batch_span = debug_span!(parent: ctx.span(), "BatchFuture::poll", reqs = inbound.len(), futs = tracing::field::Empty); - for (batch_idx, req) in inbound.iter().enumerate() { + for req in inbound.iter() { let req = req.map(|req| { - let span = debug_span!(parent: &span, "RouteFuture::poll", batch_idx, method = req.method(), id = ?req.id()); - let args = HandlerArgs::new(ctx.clone(), req); + // Cloning here resets the `TracingInfo`, which means each + // ctx has a separate span with similar metadata. + let ctx = ctx.clone(); + let request_span = ctx.init_request_span(self, Some(&batch_span)).clone(); + + // Several span fields are populated in `HandlerArgs::new`. + let args = HandlerArgs::new(ctx, req); self.inner .call_with_state(args, state.clone()) - .with_span(span) + .with_span(request_span) }); fut.push_parse_result(req); } - span.record("futs", fut.len()); - fut.with_span(span) + batch_span.record("futs", fut.len()); + fut.with_span(batch_span) } /// Nest this router into a new Axum router, with the specified path and the currently-running @@ -473,6 +495,10 @@ pub(crate) struct RouterInner { /// The handler to call when no method is found. fallback: Method, + /// An optional service name for OpenTelemetry tracing. This is not + /// set by default. + service_name: Option<&'static str>, + // next 2 fields are used for reverse lookup of method names /// A map from method names to their IDs. name_to_id: BTreeMap, MethodId>, @@ -502,6 +528,8 @@ impl RouterInner { fallback: Method::Ready(Route::default_fallback()), + service_name: None, + name_to_id: BTreeMap::new(), id_to_name: BTreeMap::new(), } @@ -523,6 +551,7 @@ impl RouterInner { .collect(), fallback: self.fallback.with_state(state), last_id: self.last_id, + service_name: self.service_name, name_to_id: self.name_to_id, id_to_name: self.id_to_name, } diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index 3a1553a..d3733db 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -1,9 +1,15 @@ -use crate::{types::Request, RpcSend, TaskSet}; +use crate::{pubsub::WriteItem, types::Request, Router, RpcSend, TaskSet}; +use ::tracing::info_span; +use opentelemetry::trace::TraceContextExt; use serde_json::value::RawValue; -use std::future::Future; -use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle}; +use std::{future::Future, sync::OnceLock}; +use tokio::{ + sync::mpsc::{self, error::SendError}, + task::JoinHandle, +}; use tokio_util::sync::WaitForCancellationFutureOwned; -use tracing::error; +use tracing::{enabled, error, Level}; +use tracing_opentelemetry::OpenTelemetrySpanExt; /// Errors that can occur when sending notifications. #[derive(thiserror::Error, Debug)] @@ -13,7 +19,127 @@ pub enum NotifyError { Serde(#[from] serde_json::Error), /// The notification channel was closed. #[error("notification channel closed")] - Send(#[from] mpsc::error::SendError>), + Send(#[from] SendError>), +} + +impl From> for NotifyError { + fn from(value: SendError) -> Self { + SendError(value.0.json).into() + } +} + +/// Tracing information for OpenTelemetry. This struct is used to store +/// information about the current request that can be used for tracing. +#[derive(Debug)] +#[non_exhaustive] +pub struct TracingInfo { + /// The OpenTelemetry service name. + pub service: &'static str, + + /// The open telemetry Context, + pub context: Option, + + /// The tracing span for this request. + span: OnceLock, +} + +impl Clone for TracingInfo { + fn clone(&self) -> Self { + Self { + service: self.service, + context: self.context.clone(), + span: OnceLock::new(), + } + } +} + +impl TracingInfo { + /// Create a new tracing info with the given service name and no context. + #[allow(dead_code)] // used in some features + pub const fn new(service: &'static str) -> Self { + Self { + service, + context: None, + span: OnceLock::new(), + } + } + + /// Create a new tracing info with the given service name and context. + pub const fn new_with_context( + service: &'static str, + context: opentelemetry::context::Context, + ) -> Self { + Self { + service, + context: Some(context), + span: OnceLock::new(), + } + } + + /// Create a request span for a handler invocation. + fn init_request_span( + &self, + router: &Router, + with_notifications: bool, + parent: Option<&tracing::Span>, + ) -> &tracing::Span + where + S: Clone + Send + Sync + 'static, + { + // This span is populated with as much detail as possible, and then + // given to the Request. It will be populated with request-specific + // details (e.g. method) during request setup. + self.span.get_or_init(|| { + let span = info_span!( + parent: parent.and_then(|p| p.id()), + "AjjRequest", + "otel.kind" = "server", + "rpc.system" = "jsonrpc", + "rpc.jsonrpc.version" = "2.0", + "rpc.service" = router.service_name(), + notifications_enabled = with_notifications, + "trace_id" = ::tracing::field::Empty, + "otel.name" = ::tracing::field::Empty, + "otel.status_code" = ::tracing::field::Empty, + "rpc.jsonrpc.request_id" = ::tracing::field::Empty, + "rpc.jsonrpc.error_code" = ::tracing::field::Empty, + "rpc.jsonrpc.error_message" = ::tracing::field::Empty, + "rpc.method" = ::tracing::field::Empty, + params = ::tracing::field::Empty, + ); + if let Some(context) = &self.context { + let _ = span.set_parent(context.clone()); + + span.record( + "trace_id", + context.span().span_context().trace_id().to_string(), + ); + } + + span + }) + } + + /// Get a reference to the tracing span for this request. + /// + /// ## Panics + /// + /// Panics if the span has not been initialized via + /// [`Self::init_request_span`]. + #[track_caller] + fn request_span(&self) -> &tracing::Span { + self.span.get().expect("span not initialized") + } + + /// Create a mock tracing info for testing. + #[cfg(test)] + pub fn mock() -> Self { + Self { + service: "test", + context: None, + span: OnceLock::from(info_span!("")), + } + } } /// A context for handler requests that allow the handler to send notifications @@ -25,52 +151,63 @@ pub enum NotifyError { /// - Sending notifications to pubsub clients via [`HandlerCtx::notify`]. /// Notifcations SHOULD be valid JSON-RPC objects, but this is /// not enforced by the type system. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct HandlerCtx { - pub(crate) notifications: Option>>, + pub(crate) notifications: Option>, /// A task set on which to spawn tasks. This is used to coordinate pub(crate) tasks: TaskSet, -} - -impl From for HandlerCtx { - fn from(tasks: TaskSet) -> Self { - Self { - notifications: None, - tasks, - } - } -} -impl From for HandlerCtx { - fn from(handle: Handle) -> Self { - Self { - notifications: None, - tasks: handle.into(), - } - } + /// Tracing information for OpenTelemetry. + pub(crate) tracing: TracingInfo, } impl HandlerCtx { /// Create a new handler context. - #[allow(dead_code)] // used in pubsub and axum features pub(crate) const fn new( - notifications: Option>>, + notifications: Option>, tasks: TaskSet, + tracing: TracingInfo, ) -> Self { Self { notifications, tasks, + tracing, + } + } + + /// Create a mock handler context for testing. + #[cfg(test)] + pub fn mock() -> Self { + Self { + notifications: None, + tasks: TaskSet::default(), + tracing: TracingInfo::mock(), } } - /// Get a reference to the notification sender. This is used to - /// send notifications over pubsub transports. - pub const fn notifications(&self) -> Option<&mpsc::Sender>> { - self.notifications.as_ref() + /// Get a reference to the tracing information for this handler context. + pub const fn tracing_info(&self) -> &TracingInfo { + &self.tracing + } + + /// Get the OpenTelemetry service name for this handler context. + pub const fn otel_service_name(&self) -> &'static str { + self.tracing.service + } + + /// Get a reference to the tracing span for this handler context. + #[track_caller] + pub fn span(&self) -> &tracing::Span { + self.tracing.request_span() + } + + /// Set the tracing information for this handler context. + pub fn set_tracing_info(&mut self, tracing: TracingInfo) { + self.tracing = tracing; } - /// Check if notiifcations can be sent to the client. This will be false + /// Check if notifications can be sent to the client. This will be false /// when either the transport does not support notifications, or the /// notification channel has been closed (due the the client going away). pub fn notifications_enabled(&self) -> bool { @@ -80,11 +217,29 @@ impl HandlerCtx { .unwrap_or_default() } + /// Create a request span for a handler invocation. + pub fn init_request_span( + &self, + router: &Router, + parent: Option<&tracing::Span>, + ) -> &tracing::Span + where + S: Clone + Send + Sync + 'static, + { + self.tracing_info() + .init_request_span(router, self.notifications_enabled(), parent) + } + /// Notify a client of an event. pub async fn notify(&self, t: &T) -> Result<(), NotifyError> { if let Some(notifications) = self.notifications.as_ref() { let rv = serde_json::value::to_raw_value(t)?; - notifications.send(rv).await?; + notifications + .send(WriteItem { + span: self.span().clone(), + json: rv, + }) + .await?; } Ok(()) @@ -211,15 +366,43 @@ impl HandlerCtx { #[derive(Debug, Clone)] pub struct HandlerArgs { /// The handler context. - pub(crate) ctx: HandlerCtx, + ctx: HandlerCtx, /// The JSON-RPC request. - pub(crate) req: Request, + req: Request, + + /// prevent instantation outside of this module + _seal: (), } impl HandlerArgs { /// Create new handler arguments. - pub const fn new(ctx: HandlerCtx, req: Request) -> Self { - Self { ctx, req } + /// + /// ## Panics + /// + /// If the ctx tracing span has not been initialized via + /// [`HandlerCtx::init_request_span`]. + #[track_caller] + pub fn new(ctx: HandlerCtx, req: Request) -> Self { + let this = Self { + ctx, + req, + _seal: (), + }; + + let span = this.span(); + span.record("otel.name", this.otel_span_name()); + span.record("rpc.method", this.req.method()); + span.record("rpc.jsonrpc.request_id", this.req.id()); + if enabled!(Level::TRACE) { + span.record("params", this.req.params()); + } + + this + } + + /// Decompose the handler arguments into its parts. + pub fn into_parts(self) -> (HandlerCtx, Request) { + (self.ctx, self.req) } /// Get a reference to the handler context. @@ -227,8 +410,29 @@ impl HandlerArgs { &self.ctx } + /// Get a reference to the tracing span for this handler invocation. + /// + /// ## Panics + /// + /// If the span has not been initialized via + /// [`HandlerCtx::init_request_span`]. + #[track_caller] + pub fn span(&self) -> &tracing::Span { + self.ctx.span() + } + /// Get a reference to the JSON-RPC request. pub const fn req(&self) -> &Request { &self.req } + + /// Get the ID of the JSON-RPC request, if any. + pub fn id_owned(&self) -> Option> { + self.req.id_owned() + } + + /// Get the OpenTelemetry span name for this handler invocation. + pub fn otel_span_name(&self) -> String { + format!("{}/{}", self.ctx.otel_service_name(), self.req.method()) + } } diff --git a/src/routes/handler.rs b/src/routes/handler.rs index dda25da..a370f01 100644 --- a/src/routes/handler.rs +++ b/src/routes/handler.rs @@ -3,7 +3,7 @@ use crate::{ }; use serde_json::value::RawValue; use std::{convert::Infallible, future::Future, marker::PhantomData, pin::Pin, task}; -use tracing::{debug_span, enabled, trace, Instrument, Level}; +use tracing::{trace, Instrument}; macro_rules! convert_result { ($res:expr) => {{ @@ -412,16 +412,9 @@ where fn call(&mut self, args: HandlerArgs) -> Self::Future { let this = self.clone(); Box::pin(async move { - let notifications_enabled = args.ctx.notifications_enabled(); - - let span = debug_span!( - "HandlerService::call", - notifications_enabled, - params = tracing::field::Empty - ); - if enabled!(Level::TRACE) { - span.record("params", args.req.params()); - } + // This span captures standard OpenTelemetry attributes for + // JSON-RPC according to OTEL semantic conventions. + let span = args.span().clone(); Ok(this .handler @@ -460,12 +453,11 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = self().await; - - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -480,12 +472,13 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let id = args.req.id_owned(); - let ctx = args.ctx; + let id = args.id_owned(); + let (ctx, _) = args.into_parts(); + let span = ctx.span().clone(); Box::pin(async move { let payload = self(ctx).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -502,7 +495,8 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let HandlerArgs { req, .. } = args; + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); Box::pin(async move { let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -510,7 +504,7 @@ where }; let payload = self(params).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -526,7 +520,8 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let HandlerArgs { req, .. } = args; + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); Box::pin(async move { let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -534,7 +529,7 @@ where }; let payload = self(Params(params)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -550,10 +545,10 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = self(state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -569,10 +564,10 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = self(State(state)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -590,7 +585,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -600,7 +596,7 @@ where drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, params).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -618,7 +614,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -628,7 +625,7 @@ where drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, Params(params)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -646,7 +643,8 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -656,7 +654,7 @@ where let payload = self(params, state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -674,15 +672,15 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -699,15 +697,15 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, State(state)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -726,9 +724,10 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); + let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); }; @@ -736,7 +735,7 @@ where drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, params, state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -751,11 +750,12 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); + let span = args.span().clone(); drop(args); Box::pin(async move { let payload = self().await; - Response::maybe(id.as_deref(), &convert_result!(payload)) + Response::maybe(&span, id.as_deref(), &convert_result!(payload)) }) } } @@ -770,15 +770,15 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); drop(req); Box::pin(async move { let payload = convert_result!(self(ctx).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -795,8 +795,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -806,7 +806,7 @@ where let payload = convert_result!(self(params).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -823,7 +823,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -834,7 +835,7 @@ where let payload = convert_result!(self(Params(params)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -850,10 +851,10 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = convert_result!(self(state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -869,10 +870,10 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = convert_result!(self(State(state)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -890,8 +891,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -901,7 +902,7 @@ where let payload = convert_result!(self(ctx, params).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -918,8 +919,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -929,7 +930,7 @@ where let payload = convert_result!(self(ctx, Params(params)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -947,7 +948,8 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -958,7 +960,7 @@ where let payload = convert_result!(self(params, state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -974,16 +976,16 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); - drop(req); + drop(req); // deallocate explicitly. No funny business. Box::pin(async move { let payload = convert_result!(self(ctx, state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -999,16 +1001,16 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); - drop(req); + drop(req); // deallocate explicitly. No funny business. Box::pin(async move { let payload = convert_result!(self(ctx, State(state)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -1026,8 +1028,8 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -1037,7 +1039,7 @@ where let payload = convert_result!(self(ctx, params, state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } diff --git a/src/routes/mod.rs b/src/routes/mod.rs index ecafb38..07af8e8 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,5 +1,5 @@ mod ctx; -pub use ctx::{HandlerArgs, HandlerCtx, NotifyError}; +pub use ctx::{HandlerArgs, HandlerCtx, NotifyError, TracingInfo}; mod erased; pub(crate) use erased::{BoxedIntoRoute, ErasedIntoRoute, MakeErasedHandler}; @@ -14,6 +14,7 @@ pub use handler::{Handler, Params, State}; mod method; pub(crate) use method::Method; +use crate::types::Response; use serde_json::value::RawValue; use std::{ convert::Infallible, @@ -22,8 +23,6 @@ use std::{ use tower::{util::BoxCloneSyncService, Service, ServiceExt}; use tracing::{debug_span, enabled, Level}; -use crate::types::Response; - /// A JSON-RPC handler for a specific method. /// /// A route is a [`BoxCloneSyncService`] that takes JSON parameters and may @@ -52,9 +51,8 @@ impl Route { /// Create a default fallback route that returns a method not found error. pub(crate) fn default_fallback() -> Self { Self::new(tower::service_fn(|args: HandlerArgs| async { - let HandlerArgs { req, .. } = args; - let id = req.id_owned(); - drop(req); + let id = args.id_owned(); + drop(args); // no longer needed Ok(Response::maybe_method_not_found(id.as_deref())) })) @@ -102,7 +100,7 @@ impl Service for Route { params = tracing::field::Empty, ); if enabled!(Level::TRACE) { - span.record("params", args.req.params()); + span.record("params", args.req().params()); } self.oneshot_inner(args) } diff --git a/src/types/batch.rs b/src/types/batch.rs index 9ee745e..cdbabc1 100644 --- a/src/types/batch.rs +++ b/src/types/batch.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use serde::Deserialize; use serde_json::value::RawValue; use std::ops::Range; -use tracing::{debug, enabled, instrument, Level}; +use tracing::{debug, enabled, instrument, span::Span, Level}; /// UTF-8, partially deserialized JSON-RPC request batch. #[derive(Default)] @@ -56,8 +56,12 @@ impl TryFrom for InboundData { #[instrument(level = "debug", skip(bytes), fields(buf_len = bytes.len(), bytes = tracing::field::Empty))] fn try_from(bytes: Bytes) -> Result { if enabled!(Level::TRACE) { - tracing::span::Span::current().record("bytes", format!("0x{:x}", bytes)); + Span::current().record("bytes", format!("0x{:x}", bytes)); } + + // This event exists only so that people who use default console + // logging setups still see the span details. Without this event, the + // span would not show up in logs. debug!("Parsing inbound data"); // We set up the deserializer to read from the byte buffer. diff --git a/src/types/resp.rs b/src/types/resp.rs index 3b2fe4e..acf0e77 100644 --- a/src/types/resp.rs +++ b/src/types/resp.rs @@ -1,8 +1,10 @@ use crate::RpcSend; +use opentelemetry::trace::Status; use serde::{ser::SerializeMap, Serialize, Serializer}; use serde_json::value::{to_raw_value, RawValue}; use std::borrow::Cow; use std::fmt; +use tracing_opentelemetry::OpenTelemetrySpanExt; const INTERNAL_ERROR: Cow<'_, str> = Cow::Borrowed("Internal error"); @@ -73,10 +75,19 @@ where E: Serialize, { pub(crate) fn maybe( + span: &tracing::Span, id: Option<&'b RawValue>, payload: &'a ResponsePayload, ) -> Option> { - id.map(|id| Self { id, payload }.to_json()) + if let Some(e) = payload.as_error() { + span.record("rpc.jsonrpc.error_code", e.code); + span.record("rpc.jsonrpc.error_message", e.message.as_ref()); + span.set_status(Status::Error { + description: e.message.clone(), + }); + } + + id.map(move |id| Self { id, payload }.to_json()) } pub(crate) fn to_json(&self) -> Box {