@@ -46,7 +46,6 @@ use anyhow::anyhow;
46
46
use futures_lite:: { future:: BoxedLocal , Stream , StreamExt } ;
47
47
use hashlink:: LinkedHashSet ;
48
48
use iroh:: { endpoint, Endpoint , NodeAddr , NodeId } ;
49
- use iroh_metrics:: inc;
50
49
use tokio:: {
51
50
sync:: { mpsc, oneshot} ,
52
51
task:: JoinSet ,
@@ -55,7 +54,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
55
54
use tracing:: { debug, error, error_span, trace, warn, Instrument } ;
56
55
57
56
use crate :: {
58
- get:: { db:: DownloadProgress , Stats } ,
57
+ get:: { db:: DownloadProgress , error :: GetError , Stats } ,
59
58
metrics:: Metrics ,
60
59
store:: Store ,
61
60
util:: { local_pool:: LocalPoolHandle , progress:: ProgressSender } ,
@@ -98,7 +97,7 @@ pub enum FailureAction {
98
97
/// The request was cancelled by us.
99
98
AllIntentsDropped ,
100
99
/// An error occurred that prevents the request from being retried at all.
101
- AbortRequest ( anyhow :: Error ) ,
100
+ AbortRequest ( GetError ) ,
102
101
/// An error occurred that suggests the node should not be used in general.
103
102
DropPeer ( anyhow:: Error ) ,
104
103
/// An error occurred in which neither the node nor the request are at fault.
@@ -332,6 +331,7 @@ pub struct Downloader {
332
331
next_id : Arc < AtomicU64 > ,
333
332
/// Channel to communicate with the service.
334
333
msg_tx : mpsc:: Sender < Message > ,
334
+ metrics : Arc < Metrics > ,
335
335
}
336
336
337
337
impl Downloader {
@@ -354,23 +354,33 @@ impl Downloader {
354
354
where
355
355
S : Store ,
356
356
{
357
+ let metrics = Arc :: new ( Metrics :: default ( ) ) ;
357
358
let me = endpoint. node_id ( ) . fmt_short ( ) ;
358
359
let ( msg_tx, msg_rx) = mpsc:: channel ( SERVICE_CHANNEL_CAPACITY ) ;
359
360
let dialer = Dialer :: new ( endpoint) ;
360
361
362
+ let metrics_clone = metrics. clone ( ) ;
361
363
let create_future = move || {
362
364
let getter = get:: IoGetter {
363
365
store : store. clone ( ) ,
364
366
} ;
365
367
366
- let service = Service :: new ( getter, dialer, concurrency_limits, retry_config, msg_rx) ;
368
+ let service = Service :: new (
369
+ getter,
370
+ dialer,
371
+ concurrency_limits,
372
+ retry_config,
373
+ msg_rx,
374
+ metrics_clone,
375
+ ) ;
367
376
368
377
service. run ( ) . instrument ( error_span ! ( "downloader" , %me) )
369
378
} ;
370
379
rt. spawn_detached ( create_future) ;
371
380
Self {
372
381
next_id : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
373
382
msg_tx,
383
+ metrics,
374
384
}
375
385
}
376
386
@@ -424,6 +434,11 @@ impl Downloader {
424
434
debug ! ( ?msg, "nodes have not been sent" )
425
435
}
426
436
}
437
+
438
+ /// Returns the metrics collected for this downloader.
439
+ pub fn metrics ( & self ) -> & Arc < Metrics > {
440
+ & self . metrics
441
+ }
427
442
}
428
443
429
444
/// Messages the service can receive.
@@ -565,6 +580,7 @@ struct Service<G: Getter, D: DialerT> {
565
580
in_progress_downloads : JoinSet < ( DownloadKind , InternalDownloadResult ) > ,
566
581
/// Progress tracker
567
582
progress_tracker : ProgressTracker ,
583
+ metrics : Arc < Metrics > ,
568
584
}
569
585
impl < G : Getter < Connection = D :: Connection > , D : DialerT > Service < G , D > {
570
586
fn new (
@@ -573,6 +589,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
573
589
concurrency_limits : ConcurrencyLimits ,
574
590
retry_config : RetryConfig ,
575
591
msg_rx : mpsc:: Receiver < Message > ,
592
+ metrics : Arc < Metrics > ,
576
593
) -> Self {
577
594
Service {
578
595
getter,
@@ -590,23 +607,24 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
590
607
in_progress_downloads : Default :: default ( ) ,
591
608
progress_tracker : ProgressTracker :: new ( ) ,
592
609
queue : Default :: default ( ) ,
610
+ metrics,
593
611
}
594
612
}
595
613
596
614
/// Main loop for the service.
597
615
async fn run ( mut self ) {
598
616
loop {
599
617
trace ! ( "wait for tick" ) ;
600
- inc ! ( Metrics , downloader_tick_main ) ;
618
+ self . metrics . downloader_tick_main . inc ( ) ;
601
619
tokio:: select! {
602
620
Some ( ( node, conn_result) ) = self . dialer. next( ) => {
603
621
trace!( node=%node. fmt_short( ) , "tick: connection ready" ) ;
604
- inc! ( Metrics , downloader_tick_connection_ready ) ;
622
+ self . metrics . downloader_tick_connection_ready . inc( ) ;
605
623
self . on_connection_ready( node, conn_result) ;
606
624
}
607
625
maybe_msg = self . msg_rx. recv( ) => {
608
626
trace!( msg=?maybe_msg, "tick: message received" ) ;
609
- inc! ( Metrics , downloader_tick_message_received ) ;
627
+ self . metrics . downloader_tick_message_received . inc( ) ;
610
628
match maybe_msg {
611
629
Some ( msg) => self . handle_message( msg) . await ,
612
630
None => return self . shutdown( ) . await ,
@@ -616,25 +634,26 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
616
634
match res {
617
635
Ok ( ( kind, result) ) => {
618
636
trace!( %kind, "tick: transfer completed" ) ;
619
- inc!( Metrics , downloader_tick_transfer_completed) ;
637
+ self :: get:: track_metrics( & result, & self . metrics) ;
638
+ self . metrics. downloader_tick_transfer_completed. inc( ) ;
620
639
self . on_download_completed( kind, result) ;
621
640
}
622
641
Err ( err) => {
623
642
warn!( ?err, "transfer task panicked" ) ;
624
- inc! ( Metrics , downloader_tick_transfer_failed ) ;
643
+ self . metrics . downloader_tick_transfer_failed . inc( ) ;
625
644
}
626
645
}
627
646
}
628
647
Some ( expired) = self . retry_nodes_queue. next( ) => {
629
648
let node = expired. into_inner( ) ;
630
649
trace!( node=%node. fmt_short( ) , "tick: retry node" ) ;
631
- inc! ( Metrics , downloader_tick_retry_node ) ;
650
+ self . metrics . downloader_tick_retry_node . inc( ) ;
632
651
self . on_retry_wait_elapsed( node) ;
633
652
}
634
653
Some ( expired) = self . goodbye_nodes_queue. next( ) => {
635
654
let node = expired. into_inner( ) ;
636
655
trace!( node=%node. fmt_short( ) , "tick: goodbye node" ) ;
637
- inc! ( Metrics , downloader_tick_goodbye_node ) ;
656
+ self . metrics . downloader_tick_goodbye_node . inc( ) ;
638
657
self . disconnect_idle_node( node, "idle expired" ) ;
639
658
}
640
659
}
0 commit comments