Skip to content

Commit

Permalink
Add span to every major unit of work
Browse files Browse the repository at this point in the history
  • Loading branch information
zeegomo committed Sep 24, 2021
1 parent 8a967bd commit d4820a0
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 183 deletions.
156 changes: 80 additions & 76 deletions jormungandr/src/blockchain/process.rs
Expand Up @@ -201,16 +201,16 @@ impl Process {
let network_msg_box = self.network_msgbox.clone();
let explorer_msg_box = self.explorer_msgbox.clone();
let stats_counter = self.stats_counter.clone();

tracing::trace!("handling new blockchain task item");
match input {
BlockMsg::LeadershipBlock(leadership_block) => {
let span = span!(
parent: self.service_info.span(),
Level::TRACE,
Level::DEBUG,
"process_leadership_block",
hash = %leadership_block.block.header.hash().to_string(),
parent = %leadership_block.block.header.parent_id().to_string(),
date = %leadership_block.block.header.block_date().to_string()
hash = %leadership_block.block.header.hash(),
parent = %leadership_block.block.header.parent_id(),
date = %leadership_block.block.header.block_date()
);
let _enter = span.enter();
tracing::info!("receiving block from leadership service");
Expand All @@ -231,12 +231,12 @@ impl Process {
BlockMsg::AnnouncedBlock(header, node_id) => {
let span = span!(
parent: self.service_info.span(),
Level::TRACE,
Level::DEBUG,
"process_announced_block",
hash = %header.hash().to_string(),
parent = %header.parent_id().to_string(),
date = %header.block_date().to_string(),
peer = %node_id.to_string()
hash = %header.hash(),
parent = %header.parent_id(),
date = %header.block_date(),
peer = %node_id
);
let _enter = span.enter();
tracing::info!("received block announcement from network");
Expand All @@ -251,10 +251,17 @@ impl Process {
node_id,
self.pull_headers_scheduler.clone(),
self.get_next_block_scheduler.clone(),
),
)
.instrument(span.clone()),
)
}
BlockMsg::NetworkBlocks(handle) => {
let span = span!(
parent: self.service_info.span(),
Level::DEBUG,
"process_network_blocks",
);
let _guard = span.enter();
tracing::info!("receiving block stream from network");

self.service_info.timeout_spawn_fallible(
Expand All @@ -268,13 +275,14 @@ impl Process {
self.get_next_block_scheduler.clone(),
handle,
stats_counter,
),
)
.instrument(span.clone()),
);
}
BlockMsg::ChainHeaders(handle) => {
tracing::info!("receiving header stream from network");
let span = span!(parent: self.service_info.span(), Level::TRACE, "process_chain_headers", sub_task = "chain_pull");
let span = span!(parent: self.service_info.span(), Level::DEBUG, "process_chain_headers", sub_task = "chain_pull");
let _enter = span.enter();
tracing::info!("receiving header stream from network");

self.service_info.timeout_spawn(
"process network headers",
Expand All @@ -284,10 +292,12 @@ impl Process {
handle,
self.pull_headers_scheduler.clone(),
network_msg_box,
),
)
.instrument(span.clone()),
);
}
}
tracing::trace!("item handling finished");
}

