1
- use std:: fmt:: Debug ;
1
+ use std:: { fmt:: Debug , ops :: Deref } ;
2
2
3
3
use irpc:: {
4
4
channel:: { mpsc, none:: NoSender , oneshot} ,
@@ -143,12 +143,16 @@ impl EventMask {
143
143
observe : ObserveMode :: None ,
144
144
} ;
145
145
146
- /// You get asked for every single thing that is going on and can intervene/throttle.
147
- pub const ALL : Self = Self {
146
+ /// All event notifications for read-only requests are fully enabled.
147
+ ///
148
+ /// If you want to enable push requests, which can write to the local store, you
149
+ /// need to do it manually. Providing constants that have push enabled would
150
+ /// risk misuse.
151
+ pub const ALL_READONLY : Self = Self {
148
152
connected : ConnectMode :: Request ,
149
153
get : RequestMode :: RequestLog ,
150
154
get_many : RequestMode :: RequestLog ,
151
- push : RequestMode :: RequestLog ,
155
+ push : RequestMode :: Disabled ,
152
156
throttle : ThrottleMode :: Throttle ,
153
157
observe : ObserveMode :: Request ,
154
158
} ;
@@ -158,6 +162,14 @@ impl EventMask {
158
162
#[ derive( Debug , Serialize , Deserialize ) ]
159
163
pub struct Notify < T > ( T ) ;
160
164
165
+ impl < T > Deref for Notify < T > {
166
+ type Target = T ;
167
+
168
+ fn deref ( & self ) -> & Self :: Target {
169
+ & self . 0
170
+ }
171
+ }
172
+
161
173
#[ derive( Debug , Default , Clone ) ]
162
174
pub struct EventSender {
163
175
mask : EventMask ,
@@ -263,6 +275,80 @@ impl EventSender {
263
275
}
264
276
}
265
277
278
+ /// Log request events at trace level.
279
+ pub fn tracing ( & self , mask : EventMask ) -> Self {
280
+ use tracing:: trace;
281
+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
282
+ n0_future:: task:: spawn ( async move {
283
+ fn log_request_events (
284
+ mut rx : irpc:: channel:: mpsc:: Receiver < RequestUpdate > ,
285
+ connection_id : u64 ,
286
+ request_id : u64 ,
287
+ ) {
288
+ n0_future:: task:: spawn ( async move {
289
+ while let Ok ( Some ( update) ) = rx. recv ( ) . await {
290
+ trace ! ( %connection_id, %request_id, "{update:?}" ) ;
291
+ }
292
+ } ) ;
293
+ }
294
+ while let Some ( msg) = rx. recv ( ) . await {
295
+ match msg {
296
+ ProviderMessage :: ClientConnected ( _) => todo ! ( ) ,
297
+ ProviderMessage :: ClientConnectedNotify ( msg) => {
298
+ trace ! ( "{:?}" , msg. inner) ;
299
+ }
300
+ ProviderMessage :: ConnectionClosed ( msg) => {
301
+ trace ! ( "{:?}" , msg. inner) ;
302
+ }
303
+ ProviderMessage :: GetRequestReceived ( msg) => {
304
+ trace ! ( "{:?}" , msg. inner) ;
305
+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
306
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
307
+ }
308
+ ProviderMessage :: GetRequestReceivedNotify ( msg) => {
309
+ trace ! ( "{:?}" , msg. inner) ;
310
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
311
+ }
312
+ ProviderMessage :: GetManyRequestReceived ( msg) => {
313
+ trace ! ( "{:?}" , msg. inner) ;
314
+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
315
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
316
+ }
317
+ ProviderMessage :: GetManyRequestReceivedNotify ( msg) => {
318
+ trace ! ( "{:?}" , msg. inner) ;
319
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
320
+ }
321
+ ProviderMessage :: PushRequestReceived ( msg) => {
322
+ trace ! ( "{:?}" , msg. inner) ;
323
+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
324
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
325
+ }
326
+ ProviderMessage :: PushRequestReceivedNotify ( msg) => {
327
+ trace ! ( "{:?}" , msg. inner) ;
328
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
329
+ }
330
+ ProviderMessage :: ObserveRequestReceived ( msg) => {
331
+ trace ! ( "{:?}" , msg. inner) ;
332
+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
333
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
334
+ }
335
+ ProviderMessage :: ObserveRequestReceivedNotify ( msg) => {
336
+ trace ! ( "{:?}" , msg. inner) ;
337
+ log_request_events ( msg. rx , msg. inner . connection_id , msg. inner . request_id ) ;
338
+ }
339
+ ProviderMessage :: Throttle ( msg) => {
340
+ trace ! ( "{:?}" , msg. inner) ;
341
+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
342
+ }
343
+ }
344
+ }
345
+ } ) ;
346
+ Self {
347
+ mask,
348
+ inner : Some ( irpc:: Client :: from ( tx) ) ,
349
+ }
350
+ }
351
+
266
352
/// A new client has been connected.
267
353
pub async fn client_connected ( & self , f : impl Fn ( ) -> ClientConnected ) -> ClientResult {
268
354
if let Some ( client) = & self . inner {
0 commit comments