diff --git a/jormungandr/src/network/subscription.rs b/jormungandr/src/network/subscription.rs index 2ddcd573b6..26fbd28255 100644 --- a/jormungandr/src/network/subscription.rs +++ b/jormungandr/src/network/subscription.rs @@ -13,7 +13,6 @@ use jormungandr_lib::interfaces::FragmentOrigin; use futures::future::BoxFuture; use futures::prelude::*; use futures::ready; -use tracing::Span; use std::error::Error as _; use std::mem; @@ -42,11 +41,10 @@ pub async fn process_block_announcements( mbox: MessageBox, node_id: Address, global_state: GlobalStateR, - span: Span, ) where S: TryStream, { - let sink = BlockAnnouncementProcessor::new(mbox, node_id, global_state, span); + let sink = BlockAnnouncementProcessor::new(mbox, node_id, global_state); stream .into_stream() .forward(sink) @@ -61,11 +59,10 @@ pub async fn process_gossip( mbox: MessageBox, node_id: Address, global_state: GlobalStateR, - span: Span, ) where S: TryStream, { - let processor = GossipProcessor::new(mbox, node_id, global_state, span); + let processor = GossipProcessor::new(mbox, node_id, global_state); stream .into_stream() .forward(processor) @@ -83,11 +80,10 @@ pub async fn process_fragments( mbox: MessageBox, node_id: Address, global_state: GlobalStateR, - span: Span, ) where S: TryStream, { - let sink = FragmentProcessor::new(mbox, node_id, global_state, span); + let sink = FragmentProcessor::new(mbox, node_id, global_state); stream .into_stream() .forward(sink) @@ -103,7 +99,6 @@ pub struct BlockAnnouncementProcessor { node_id: Address, global_state: GlobalStateR, pending_processing: PendingProcessing, - span: Span, } impl BlockAnnouncementProcessor { @@ -111,14 +106,12 @@ impl BlockAnnouncementProcessor { mbox: MessageBox, node_id: Address, global_state: GlobalStateR, - span: Span, ) -> Self { BlockAnnouncementProcessor { mbox, node_id, global_state, pending_processing: PendingProcessing::default(), - span, } } @@ -135,7 +128,7 @@ impl BlockAnnouncementProcessor { tracing::debug!("received block from node that is not in the peer map"); } } - .instrument(self.span.clone()); + .in_current_span(); // It's OK to overwrite a pending future because only the latest // timestamp matters. self.pending_processing.start(fut); @@ -155,7 +148,6 @@ pub struct FragmentProcessor { global_state: GlobalStateR, buffered_fragments: Vec, pending_processing: PendingProcessing, - span: Span, } impl FragmentProcessor { @@ -163,7 +155,6 @@ impl FragmentProcessor { mbox: MessageBox, node_id: Address, global_state: GlobalStateR, - span: Span, ) -> Self { FragmentProcessor { mbox, @@ -171,12 +162,10 @@ impl FragmentProcessor { global_state, buffered_fragments: Vec::with_capacity(buffer_sizes::inbound::FRAGMENTS), pending_processing: PendingProcessing::default(), - span, } } fn refresh_stat(&mut self) { - let refresh_span = self.span.clone(); let state = self.global_state.clone(); let node_id = self.node_id; let fut = async move { @@ -185,7 +174,7 @@ impl FragmentProcessor { tracing::debug!("received fragment from node that is not in the peer map",); } } - .instrument(refresh_span); + .in_current_span(); // It's OK to overwrite a pending future because only the latest // timestamp matters. self.pending_processing.start(fut); @@ -196,7 +185,6 @@ pub struct GossipProcessor { mbox: MessageBox, node_id: Address, global_state: GlobalStateR, - span: Span, pending_processing: PendingProcessing, } @@ -205,13 +193,11 @@ impl GossipProcessor { mbox: MessageBox, node_id: Address, global_state: GlobalStateR, - span: Span, ) -> Self { GossipProcessor { mbox, node_id, global_state, - span, pending_processing: Default::default(), } } @@ -337,8 +323,6 @@ impl Sink for FragmentProcessor { impl FragmentProcessor { fn poll_send_fragments(&mut self, cx: &mut Context<'_>) -> Poll> { - let span = self.span.clone(); - let _enter = span.enter(); ready!(self.mbox.poll_ready(cx)).map_err(|e| { tracing::debug!(reason = %e, "error sending fragments for processing"); Error::new(Code::Internal, e) @@ -367,7 +351,6 @@ impl FragmentProcessor { } fn poll_flush_mbox(&mut self, cx: &mut Context<'_>) -> Poll> { - let _enter = self.span.enter(); Pin::new(&mut self.mbox).poll_flush(cx).map_err(|e| { tracing::error!( reason = %e, @@ -378,7 +361,6 @@ impl FragmentProcessor { } fn poll_complete_refresh_stat(&mut self, cx: &mut Context<'_>) -> Poll<()> { - let _enter = self.span.enter(); self.pending_processing.poll_complete(cx) } } @@ -392,8 +374,6 @@ impl Sink for GossipProcessor { } fn start_send(mut self: Pin<&mut Self>, gossip: net_data::Gossip) -> Result<(), Error> { - let span = self.span.clone(); - let _enter = span.enter(); let nodes = gossip.nodes.decode().map_err(|e| { tracing::info!( reason = %e.source().unwrap(), @@ -426,7 +406,7 @@ impl Sink for GossipProcessor { }); }, ) - .instrument(span.clone()) + .in_current_span() .map(|_| ()); self.pending_processing.start(fut); Ok(())