Skip to content

Commit

Permalink
add debug spans to every major unit of work
Browse files Browse the repository at this point in the history
  • Loading branch information
zeegomo committed Sep 24, 2021
1 parent 8a967bd commit 2846e1d
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 118 deletions.
26 changes: 18 additions & 8 deletions jormungandr/src/blockchain/process.rs
Expand Up @@ -201,12 +201,12 @@ impl Process {
let network_msg_box = self.network_msgbox.clone();
let explorer_msg_box = self.explorer_msgbox.clone();
let stats_counter = self.stats_counter.clone();

tracing::trace!("handling new blockchain task item");
match input {
BlockMsg::LeadershipBlock(leadership_block) => {
let span = span!(
parent: self.service_info.span(),
Level::TRACE,
Level::DEBUG,
"process_leadership_block",
hash = %leadership_block.block.header.hash().to_string(),
parent = %leadership_block.block.header.parent_id().to_string(),
Expand All @@ -231,7 +231,7 @@ impl Process {
BlockMsg::AnnouncedBlock(header, node_id) => {
let span = span!(
parent: self.service_info.span(),
Level::TRACE,
Level::DEBUG,
"process_announced_block",
hash = %header.hash().to_string(),
parent = %header.parent_id().to_string(),
Expand All @@ -251,10 +251,17 @@ impl Process {
node_id,
self.pull_headers_scheduler.clone(),
self.get_next_block_scheduler.clone(),
),
)
.instrument(span.clone()),
)
}
BlockMsg::NetworkBlocks(handle) => {
let span = span!(
parent: self.service_info.span(),
Level::DEBUG,
"process_network_blocks",
);
let _guard = span.enter();
tracing::info!("receiving block stream from network");

self.service_info.timeout_spawn_fallible(
Expand All @@ -268,13 +275,14 @@ impl Process {
self.get_next_block_scheduler.clone(),
handle,
stats_counter,
),
)
.instrument(span.clone()),
);
}
BlockMsg::ChainHeaders(handle) => {
tracing::info!("receiving header stream from network");
let span = span!(parent: self.service_info.span(), Level::TRACE, "process_chain_headers", sub_task = "chain_pull");
let span = span!(parent: self.service_info.span(), Level::DEBUG, "process_chain_headers", sub_task = "chain_pull");
let _enter = span.enter();
tracing::info!("receiving header stream from network");

self.service_info.timeout_spawn(
"process network headers",
Expand All @@ -284,10 +292,12 @@ impl Process {
handle,
self.pull_headers_scheduler.clone(),
network_msg_box,
),
)
.instrument(span.clone()),
);
}
}
tracing::trace!("item handling finished");
}

