Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Simple, modern, ergonomic JSON-RPC 2.0 router built with tower an
keywords = ["json-rpc", "jsonrpc", "json"]
categories = ["web-programming::http-server", "web-programming::websocket"]

version = "0.3.4"
version = "0.4.0"
edition = "2021"
rust-version = "1.81"
authors = ["init4", "James Prestwich"]
Expand All @@ -15,6 +15,7 @@ repository = "https://github.com/init4tech/ajj"

[dependencies]
bytes = "1.9.0"
opentelemetry = "0.31.0"
pin-project = "1.1.8"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = { version = "1.0.135", features = ["raw_value"] }
Expand All @@ -23,10 +24,12 @@ tokio = { version = "1.43.0", features = ["sync", "rt", "macros"] }
tokio-util = { version = "0.7.13", features = ["io", "rt"] }
tower = { version = "0.5.2", features = ["util"] }
tracing = "0.1.41"
tracing-opentelemetry = "0.32.0"

# axum
axum = { version = "0.8.1", optional = true }
mime = { version = "0.3.17", optional = true }
opentelemetry-http = { version = "0.31.0", optional = true }

# pubsub
tokio-stream = { version = "0.1.17", optional = true }
Expand All @@ -51,7 +54,7 @@ eyre = "0.6.12"

[features]
default = ["axum", "ws", "ipc"]
axum = ["dep:axum", "dep:mime"]
axum = ["dep:axum", "dep:mime", "dep:opentelemetry-http"]
pubsub = ["dep:tokio-stream", "axum?/ws"]
ipc = ["pubsub", "dep:interprocess"]
ws = ["pubsub", "dep:tokio-tungstenite", "dep:futures-util"]
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ implementations.

See the [crate documentation on docs.rs] for more detailed examples.

## Specification Complinace

`ajj` aims to be fully compliant with the [JSON-RPC 2.0] specification. If any
issues are found, please [open an issue]!

`ajj` produces [`tracing`] spans and events that meet the [OpenTelemetry
semantic conventions] for JSON-RPC servers with the following exceptions:

- The `server.address` attribute is NOT set, as the server address is not always
known to the ajj system.
- `rpc.message` events are included in AJJ system spans for the batch request,
which technically does not comply with semantic conventions. The semantic
conventions do not specify how to handle batch requests, and assume that each
message corresponds to a separate request. In AJJ, batch requests are a single
message, and result in a single `rpc.message` event at receipt and at
response.

## Note on code provenance

Some code in this project has been reproduced or adapted from other projects.
Expand All @@ -94,3 +111,6 @@ reproduced from the following projects, and we are grateful for their work:
[`interprocess::local_socket::ListenerOptions`]: https://docs.rs/interprocess/latest/interprocess/local_socket/struct.ListenerOptions.html
[std::net::SocketAddr]: https://doc.rust-lang.org/std/net/enum.SocketAddr.html
[alloy]: https://docs.rs/alloy/latest/alloy/
[open an issue]: https://github.com/init4tech/ajj/issues/new
[OpenTelemetry semantic conventions]: https://opentelemetry.io/docs/specs/semconv/rpc/json-rpc/
[`tracing`]: https://docs.rs/tracing/latest/tracing/
71 changes: 61 additions & 10 deletions src/axum.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use crate::{
types::{InboundData, Response},
HandlerCtx, TaskSet,
HandlerCtx, TaskSet, TracingInfo,
};
use axum::{
extract::FromRequest,
http::{header, HeaderValue},
response::IntoResponse,
};
use bytes::Bytes;
use std::{future::Future, pin::Pin};
use std::{
future::Future,
pin::Pin,
sync::{atomic::AtomicU32, Arc},
};
use tokio::runtime::Handle;
use tracing::{Instrument, Span};

/// A wrapper around an [`Router`] that implements the
/// [`axum::handler::Handler`] trait. This struct is an implementation detail
Expand All @@ -21,14 +26,22 @@ use tokio::runtime::Handle;
#[derive(Debug, Clone)]
pub(crate) struct IntoAxum<S> {
pub(crate) router: crate::Router<S>,

pub(crate) task_set: TaskSet,

/// Counter for OTEL messages received.
pub(crate) rx_msg_id: Arc<AtomicU32>,
/// Counter for OTEL messages sent.
pub(crate) tx_msg_id: Arc<AtomicU32>,
}

impl<S> From<crate::Router<S>> for IntoAxum<S> {
fn from(router: crate::Router<S>) -> Self {
Self {
router,
task_set: Default::default(),
rx_msg_id: Arc::new(AtomicU32::new(1)),
tx_msg_id: Arc::new(AtomicU32::new(1)),
}
}
}
Expand All @@ -39,12 +52,26 @@ impl<S> IntoAxum<S> {
Self {
router,
task_set: handle.into(),
rx_msg_id: Arc::new(AtomicU32::new(1)),
tx_msg_id: Arc::new(AtomicU32::new(1)),
}
}
}

