Skip to content

Commit

Permalink
remove span from stream processors
Browse files Browse the repository at this point in the history
  • Loading branch information
zeegomo committed Sep 24, 2021
1 parent 8a967bd commit 04ff288
Showing 1 changed file with 6 additions and 26 deletions.
32 changes: 6 additions & 26 deletions jormungandr/src/network/subscription.rs
Expand Up @@ -13,7 +13,6 @@ use jormungandr_lib::interfaces::FragmentOrigin;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::ready;
use tracing::Span;

use std::error::Error as _;
use std::mem;
Expand Down Expand Up @@ -42,11 +41,10 @@ pub async fn process_block_announcements<S>(
mbox: MessageBox<BlockMsg>,
node_id: Address,
global_state: GlobalStateR,
span: Span,
) where
S: TryStream<Ok = net_data::Header, Error = Error>,
{
let sink = BlockAnnouncementProcessor::new(mbox, node_id, global_state, span);
let sink = BlockAnnouncementProcessor::new(mbox, node_id, global_state);
stream
.into_stream()
.forward(sink)
Expand All @@ -61,11 +59,10 @@ pub async fn process_gossip<S>(
mbox: MessageBox<TopologyMsg>,
node_id: Address,
global_state: GlobalStateR,
span: Span,
) where
S: TryStream<Ok = net_data::Gossip, Error = Error>,
{
let processor = GossipProcessor::new(mbox, node_id, global_state, span);
let processor = GossipProcessor::new(mbox, node_id, global_state);
stream
.into_stream()
.forward(processor)
Expand All @@ -83,11 +80,10 @@ pub async fn process_fragments<S>(
mbox: MessageBox<TransactionMsg>,
node_id: Address,
global_state: GlobalStateR,
span: Span,
) where
S: TryStream<Ok = net_data::Fragment, Error = Error>,
{
let sink = FragmentProcessor::new(mbox, node_id, global_state, span);
let sink = FragmentProcessor::new(mbox, node_id, global_state);
stream
.into_stream()
.forward(sink)
Expand All @@ -103,22 +99,19 @@ pub struct BlockAnnouncementProcessor {
node_id: Address,
global_state: GlobalStateR,
pending_processing: PendingProcessing,
span: Span,
}

impl BlockAnnouncementProcessor {
pub(super) fn new(
mbox: MessageBox<BlockMsg>,
node_id: Address,
global_state: GlobalStateR,
span: Span,
) -> Self {
BlockAnnouncementProcessor {
mbox,
node_id,
global_state,
pending_processing: PendingProcessing::default(),
span,
}
}

Expand All @@ -135,7 +128,7 @@ impl BlockAnnouncementProcessor {
tracing::debug!("received block from node that is not in the peer map");
}
}
.instrument(self.span.clone());
.in_current_span();
// It's OK to overwrite a pending future because only the latest
// timestamp matters.
self.pending_processing.start(fut);
Expand All @@ -155,28 +148,24 @@ pub struct FragmentProcessor {
global_state: GlobalStateR,
buffered_fragments: Vec<Fragment>,
pending_processing: PendingProcessing,
span: Span,
}

impl FragmentProcessor {
pub(super) fn new(
mbox: MessageBox<TransactionMsg>,
node_id: Address,
global_state: GlobalStateR,
span: Span,
) -> Self {
FragmentProcessor {
mbox,
node_id,
global_state,
buffered_fragments: Vec::with_capacity(buffer_sizes::inbound::FRAGMENTS),
pending_processing: PendingProcessing::default(),
span,
}
}

fn refresh_stat(&mut self) {
let refresh_span = self.span.clone();
let state = self.global_state.clone();
let node_id = self.node_id;
let fut = async move {
Expand All @@ -185,7 +174,7 @@ impl FragmentProcessor {
tracing::debug!("received fragment from node that is not in the peer map",);
}
}
.instrument(refresh_span);
.in_current_span();
// It's OK to overwrite a pending future because only the latest
// timestamp matters.
self.pending_processing.start(fut);
Expand All @@ -196,7 +185,6 @@ pub struct GossipProcessor {
mbox: MessageBox<TopologyMsg>,
node_id: Address,
global_state: GlobalStateR,
span: Span,
pending_processing: PendingProcessing,
}

Expand All @@ -205,13 +193,11 @@ impl GossipProcessor {
mbox: MessageBox<TopologyMsg>,
node_id: Address,
global_state: GlobalStateR,
span: Span,
) -> Self {
GossipProcessor {
mbox,
node_id,
global_state,
span,
pending_processing: Default::default(),
}
}
Expand Down Expand Up @@ -337,8 +323,6 @@ impl Sink<net_data::Fragment> for FragmentProcessor {

impl FragmentProcessor {
fn poll_send_fragments(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
let span = self.span.clone();
let _enter = span.enter();
ready!(self.mbox.poll_ready(cx)).map_err(|e| {
tracing::debug!(reason = %e, "error sending fragments for processing");
Error::new(Code::Internal, e)
Expand Down Expand Up @@ -367,7 +351,6 @@ impl FragmentProcessor {
}

fn poll_flush_mbox(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
let _enter = self.span.enter();
Pin::new(&mut self.mbox).poll_flush(cx).map_err(|e| {
tracing::error!(
reason = %e,
Expand All @@ -378,7 +361,6 @@ impl FragmentProcessor {
}

fn poll_complete_refresh_stat(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let _enter = self.span.enter();
self.pending_processing.poll_complete(cx)
}
}
Expand All @@ -392,8 +374,6 @@ impl Sink<net_data::Gossip> for GossipProcessor {
}

fn start_send(mut self: Pin<&mut Self>, gossip: net_data::Gossip) -> Result<(), Error> {
let span = self.span.clone();
let _enter = span.enter();
let nodes = gossip.nodes.decode().map_err(|e| {
tracing::info!(
reason = %e.source().unwrap(),
Expand Down Expand Up @@ -426,7 +406,7 @@ impl Sink<net_data::Gossip> for GossipProcessor {
});
},
)
.instrument(span.clone())
.in_current_span()
.map(|_| ());
self.pending_processing.start(fut);
Ok(())
Expand Down

0 comments on commit 04ff288

Please sign in to comment.