fn start_garbage_collector(&self, info: &TokioServiceInfo) {
Expand Down
10 changes: 7 additions & 3 deletions jormungandr/src/fragment/pool.rs
Expand Up @@ -119,7 +119,7 @@ impl Pool {
for fragment in fragments.by_ref() {
let id = fragment.id();

let span = tracing::trace_span!("pool_incoming_fragment", fragment_id=?id);
let span = tracing::debug_span!("fragment", %id);
let _enter = span.enter();

if self.logs.exists(id) {
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Pool {
if fail_fast {
for fragment in fragments {
let id = fragment.id();
let span = tracing::trace_span!("pool_incoming_fragment", fragment_id=?id);
let span = tracing::debug_span!("fragment", %id);
let _enter = span.enter();
tracing::error!(
"rejected due to fail_fast and one of previous fragments being invalid"
Expand All @@ -222,7 +222,11 @@ impl Pool {
tracing::debug!("{} of the received fragments were added to the pool", count);
let fragment_logs: Vec<_> = new_fragments
.iter()
.map(move |fragment| FragmentLog::new(fragment.id(), origin))
.map(move |fragment| {
let id = fragment.id();
tracing::debug!(fragment_id=?id, "inserted fragment to the pool");
FragmentLog::new(id, origin)
})
.collect();
self.logs.insert_all_pending(fragment_logs);

Expand Down
75 changes: 49 additions & 26 deletions jormungandr/src/fragment/process.rs
Expand Up @@ -18,7 +18,7 @@ use chrono::{Duration, DurationRound, Utc};
use futures::{future, TryFutureExt};
use thiserror::Error;
use tokio_stream::StreamExt;
use tracing::{span, Level};
use tracing::{debug_span, span, Level};
use tracing_futures::Instrument;

pub struct Process {
Expand Down Expand Up @@ -115,6 +115,7 @@ impl Process {
loop {
tokio::select! {
maybe_msg = input.next() => {
tracing::trace!("handling new fragment task item");
match maybe_msg {
None => break,
Some(msg) => match msg {
Expand All @@ -129,25 +130,32 @@ impl Process {
// This interface only makes sense for messages coming from arbitrary users (like transaction, certificates),
// for other message we don't want to receive them through this interface, and possibly
// put them in another pool.

let stats_counter = stats_counter.clone();

let summary = pool
.insert_and_propagate_all(origin, fragments, fail_fast)
.await?;

stats_counter.add_tx_recv_cnt(summary.accepted.len());

reply_handle.reply_ok(summary);
let span = debug_span!("incoming_fragments");
async {
let stats_counter = stats_counter.clone();
let summary = pool
.insert_and_propagate_all(origin, fragments, fail_fast)
.await?;

stats_counter.add_tx_recv_cnt(summary.accepted.len());

reply_handle.reply_ok(summary);
Ok::<(), Error>(())
}
.instrument(span)
.await?;
}
TransactionMsg::RemoveTransactions(fragment_ids, status) => {
tracing::debug!(
"removing fragments added to block {:?}: {:?}",
status,
fragment_ids
);
pool.remove_added_to_block(fragment_ids, status);
pool.remove_expired_txs().await;
let span = debug_span!("remove_transactions_in_block");
async {
tracing::debug!(
"removing fragments added to block {:?}: {:?}",
status,
fragment_ids
);
pool.remove_added_to_block(fragment_ids, status);
pool.remove_expired_txs().await;
}.instrument(span).await
}
TransactionMsg::GetLogs(reply_handle) => {
let logs = pool.logs().logs().cloned().collect();
Expand All @@ -163,6 +171,7 @@ impl Process {
reply_handle.reply_ok(statuses);
}
TransactionMsg::BranchSwitch(fork_date) => {
tracing::debug!(%fork_date, "pruning logs after branch switch");
pool.prune_after_ledger_branch(fork_date);
}
TransactionMsg::SelectTransactions {
Expand All @@ -173,7 +182,13 @@ impl Process {
soft_deadline_future,
hard_deadline_future,
} => {
let contents = pool
let span = span!(
Level::DEBUG,
"fragment_selection",
kind = "older_first",
);
async {
let contents = pool
.select(
ledger,
ledger_params,
Expand All @@ -182,17 +197,25 @@ impl Process {
hard_deadline_future,
)
.await;
reply_handle.reply_ok(contents);
reply_handle.reply_ok(contents);
}
.instrument(span)
.await
}
}
}
};
tracing::trace!("item handling finished");
}
_ = &mut wakeup => {
pool.close_persistent_log().await;
let dir = persistent_log_dir.as_ref().unwrap();
let file = open_log_file(dir.as_ref()).await?;
pool.set_persistent_log(file);
wakeup = Box::pin(hourly_wakeup(true));
async {
pool.close_persistent_log().await;
let dir = persistent_log_dir.as_ref().unwrap();
let file = open_log_file(dir.as_ref()).await?;
pool.set_persistent_log(file);
wakeup = Box::pin(hourly_wakeup(true));
Ok::<_, Error>(())
}
.instrument(debug_span!("persistent_log_rotation")).await?;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions jormungandr/src/leadership/process.rs
Expand Up @@ -323,7 +323,7 @@ impl Module {

let span = span!(
parent: self.service_info.span(),
Level::TRACE,
Level::DEBUG,
"action_run_entry",
event_date = %entry.event.date.to_string(),
event_start = %event_start.to_string(),
Expand Down Expand Up @@ -605,7 +605,7 @@ impl Module {
let parent_span = self.service_info.span();
let span = tracing::span!(
parent: parent_span,
Level::TRACE,
Level::DEBUG,
"action_schedule",
epoch_tip = epoch_tip.0,
current_epoch = current_slot_position.epoch.0,
Expand Down

0 comments on commit 2846e1d

Please sign in to comment.