1
1
use crate :: {
2
2
pubsub:: { In , JsonSink , Listener , Out } ,
3
3
types:: InboundData ,
4
- HandlerCtx , TaskSet ,
4
+ HandlerCtx , TaskSet , TracingInfo ,
5
5
} ;
6
6
use core:: fmt;
7
7
use serde_json:: value:: RawValue ;
8
- use std:: sync:: { atomic:: AtomicU64 , Arc } ;
8
+ use std:: sync:: {
9
+ atomic:: { AtomicU32 , AtomicU64 , Ordering } ,
10
+ Arc ,
11
+ } ;
9
12
use tokio:: { pin, runtime:: Handle , select, sync:: mpsc, task:: JoinHandle } ;
10
13
use tokio_stream:: StreamExt ;
11
14
use tokio_util:: sync:: WaitForCancellationFutureOwned ;
@@ -105,8 +108,7 @@ impl ConnectionManager {
105
108
106
109
/// Increment the connection ID counter and return an unused ID.
107
110
fn next_id ( & self ) -> ConnectionId {
108
- self . next_id
109
- . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: Relaxed )
111
+ self . next_id . fetch_add ( 1 , Ordering :: Relaxed )
110
112
}
111
113
112
114
/// Get a clone of the router.
@@ -131,13 +133,15 @@ impl ConnectionManager {
131
133
write_task : tx,
132
134
requests,
133
135
tasks : tasks. clone ( ) ,
136
+ rx_msg_id : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
134
137
} ;
135
138
136
139
let wt = WriteTask {
137
140
tasks,
138
141
conn_id,
139
- json : rx,
142
+ items : rx,
140
143
connection,
144
+ tx_msg_id : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
141
145
} ;
142
146
143
147
( rt, wt)
@@ -168,11 +172,14 @@ struct RouteTask<T: crate::pubsub::Listener> {
168
172
/// Connection ID for the connection serviced by this task.
169
173
pub ( crate ) conn_id : ConnectionId ,
170
174
/// Sender to the write task.
171
- pub ( crate ) write_task : mpsc:: Sender < Box < RawValue > > ,
175
+ pub ( crate ) write_task : mpsc:: Sender < WriteItem > ,
172
176
/// Stream of requests.
173
177
pub ( crate ) requests : In < T > ,
174
178
/// The task set for this connection
175
179
pub ( crate ) tasks : TaskSet ,
180
+
181
+ /// Counter for OTEL messages received.
182
+ pub ( crate ) rx_msg_id : Arc < AtomicU32 > ,
176
183
}
177
184
178
185
impl < T : crate :: pubsub:: Listener > fmt:: Debug for RouteTask < T > {
@@ -199,6 +206,7 @@ where
199
206
mut requests,
200
207
write_task,
201
208
tasks,
209
+ rx_msg_id,
202
210
..
203
211
} = self ;
204
212
@@ -224,6 +232,8 @@ where
224
232
break ;
225
233
} ;
226
234
235
+ let item_bytes = item. len( ) ;
236
+
227
237
// If the inbound data is not currently parsable, we
228
238
// send an empty one it to the router, as the router
229
239
// enforces the specification.
@@ -234,16 +244,38 @@ where
234
244
// if the client stops accepting responses, we do not keep
235
245
// handling inbound requests.
236
246
let Ok ( permit) = write_task. clone( ) . reserve_owned( ) . await else {
237
- tracing :: error!( "write task dropped while waiting for permit" ) ;
247
+ error!( "write task dropped while waiting for permit" ) ;
238
248
break ;
239
249
} ;
240
250
251
+ let tracing = TracingInfo { service: router. service_name( ) , request_span: debug_span!(
252
+ parent: None ,
253
+ "ajj.pubsub.RouteTask::call" ,
254
+ "otel.kind" = "server" ,
255
+ "rpc.system" = "jsonrpc" ,
256
+ "rpc.jsonrpc.version" = "2.0" ,
257
+ "rpc.service" = router. service_name( ) ,
258
+ notifications_enabled = true ,
259
+ params = tracing:: field:: Empty
260
+ ) } ;
261
+
241
262
let ctx =
242
263
HandlerCtx :: new(
243
264
Some ( write_task. clone( ) ) ,
244
265
children. clone( ) ,
266
+ tracing,
245
267
) ;
246
268
269
+ let span = ctx. span( ) . clone( ) ;
270
+ span. in_scope( || {
271
+ debug!(
272
+ "rpc.message.id" = rx_msg_id. fetch_add( 1 , Ordering :: Relaxed ) ,
273
+ "rpc.message.type" = "received" ,
274
+ "rpc.message.uncompressed_size" = item_bytes,
275
+ "Received request"
276
+ ) ;
277
+ } ) ;
278
+
247
279
// Run the future in a new task.
248
280
let fut = router. handle_request_batch( ctx, reqs) ;
249
281
@@ -252,9 +284,9 @@ where
252
284
// Send the response to the write task.
253
285
// we don't care if the receiver has gone away,
254
286
// as the task is done regardless.
255
- if let Some ( rv ) = fut. await {
287
+ if let Some ( json ) = fut. await {
256
288
let _ = permit. send(
257
- rv
289
+ WriteItem { span , json }
258
290
) ;
259
291
}
260
292
}
@@ -275,6 +307,13 @@ where
275
307
}
276
308
}
277
309
310
+ /// An item to be written to an outbound JSON pubsub stream.
311
+ #[ derive( Debug , Clone ) ]
312
+ pub ( crate ) struct WriteItem {
313
+ pub ( crate ) span : tracing:: Span ,
314
+ pub ( crate ) json : Box < RawValue > ,
315
+ }
316
+
278
317
/// The Write Task is responsible for writing JSON to the outbound connection.
279
318
struct WriteTask < T : Listener > {
280
319
/// Task set
@@ -287,10 +326,13 @@ struct WriteTask<T: Listener> {
287
326
///
288
327
/// Dropping this channel will cause the associated [`RouteTask`] to
289
328
/// shutdown.
290
- pub ( crate ) json : mpsc:: Receiver < Box < RawValue > > ,
329
+ pub ( crate ) items : mpsc:: Receiver < WriteItem > ,
291
330
292
331
/// Outbound connections.
293
332
pub ( crate ) connection : Out < T > ,
333
+
334
+ /// Counter for OTEL messages sent.
335
+ pub ( crate ) tx_msg_id : Arc < AtomicU32 > ,
294
336
}
295
337
296
338
impl < T : Listener > WriteTask < T > {
@@ -305,8 +347,9 @@ impl<T: Listener> WriteTask<T> {
305
347
pub ( crate ) async fn task_future ( self ) {
306
348
let WriteTask {
307
349
tasks,
308
- mut json ,
350
+ mut items ,
309
351
mut connection,
352
+ tx_msg_id,
310
353
..
311
354
} = self ;
312
355
@@ -318,12 +361,20 @@ impl<T: Listener> WriteTask<T> {
318
361
debug!( "Shutdown signal received" ) ;
319
362
break ;
320
363
}
321
- json = json . recv( ) => {
322
- let Some ( json) = json else {
364
+ item = items . recv( ) => {
365
+ let Some ( WriteItem { span , json } ) = item else {
323
366
tracing:: error!( "Json stream has closed" ) ;
324
367
break ;
325
368
} ;
326
- let span = debug_span!( "WriteTask" , conn_id = self . conn_id) ;
369
+ span. record( "conn_id" , self . conn_id) ;
370
+ span. in_scope( || {
371
+ debug!(
372
+ "rpc.message.id" = tx_msg_id. fetch_add( 1 , Ordering :: Relaxed ) ,
373
+ "rpc.message.type" = "sent" ,
374
+ "Sending response"
375
+ ) ;
376
+ } ) ;
377
+
327
378
if let Err ( err) = connection. send_json( json) . instrument( span) . await {
328
379
debug!( %err, conn_id = self . conn_id, "Failed to send json" ) ;
329
380
break ;
0 commit comments