/// Get a new context, built from the task set.
fn ctx(&self) -> HandlerCtx {
self.task_set.clone().into()
impl<S> IntoAxum<S>
where
S: Clone + Send + Sync + 'static,
{
fn ctx(&self, req: &axum::extract::Request) -> HandlerCtx {
let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&opentelemetry_http::HeaderExtractor(req.headers()))
});

HandlerCtx::new(
None,
self.task_set.clone(),
TracingInfo::new_with_context(self.router.service_name(), parent_context),
)
}
}

Expand All @@ -56,25 +83,49 @@ where

fn call(self, req: axum::extract::Request, state: S) -> Self::Future {
Box::pin(async move {
let ctx = self.ctx(&req);
ctx.init_request_span(&self.router, Some(&Span::current()));

let Ok(bytes) = Bytes::from_request(req, &state).await else {
return Box::<str>::from(Response::parse_error()).into_response();
};

// If the inbound data is not currently parsable, we
// send an empty one it to the router, as the router enforces
// the specification.
let req = InboundData::try_from(bytes).unwrap_or_default();
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md#message-event
let req = ctx.span().in_scope(|| {
message_event!(
@received,
counter: &self.rx_msg_id,
bytes: bytes.len(),
);

// If the inbound data is not currently parsable, we
// send an empty one it to the router, as the router enforces
// the specification.
InboundData::try_from(bytes).unwrap_or_default()
});

let span = ctx.span().clone();
if let Some(response) = self
.router
.call_batch_with_state(self.ctx(), req, state)
.call_batch_with_state(ctx, req, state)
.instrument(span.clone())
.await
{
let headers = [(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()),
)];

let body = Box::<str>::from(response);

span.in_scope(|| {
message_event!(
@sent,
counter: &self.tx_msg_id,
bytes: body.len(),
);
});

(headers, body).into_response()
} else {
().into_response()
Expand Down
16 changes: 6 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! })
//! // Routes get a ctx, which can be used to send notifications.
//! .route("notify", |ctx: HandlerCtx| async move {
//! if ctx.notifications().is_none() {
//! if !ctx.notifications_enabled() {
//! // This error will appear in the ResponsePayload's `data` field.
//! return Err("notifications are disabled");
//! }
Expand Down Expand Up @@ -171,6 +171,7 @@ pub use pubsub::ReadJsonStream;
mod routes;
pub use routes::{
BatchFuture, Handler, HandlerArgs, HandlerCtx, NotifyError, Params, RouteFuture, State,
TracingInfo,
};
pub(crate) use routes::{BoxedIntoRoute, ErasedIntoRoute, Method, Route};

Expand Down Expand Up @@ -206,7 +207,8 @@ pub(crate) mod test_utils {
mod test {

use crate::{
router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, ResponsePayload,
router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, HandlerCtx,
ResponsePayload,
};
use bytes::Bytes;
use serde_json::value::RawValue;
Expand All @@ -231,10 +233,7 @@ mod test {

let res = router
.call_with_state(
HandlerArgs {
ctx: Default::default(),
req: req.try_into().unwrap(),
},
HandlerArgs::new(HandlerCtx::mock(), req.try_into().unwrap()),
(),
)
.await
Expand All @@ -250,10 +249,7 @@ mod test {

let res2 = router
.call_with_state(
HandlerArgs {
ctx: Default::default(),
req: req2.try_into().unwrap(),
},
HandlerArgs::new(HandlerCtx::mock(), req2.try_into().unwrap()),
(),
)
.await
Expand Down
22 changes: 22 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,28 @@ macro_rules! unwrap_infallible {
};
}

/// Log a message event to the current span.
///
/// See <https://github.com/open-telemetry/semantic-conventions/blob/d66109ff41e75f49587114e5bff9d101b87f40bd/docs/rpc/rpc-spans.md#events>
macro_rules! message_event {
($type:literal, counter: $counter:expr, bytes: $bytes:expr,) => {{
::tracing::info!(
"rpc.message.id" = $counter.fetch_add(1, ::std::sync::atomic::Ordering::Relaxed),
"rpc.message.type" = $type,
"rpc.message.uncompressed_size" = $bytes,
"rpc.message"
);
}};

(@received, counter: $counter:expr, bytes: $bytes:expr, ) => {
message_event!("RECEIVED", counter: $counter, bytes: $bytes,);
};

(@sent, counter: $counter:expr, bytes: $bytes:expr, ) => {
message_event!("SENT", counter: $counter, bytes: $bytes,);
};
}

// Some code is this file is reproduced under the terms of the MIT license. It
// originates from the `axum` crate. The original source code can be found at
// the following URL, and the original license is included below.
Expand Down
2 changes: 2 additions & 0 deletions src/pubsub/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ impl AxumWsCfg {
next_id: arc.next_id.clone(),
router: arc.router.clone(),
notification_buffer_per_task: arc.notification_buffer_per_task,
tx_msg_id: arc.tx_msg_id.clone(),
rx_msg_id: arc.rx_msg_id.clone(),
},
}
}
Expand Down
1 change: 1 addition & 0 deletions src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ mod ipc;
pub use ipc::ReadJsonStream;

mod shared;
pub(crate) use shared::WriteItem;
pub use shared::{ConnectionId, DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT};

mod shutdown;
Expand Down
Loading
Loading