diff --git a/jormungandr/src/blockchain/process.rs b/jormungandr/src/blockchain/process.rs index 72099f4216..951328ce08 100644 --- a/jormungandr/src/blockchain/process.rs +++ b/jormungandr/src/blockchain/process.rs @@ -201,16 +201,16 @@ impl Process { let network_msg_box = self.network_msgbox.clone(); let explorer_msg_box = self.explorer_msgbox.clone(); let stats_counter = self.stats_counter.clone(); - + tracing::trace!("handling new blockchain task item"); match input { BlockMsg::LeadershipBlock(leadership_block) => { let span = span!( parent: self.service_info.span(), - Level::TRACE, + Level::DEBUG, "process_leadership_block", - hash = %leadership_block.block.header.hash().to_string(), - parent = %leadership_block.block.header.parent_id().to_string(), - date = %leadership_block.block.header.block_date().to_string() + hash = %leadership_block.block.header.hash(), + parent = %leadership_block.block.header.parent_id(), + date = %leadership_block.block.header.block_date() ); let _enter = span.enter(); tracing::info!("receiving block from leadership service"); @@ -231,12 +231,12 @@ impl Process { BlockMsg::AnnouncedBlock(header, node_id) => { let span = span!( parent: self.service_info.span(), - Level::TRACE, + Level::DEBUG, "process_announced_block", - hash = %header.hash().to_string(), - parent = %header.parent_id().to_string(), - date = %header.block_date().to_string(), - peer = %node_id.to_string() + hash = %header.hash(), + parent = %header.parent_id(), + date = %header.block_date(), + peer = %node_id ); let _enter = span.enter(); tracing::info!("received block announcement from network"); @@ -251,10 +251,17 @@ impl Process { node_id, self.pull_headers_scheduler.clone(), self.get_next_block_scheduler.clone(), - ), + ) + .instrument(span.clone()), ) } BlockMsg::NetworkBlocks(handle) => { + let span = span!( + parent: self.service_info.span(), + Level::DEBUG, + "process_network_blocks", + ); + let _guard = span.enter(); tracing::info!("receiving block stream from network"); self.service_info.timeout_spawn_fallible( @@ -268,13 +275,14 @@ impl Process { self.get_next_block_scheduler.clone(), handle, stats_counter, - ), + ) + .instrument(span.clone()), ); } BlockMsg::ChainHeaders(handle) => { - tracing::info!("receiving header stream from network"); - let span = span!(parent: self.service_info.span(), Level::TRACE, "process_chain_headers", sub_task = "chain_pull"); + let span = span!(parent: self.service_info.span(), Level::DEBUG, "process_chain_headers", sub_task = "chain_pull"); let _enter = span.enter(); + tracing::info!("receiving header stream from network"); self.service_info.timeout_spawn( "process network headers", @@ -284,10 +292,12 @@ impl Process { handle, self.pull_headers_scheduler.clone(), network_msg_box, - ), + ) + .instrument(span.clone()), ); } } + tracing::trace!("item handling finished"); } fn start_garbage_collector(&self, info: &TokioServiceInfo) { @@ -312,22 +322,29 @@ async fn process_and_propagate_new_ref( mut network_msg_box: MessageBox, ) -> chain::Result<()> { let header = new_block_ref.header().clone(); - tracing::debug!("processing the new block and propagating"); - // Even if this fails because the queue is full we periodically recompute the tip - tip_update_mbox - .try_send(new_block_ref) - .unwrap_or_else(|err| { - tracing::error!( - "cannot send new ref to be evaluated as candidate tip: {}", - err - ) - }); + let span = span!(Level::DEBUG, "process_and_propagate_new_ref", block = %header.hash()); + + async { + tracing::debug!("processing the new block and propagating"); + // Even if this fails because the queue is full we periodically recompute the tip + tip_update_mbox + .try_send(new_block_ref) + .unwrap_or_else(|err| { + tracing::error!( + "cannot send new ref to be evaluated as candidate tip: {}", + err + ) + }); - tracing::debug!("propagating block to the network"); + tracing::debug!("propagating block to the network"); - network_msg_box - .send(NetworkMsg::Propagate(PropagateMsg::Block(header))) - .await?; + network_msg_box + .send(NetworkMsg::Propagate(PropagateMsg::Block(header))) + .await?; + Ok::<(), Error>(()) + } + .instrument(span) + .await?; Ok(()) } @@ -482,38 +499,41 @@ async fn process_network_block( explorer_msg_box: Option<&mut MessageBox>, get_next_block_scheduler: &mut GetNextBlockScheduler, ) -> Result>, chain::Error> { - get_next_block_scheduler - .declare_completed(block.id()) - .unwrap_or_else( - |e| tracing::error!(reason = ?e, "get next block schedule completion failed"), - ); let header = block.header(); - let pre_checked = blockchain.pre_check_header(header, false).await?; - match pre_checked { - PreCheckedHeader::AlreadyPresent { header, .. } => { - tracing::debug!( - hash = %header.hash(), - parent = %header.parent_id(), - date = %header.block_date(), - "block is already present" - ); - Ok(None) - } - PreCheckedHeader::MissingParent { header, .. } => { - let parent_hash = header.parent_id(); - tracing::debug!( - hash = %header.hash(), - parent = %parent_hash, - date = %header.block_date(), - "block is missing a locally stored parent" + let span = tracing::span!( + Level::DEBUG, + "network_block", + block = %header.hash(), + parent = %header.parent_id(), + date = %header.block_date(), + ); + + async { + get_next_block_scheduler + .declare_completed(block.id()) + .unwrap_or_else( + |e| tracing::error!(reason = ?e, "get next block schedule completion failed"), ); - Err(Error::MissingParentBlock(parent_hash)) - } - PreCheckedHeader::HeaderWithCache { parent_ref, .. } => { - let r = check_and_apply_block(blockchain, parent_ref, block, explorer_msg_box).await; - r + let pre_checked = blockchain.pre_check_header(header, false).await?; + match pre_checked { + PreCheckedHeader::AlreadyPresent { .. } => { + tracing::debug!("block is already present"); + Ok(None) + } + PreCheckedHeader::MissingParent { header } => { + let parent_hash = header.parent_id(); + tracing::debug!("block is missing a locally stored parent"); + Err(Error::MissingParentBlock(parent_hash)) + } + PreCheckedHeader::HeaderWithCache { parent_ref, .. } => { + let r = + check_and_apply_block(blockchain, parent_ref, block, explorer_msg_box).await; + r + } } } + .instrument(span) + .await } async fn check_and_apply_block( @@ -526,14 +546,7 @@ async fn check_and_apply_block( let post_checked = blockchain .post_check_header(block.header(), parent_ref, CheckHeaderProof::Enabled) .await?; - let header = post_checked.header(); - let block_hash = header.hash(); - tracing::debug!( - hash = %block_hash, - parent = %header.parent_id(), - date = %header.block_date(), - "applying block to storage" - ); + tracing::debug!("applying block to storage"); let mut block_for_explorer = if explorer_enabled { Some(block.clone()) } else { @@ -543,13 +556,7 @@ async fn check_and_apply_block( .apply_and_store_block(post_checked, block) .await?; if let AppliedBlock::New(block_ref) = applied_block { - let header = block_ref.header(); - tracing::debug!( - hash = %block_hash, - parent = %header.parent_id(), - date = %header.block_date(), - "applied block to storage" - ); + tracing::debug!("applied block to storage"); if let Some(msg_box) = explorer_msg_box { msg_box .try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap())) @@ -557,10 +564,7 @@ async fn check_and_apply_block( } Ok(Some(block_ref)) } else { - tracing::debug!( - hash = %block_hash, - "block is already present in storage, not applied" - ); + tracing::debug!("block is already present in storage, not applied"); Ok(None) } } diff --git a/jormungandr/src/fragment/process.rs b/jormungandr/src/fragment/process.rs index 627f854e90..82430fa283 100644 --- a/jormungandr/src/fragment/process.rs +++ b/jormungandr/src/fragment/process.rs @@ -18,7 +18,7 @@ use chrono::{Duration, DurationRound, Utc}; use futures::{future, TryFutureExt}; use thiserror::Error; use tokio_stream::StreamExt; -use tracing::{span, Level}; +use tracing::{debug_span, span, Level}; use tracing_futures::Instrument; pub struct Process { @@ -115,6 +115,7 @@ impl Process { loop { tokio::select! { maybe_msg = input.next() => { + tracing::trace!("handling new fragment task item"); match maybe_msg { None => break, Some(msg) => match msg { @@ -129,25 +130,32 @@ impl Process { // This interface only makes sense for messages coming from arbitrary users (like transaction, certificates), // for other message we don't want to receive them through this interface, and possibly // put them in another pool. - - let stats_counter = stats_counter.clone(); - - let summary = pool - .insert_and_propagate_all(origin, fragments, fail_fast) - .await?; - - stats_counter.add_tx_recv_cnt(summary.accepted.len()); - - reply_handle.reply_ok(summary); + let span = debug_span!("incoming_fragments"); + async { + let stats_counter = stats_counter.clone(); + let summary = pool + .insert_and_propagate_all(origin, fragments, fail_fast) + .await?; + + stats_counter.add_tx_recv_cnt(summary.accepted.len()); + + reply_handle.reply_ok(summary); + Ok::<(), Error>(()) + } + .instrument(span) + .await?; } TransactionMsg::RemoveTransactions(fragment_ids, status) => { - tracing::debug!( - "removing fragments added to block {:?}: {:?}", - status, - fragment_ids - ); - pool.remove_added_to_block(fragment_ids, status); - pool.remove_expired_txs().await; + let span = debug_span!("remove_transactions_in_block"); + async { + tracing::debug!( + "removing fragments added to block {:?}: {:?}", + status, + fragment_ids + ); + pool.remove_added_to_block(fragment_ids, status); + pool.remove_expired_txs().await; + }.instrument(span).await } TransactionMsg::GetLogs(reply_handle) => { let logs = pool.logs().logs().cloned().collect(); @@ -163,6 +171,7 @@ impl Process { reply_handle.reply_ok(statuses); } TransactionMsg::BranchSwitch(fork_date) => { + tracing::debug!(%fork_date, "pruning logs after branch switch"); pool.prune_after_ledger_branch(fork_date); } TransactionMsg::SelectTransactions { @@ -173,7 +182,13 @@ impl Process { soft_deadline_future, hard_deadline_future, } => { - let contents = pool + let span = span!( + Level::DEBUG, + "fragment_selection", + kind = "older_first", + ); + async { + let contents = pool .select( ledger, ledger_params, @@ -182,17 +197,25 @@ impl Process { hard_deadline_future, ) .await; - reply_handle.reply_ok(contents); + reply_handle.reply_ok(contents); + } + .instrument(span) + .await } } - } + }; + tracing::trace!("item handling finished"); } _ = &mut wakeup => { - pool.close_persistent_log().await; - let dir = persistent_log_dir.as_ref().unwrap(); - let file = open_log_file(dir.as_ref()).await?; - pool.set_persistent_log(file); - wakeup = Box::pin(hourly_wakeup(true)); + async { + pool.close_persistent_log().await; + let dir = persistent_log_dir.as_ref().unwrap(); + let file = open_log_file(dir.as_ref()).await?; + pool.set_persistent_log(file); + wakeup = Box::pin(hourly_wakeup(true)); + Ok::<_, Error>(()) + } + .instrument(debug_span!("persistent_log_rotation")).await?; } } } diff --git a/jormungandr/src/leadership/process.rs b/jormungandr/src/leadership/process.rs index 89746add00..a5ec8836e3 100644 --- a/jormungandr/src/leadership/process.rs +++ b/jormungandr/src/leadership/process.rs @@ -323,7 +323,7 @@ impl Module { let span = span!( parent: self.service_info.span(), - Level::TRACE, + Level::DEBUG, "action_run_entry", event_date = %entry.event.date.to_string(), event_start = %event_start.to_string(), @@ -605,7 +605,7 @@ impl Module { let parent_span = self.service_info.span(); let span = tracing::span!( parent: parent_span, - Level::TRACE, + Level::DEBUG, "action_schedule", epoch_tip = epoch_tip.0, current_epoch = current_slot_position.epoch.0, diff --git a/jormungandr/src/network/mod.rs b/jormungandr/src/network/mod.rs index 145531a6c3..a2598c9002 100644 --- a/jormungandr/src/network/mod.rs +++ b/jormungandr/src/network/mod.rs @@ -83,7 +83,7 @@ use crate::utils::async_msg::{MessageBox, MessageQueue}; use chain_network::data::NodeKeyPair; use rand::seq::SliceRandom; use tonic::transport; -use tracing::{span, Level, Span}; +use tracing::{field, instrument, span, Level, Span}; use tracing_futures::Instrument; use std::collections::HashSet; @@ -310,6 +310,7 @@ async fn handle_network_input( channels: Channels, ) { while let Some(msg) = input.next().await { + tracing::trace!("handling new network task item"); match msg { NetworkMsg::Propagate(msg) => { handle_propagation_msg(msg, state.clone(), channels.clone()) @@ -337,7 +338,8 @@ async fn handle_network_input( NetworkMsg::PeerInfo(reply) => { state.peers.infos().map(|infos| reply.reply_ok(infos)).await; } - } + }; + tracing::trace!("item handling finished"); } } @@ -362,90 +364,94 @@ where // and ids. let mut res = Vec::new(); for peer in peers { - if f(peer.address(), arg.clone()).await.is_err() { + if f(peer.address(), arg.clone()) + .instrument(span!(Level::DEBUG, "p2p_comm", peer = %peer.address())) + .await + .is_err() + { res.push(peer); } } Ok(res) } +#[instrument(level = "debug", skip(msg, state, channels), fields(peer = field::Empty, hash = field::Empty))] async fn handle_propagation_msg( msg: PropagateMsg, state: GlobalStateR, mut channels: Channels, ) -> Result<(), PropagateError> { use poldercast::layer::Selection; - async { - let prop_state = state.clone(); - let unreached_nodes = match &msg { - PropagateMsg::Block(header) => { - tracing::debug!(hash = %header.hash(), "block to propagate"); - let header = header.encode(); - propagate_message( - |addr, header| prop_state.peers.propagate_block(addr, header), - Selection::Topic { - topic: crate::topology::topic::BLOCKS, - }, - header, - &mut channels.topology_box, - ) - .await? - } - PropagateMsg::Fragment(fragment) => { - tracing::debug!(hash = %fragment.hash(), "fragment to propagate"); - let fragment = fragment.encode(); - propagate_message( - |addr, fragment| prop_state.peers.propagate_fragment(addr, fragment), - Selection::Topic { - topic: crate::topology::topic::MESSAGES, - }, - fragment, - &mut channels.topology_box, - ) - .await? + let prop_state = state.clone(); + let unreached_nodes = match &msg { + PropagateMsg::Block(header) => { + Span::current().record("hash", &format_args!("{}", header.description())); + tracing::debug!("received new block to propagate"); + let header = header.encode(); + propagate_message( + |addr, header| prop_state.peers.propagate_block(addr, header), + Selection::Topic { + topic: crate::topology::topic::BLOCKS, + }, + header, + &mut channels.topology_box, + ) + .await? + } + PropagateMsg::Fragment(fragment) => { + Span::current().record("hash", &format_args!("{}", fragment.hash())); + tracing::debug!(hash = %fragment.hash(), "fragment to propagate"); + let fragment = fragment.encode(); + propagate_message( + |addr, fragment| prop_state.peers.propagate_fragment(addr, fragment), + Selection::Topic { + topic: crate::topology::topic::MESSAGES, + }, + fragment, + &mut channels.topology_box, + ) + .await? + } + PropagateMsg::Gossip(peer, gossips) => { + Span::current().record("peer", &peer.address().to_string().as_str()); + tracing::debug!("gossip to propagate"); + let gossip = gossips.encode(); + match prop_state + .peers + .propagate_gossip_to(peer.address(), gossip) + .await + { + Err(_) => vec![peer.clone()], + Ok(_) => Vec::new(), } - PropagateMsg::Gossip(peer, gossips) => { - tracing::debug!("gossip to propagate"); - let gossip = gossips.encode(); - match prop_state - .peers - .propagate_gossip_to(peer.address(), gossip) - .await - { - Err(_) => vec![peer.clone()], - Ok(_) => Vec::new(), + } + }; + // If any nodes selected for propagation are not in the + // active subscriptions map, connect to them and deliver + // the item. + if !unreached_nodes.is_empty() { + tracing::debug!( + "will try to connect to the peers not immediately reachable for propagation: {:?}", + unreached_nodes, + ); + for peer in unreached_nodes { + let mut options = p2p::comm::ConnectOptions::default(); + match &msg { + PropagateMsg::Block(header) => { + options.pending_block_announcement = Some(header.encode()); } - } - }; - // If any nodes selected for propagation are not in the - // active subscriptions map, connect to them and deliver - // the item. - if !unreached_nodes.is_empty() { - tracing::debug!( - "will try to connect to the peers not immediately reachable for propagation: {:?}", - unreached_nodes, - ); - for peer in unreached_nodes { - let mut options = p2p::comm::ConnectOptions::default(); - match &msg { - PropagateMsg::Block(header) => { - options.pending_block_announcement = Some(header.encode()); - } - PropagateMsg::Fragment(fragment) => { - options.pending_fragment = Some(fragment.encode()); - } - PropagateMsg::Gossip(_, gossip) => { - options.pending_gossip = Some(gossip.encode()); - } - }; - let (addr, id) = (peer.address(), peer.id()); - connect_and_propagate(addr, id, state.clone(), channels.clone(), options); - } + PropagateMsg::Fragment(fragment) => { + options.pending_fragment = Some(fragment.encode()); + } + PropagateMsg::Gossip(_, gossip) => { + options.pending_gossip = Some(gossip.encode()); + } + }; + let (addr, id) = (peer.address(), peer.id()); + connect_and_propagate(addr, id, state.clone(), channels.clone(), options); } - Ok(()) } - .instrument(state.span.clone()) - .await + Ok(()) } // node_id should be missing only for trusted peer for which we do not know @@ -467,13 +473,12 @@ fn connect_and_propagate( } drop(_enter); let peer = Peer::new(node_addr); - let conn_span = span!(parent: &state.span, Level::TRACE, "peer", node = %node_addr); - let _enter = conn_span.enter(); - let conn_state = ConnectionState::new(state.clone(), &peer, conn_span.clone()); - tracing::info!("connecting to peer"); - let (handle, connecting) = client::connect(conn_state, channels.clone()); + let conn_span = span!(parent: &state.span, Level::DEBUG, "client", addr = %node_addr); let spawn_state = state.clone(); let cf = async move { + let conn_state = ConnectionState::new(state.clone(), &peer, Span::current()); + tracing::info!("connecting to peer"); + let (handle, connecting) = client::connect(conn_state, channels.clone()); state.peers.add_connecting(node_addr, handle, options).await; match connecting.await { Err(e) => { @@ -526,7 +531,7 @@ fn connect_and_propagate( } } } - .instrument(conn_span.clone()); + .instrument(conn_span); spawn_state.spawn(cf); } @@ -561,7 +566,7 @@ async fn netboot_peers(config: &Configuration, parent_span: &Span) -> BootstrapP for tpeer in &trusted_peers_addrs { let span = span!( parent: parent_span, - Level::TRACE, + Level::DEBUG, "netboot_peers", peer_addr = %tpeer.to_string() ); @@ -647,13 +652,14 @@ pub async fn bootstrap( for peer in &bootstrap_peers { let span = - span!(parent: span, Level::TRACE, "bootstrap", peer_addr = %peer.address().to_string()); + span!(parent: span, Level::DEBUG, "bootstrap", peer_addr = %peer.address().to_string()); let res = bootstrap::bootstrap_from_peer( &Peer::new(peer.address()), blockchain.clone(), branch.clone(), cancellation_token.clone(), ) + .instrument(span.clone()) .await; match res { @@ -724,7 +730,7 @@ pub async fn fetch_block( let mut block = None; - let span = span!(Level::TRACE, "fetch_block", block = %hash.to_string()); + let span = span!(Level::DEBUG, "fetch_block", block = %hash.to_string()); async { for address in trusted_peers_shuffled(config) { let peer_span = span!(Level::TRACE, "peer_address", address = %address.to_string()); diff --git a/jormungandr/src/topology/process.rs b/jormungandr/src/topology/process.rs index 4044b268e2..659006cf05 100644 --- a/jormungandr/src/topology/process.rs +++ b/jormungandr/src/topology/process.rs @@ -66,6 +66,7 @@ impl Process { loop { tokio::select! { Some(input) = self.input.next() => { + tracing::trace!("handling new topology task item"); match input { TopologyMsg::AcceptGossip(gossip) => { self.topology.accept_gossips(gossip); @@ -86,8 +87,11 @@ impl Process { handle.reply_ok(self.topology.list_quarantined()) } } + tracing::trace!("item handling finished"); }, _ = self.gossip_interval.tick() => { + let span = tracing::debug_span!("generating_gossip", task = "topology"); + let _guard = span.enter(); self.topology.update_gossip(); let view = self.topology.view(poldercast::layer::Selection::Any); if view.peers.is_empty() { @@ -97,6 +101,8 @@ impl Process { self.send_gossip_messages(view.peers) } _ = quarantine_check.tick() => { + let span = tracing::debug_span!("quarantine_check", task = "topology"); + let _guard = span.enter(); // Even if lifted from quarantine, peers will be re-added to the topology // only after we receive a gossip about them. let mut nodes_to_contact = self.topology.lift_reports();