fn start_garbage_collector(&self, info: &TokioServiceInfo) {
Expand All @@ -312,22 +322,29 @@ async fn process_and_propagate_new_ref(
mut network_msg_box: MessageBox<NetworkMsg>,
) -> chain::Result<()> {
let header = new_block_ref.header().clone();
tracing::debug!("processing the new block and propagating");
// Even if this fails because the queue is full we periodically recompute the tip
tip_update_mbox
.try_send(new_block_ref)
.unwrap_or_else(|err| {
tracing::error!(
"cannot send new ref to be evaluated as candidate tip: {}",
err
)
});
let span = span!(Level::DEBUG, "process_and_propagate_new_ref", block = %header.hash());

async {
tracing::debug!("processing the new block and propagating");
// Even if this fails because the queue is full we periodically recompute the tip
tip_update_mbox
.try_send(new_block_ref)
.unwrap_or_else(|err| {
tracing::error!(
"cannot send new ref to be evaluated as candidate tip: {}",
err
)
});

tracing::debug!("propagating block to the network");
tracing::debug!("propagating block to the network");

network_msg_box
.send(NetworkMsg::Propagate(PropagateMsg::Block(header)))
.await?;
network_msg_box
.send(NetworkMsg::Propagate(PropagateMsg::Block(header)))
.await?;
Ok::<(), Error>(())
}
.instrument(span)
.await?;

Ok(())
}
Expand Down Expand Up @@ -482,38 +499,41 @@ async fn process_network_block(
explorer_msg_box: Option<&mut MessageBox<ExplorerMsg>>,
get_next_block_scheduler: &mut GetNextBlockScheduler,
) -> Result<Option<Arc<Ref>>, chain::Error> {
get_next_block_scheduler
.declare_completed(block.id())
.unwrap_or_else(
|e| tracing::error!(reason = ?e, "get next block schedule completion failed"),
);
let header = block.header();
let pre_checked = blockchain.pre_check_header(header, false).await?;
match pre_checked {
PreCheckedHeader::AlreadyPresent { header, .. } => {
tracing::debug!(
hash = %header.hash(),
parent = %header.parent_id(),
date = %header.block_date(),
"block is already present"
);
Ok(None)
}
PreCheckedHeader::MissingParent { header, .. } => {
let parent_hash = header.parent_id();
tracing::debug!(
hash = %header.hash(),
parent = %parent_hash,
date = %header.block_date(),
"block is missing a locally stored parent"
let span = tracing::span!(
Level::DEBUG,
"network_block",
block = %header.hash(),
parent = %header.parent_id(),
date = %header.block_date(),
);

async {
get_next_block_scheduler
.declare_completed(block.id())
.unwrap_or_else(
|e| tracing::error!(reason = ?e, "get next block schedule completion failed"),
);
Err(Error::MissingParentBlock(parent_hash))
}
PreCheckedHeader::HeaderWithCache { parent_ref, .. } => {
let r = check_and_apply_block(blockchain, parent_ref, block, explorer_msg_box).await;
r
let pre_checked = blockchain.pre_check_header(header, false).await?;
match pre_checked {
PreCheckedHeader::AlreadyPresent { .. } => {
tracing::debug!("block is already present");
Ok(None)
}
PreCheckedHeader::MissingParent { header } => {
let parent_hash = header.parent_id();
tracing::debug!("block is missing a locally stored parent");
Err(Error::MissingParentBlock(parent_hash))
}
PreCheckedHeader::HeaderWithCache { parent_ref, .. } => {
let r =
check_and_apply_block(blockchain, parent_ref, block, explorer_msg_box).await;
r
}
}
}
.instrument(span)
.await
}

async fn check_and_apply_block(
Expand All @@ -526,14 +546,7 @@ async fn check_and_apply_block(
let post_checked = blockchain
.post_check_header(block.header(), parent_ref, CheckHeaderProof::Enabled)
.await?;
let header = post_checked.header();
let block_hash = header.hash();
tracing::debug!(
hash = %block_hash,
parent = %header.parent_id(),
date = %header.block_date(),
"applying block to storage"
);
tracing::debug!("applying block to storage");
let mut block_for_explorer = if explorer_enabled {
Some(block.clone())
} else {
Expand All @@ -543,24 +556,15 @@ async fn check_and_apply_block(
.apply_and_store_block(post_checked, block)
.await?;
if let AppliedBlock::New(block_ref) = applied_block {
let header = block_ref.header();
tracing::debug!(
hash = %block_hash,
parent = %header.parent_id(),
date = %header.block_date(),
"applied block to storage"
);
tracing::debug!("applied block to storage");
if let Some(msg_box) = explorer_msg_box {
msg_box
.try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap()))
.unwrap_or_else(|err| tracing::error!("cannot add block to explorer: {}", err));
}
Ok(Some(block_ref))
} else {
tracing::debug!(
hash = %block_hash,
"block is already present in storage, not applied"
);
tracing::debug!("block is already present in storage, not applied");
Ok(None)
}
}
Expand Down
75 changes: 49 additions & 26 deletions jormungandr/src/fragment/process.rs
Expand Up @@ -18,7 +18,7 @@ use chrono::{Duration, DurationRound, Utc};
use futures::{future, TryFutureExt};
use thiserror::Error;
use tokio_stream::StreamExt;
use tracing::{span, Level};
use tracing::{debug_span, span, Level};
use tracing_futures::Instrument;

pub struct Process {
Expand Down Expand Up @@ -115,6 +115,7 @@ impl Process {
loop {
tokio::select! {
maybe_msg = input.next() => {
tracing::trace!("handling new fragment task item");
match maybe_msg {
None => break,
Some(msg) => match msg {
Expand All @@ -129,25 +130,32 @@ impl Process {
// This interface only makes sense for messages coming from arbitrary users (like transaction, certificates),
// for other message we don't want to receive them through this interface, and possibly
// put them in another pool.

let stats_counter = stats_counter.clone();

let summary = pool
.insert_and_propagate_all(origin, fragments, fail_fast)
.await?;

stats_counter.add_tx_recv_cnt(summary.accepted.len());

reply_handle.reply_ok(summary);
let span = debug_span!("incoming_fragments");
async {
let stats_counter = stats_counter.clone();
let summary = pool
.insert_and_propagate_all(origin, fragments, fail_fast)
.await?;

stats_counter.add_tx_recv_cnt(summary.accepted.len());

reply_handle.reply_ok(summary);
Ok::<(), Error>(())
}
.instrument(span)
.await?;
}
TransactionMsg::RemoveTransactions(fragment_ids, status) => {
tracing::debug!(
"removing fragments added to block {:?}: {:?}",
status,
fragment_ids
);
pool.remove_added_to_block(fragment_ids, status);
pool.remove_expired_txs().await;
let span = debug_span!("remove_transactions_in_block");
async {
tracing::debug!(
"removing fragments added to block {:?}: {:?}",
status,
fragment_ids
);
pool.remove_added_to_block(fragment_ids, status);
pool.remove_expired_txs().await;
}.instrument(span).await
}
TransactionMsg::GetLogs(reply_handle) => {
let logs = pool.logs().logs().cloned().collect();
Expand All @@ -163,6 +171,7 @@ impl Process {
reply_handle.reply_ok(statuses);
}
TransactionMsg::BranchSwitch(fork_date) => {
tracing::debug!(%fork_date, "pruning logs after branch switch");
pool.prune_after_ledger_branch(fork_date);
}
TransactionMsg::SelectTransactions {
Expand All @@ -173,7 +182,13 @@ impl Process {
soft_deadline_future,
hard_deadline_future,
} => {
let contents = pool
let span = span!(
Level::DEBUG,
"fragment_selection",
kind = "older_first",
);
async {
let contents = pool
.select(
ledger,
ledger_params,
Expand All @@ -182,17 +197,25 @@ impl Process {
hard_deadline_future,
)
.await;
reply_handle.reply_ok(contents);
reply_handle.reply_ok(contents);
}
.instrument(span)
.await
}
}
}
};
tracing::trace!("item handling finished");
}
_ = &mut wakeup => {
pool.close_persistent_log().await;
let dir = persistent_log_dir.as_ref().unwrap();
let file = open_log_file(dir.as_ref()).await?;
pool.set_persistent_log(file);
wakeup = Box::pin(hourly_wakeup(true));
async {
pool.close_persistent_log().await;
let dir = persistent_log_dir.as_ref().unwrap();
let file = open_log_file(dir.as_ref()).await?;
pool.set_persistent_log(file);
wakeup = Box::pin(hourly_wakeup(true));
Ok::<_, Error>(())
}
.instrument(debug_span!("persistent_log_rotation")).await?;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions jormungandr/src/leadership/process.rs
Expand Up @@ -323,7 +323,7 @@ impl Module {

let span = span!(
parent: self.service_info.span(),
Level::TRACE,
Level::DEBUG,
"action_run_entry",
event_date = %entry.event.date.to_string(),
event_start = %event_start.to_string(),
Expand Down Expand Up @@ -605,7 +605,7 @@ impl Module {
let parent_span = self.service_info.span();
let span = tracing::span!(
parent: parent_span,
Level::TRACE,
Level::DEBUG,
"action_schedule",
epoch_tip = epoch_tip.0,
current_epoch = current_slot_position.epoch.0,
Expand Down

0 comments on commit d4820a0

Please sign in to comment.