From c32f7c6234911ab2ce3760cf0f11a7f73d922313 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 10:51:31 -0400 Subject: [PATCH 01/19] feat: enhance otel implementation by setting more span info on request receipt --- Cargo.toml | 2 +- src/axum.rs | 84 +++++++++++++++++++++----- src/lib.rs | 10 ++-- src/pubsub/mod.rs | 1 + src/pubsub/shared.rs | 79 ++++++++++++++++++++----- src/router.rs | 28 ++++++++- src/routes/ctx.rs | 133 +++++++++++++++++++++++++++++++----------- src/routes/handler.rs | 15 ++--- src/routes/mod.rs | 5 +- src/types/batch.rs | 4 ++ 10 files changed, 280 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 27d779b..7de86da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ description = "Simple, modern, ergonomic JSON-RPC 2.0 router built with tower an keywords = ["json-rpc", "jsonrpc", "json"] categories = ["web-programming::http-server", "web-programming::websocket"] -version = "0.3.4" +version = "0.4.0" edition = "2021" rust-version = "1.81" authors = ["init4", "James Prestwich"] diff --git a/src/axum.rs b/src/axum.rs index 7bb337e..4fd1db1 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -1,6 +1,6 @@ use crate::{ types::{InboundData, Response}, - HandlerCtx, TaskSet, + HandlerCtx, TaskSet, TracingInfo, }; use axum::{ extract::FromRequest, @@ -8,8 +8,16 @@ use axum::{ response::IntoResponse, }; use bytes::Bytes; -use std::{future::Future, pin::Pin}; +use std::{ + future::Future, + pin::Pin, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; use tokio::runtime::Handle; +use tracing::{debug, debug_span}; /// A wrapper around an [`Router`] that implements the /// [`axum::handler::Handler`] trait. This struct is an implementation detail @@ -21,7 +29,13 @@ use tokio::runtime::Handle; #[derive(Debug, Clone)] pub(crate) struct IntoAxum { pub(crate) router: crate::Router, + pub(crate) task_set: TaskSet, + + /// Counter for OTEL messages received. + pub(crate) rx_msg_id: Arc, + /// Counter for OTEL messages sent. + pub(crate) tx_msg_id: Arc, } impl From> for IntoAxum { @@ -29,6 +43,8 @@ impl From> for IntoAxum { Self { router, task_set: Default::default(), + rx_msg_id: Arc::new(AtomicU32::new(0)), + tx_msg_id: Arc::new(AtomicU32::new(0)), } } } @@ -39,12 +55,35 @@ impl IntoAxum { Self { router, task_set: handle.into(), + rx_msg_id: Arc::new(AtomicU32::new(0)), + tx_msg_id: Arc::new(AtomicU32::new(0)), } } +} - /// Get a new context, built from the task set. +impl IntoAxum +where + S: Clone + Send + Sync + 'static, +{ fn ctx(&self) -> HandlerCtx { - self.task_set.clone().into() + let request_span = debug_span!( + "ajj.IntoAxum::call", + "otel.kind" = "server", + "rpc.system" = "jsonrpc", + "rpc.jsonrpc.version" = "2.0", + "rpc.service" = self.router.service_name(), + notifications_enabled = false, + params = tracing::field::Empty + ); + + HandlerCtx::new( + None, + self.task_set.clone(), + TracingInfo { + service: self.router.service_name(), + request_span, + }, + ) } } @@ -56,25 +95,44 @@ where fn call(self, req: axum::extract::Request, state: S) -> Self::Future { Box::pin(async move { + let ctx = self.ctx(); + let Ok(bytes) = Bytes::from_request(req, &state).await else { return Box::::from(Response::parse_error()).into_response(); }; - // If the inbound data is not currently parsable, we - // send an empty one it to the router, as the router enforces - // the specification. - let req = InboundData::try_from(bytes).unwrap_or_default(); + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md#message-event + let req = ctx.span().in_scope(|| { + debug!( + "rpc.message.id" = self.rx_msg_id.fetch_add(1, Ordering::Relaxed), + "rpc.message.type" = "received", + "rpc.message.uncompressed_size" = bytes.len(), + "Received request" + ); + + // If the inbound data is not currently parsable, we + // send an empty one it to the router, as the router enforces + // the specification. + InboundData::try_from(bytes).unwrap_or_default() + }); - if let Some(response) = self - .router - .call_batch_with_state(self.ctx(), req, state) - .await - { + let span = ctx.span().clone(); + if let Some(response) = self.router.call_batch_with_state(ctx, req, state).await { let headers = [( header::CONTENT_TYPE, HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), )]; + let body = Box::::from(response); + + span.in_scope(|| { + debug!( + "rpc.message.id" = self.tx_msg_id.fetch_add(1, Ordering::Relaxed), + "rpc.message.type" = "received", + "Received request" + ); + }); + (headers, body).into_response() } else { ().into_response() diff --git a/src/lib.rs b/src/lib.rs index a5da1e6..e84dbec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ //! }) //! // Routes get a ctx, which can be used to send notifications. //! .route("notify", |ctx: HandlerCtx| async move { -//! if ctx.notifications().is_none() { +//! if !ctx.notifications_enabled() { //! // This error will appear in the ResponsePayload's `data` field. //! return Err("notifications are disabled"); //! } @@ -171,6 +171,7 @@ pub use pubsub::ReadJsonStream; mod routes; pub use routes::{ BatchFuture, Handler, HandlerArgs, HandlerCtx, NotifyError, Params, RouteFuture, State, + TracingInfo, }; pub(crate) use routes::{BoxedIntoRoute, ErasedIntoRoute, Method, Route}; @@ -206,7 +207,8 @@ pub(crate) mod test_utils { mod test { use crate::{ - router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, ResponsePayload, + router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, HandlerCtx, + ResponsePayload, }; use bytes::Bytes; use serde_json::value::RawValue; @@ -232,7 +234,7 @@ mod test { let res = router .call_with_state( HandlerArgs { - ctx: Default::default(), + ctx: HandlerCtx::mock(), req: req.try_into().unwrap(), }, (), @@ -251,7 +253,7 @@ mod test { let res2 = router .call_with_state( HandlerArgs { - ctx: Default::default(), + ctx: HandlerCtx::mock(), req: req2.try_into().unwrap(), }, (), diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index 64dfb79..51aa896 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -95,6 +95,7 @@ mod ipc; pub use ipc::ReadJsonStream; mod shared; +pub(crate) use shared::WriteItem; pub use shared::{ConnectionId, DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT}; mod shutdown; diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index 74ed4b3..b11b91e 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -1,11 +1,14 @@ use crate::{ pubsub::{In, JsonSink, Listener, Out}, types::InboundData, - HandlerCtx, TaskSet, + HandlerCtx, TaskSet, TracingInfo, }; use core::fmt; use serde_json::value::RawValue; -use std::sync::{atomic::AtomicU64, Arc}; +use std::sync::{ + atomic::{AtomicU32, AtomicU64, Ordering}, + Arc, +}; use tokio::{pin, runtime::Handle, select, sync::mpsc, task::JoinHandle}; use tokio_stream::StreamExt; use tokio_util::sync::WaitForCancellationFutureOwned; @@ -105,8 +108,7 @@ impl ConnectionManager { /// Increment the connection ID counter and return an unused ID. fn next_id(&self) -> ConnectionId { - self.next_id - .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + self.next_id.fetch_add(1, Ordering::Relaxed) } /// Get a clone of the router. @@ -131,13 +133,15 @@ impl ConnectionManager { write_task: tx, requests, tasks: tasks.clone(), + rx_msg_id: Arc::new(AtomicU32::new(0)), }; let wt = WriteTask { tasks, conn_id, - json: rx, + items: rx, connection, + tx_msg_id: Arc::new(AtomicU32::new(0)), }; (rt, wt) @@ -168,11 +172,14 @@ struct RouteTask { /// Connection ID for the connection serviced by this task. pub(crate) conn_id: ConnectionId, /// Sender to the write task. - pub(crate) write_task: mpsc::Sender>, + pub(crate) write_task: mpsc::Sender, /// Stream of requests. pub(crate) requests: In, /// The task set for this connection pub(crate) tasks: TaskSet, + + /// Counter for OTEL messages received. + pub(crate) rx_msg_id: Arc, } impl fmt::Debug for RouteTask { @@ -199,6 +206,7 @@ where mut requests, write_task, tasks, + rx_msg_id, .. } = self; @@ -224,6 +232,8 @@ where break; }; + let item_bytes = item.len(); + // If the inbound data is not currently parsable, we // send an empty one it to the router, as the router // enforces the specification. @@ -234,16 +244,38 @@ where // if the client stops accepting responses, we do not keep // handling inbound requests. let Ok(permit) = write_task.clone().reserve_owned().await else { - tracing::error!("write task dropped while waiting for permit"); + error!("write task dropped while waiting for permit"); break; }; + let tracing = TracingInfo { service: router.service_name(), request_span: debug_span!( + parent: None, + "ajj.pubsub.RouteTask::call", + "otel.kind" = "server", + "rpc.system" = "jsonrpc", + "rpc.jsonrpc.version" = "2.0", + "rpc.service" = router.service_name(), + notifications_enabled = true, + params = tracing::field::Empty + ) }; + let ctx = HandlerCtx::new( Some(write_task.clone()), children.clone(), + tracing, ); + let span = ctx.span().clone(); + span.in_scope(|| { + debug!( + "rpc.message.id" = rx_msg_id.fetch_add(1, Ordering::Relaxed), + "rpc.message.type" = "received", + "rpc.message.uncompressed_size" = item_bytes, + "Received request" + ); + }); + // Run the future in a new task. let fut = router.handle_request_batch(ctx, reqs); @@ -252,9 +284,9 @@ where // Send the response to the write task. // we don't care if the receiver has gone away, // as the task is done regardless. - if let Some(rv) = fut.await { + if let Some(json) = fut.await { let _ = permit.send( - rv + WriteItem { span, json } ); } } @@ -275,6 +307,13 @@ where } } +/// An item to be written to an outbound JSON pubsub stream. +#[derive(Debug, Clone)] +pub(crate) struct WriteItem { + pub(crate) span: tracing::Span, + pub(crate) json: Box, +} + /// The Write Task is responsible for writing JSON to the outbound connection. struct WriteTask { /// Task set @@ -287,10 +326,13 @@ struct WriteTask { /// /// Dropping this channel will cause the associated [`RouteTask`] to /// shutdown. - pub(crate) json: mpsc::Receiver>, + pub(crate) items: mpsc::Receiver, /// Outbound connections. pub(crate) connection: Out, + + /// Counter for OTEL messages sent. + pub(crate) tx_msg_id: Arc, } impl WriteTask { @@ -305,8 +347,9 @@ impl WriteTask { pub(crate) async fn task_future(self) { let WriteTask { tasks, - mut json, + mut items, mut connection, + tx_msg_id, .. } = self; @@ -318,12 +361,20 @@ impl WriteTask { debug!("Shutdown signal received"); break; } - json = json.recv() => { - let Some(json) = json else { + item = items.recv() => { + let Some(WriteItem { span, json }) = item else { tracing::error!("Json stream has closed"); break; }; - let span = debug_span!("WriteTask", conn_id = self.conn_id); + span.record("conn_id", self.conn_id); + span.in_scope(|| { + debug!( + "rpc.message.id" = tx_msg_id.fetch_add(1, Ordering::Relaxed), + "rpc.message.type" = "sent", + "Sending response" + ); + }); + if let Err(err) = connection.send_json(json).instrument(span).await { debug!(%err, conn_id = self.conn_id, "Failed to send json"); break; diff --git a/src/router.rs b/src/router.rs index a589896..2846a7d 100644 --- a/src/router.rs +++ b/src/router.rs @@ -95,6 +95,22 @@ where } } + /// Create a new, empty router with the specified OpenTelemetry service + /// name. + pub fn new_named(service_name: &'static str) -> Self { + Self { + inner: Arc::new(RouterInner { + service_name: Some(service_name), + ..RouterInner::new() + }), + } + } + + /// Get the OpenTelemetry service name for this router. + pub fn service_name(&self) -> &'static str { + self.inner.service_name.unwrap_or("ajj") + } + /// If this router is the only reference to its inner state, return the /// inner state. Otherwise, clone the inner state and return the clone. fn into_inner(self) -> RouterInner { @@ -104,6 +120,7 @@ where routes: arc.routes.clone(), last_id: arc.last_id, fallback: arc.fallback.clone(), + service_name: arc.service_name, name_to_id: arc.name_to_id.clone(), id_to_name: arc.id_to_name.clone(), }, @@ -301,7 +318,7 @@ where pub fn call_with_state(&self, args: HandlerArgs, state: S) -> RouteFuture { let id = args.req().id_owned(); let method = args.req().method(); - let span = debug_span!(parent: None, "Router::call_with_state", %method, ?id); + let span = debug_span!(parent: args.span(), "Router::call_with_state", %method, ?id); self.inner.call_with_state(args, state).with_span(span) } @@ -316,7 +333,7 @@ where let mut fut = BatchFuture::new_with_capacity(inbound.single(), inbound.len()); // According to spec, non-parsable requests should still receive a // response. - let span = debug_span!(parent: None, "BatchFuture::poll", reqs = inbound.len(), futs = tracing::field::Empty); + let span = debug_span!(parent: ctx.span(), "BatchFuture::poll", reqs = inbound.len(), futs = tracing::field::Empty); for (batch_idx, req) in inbound.iter().enumerate() { let req = req.map(|req| { @@ -473,6 +490,10 @@ pub(crate) struct RouterInner { /// The handler to call when no method is found. fallback: Method, + /// An optional service name for OpenTelemetry tracing. This is not + /// set by default. + service_name: Option<&'static str>, + // next 2 fields are used for reverse lookup of method names /// A map from method names to their IDs. name_to_id: BTreeMap, MethodId>, @@ -502,6 +523,8 @@ impl RouterInner { fallback: Method::Ready(Route::default_fallback()), + service_name: None, + name_to_id: BTreeMap::new(), id_to_name: BTreeMap::new(), } @@ -523,6 +546,7 @@ impl RouterInner { .collect(), fallback: self.fallback.with_state(state), last_id: self.last_id, + service_name: self.service_name, name_to_id: self.name_to_id, id_to_name: self.id_to_name, } diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index 3a1553a..30b1aa8 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -1,9 +1,12 @@ -use crate::{types::Request, RpcSend, TaskSet}; +use crate::{pubsub::WriteItem, types::Request, RpcSend, TaskSet}; use serde_json::value::RawValue; use std::future::Future; -use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::mpsc::{self, error::SendError}, + task::JoinHandle, +}; use tokio_util::sync::WaitForCancellationFutureOwned; -use tracing::error; +use tracing::{enabled, error, Level}; /// Errors that can occur when sending notifications. #[derive(thiserror::Error, Debug)] @@ -13,7 +16,37 @@ pub enum NotifyError { Serde(#[from] serde_json::Error), /// The notification channel was closed. #[error("notification channel closed")] - Send(#[from] mpsc::error::SendError>), + Send(#[from] SendError>), +} + +impl From> for NotifyError { + fn from(value: SendError) -> Self { + SendError(value.0.json).into() + } +} + +/// Tracing information for OpenTelemetry. This struct is used to store +/// information about the current request that can be used for tracing. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct TracingInfo { + /// The OpenTelemetry service name. + pub service: &'static str, + + /// The request span. + pub request_span: tracing::Span, +} + +impl TracingInfo { + /// Create a mock tracing info for testing. + #[cfg(test)] + pub fn mock() -> Self { + use tracing::debug_span; + Self { + service: "test", + request_span: debug_span!("test"), + } + } } /// A context for handler requests that allow the handler to send notifications @@ -25,52 +58,62 @@ pub enum NotifyError { /// - Sending notifications to pubsub clients via [`HandlerCtx::notify`]. /// Notifcations SHOULD be valid JSON-RPC objects, but this is /// not enforced by the type system. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct HandlerCtx { - pub(crate) notifications: Option>>, + pub(crate) notifications: Option>, /// A task set on which to spawn tasks. This is used to coordinate pub(crate) tasks: TaskSet, -} -impl From for HandlerCtx { - fn from(tasks: TaskSet) -> Self { - Self { - notifications: None, - tasks, - } - } -} - -impl From for HandlerCtx { - fn from(handle: Handle) -> Self { - Self { - notifications: None, - tasks: handle.into(), - } - } + /// Tracing information for OpenTelemetry. + pub(crate) tracing: TracingInfo, } impl HandlerCtx { /// Create a new handler context. - #[allow(dead_code)] // used in pubsub and axum features pub(crate) const fn new( - notifications: Option>>, + notifications: Option>, tasks: TaskSet, + tracing: TracingInfo, ) -> Self { Self { notifications, tasks, + tracing, } } - /// Get a reference to the notification sender. This is used to - /// send notifications over pubsub transports. - pub const fn notifications(&self) -> Option<&mpsc::Sender>> { - self.notifications.as_ref() + /// Create a mock handler context for testing. + #[cfg(test)] + pub fn mock() -> Self { + Self { + notifications: None, + tasks: TaskSet::default(), + tracing: TracingInfo::mock(), + } } - /// Check if notiifcations can be sent to the client. This will be false + /// Get a reference to the tracing information for this handler context. + pub const fn tracing_info(&self) -> &TracingInfo { + &self.tracing + } + + /// Get the OpenTelemetry service name for this handler context. + pub const fn otel_service_name(&self) -> &'static str { + self.tracing.service + } + + /// Get a reference to the tracing span for this handler context. + pub const fn span(&self) -> &tracing::Span { + &self.tracing.request_span + } + + /// Set the tracing information for this handler context. + pub fn set_tracing_info(&mut self, tracing: TracingInfo) { + self.tracing = tracing; + } + + /// Check if notifications can be sent to the client. This will be false /// when either the transport does not support notifications, or the /// notification channel has been closed (due the the client going away). pub fn notifications_enabled(&self) -> bool { @@ -84,7 +127,12 @@ impl HandlerCtx { pub async fn notify(&self, t: &T) -> Result<(), NotifyError> { if let Some(notifications) = self.notifications.as_ref() { let rv = serde_json::value::to_raw_value(t)?; - notifications.send(rv).await?; + notifications + .send(WriteItem { + span: self.span().clone(), + json: rv, + }) + .await?; } Ok(()) @@ -218,8 +266,17 @@ pub struct HandlerArgs { impl HandlerArgs { /// Create new handler arguments. - pub const fn new(ctx: HandlerCtx, req: Request) -> Self { - Self { ctx, req } + pub fn new(ctx: HandlerCtx, req: Request) -> Self { + let this = Self { ctx, req }; + + let span = this.span(); + span.record("otel.name", this.otel_span_name()); + span.record("rpc.method", this.req.method()); + if enabled!(Level::TRACE) { + span.record("params", this.req.params()); + } + + this } /// Get a reference to the handler context. @@ -227,8 +284,18 @@ impl HandlerArgs { &self.ctx } + /// Get a reference to the tracing span for this handler invocation. + pub const fn span(&self) -> &tracing::Span { + self.ctx.span() + } + /// Get a reference to the JSON-RPC request. pub const fn req(&self) -> &Request { &self.req } + + /// Get the OpenTelemetry span name for this handler invocation. + pub fn otel_span_name(&self) -> String { + format!("{}/{}", self.ctx.otel_service_name(), self.req.method()) + } } diff --git a/src/routes/handler.rs b/src/routes/handler.rs index dda25da..980a919 100644 --- a/src/routes/handler.rs +++ b/src/routes/handler.rs @@ -3,7 +3,7 @@ use crate::{ }; use serde_json::value::RawValue; use std::{convert::Infallible, future::Future, marker::PhantomData, pin::Pin, task}; -use tracing::{debug_span, enabled, trace, Instrument, Level}; +use tracing::{trace, Instrument}; macro_rules! convert_result { ($res:expr) => {{ @@ -412,16 +412,9 @@ where fn call(&mut self, args: HandlerArgs) -> Self::Future { let this = self.clone(); Box::pin(async move { - let notifications_enabled = args.ctx.notifications_enabled(); - - let span = debug_span!( - "HandlerService::call", - notifications_enabled, - params = tracing::field::Empty - ); - if enabled!(Level::TRACE) { - span.record("params", args.req.params()); - } + // This span captures standard OpenTelemetry attributes for + // JSON-RPC according to OTEL semantic conventions. + let span = args.span().clone(); Ok(this .handler diff --git a/src/routes/mod.rs b/src/routes/mod.rs index ecafb38..2fee397 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,5 +1,5 @@ mod ctx; -pub use ctx::{HandlerArgs, HandlerCtx, NotifyError}; +pub use ctx::{HandlerArgs, HandlerCtx, NotifyError, TracingInfo}; mod erased; pub(crate) use erased::{BoxedIntoRoute, ErasedIntoRoute, MakeErasedHandler}; @@ -14,6 +14,7 @@ pub use handler::{Handler, Params, State}; mod method; pub(crate) use method::Method; +use crate::types::Response; use serde_json::value::RawValue; use std::{ convert::Infallible, @@ -22,8 +23,6 @@ use std::{ use tower::{util::BoxCloneSyncService, Service, ServiceExt}; use tracing::{debug_span, enabled, Level}; -use crate::types::Response; - /// A JSON-RPC handler for a specific method. /// /// A route is a [`BoxCloneSyncService`] that takes JSON parameters and may diff --git a/src/types/batch.rs b/src/types/batch.rs index 9ee745e..1b650ff 100644 --- a/src/types/batch.rs +++ b/src/types/batch.rs @@ -58,6 +58,10 @@ impl TryFrom for InboundData { if enabled!(Level::TRACE) { tracing::span::Span::current().record("bytes", format!("0x{:x}", bytes)); } + + // This event exists only so that people who use default console + // logging setups still see the span details. Without this event, the + // span would not show up in logs. debug!("Parsing inbound data"); // We set up the deserializer to read from the byte buffer. From f67601f4ece458af9db7947bddf6a6961b2d2825 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 12:05:48 -0400 Subject: [PATCH 02/19] refactor: simplify conn_id --- src/pubsub/shared.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index b11b91e..830262e 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -255,6 +255,7 @@ where "rpc.system" = "jsonrpc", "rpc.jsonrpc.version" = "2.0", "rpc.service" = router.service_name(), + conn_id = self.conn_id, notifications_enabled = true, params = tracing::field::Empty ) }; @@ -366,7 +367,6 @@ impl WriteTask { tracing::error!("Json stream has closed"); break; }; - span.record("conn_id", self.conn_id); span.in_scope(|| { debug!( "rpc.message.id" = tx_msg_id.fetch_add(1, Ordering::Relaxed), From 5182f6b085947c3a0ef2a771539707ee986c7e76 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 12:07:20 -0400 Subject: [PATCH 03/19] fix: share counters --- src/pubsub/axum.rs | 2 ++ src/pubsub/shared.rs | 10 ++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/pubsub/axum.rs b/src/pubsub/axum.rs index 338b099..ad305d1 100644 --- a/src/pubsub/axum.rs +++ b/src/pubsub/axum.rs @@ -126,6 +126,8 @@ impl AxumWsCfg { next_id: arc.next_id.clone(), router: arc.router.clone(), notification_buffer_per_task: arc.notification_buffer_per_task, + tx_msg_id: arc.tx_msg_id.clone(), + rx_msg_id: arc.rx_msg_id.clone(), }, } } diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index 830262e..5140b1d 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -70,6 +70,10 @@ pub(crate) struct ConnectionManager { pub(crate) router: crate::Router<()>, pub(crate) notification_buffer_per_task: usize, + + // OTEL message counters + pub(crate) tx_msg_id: Arc, + pub(crate) rx_msg_id: Arc, } impl ConnectionManager { @@ -80,6 +84,8 @@ impl ConnectionManager { next_id: AtomicU64::new(0).into(), router, notification_buffer_per_task: DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT, + tx_msg_id: Arc::new(AtomicU32::new(0)), + rx_msg_id: Arc::new(AtomicU32::new(0)), } } @@ -133,7 +139,7 @@ impl ConnectionManager { write_task: tx, requests, tasks: tasks.clone(), - rx_msg_id: Arc::new(AtomicU32::new(0)), + rx_msg_id: self.rx_msg_id.clone(), }; let wt = WriteTask { @@ -141,7 +147,7 @@ impl ConnectionManager { conn_id, items: rx, connection, - tx_msg_id: Arc::new(AtomicU32::new(0)), + tx_msg_id: self.tx_msg_id.clone(), }; (rt, wt) From 30ea8e7d61372d4bbd008ca5b7d38cab0d8c9fa4 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 12:21:59 -0400 Subject: [PATCH 04/19] refactor: make ctx private to prevent misuse --- src/lib.rs | 10 ++------- src/routes/ctx.rs | 23 +++++++++++++++++--- src/routes/handler.rs | 50 +++++++++++++++++++++---------------------- src/routes/mod.rs | 7 +++--- 4 files changed, 50 insertions(+), 40 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e84dbec..e83fd11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -233,10 +233,7 @@ mod test { let res = router .call_with_state( - HandlerArgs { - ctx: HandlerCtx::mock(), - req: req.try_into().unwrap(), - }, + HandlerArgs::new(HandlerCtx::mock(), req.try_into().unwrap()), (), ) .await @@ -252,10 +249,7 @@ mod test { let res2 = router .call_with_state( - HandlerArgs { - ctx: HandlerCtx::mock(), - req: req2.try_into().unwrap(), - }, + HandlerArgs::new(HandlerCtx::mock(), req2.try_into().unwrap()), (), ) .await diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index 30b1aa8..c2d591a 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -259,15 +259,22 @@ impl HandlerCtx { #[derive(Debug, Clone)] pub struct HandlerArgs { /// The handler context. - pub(crate) ctx: HandlerCtx, + ctx: HandlerCtx, /// The JSON-RPC request. - pub(crate) req: Request, + req: Request, + + /// prevent instantation outside of this module + _seal: (), } impl HandlerArgs { /// Create new handler arguments. pub fn new(ctx: HandlerCtx, req: Request) -> Self { - let this = Self { ctx, req }; + let this = Self { + ctx, + req, + _seal: (), + }; let span = this.span(); span.record("otel.name", this.otel_span_name()); @@ -279,6 +286,11 @@ impl HandlerArgs { this } + /// Decompose the handler arguments into its parts. + pub fn into_parts(self) -> (HandlerCtx, Request) { + (self.ctx, self.req) + } + /// Get a reference to the handler context. pub const fn ctx(&self) -> &HandlerCtx { &self.ctx @@ -294,6 +306,11 @@ impl HandlerArgs { &self.req } + /// Get the ID of the JSON-RPC request, if any. + pub fn id_owned(&self) -> Option> { + self.req.id_owned() + } + /// Get the OpenTelemetry span name for this handler invocation. pub fn otel_span_name(&self) -> String { format!("{}/{}", self.ctx.otel_service_name(), self.req.method()) diff --git a/src/routes/handler.rs b/src/routes/handler.rs index 980a919..1761177 100644 --- a/src/routes/handler.rs +++ b/src/routes/handler.rs @@ -453,7 +453,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = self().await; @@ -473,8 +473,8 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let id = args.req.id_owned(); - let ctx = args.ctx; + let id = args.id_owned(); + let (ctx, _) = args.into_parts(); Box::pin(async move { let payload = self(ctx).await; @@ -495,7 +495,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let HandlerArgs { req, .. } = args; + let (_, req) = args.into_parts(); Box::pin(async move { let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -519,7 +519,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let HandlerArgs { req, .. } = args; + let (_, req) = args.into_parts(); Box::pin(async move { let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -543,7 +543,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = self(state).await; Response::maybe(id.as_deref(), &payload) @@ -562,7 +562,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = self(State(state)).await; Response::maybe(id.as_deref(), &payload) @@ -583,7 +583,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -611,7 +611,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -639,7 +639,7 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; + let (_, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -667,7 +667,7 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); @@ -692,7 +692,7 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); @@ -719,7 +719,7 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -744,7 +744,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); drop(args); Box::pin(async move { let payload = self().await; @@ -763,7 +763,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); @@ -788,7 +788,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; + let (_, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -816,7 +816,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; + let (_, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -843,7 +843,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = convert_result!(self(state).await); Response::maybe(id.as_deref(), &payload) @@ -862,7 +862,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let id = args.req.id_owned(); + let id = args.id_owned(); Box::pin(async move { let payload = convert_result!(self(State(state)).await); Response::maybe(id.as_deref(), &payload) @@ -883,7 +883,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -911,7 +911,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -940,7 +940,7 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { req, .. } = args; + let (_, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -967,7 +967,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); @@ -992,7 +992,7 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); @@ -1019,7 +1019,7 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let HandlerArgs { ctx, req } = args; + let (ctx, req) = args.into_parts(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 2fee397..07af8e8 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -51,9 +51,8 @@ impl Route { /// Create a default fallback route that returns a method not found error. pub(crate) fn default_fallback() -> Self { Self::new(tower::service_fn(|args: HandlerArgs| async { - let HandlerArgs { req, .. } = args; - let id = req.id_owned(); - drop(req); + let id = args.id_owned(); + drop(args); // no longer needed Ok(Response::maybe_method_not_found(id.as_deref())) })) @@ -101,7 +100,7 @@ impl Service for Route { params = tracing::field::Empty, ); if enabled!(Level::TRACE) { - span.record("params", args.req.params()); + span.record("params", args.req().params()); } self.oneshot_inner(args) } From 3a27abe3eb76be29443659b42b2a328fcb5c39a0 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 14:36:16 -0400 Subject: [PATCH 05/19] feat: comply with error_code requirement --- src/routes/handler.rs | 95 +++++++++++++++++++++++-------------------- src/types/resp.rs | 5 +++ 2 files changed, 57 insertions(+), 43 deletions(-) diff --git a/src/routes/handler.rs b/src/routes/handler.rs index 1761177..a370f01 100644 --- a/src/routes/handler.rs +++ b/src/routes/handler.rs @@ -457,8 +457,7 @@ where Box::pin(async move { let payload = self().await; - - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -475,10 +474,11 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { let id = args.id_owned(); let (ctx, _) = args.into_parts(); + let span = ctx.span().clone(); Box::pin(async move { let payload = self(ctx).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -495,7 +495,8 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let (_, req) = args.into_parts(); + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); Box::pin(async move { let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -503,7 +504,7 @@ where }; let payload = self(params).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -519,7 +520,8 @@ where type Future = Pin>> + Send>>; fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { - let (_, req) = args.into_parts(); + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); Box::pin(async move { let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -527,7 +529,7 @@ where }; let payload = self(Params(params)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -546,7 +548,7 @@ where let id = args.id_owned(); Box::pin(async move { let payload = self(state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -565,7 +567,7 @@ where let id = args.id_owned(); Box::pin(async move { let payload = self(State(state)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -584,6 +586,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -593,7 +596,7 @@ where drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, params).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -612,6 +615,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -621,7 +625,7 @@ where drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, Params(params)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -639,7 +643,8 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let (_, req) = args.into_parts(); + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -649,7 +654,7 @@ where let payload = self(params, state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -668,14 +673,14 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -693,14 +698,14 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, State(state)).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -720,8 +725,9 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); + let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); }; @@ -729,7 +735,7 @@ where drop(req); // deallocate explicitly. No funny business. let payload = self(ctx, params, state).await; - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -745,10 +751,11 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { let id = args.id_owned(); + let span = args.span().clone(); drop(args); Box::pin(async move { let payload = self().await; - Response::maybe(id.as_deref(), &convert_result!(payload)) + Response::maybe(&span, id.as_deref(), &convert_result!(payload)) }) } } @@ -764,14 +771,14 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); drop(req); Box::pin(async move { let payload = convert_result!(self(ctx).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -788,8 +795,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let (_, req) = args.into_parts(); - + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -799,7 +806,7 @@ where let payload = convert_result!(self(params).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -816,7 +823,8 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { - let (_, req) = args.into_parts(); + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -827,7 +835,7 @@ where let payload = convert_result!(self(Params(params)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -846,7 +854,7 @@ where let id = args.id_owned(); Box::pin(async move { let payload = convert_result!(self(state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -865,7 +873,7 @@ where let id = args.id_owned(); Box::pin(async move { let payload = convert_result!(self(State(state)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(args.span(), id.as_deref(), &payload) }) } } @@ -884,7 +892,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -894,7 +902,7 @@ where let payload = convert_result!(self(ctx, params).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -912,7 +920,7 @@ where fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -922,7 +930,7 @@ where let payload = convert_result!(self(ctx, Params(params)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -940,7 +948,8 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { - let (_, req) = args.into_parts(); + let (ctx, req) = args.into_parts(); + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { @@ -951,7 +960,7 @@ where let payload = convert_result!(self(params, state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -968,15 +977,15 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); - drop(req); + drop(req); // deallocate explicitly. No funny business. Box::pin(async move { let payload = convert_result!(self(ctx, state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -993,15 +1002,15 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); - drop(req); + drop(req); // deallocate explicitly. No funny business. Box::pin(async move { let payload = convert_result!(self(ctx, State(state)).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } @@ -1020,7 +1029,7 @@ where fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { Box::pin(async move { let (ctx, req) = args.into_parts(); - + let span = ctx.span().clone(); let id = req.id_owned(); let Ok(params) = req.deser_params() else { return Response::maybe_invalid_params(id.as_deref()); @@ -1030,7 +1039,7 @@ where let payload = convert_result!(self(ctx, params, state).await); - Response::maybe(id.as_deref(), &payload) + Response::maybe(&span, id.as_deref(), &payload) }) } } diff --git a/src/types/resp.rs b/src/types/resp.rs index 3b2fe4e..a021a62 100644 --- a/src/types/resp.rs +++ b/src/types/resp.rs @@ -73,9 +73,14 @@ where E: Serialize, { pub(crate) fn maybe( + span: &tracing::Span, id: Option<&'b RawValue>, payload: &'a ResponsePayload, ) -> Option> { + if let Some(err_code) = payload.as_error().map(|e| e.code) { + span.record("rpc.jsonrpc.error_code", err_code); + } + id.map(|id| Self { id, payload }.to_json()) } From e53097d110351efbe17bc0b0c7a607902a434555 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 14:45:09 -0400 Subject: [PATCH 06/19] feat: comply with request_id on server span --- src/axum.rs | 13 +++++++++---- src/pubsub/shared.rs | 14 ++++++++++---- src/routes/ctx.rs | 1 + 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/axum.rs b/src/axum.rs index 4fd1db1..ed1a834 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -66,6 +66,9 @@ where S: Clone + Send + Sync + 'static, { fn ctx(&self) -> HandlerCtx { + // This span is populated with as much detail as possible, and then + // given to the Handler ctx. It will be populated with request-specific + // details (e.g. method) during ctx instantiation. let request_span = debug_span!( "ajj.IntoAxum::call", "otel.kind" = "server", @@ -103,11 +106,12 @@ where // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md#message-event let req = ctx.span().in_scope(|| { + //// https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events debug!( "rpc.message.id" = self.rx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "received", + "rpc.message.type" = "RECEIVED", "rpc.message.uncompressed_size" = bytes.len(), - "Received request" + "rpc.message" ); // If the inbound data is not currently parsable, we @@ -126,10 +130,11 @@ where let body = Box::::from(response); span.in_scope(|| { + // https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events debug!( "rpc.message.id" = self.tx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "received", - "Received request" + "rpc.message.type" = "SENT", + "rpc.message.uncompressed_size" = body.len(), ); }); diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index 5140b1d..d5c2ccf 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -254,6 +254,10 @@ where break; }; + // This span is populated with as much detail as + // possible, and then given to the Handler ctx. It + // will be populated with request-specific details + // (e.g. method) during ctx instantiation. let tracing = TracingInfo { service: router.service_name(), request_span: debug_span!( parent: None, "ajj.pubsub.RouteTask::call", @@ -275,11 +279,12 @@ where let span = ctx.span().clone(); span.in_scope(|| { + // https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events debug!( "rpc.message.id" = rx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "received", + "rpc.message.type" = "RECEIVED", "rpc.message.uncompressed_size" = item_bytes, - "Received request" + "rpc.message" ); }); @@ -374,10 +379,11 @@ impl WriteTask { break; }; span.in_scope(|| { + // https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events debug!( "rpc.message.id" = tx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "sent", - "Sending response" + "rpc.message.type" = "SENT", + "rpc.message" ); }); diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index c2d591a..9b0b4f6 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -279,6 +279,7 @@ impl HandlerArgs { let span = this.span(); span.record("otel.name", this.otel_span_name()); span.record("rpc.method", this.req.method()); + span.record("rpc.jsonrpc.request_id", this.req.id()); if enabled!(Level::TRACE) { span.record("params", this.req.params()); } From a3a622e683aee99995d1d3825baf9b2c97242baa Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 14:51:21 -0400 Subject: [PATCH 07/19] chore: readme update --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index dfaa536..553deff 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,17 @@ implementations. See the [crate documentation on docs.rs] for more detailed examples. +## Specification Complinace + +`ajj` aims to be fully compliant with the [JSON-RPC 2.0] specification. If any +issues are found, please [open an issue]! + +`ajj` produces [`tracing`] spans that meet the [OpenTelemetry semantic +conventions] for JSON-RPC servers with the following exception: + +- The `server.address` attribute is NOT set, as the server address is not always + known to the ajj system. + ## Note on code provenance Some code in this project has been reproduced or adapted from other projects. @@ -94,3 +105,6 @@ reproduced from the following projects, and we are grateful for their work: [`interprocess::local_socket::ListenerOptions`]: https://docs.rs/interprocess/latest/interprocess/local_socket/struct.ListenerOptions.html [std::net::SocketAddr]: https://doc.rust-lang.org/std/net/enum.SocketAddr.html [alloy]: https://docs.rs/alloy/latest/alloy/ +[open an issue]: https://github.com/init4tech/ajj/issues/new +[OpenTelemetry semantic conventions]: https://opentelemetry.io/docs/specs/semconv/rpc/json-rpc/ +[`tracing`]: https://docs.rs/tracing/latest/tracing/ From 18bc4735dbac30b3faf9d4f29452f65ab329ad3d Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 14:53:54 -0400 Subject: [PATCH 08/19] fix: add the error_message prop as well --- README.md | 5 +++-- src/types/resp.rs | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 553deff..d6662e2 100644 --- a/README.md +++ b/README.md @@ -76,11 +76,12 @@ See the [crate documentation on docs.rs] for more detailed examples. `ajj` aims to be fully compliant with the [JSON-RPC 2.0] specification. If any issues are found, please [open an issue]! -`ajj` produces [`tracing`] spans that meet the [OpenTelemetry semantic -conventions] for JSON-RPC servers with the following exception: +`ajj` produces [`tracing`] spans and events that meet the [OpenTelemetry +semantic conventions] for JSON-RPC servers with the following exception: - The `server.address` attribute is NOT set, as the server address is not always known to the ajj system. +- ## Note on code provenance diff --git a/src/types/resp.rs b/src/types/resp.rs index a021a62..4ac36fd 100644 --- a/src/types/resp.rs +++ b/src/types/resp.rs @@ -77,8 +77,9 @@ where id: Option<&'b RawValue>, payload: &'a ResponsePayload, ) -> Option> { - if let Some(err_code) = payload.as_error().map(|e| e.code) { - span.record("rpc.jsonrpc.error_code", err_code); + if let Some(e) = payload.as_error() { + span.record("rpc.jsonrpc.error_code", e.code); + span.record("rpc.jsonrpc.error_message", &e.message.as_ref()); } id.map(|id| Self { id, payload }.to_json()) From 6e9afdb5972af85d98e2b375e670f43e6d3b33b6 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 3 Oct 2025 15:05:27 -0400 Subject: [PATCH 09/19] lint: clippy --- src/types/resp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/types/resp.rs b/src/types/resp.rs index 4ac36fd..4eadfa8 100644 --- a/src/types/resp.rs +++ b/src/types/resp.rs @@ -79,7 +79,7 @@ where ) -> Option> { if let Some(e) = payload.as_error() { span.record("rpc.jsonrpc.error_code", e.code); - span.record("rpc.jsonrpc.error_message", &e.message.as_ref()); + span.record("rpc.jsonrpc.error_message", e.message.as_ref()); } id.map(|id| Self { id, payload }.to_json()) From 5e9d055d06eebe189ac7665f6fc9cb424dbacd45 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 09:52:21 -0400 Subject: [PATCH 10/19] nit: formatting --- src/axum.rs | 7 +++++++ src/pubsub/shared.rs | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/src/axum.rs b/src/axum.rs index ed1a834..5bd7385 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -70,12 +70,19 @@ where // given to the Handler ctx. It will be populated with request-specific // details (e.g. method) during ctx instantiation. let request_span = debug_span!( + // We could erase the parent here, however, axum or tower layers + // may be creating per-request spans that we want to be children of. "ajj.IntoAxum::call", "otel.kind" = "server", "rpc.system" = "jsonrpc", "rpc.jsonrpc.version" = "2.0", "rpc.service" = self.router.service_name(), notifications_enabled = false, + "otel.name" = tracing::field::Empty, + "rpc.jsonrpc.request_id" = tracing::field::Empty, + "rpc.jsonrpc.error_code" = tracing::field::Empty, + "rpc.jsonrpc.error_message" = tracing::field::Empty, + "rpc.method" = tracing::field::Empty, params = tracing::field::Empty ); diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index d5c2ccf..1530883 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -267,6 +267,11 @@ where "rpc.service" = router.service_name(), conn_id = self.conn_id, notifications_enabled = true, + "otel.name" = tracing::field::Empty, + "rpc.jsonrpc.request_id" = tracing::field::Empty, + "rpc.jsonrpc.error_code" = tracing::field::Empty, + "rpc.jsonrpc.error_message" = tracing::field::Empty, + "rpc.method" = tracing::field::Empty, params = tracing::field::Empty ) }; From ade71a0ec39d56ea7b15bcbdf1c4f2cb65f7341a Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 10:22:53 -0400 Subject: [PATCH 11/19] feat: use traceparent via otel http header extractor --- Cargo.toml | 5 ++++- src/axum.rs | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7de86da..2073eb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ repository = "https://github.com/init4tech/ajj" [dependencies] bytes = "1.9.0" +opentelemetry = "0.31.0" pin-project = "1.1.8" serde = { version = "1.0.217", features = ["derive"] } serde_json = { version = "1.0.135", features = ["raw_value"] } @@ -23,10 +24,12 @@ tokio = { version = "1.43.0", features = ["sync", "rt", "macros"] } tokio-util = { version = "0.7.13", features = ["io", "rt"] } tower = { version = "0.5.2", features = ["util"] } tracing = "0.1.41" +tracing-opentelemetry = "0.32.0" # axum axum = { version = "0.8.1", optional = true } mime = { version = "0.3.17", optional = true } +opentelemetry-http = { version = "0.31.0", optional = true } # pubsub tokio-stream = { version = "0.1.17", optional = true } @@ -51,7 +54,7 @@ eyre = "0.6.12" [features] default = ["axum", "ws", "ipc"] -axum = ["dep:axum", "dep:mime"] +axum = ["dep:axum", "dep:mime", "dep:opentelemetry-http"] pubsub = ["dep:tokio-stream", "axum?/ws"] ipc = ["pubsub", "dep:interprocess"] ws = ["pubsub", "dep:tokio-tungstenite", "dep:futures-util"] diff --git a/src/axum.rs b/src/axum.rs index 5bd7385..99c8448 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -18,6 +18,7 @@ use std::{ }; use tokio::runtime::Handle; use tracing::{debug, debug_span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; /// A wrapper around an [`Router`] that implements the /// [`axum::handler::Handler`] trait. This struct is an implementation detail @@ -107,6 +108,11 @@ where Box::pin(async move { let ctx = self.ctx(); + let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&opentelemetry_http::HeaderExtractor(req.headers())) + }); + ctx.span().set_parent(parent_context).unwrap(); + let Ok(bytes) = Bytes::from_request(req, &state).await else { return Box::::from(Response::parse_error()).into_response(); }; From 7620c7c9fcb25e04a91e60153ebf58de22079471 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 10:27:38 -0400 Subject: [PATCH 12/19] feat: set otel status during response construction --- src/types/resp.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/types/resp.rs b/src/types/resp.rs index 4eadfa8..acf0e77 100644 --- a/src/types/resp.rs +++ b/src/types/resp.rs @@ -1,8 +1,10 @@ use crate::RpcSend; +use opentelemetry::trace::Status; use serde::{ser::SerializeMap, Serialize, Serializer}; use serde_json::value::{to_raw_value, RawValue}; use std::borrow::Cow; use std::fmt; +use tracing_opentelemetry::OpenTelemetrySpanExt; const INTERNAL_ERROR: Cow<'_, str> = Cow::Borrowed("Internal error"); @@ -80,9 +82,12 @@ where if let Some(e) = payload.as_error() { span.record("rpc.jsonrpc.error_code", e.code); span.record("rpc.jsonrpc.error_message", e.message.as_ref()); + span.set_status(Status::Error { + description: e.message.clone(), + }); } - id.map(|id| Self { id, payload }.to_json()) + id.map(move |id| Self { id, payload }.to_json()) } pub(crate) fn to_json(&self) -> Box { From 714832dce08c17147f4ec40da0affaef7ee1867d Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 10:41:16 -0400 Subject: [PATCH 13/19] refactor: DRY with macros --- src/axum.rs | 41 +++++++---------------- src/macros.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++ src/pubsub/shared.rs | 38 ++++++--------------- 3 files changed, 100 insertions(+), 57 deletions(-) diff --git a/src/axum.rs b/src/axum.rs index 99c8448..e635d09 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -11,13 +11,9 @@ use bytes::Bytes; use std::{ future::Future, pin::Pin, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, + sync::{atomic::AtomicU32, Arc}, }; use tokio::runtime::Handle; -use tracing::{debug, debug_span}; use tracing_opentelemetry::OpenTelemetrySpanExt; /// A wrapper around an [`Router`] that implements the @@ -70,21 +66,11 @@ where // This span is populated with as much detail as possible, and then // given to the Handler ctx. It will be populated with request-specific // details (e.g. method) during ctx instantiation. - let request_span = debug_span!( + let request_span = request_span!( // We could erase the parent here, however, axum or tower layers // may be creating per-request spans that we want to be children of. - "ajj.IntoAxum::call", - "otel.kind" = "server", - "rpc.system" = "jsonrpc", - "rpc.jsonrpc.version" = "2.0", - "rpc.service" = self.router.service_name(), - notifications_enabled = false, - "otel.name" = tracing::field::Empty, - "rpc.jsonrpc.request_id" = tracing::field::Empty, - "rpc.jsonrpc.error_code" = tracing::field::Empty, - "rpc.jsonrpc.error_message" = tracing::field::Empty, - "rpc.method" = tracing::field::Empty, - params = tracing::field::Empty + name: "ajj.IntoAxum::call", + router: &self.router, ); HandlerCtx::new( @@ -119,12 +105,10 @@ where // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md#message-event let req = ctx.span().in_scope(|| { - //// https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events - debug!( - "rpc.message.id" = self.rx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "RECEIVED", - "rpc.message.uncompressed_size" = bytes.len(), - "rpc.message" + message_event!( + @received, + counter: &self.rx_msg_id, + bytes: bytes.len(), ); // If the inbound data is not currently parsable, we @@ -143,11 +127,10 @@ where let body = Box::::from(response); span.in_scope(|| { - // https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events - debug!( - "rpc.message.id" = self.tx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "SENT", - "rpc.message.uncompressed_size" = body.len(), + message_event!( + @sent, + counter: &self.tx_msg_id, + bytes: body.len(), ); }); diff --git a/src/macros.rs b/src/macros.rs index 105eaa4..d18bfd8 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -54,6 +54,84 @@ macro_rules! unwrap_infallible { }; } +/// Set up an initial tracing span for a request handler. +macro_rules! request_span { + (@noparent, $name:literal, $router:expr, notifications: $notifications:literal,) => { + ::tracing::debug_span!( + parent: None, + $name, + "otel.kind" = "server", + "rpc.system" = "jsonrpc", + "rpc.jsonrpc.version" = "2.0", + "rpc.service" = $router.service_name(), + notifications_enabled = $notifications, + "otel.name" = ::tracing::field::Empty, + "rpc.jsonrpc.request_id" = ::tracing::field::Empty, + "rpc.jsonrpc.error_code" = ::tracing::field::Empty, + "rpc.jsonrpc.error_message" = ::tracing::field::Empty, + "rpc.method" = ::tracing::field::Empty, + params = ::tracing::field::Empty, + ) + }; + + (parent: $parent:expr, $name:literal, $router:expr, notifications: $notifications:literal,) => { + ::tracing::debug_span!( + parent: $parent, + $name, + "otel.kind" = "server", + "rpc.system" = "jsonrpc", + "rpc.jsonrpc.version" = "2.0", + "rpc.service" = $router.service_name(), + notifications_enabled = $notifications, + "otel.name" = ::tracing::field::Empty, + "rpc.jsonrpc.request_id" = ::tracing::field::Empty, + "rpc.jsonrpc.error_code" = ::tracing::field::Empty, + "rpc.jsonrpc.error_message" = ::tracing::field::Empty, + "rpc.method" = ::tracing::field::Empty, + params = ::tracing::field::Empty, + ) + }; + + + (parent: $parent:expr, name: $name:literal, router: $router:expr,) => { + request_span!($parent, $name, $router, notifications: false,) + }; + + (parent: $parent:expr, name: $name:literal, router: $router:expr, with_notifications) => { + request_span!(parent: $parent, $name, $router, notifications: true,) + }; + + (name: $name:literal, router: $router:expr,) => { + request_span!(@noparent, $name, $router, notifications: false,) + }; + + (name: $name:literal, router: $router:expr, with_notifications) => { + request_span!(@noparent, $name, $router, notifications: true,) + }; +} + +/// Log a message event to the current span. +/// +/// See +macro_rules! message_event { + ($type:literal, counter: $counter:expr, bytes: $bytes:expr,) => {{ + ::tracing::debug!( + "rpc.message.id" = $counter.fetch_add(1, ::std::sync::atomic::Ordering::Relaxed), + "rpc.message.type" = $type, + "rpc.message.uncompressed_size" = $bytes, + "rpc.message" + ); + }}; + + (@received, counter: $counter:expr, bytes: $bytes:expr, ) => { + message_event!("RECEIVED", counter: $counter, bytes: $bytes,); + }; + + (@sent, counter: $counter:expr, bytes: $bytes:expr, ) => { + message_event!("SENT", counter: $counter, bytes: $bytes,); + }; +} + // Some code is this file is reproduced under the terms of the MIT license. It // originates from the `axum` crate. The original source code can be found at // the following URL, and the original license is included below. diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index 1530883..3b0278f 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -12,7 +12,7 @@ use std::sync::{ use tokio::{pin, runtime::Handle, select, sync::mpsc, task::JoinHandle}; use tokio_stream::StreamExt; use tokio_util::sync::WaitForCancellationFutureOwned; -use tracing::{debug, debug_span, error, trace, Instrument}; +use tracing::{debug, error, trace, Instrument}; /// Default notification buffer size per task. pub const DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT: usize = 16; @@ -258,22 +258,7 @@ where // possible, and then given to the Handler ctx. It // will be populated with request-specific details // (e.g. method) during ctx instantiation. - let tracing = TracingInfo { service: router.service_name(), request_span: debug_span!( - parent: None, - "ajj.pubsub.RouteTask::call", - "otel.kind" = "server", - "rpc.system" = "jsonrpc", - "rpc.jsonrpc.version" = "2.0", - "rpc.service" = router.service_name(), - conn_id = self.conn_id, - notifications_enabled = true, - "otel.name" = tracing::field::Empty, - "rpc.jsonrpc.request_id" = tracing::field::Empty, - "rpc.jsonrpc.error_code" = tracing::field::Empty, - "rpc.jsonrpc.error_message" = tracing::field::Empty, - "rpc.method" = tracing::field::Empty, - params = tracing::field::Empty - ) }; + let tracing = TracingInfo { service: router.service_name(), request_span: request_span!(name: "ajj.pubsub.RouteTask::call", router: &router, with_notifications) }; let ctx = HandlerCtx::new( @@ -284,12 +269,10 @@ where let span = ctx.span().clone(); span.in_scope(|| { - // https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events - debug!( - "rpc.message.id" = rx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "RECEIVED", - "rpc.message.uncompressed_size" = item_bytes, - "rpc.message" + message_event!( + @received, + counter: &rx_msg_id, + bytes: item_bytes, ); }); @@ -384,11 +367,10 @@ impl WriteTask { break; }; span.in_scope(|| { - // https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events - debug!( - "rpc.message.id" = tx_msg_id.fetch_add(1, Ordering::Relaxed), - "rpc.message.type" = "SENT", - "rpc.message" + message_event!( + @sent, + counter: &tx_msg_id, + bytes: json.get().len(), ); }); From 0458f0cde5336707d628cb4ce0b31d949020e659 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 11:03:09 -0400 Subject: [PATCH 14/19] refactor: improve code quality and DRY --- src/axum.rs | 28 ++++++++++++++++++++-------- src/macros.rs | 30 +++++++----------------------- src/pubsub/shared.rs | 2 +- src/types/batch.rs | 4 ++-- 4 files changed, 30 insertions(+), 34 deletions(-) diff --git a/src/axum.rs b/src/axum.rs index e635d09..b3cf84a 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -8,12 +8,14 @@ use axum::{ response::IntoResponse, }; use bytes::Bytes; +use opentelemetry::trace::TraceContextExt; use std::{ future::Future, pin::Pin, sync::{atomic::AtomicU32, Arc}, }; use tokio::runtime::Handle; +use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; /// A wrapper around an [`Router`] that implements the @@ -62,17 +64,32 @@ impl IntoAxum where S: Clone + Send + Sync + 'static, { - fn ctx(&self) -> HandlerCtx { + fn ctx(&self, req: &axum::extract::Request) -> HandlerCtx { // This span is populated with as much detail as possible, and then // given to the Handler ctx. It will be populated with request-specific // details (e.g. method) during ctx instantiation. let request_span = request_span!( + parent: Span::current(), // We could erase the parent here, however, axum or tower layers // may be creating per-request spans that we want to be children of. - name: "ajj.IntoAxum::call", + name: "ajj.IntoAxum::request", router: &self.router, ); + let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&opentelemetry_http::HeaderExtractor(req.headers())) + }); + request_span.set_parent(parent_context).unwrap(); + request_span.record( + "trace_id", + request_span + .context() + .span() + .span_context() + .trace_id() + .to_string(), + ); + HandlerCtx::new( None, self.task_set.clone(), @@ -92,12 +109,7 @@ where fn call(self, req: axum::extract::Request, state: S) -> Self::Future { Box::pin(async move { - let ctx = self.ctx(); - - let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| { - propagator.extract(&opentelemetry_http::HeaderExtractor(req.headers())) - }); - ctx.span().set_parent(parent_context).unwrap(); + let ctx = self.ctx(&req); let Ok(bytes) = Bytes::from_request(req, &state).await else { return Box::::from(Response::parse_error()).into_response(); diff --git a/src/macros.rs b/src/macros.rs index d18bfd8..a436337 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -56,24 +56,6 @@ macro_rules! unwrap_infallible { /// Set up an initial tracing span for a request handler. macro_rules! request_span { - (@noparent, $name:literal, $router:expr, notifications: $notifications:literal,) => { - ::tracing::debug_span!( - parent: None, - $name, - "otel.kind" = "server", - "rpc.system" = "jsonrpc", - "rpc.jsonrpc.version" = "2.0", - "rpc.service" = $router.service_name(), - notifications_enabled = $notifications, - "otel.name" = ::tracing::field::Empty, - "rpc.jsonrpc.request_id" = ::tracing::field::Empty, - "rpc.jsonrpc.error_code" = ::tracing::field::Empty, - "rpc.jsonrpc.error_message" = ::tracing::field::Empty, - "rpc.method" = ::tracing::field::Empty, - params = ::tracing::field::Empty, - ) - }; - (parent: $parent:expr, $name:literal, $router:expr, notifications: $notifications:literal,) => { ::tracing::debug_span!( parent: $parent, @@ -83,7 +65,9 @@ macro_rules! request_span { "rpc.jsonrpc.version" = "2.0", "rpc.service" = $router.service_name(), notifications_enabled = $notifications, + "trace_id" = ::tracing::field::Empty, "otel.name" = ::tracing::field::Empty, + "otel.status" = ::tracing::field::Empty, "rpc.jsonrpc.request_id" = ::tracing::field::Empty, "rpc.jsonrpc.error_code" = ::tracing::field::Empty, "rpc.jsonrpc.error_message" = ::tracing::field::Empty, @@ -94,19 +78,19 @@ macro_rules! request_span { (parent: $parent:expr, name: $name:literal, router: $router:expr,) => { - request_span!($parent, $name, $router, notifications: false,) + request_span!(parent: $parent, $name, $router, notifications: false,) }; (parent: $parent:expr, name: $name:literal, router: $router:expr, with_notifications) => { request_span!(parent: $parent, $name, $router, notifications: true,) }; - (name: $name:literal, router: $router:expr,) => { - request_span!(@noparent, $name, $router, notifications: false,) + (@noparent, name: $name:literal, router: $router:expr,) => { + request_span!(parent: None,, $name, $router, notifications: false,) }; - (name: $name:literal, router: $router:expr, with_notifications) => { - request_span!(@noparent, $name, $router, notifications: true,) + (@noparent, name: $name:literal, router: $router:expr, with_notifications) => { + request_span!(parent: None, $name, $router, notifications: true,) }; } diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index 3b0278f..b9d5690 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -258,7 +258,7 @@ where // possible, and then given to the Handler ctx. It // will be populated with request-specific details // (e.g. method) during ctx instantiation. - let tracing = TracingInfo { service: router.service_name(), request_span: request_span!(name: "ajj.pubsub.RouteTask::call", router: &router, with_notifications) }; + let tracing = TracingInfo { service: router.service_name(), request_span: request_span!(@noparent, name: "ajj.pubsub.RouteTask::call", router: &router, with_notifications) }; let ctx = HandlerCtx::new( diff --git a/src/types/batch.rs b/src/types/batch.rs index 1b650ff..cdbabc1 100644 --- a/src/types/batch.rs +++ b/src/types/batch.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use serde::Deserialize; use serde_json::value::RawValue; use std::ops::Range; -use tracing::{debug, enabled, instrument, Level}; +use tracing::{debug, enabled, instrument, span::Span, Level}; /// UTF-8, partially deserialized JSON-RPC request batch. #[derive(Default)] @@ -56,7 +56,7 @@ impl TryFrom for InboundData { #[instrument(level = "debug", skip(bytes), fields(buf_len = bytes.len(), bytes = tracing::field::Empty))] fn try_from(bytes: Bytes) -> Result { if enabled!(Level::TRACE) { - tracing::span::Span::current().record("bytes", format!("0x{:x}", bytes)); + Span::current().record("bytes", format!("0x{:x}", bytes)); } // This event exists only so that people who use default console From a0d7bd897f51117a11194777483c34153010991a Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 11:53:42 -0400 Subject: [PATCH 15/19] feat: appropiately associate spans with names --- README.md | 1 - src/axum.rs | 38 +++---------- src/macros.rs | 42 +------------- src/pubsub/shared.rs | 3 +- src/router.rs | 19 ++++--- src/routes/ctx.rs | 128 +++++++++++++++++++++++++++++++++++++++---- 6 files changed, 142 insertions(+), 89 deletions(-) diff --git a/README.md b/README.md index d6662e2..ca0a743 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,6 @@ semantic conventions] for JSON-RPC servers with the following exception: - The `server.address` attribute is NOT set, as the server address is not always known to the ajj system. -- ## Note on code provenance diff --git a/src/axum.rs b/src/axum.rs index b3cf84a..e8d9671 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -8,15 +8,13 @@ use axum::{ response::IntoResponse, }; use bytes::Bytes; -use opentelemetry::trace::TraceContextExt; use std::{ future::Future, pin::Pin, sync::{atomic::AtomicU32, Arc}, }; use tokio::runtime::Handle; -use tracing::Span; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, Span}; /// A wrapper around an [`Router`] that implements the /// [`axum::handler::Handler`] trait. This struct is an implementation detail @@ -65,38 +63,14 @@ where S: Clone + Send + Sync + 'static, { fn ctx(&self, req: &axum::extract::Request) -> HandlerCtx { - // This span is populated with as much detail as possible, and then - // given to the Handler ctx. It will be populated with request-specific - // details (e.g. method) during ctx instantiation. - let request_span = request_span!( - parent: Span::current(), - // We could erase the parent here, however, axum or tower layers - // may be creating per-request spans that we want to be children of. - name: "ajj.IntoAxum::request", - router: &self.router, - ); - let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| { propagator.extract(&opentelemetry_http::HeaderExtractor(req.headers())) }); - request_span.set_parent(parent_context).unwrap(); - request_span.record( - "trace_id", - request_span - .context() - .span() - .span_context() - .trace_id() - .to_string(), - ); HandlerCtx::new( None, self.task_set.clone(), - TracingInfo { - service: self.router.service_name(), - request_span, - }, + TracingInfo::new_with_context(self.router.service_name(), parent_context), ) } } @@ -110,6 +84,7 @@ where fn call(self, req: axum::extract::Request, state: S) -> Self::Future { Box::pin(async move { let ctx = self.ctx(&req); + ctx.init_request_span(&self.router, Some(&Span::current())); let Ok(bytes) = Bytes::from_request(req, &state).await else { return Box::::from(Response::parse_error()).into_response(); @@ -130,7 +105,12 @@ where }); let span = ctx.span().clone(); - if let Some(response) = self.router.call_batch_with_state(ctx, req, state).await { + if let Some(response) = self + .router + .call_batch_with_state(ctx, req, state) + .instrument(span.clone()) + .await + { let headers = [( header::CONTENT_TYPE, HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), diff --git a/src/macros.rs b/src/macros.rs index a436337..db9cbd6 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -54,52 +54,12 @@ macro_rules! unwrap_infallible { }; } -/// Set up an initial tracing span for a request handler. -macro_rules! request_span { - (parent: $parent:expr, $name:literal, $router:expr, notifications: $notifications:literal,) => { - ::tracing::debug_span!( - parent: $parent, - $name, - "otel.kind" = "server", - "rpc.system" = "jsonrpc", - "rpc.jsonrpc.version" = "2.0", - "rpc.service" = $router.service_name(), - notifications_enabled = $notifications, - "trace_id" = ::tracing::field::Empty, - "otel.name" = ::tracing::field::Empty, - "otel.status" = ::tracing::field::Empty, - "rpc.jsonrpc.request_id" = ::tracing::field::Empty, - "rpc.jsonrpc.error_code" = ::tracing::field::Empty, - "rpc.jsonrpc.error_message" = ::tracing::field::Empty, - "rpc.method" = ::tracing::field::Empty, - params = ::tracing::field::Empty, - ) - }; - - - (parent: $parent:expr, name: $name:literal, router: $router:expr,) => { - request_span!(parent: $parent, $name, $router, notifications: false,) - }; - - (parent: $parent:expr, name: $name:literal, router: $router:expr, with_notifications) => { - request_span!(parent: $parent, $name, $router, notifications: true,) - }; - - (@noparent, name: $name:literal, router: $router:expr,) => { - request_span!(parent: None,, $name, $router, notifications: false,) - }; - - (@noparent, name: $name:literal, router: $router:expr, with_notifications) => { - request_span!(parent: None, $name, $router, notifications: true,) - }; -} - /// Log a message event to the current span. /// /// See macro_rules! message_event { ($type:literal, counter: $counter:expr, bytes: $bytes:expr,) => {{ - ::tracing::debug!( + ::tracing::info!( "rpc.message.id" = $counter.fetch_add(1, ::std::sync::atomic::Ordering::Relaxed), "rpc.message.type" = $type, "rpc.message.uncompressed_size" = $bytes, diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index b9d5690..a1dcb3f 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -258,7 +258,7 @@ where // possible, and then given to the Handler ctx. It // will be populated with request-specific details // (e.g. method) during ctx instantiation. - let tracing = TracingInfo { service: router.service_name(), request_span: request_span!(@noparent, name: "ajj.pubsub.RouteTask::call", router: &router, with_notifications) }; + let tracing = TracingInfo::new(router.service_name()); let ctx = HandlerCtx::new( @@ -266,6 +266,7 @@ where children.clone(), tracing, ); + ctx.init_request_span(&router, None); let span = ctx.span().clone(); span.in_scope(|| { diff --git a/src/router.rs b/src/router.rs index 2846a7d..98e6593 100644 --- a/src/router.rs +++ b/src/router.rs @@ -333,20 +333,25 @@ where let mut fut = BatchFuture::new_with_capacity(inbound.single(), inbound.len()); // According to spec, non-parsable requests should still receive a // response. - let span = debug_span!(parent: ctx.span(), "BatchFuture::poll", reqs = inbound.len(), futs = tracing::field::Empty); + let batch_span = debug_span!(parent: ctx.span(), "BatchFuture::poll", reqs = inbound.len(), futs = tracing::field::Empty); - for (batch_idx, req) in inbound.iter().enumerate() { + for req in inbound.iter() { let req = req.map(|req| { - let span = debug_span!(parent: &span, "RouteFuture::poll", batch_idx, method = req.method(), id = ?req.id()); - let args = HandlerArgs::new(ctx.clone(), req); + // Cloning here resets the `TracingInfo`, which means each + // ctx has a separate span with similar metadata. + let ctx = ctx.clone(); + let request_span = ctx.init_request_span(&self, Some(&batch_span)).clone(); + + // Several span fields are populated in `HandlerArgs::new`. + let args = HandlerArgs::new(ctx, req); self.inner .call_with_state(args, state.clone()) - .with_span(span) + .with_span(request_span) }); fut.push_parse_result(req); } - span.record("futs", fut.len()); - fut.with_span(span) + batch_span.record("futs", fut.len()); + fut.with_span(batch_span) } /// Nest this router into a new Axum router, with the specified path and the currently-running diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index 9b0b4f6..2df9bcb 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -1,12 +1,15 @@ -use crate::{pubsub::WriteItem, types::Request, RpcSend, TaskSet}; +use crate::{pubsub::WriteItem, types::Request, Router, RpcSend, TaskSet}; +use ::tracing::info_span; +use opentelemetry::trace::TraceContextExt; use serde_json::value::RawValue; -use std::future::Future; +use std::{future::Future, sync::OnceLock}; use tokio::{ sync::mpsc::{self, error::SendError}, task::JoinHandle, }; use tokio_util::sync::WaitForCancellationFutureOwned; use tracing::{enabled, error, Level}; +use tracing_opentelemetry::OpenTelemetrySpanExt; /// Errors that can occur when sending notifications. #[derive(thiserror::Error, Debug)] @@ -27,24 +30,115 @@ impl From> for NotifyError { /// Tracing information for OpenTelemetry. This struct is used to store /// information about the current request that can be used for tracing. -#[derive(Debug, Clone)] +#[derive(Debug)] #[non_exhaustive] pub struct TracingInfo { /// The OpenTelemetry service name. pub service: &'static str, - /// The request span. - pub request_span: tracing::Span, + /// The open telemetry Context, + pub context: Option, + + /// The tracing span for this request. + span: OnceLock, +} + +impl Clone for TracingInfo { + fn clone(&self) -> Self { + Self { + service: self.service, + context: self.context.clone(), + span: OnceLock::new(), + } + } } impl TracingInfo { + /// Create a new tracing info with the given service name and no context. + #[allow(dead_code)] // used in some features + pub fn new(service: &'static str) -> Self { + Self { + service, + context: None, + span: OnceLock::new(), + } + } + + /// Create a new tracing info with the given service name and context. + pub fn new_with_context( + service: &'static str, + context: opentelemetry::context::Context, + ) -> Self { + Self { + service, + context: Some(context), + span: OnceLock::new(), + } + } + + /// Create a request span for a handler invocation. + fn init_request_span( + &self, + router: &Router, + with_notifications: bool, + parent: Option<&tracing::Span>, + ) -> &tracing::Span + where + S: Clone + Send + Sync + 'static, + { + // This span is populated with as much detail as possible, and then + // given to the Request. It will be populated with request-specific + // details (e.g. method) during request setup. + self.span.get_or_init(|| { + let span = info_span!( + parent: parent.and_then(|p| p.id()), + "AjjRequest", + "otel.kind" = "server", + "rpc.system" = "jsonrpc", + "rpc.jsonrpc.version" = "2.0", + "rpc.service" = router.service_name(), + notifications_enabled = with_notifications, + "trace_id" = ::tracing::field::Empty, + "otel.name" = ::tracing::field::Empty, + "otel.status_code" = ::tracing::field::Empty, + "rpc.jsonrpc.request_id" = ::tracing::field::Empty, + "rpc.jsonrpc.error_code" = ::tracing::field::Empty, + "rpc.jsonrpc.error_message" = ::tracing::field::Empty, + "rpc.method" = ::tracing::field::Empty, + params = ::tracing::field::Empty, + ); + if let Some(context) = &self.context { + let _ = span.set_parent(context.clone()); + + span.record( + "trace_id", + context.span().span_context().trace_id().to_string(), + ); + } + + span + }) + } + + /// Get a reference to the tracing span for this request. + /// + /// ## Panics + /// + /// Panics if the span has not been initialized via + /// [`Self::init_request_span`]. + #[track_caller] + fn request_span(&self) -> &tracing::Span { + self.span.get().expect("span not initialized") + } + /// Create a mock tracing info for testing. #[cfg(test)] pub fn mock() -> Self { - use tracing::debug_span; + use tracing::info_span; Self { service: "test", - request_span: debug_span!("test"), + context: None, + span: OnceLock::new(), } } } @@ -104,8 +198,9 @@ impl HandlerCtx { } /// Get a reference to the tracing span for this handler context. - pub const fn span(&self) -> &tracing::Span { - &self.tracing.request_span + #[track_caller] + pub fn span(&self) -> &tracing::Span { + &self.tracing.request_span() } /// Set the tracing information for this handler context. @@ -123,6 +218,19 @@ impl HandlerCtx { .unwrap_or_default() } + /// Create a request span for a handler invocation. + pub fn init_request_span( + &self, + router: &Router, + parent: Option<&tracing::Span>, + ) -> &tracing::Span + where + S: Clone + Send + Sync + 'static, + { + self.tracing_info() + .init_request_span(router, self.notifications_enabled(), parent) + } + /// Notify a client of an event. pub async fn notify(&self, t: &T) -> Result<(), NotifyError> { if let Some(notifications) = self.notifications.as_ref() { @@ -298,7 +406,7 @@ impl HandlerArgs { } /// Get a reference to the tracing span for this handler invocation. - pub const fn span(&self) -> &tracing::Span { + pub fn span(&self) -> &tracing::Span { self.ctx.span() } From 3e00517b9470f1ecc7fbc850e467303086815420 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 11:54:12 -0400 Subject: [PATCH 16/19] lint: clippy --- src/router.rs | 2 +- src/routes/ctx.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/router.rs b/src/router.rs index 98e6593..3939b1a 100644 --- a/src/router.rs +++ b/src/router.rs @@ -340,7 +340,7 @@ where // Cloning here resets the `TracingInfo`, which means each // ctx has a separate span with similar metadata. let ctx = ctx.clone(); - let request_span = ctx.init_request_span(&self, Some(&batch_span)).clone(); + let request_span = ctx.init_request_span(self, Some(&batch_span)).clone(); // Several span fields are populated in `HandlerArgs::new`. let args = HandlerArgs::new(ctx, req); diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index 2df9bcb..5be7762 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -56,7 +56,7 @@ impl Clone for TracingInfo { impl TracingInfo { /// Create a new tracing info with the given service name and no context. #[allow(dead_code)] // used in some features - pub fn new(service: &'static str) -> Self { + pub const fn new(service: &'static str) -> Self { Self { service, context: None, @@ -65,7 +65,7 @@ impl TracingInfo { } /// Create a new tracing info with the given service name and context. - pub fn new_with_context( + pub const fn new_with_context( service: &'static str, context: opentelemetry::context::Context, ) -> Self { @@ -133,8 +133,8 @@ impl TracingInfo { /// Create a mock tracing info for testing. #[cfg(test)] - pub fn mock() -> Self { - use tracing::info_span; + pub const fn mock() -> Self { + Self { service: "test", context: None, @@ -200,7 +200,7 @@ impl HandlerCtx { /// Get a reference to the tracing span for this handler context. #[track_caller] pub fn span(&self) -> &tracing::Span { - &self.tracing.request_span() + self.tracing.request_span() } /// Set the tracing information for this handler context. From 172c8ffe4dd6988d3142b22c9708a82221207bbc Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 11:59:21 -0400 Subject: [PATCH 17/19] fix: start message counters at 1 --- src/axum.rs | 8 ++++---- src/pubsub/shared.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/axum.rs b/src/axum.rs index e8d9671..d1327e8 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -40,8 +40,8 @@ impl From> for IntoAxum { Self { router, task_set: Default::default(), - rx_msg_id: Arc::new(AtomicU32::new(0)), - tx_msg_id: Arc::new(AtomicU32::new(0)), + rx_msg_id: Arc::new(AtomicU32::new(1)), + tx_msg_id: Arc::new(AtomicU32::new(1)), } } } @@ -52,8 +52,8 @@ impl IntoAxum { Self { router, task_set: handle.into(), - rx_msg_id: Arc::new(AtomicU32::new(0)), - tx_msg_id: Arc::new(AtomicU32::new(0)), + rx_msg_id: Arc::new(AtomicU32::new(1)), + tx_msg_id: Arc::new(AtomicU32::new(1)), } } } diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs index a1dcb3f..26c3900 100644 --- a/src/pubsub/shared.rs +++ b/src/pubsub/shared.rs @@ -84,8 +84,8 @@ impl ConnectionManager { next_id: AtomicU64::new(0).into(), router, notification_buffer_per_task: DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT, - tx_msg_id: Arc::new(AtomicU32::new(0)), - rx_msg_id: Arc::new(AtomicU32::new(0)), + tx_msg_id: Arc::new(AtomicU32::new(1)), + rx_msg_id: Arc::new(AtomicU32::new(1)), } } From e538f822b2bd918940ed6388785f53fa252175ff Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 12:46:22 -0400 Subject: [PATCH 18/19] chore: document non-compliance --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ca0a743..22ae42e 100644 --- a/README.md +++ b/README.md @@ -77,10 +77,16 @@ See the [crate documentation on docs.rs] for more detailed examples. issues are found, please [open an issue]! `ajj` produces [`tracing`] spans and events that meet the [OpenTelemetry -semantic conventions] for JSON-RPC servers with the following exception: +semantic conventions] for JSON-RPC servers with the following exceptions: - The `server.address` attribute is NOT set, as the server address is not always known to the ajj system. +- `rpc.message` events are included in AJJ system spans for the batch request, + which technically does not comply with semantic conventions. The semantic + conventions do not specify how to handle batch requests, and assume that each + message corresponds to a separate request. In AJJ, batch requests are a single + message, and result in a single `rpc.message` event at receipt and at + response. ## Note on code provenance From c72f7213c2358dcadd99489c598ed08239751236 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 6 Oct 2025 13:12:29 -0400 Subject: [PATCH 19/19] fix: mock --- src/routes/ctx.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs index 5be7762..d3733db 100644 --- a/src/routes/ctx.rs +++ b/src/routes/ctx.rs @@ -133,12 +133,11 @@ impl TracingInfo { /// Create a mock tracing info for testing. #[cfg(test)] - pub const fn mock() -> Self { - + pub fn mock() -> Self { Self { service: "test", context: None, - span: OnceLock::new(), + span: OnceLock::from(info_span!("")), } } } @@ -377,6 +376,12 @@ pub struct HandlerArgs { impl HandlerArgs { /// Create new handler arguments. + /// + /// ## Panics + /// + /// If the ctx tracing span has not been initialized via + /// [`HandlerCtx::init_request_span`]. + #[track_caller] pub fn new(ctx: HandlerCtx, req: Request) -> Self { let this = Self { ctx, @@ -406,6 +411,12 @@ impl HandlerArgs { } /// Get a reference to the tracing span for this handler invocation. + /// + /// ## Panics + /// + /// If the span has not been initialized via + /// [`HandlerCtx::init_request_span`]. + #[track_caller] pub fn span(&self) -> &tracing::Span { self.ctx.span() }