Skip to content

Commit

Permalink
instrument server and client functions
Browse files Browse the repository at this point in the history
  • Loading branch information
zeegomo committed Sep 24, 2021
1 parent 04ff288 commit a51d018
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 143 deletions.
77 changes: 16 additions & 61 deletions jormungandr/src/network/client/mod.rs
Expand Up @@ -23,7 +23,7 @@ use chain_network::data::block::{BlockEvent, BlockIds, ChainPullRequest};

use futures::prelude::*;
use futures::ready;
use tracing::{span, Level, Span};
use tracing::{instrument, span, Level, Span};
use tracing_futures::Instrument;

use std::pin::Pin;
Expand Down Expand Up @@ -73,37 +73,16 @@ impl Client {
builder.channels.block_box,
inbound.peer_address,
global_state.clone(),
span!(
parent: &parent_span,
Level::TRACE,
"block_announcement_processor",
stream = "block_events",
direction = "in"
),
);
let fragment_sink = FragmentProcessor::new(
builder.channels.transaction_box,
inbound.peer_address,
global_state.clone(),
span!(
parent: &parent_span,
Level::TRACE,
"fragment_processor",
stream = "fragments",
direction = "in"
),
);
let gossip_sink = GossipProcessor::new(
builder.channels.topology_box,
inbound.peer_address,
global_state.clone(),
span!(
parent: &parent_span,
Level::TRACE,
"gossip_processor",
stream = "gossip",
direction = "in"
),
);

Client {
Expand Down Expand Up @@ -172,13 +151,12 @@ impl Progress {
}

impl Client {
#[instrument(level = "debug", skip(self, cx))]
fn process_block_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<ProcessingOutcome, ()>> {
use self::ProcessingOutcome::*;
// Drive sending of a message to block task to clear the buffered
// announcement before polling more events from the block subscription
// stream.
let span = self.span().clone();
let _enter = span.enter();
let mut block_sink = Pin::new(&mut self.block_sink);
ready!(block_sink.as_mut().poll_ready(cx))
.map_err(|e| tracing::debug!(reason = %e, "failed getting block sink"))?;
Expand Down Expand Up @@ -264,13 +242,9 @@ impl Client {
Ok(Continue).into()
}

#[instrument(level = "debug", skip(self))]
fn upload_blocks(&mut self, block_ids: BlockIds) -> Result<(), ()> {
let span = span!(
parent: &self.span,
Level::TRACE,
"solicitation",
kind = "UploadBlocks"
);
let span = span!(Level::DEBUG, "upload_blocks", kind = "UploadBlocks");
let _enter = span.enter();

if block_ids.is_empty() {
Expand Down Expand Up @@ -322,15 +296,8 @@ impl Client {
Ok(())
}

#[instrument(level = "debug", skip(self, req))]
fn push_missing_headers(&mut self, req: ChainPullRequest) -> Result<(), ()> {
let span = span!(
parent: &self.span,
Level::TRACE,
"solicitation",
kind = "PushHeaders"
);
let _enter = span.enter();

let from = req.from.decode().map_err(|e| {
tracing::info!(
reason = %e,
Expand All @@ -349,7 +316,7 @@ impl Client {
"peer requests missing part of the chain"
);
let (reply_handle, future) = intercom::stream_reply(buffer_sizes::outbound::HEADERS);
let future = future.instrument(span.clone());
let future = future.in_current_span();
debug_assert!(self.incoming_solicitation.is_none());
self.incoming_solicitation = Some(ClientMsg::PullHeaders(from, to, reply_handle));
let mut client = self.inner.clone();
Expand Down Expand Up @@ -377,20 +344,14 @@ impl Client {
}
}
}
.instrument(span.clone()),
.in_current_span(),
);
Ok(())
}

#[instrument(level = "debug", skip(self, req))]
fn pull_headers(&mut self, req: ChainPullRequest) {
let mut block_box = self.block_sink.message_box();
let span = span!(
parent: &self.span,
Level::TRACE,
"request",
kind = "PullHeaders"
);
let _enter = span.enter();

let (handle, sink, _) = intercom::stream_request(buffer_sizes::inbound::HEADERS);
// TODO: make sure that back pressure on the number of requests
Expand All @@ -406,7 +367,7 @@ impl Client {
);
}
}
.instrument(span.clone()),
.in_current_span(),
);
let mut client = self.inner.clone();
self.global_state.spawn(
Expand All @@ -430,18 +391,13 @@ impl Client {
}
}
}
.instrument(span.clone()),
.in_current_span(),
);
}

#[instrument(level = "debug", skip(self))]
fn solicit_blocks(&mut self, block_ids: BlockIds) {
let mut block_box = self.block_sink.message_box();
let span = span!(
parent: self.span(),
Level::TRACE,
"request",
kind = "GetBlocks"
);
let (handle, sink, _) = intercom::stream_request(buffer_sizes::inbound::BLOCKS);
// TODO: make sure that back pressure on the number of requests
// in flight prevents unlimited spawning of these tasks.
Expand All @@ -456,7 +412,7 @@ impl Client {
);
}
}
.instrument(span.clone()),
.in_current_span(),
);
let mut client = self.inner.clone();
self.global_state.spawn(
Expand All @@ -480,14 +436,13 @@ impl Client {
}
}
}
.instrument(span),
.in_current_span(),
);
}

#[instrument(level = "debug", skip(self, cx), fields(direction = "in"))]
fn process_fragments(&mut self, cx: &mut Context<'_>) -> Poll<Result<ProcessingOutcome, ()>> {
use self::ProcessingOutcome::*;
let span = self.span().clone();
let _enter = span.enter();
let mut fragment_sink = Pin::new(&mut self.fragment_sink);
ready!(fragment_sink.as_mut().poll_ready(cx)).map_err(|_| ())?;

Expand Down Expand Up @@ -519,10 +474,9 @@ impl Client {
}
}

#[instrument(level = "debug", skip(self, cx), fields(direction = "in"))]
fn process_gossip(&mut self, cx: &mut Context<'_>) -> Poll<Result<ProcessingOutcome, ()>> {
use self::ProcessingOutcome::*;
let span = self.span().clone();
let _enter = span.enter();
let mut gossip_sink = Pin::new(&mut self.gossip_sink);
ready!(gossip_sink.as_mut().poll_ready(cx)).map_err(|_| ())?;

Expand All @@ -534,6 +488,7 @@ impl Client {
Poll::Pending
}
Poll::Ready(Some(Ok(gossip))) => {
tracing::debug!("client");
gossip_sink.as_mut().start_send(gossip).map_err(|_| ())?;
Ok(Continue).into()
}
Expand Down

0 comments on commit a51d018

Please sign in to comment.