diff --git a/src/burnchains/burnchain.rs b/src/burnchains/burnchain.rs index ae7dbcdc5a2..8d8c60b81d9 100644 --- a/src/burnchains/burnchain.rs +++ b/src/burnchains/burnchain.rs @@ -1291,32 +1291,36 @@ impl Burnchain { // TODO: don't re-process blocks. See if the block hash is already present in the burn db, // and if so, do nothing. let download_thread: thread::JoinHandle> = - thread::spawn(move || { - while let Ok(Some(ipc_header)) = downloader_recv.recv() { - debug!("Try recv next header"); - - let download_start = get_epoch_time_ms(); - let ipc_block = downloader.download(&ipc_header)?; - let download_end = get_epoch_time_ms(); - - debug!( - "Downloaded block {} in {}ms", - ipc_block.height(), - download_end.saturating_sub(download_start) - ); + thread::Builder::new() + .name("burnchain-downloader".to_string()) + .spawn(move || { + while let Ok(Some(ipc_header)) = downloader_recv.recv() { + debug!("Try recv next header"); + + let download_start = get_epoch_time_ms(); + let ipc_block = downloader.download(&ipc_header)?; + let download_end = get_epoch_time_ms(); + + debug!( + "Downloaded block {} in {}ms", + ipc_block.height(), + download_end.saturating_sub(download_start) + ); + parser_send + .send(Some(ipc_block)) + .map_err(|_e| burnchain_error::ThreadChannelError)?; + } parser_send - .send(Some(ipc_block)) + .send(None) .map_err(|_e| burnchain_error::ThreadChannelError)?; - } - parser_send - .send(None) - .map_err(|_e| burnchain_error::ThreadChannelError)?; - Ok(()) - }); + Ok(()) + }) + .unwrap(); - let parse_thread: thread::JoinHandle> = - thread::spawn(move || { + let parse_thread: thread::JoinHandle> = thread::Builder::new() + .name("burnchain-parser".to_string()) + .spawn(move || { while let Ok(Some(ipc_block)) = parser_recv.recv() { debug!("Try recv next block"); @@ -1338,34 +1342,38 @@ impl Burnchain { .send(None) .map_err(|_e| burnchain_error::ThreadChannelError)?; Ok(()) - }); + }) + .unwrap(); let db_thread: thread::JoinHandle> = - thread::spawn(move || { - let mut last_processed = burn_chain_tip; - while let Ok(Some(burnchain_block)) = db_recv.recv() { - debug!("Try recv next parsed block"); + thread::Builder::new() + .name("burnchain-db".to_string()) + .spawn(move || { + let mut last_processed = burn_chain_tip; + while let Ok(Some(burnchain_block)) = db_recv.recv() { + debug!("Try recv next parsed block"); + + if burnchain_block.block_height() == 0 { + continue; + } - if burnchain_block.block_height() == 0 { - continue; - } + let insert_start = get_epoch_time_ms(); + last_processed = + Burnchain::process_block(&myself, &mut burnchain_db, &burnchain_block)?; + if !coord_comm.announce_new_burn_block() { + return Err(burnchain_error::CoordinatorClosed); + } + let insert_end = get_epoch_time_ms(); - let insert_start = get_epoch_time_ms(); - last_processed = - Burnchain::process_block(&myself, &mut burnchain_db, &burnchain_block)?; - if !coord_comm.announce_new_burn_block() { - return Err(burnchain_error::CoordinatorClosed); + debug!( + "Inserted block {} in {}ms", + burnchain_block.block_height(), + insert_end.saturating_sub(insert_start) + ); } - let insert_end = get_epoch_time_ms(); - - debug!( - "Inserted block {} in {}ms", - burnchain_block.block_height(), - insert_end.saturating_sub(insert_start) - ); - } - Ok(last_processed) - }); + Ok(last_processed) + }) + .unwrap(); // feed the pipeline! let input_headers = indexer.read_headers(start_block + 1, end_block + 1)?; diff --git a/src/util/log.rs b/src/util/log.rs index a853933a76d..29e24040fb5 100644 --- a/src/util/log.rs +++ b/src/util/log.rs @@ -50,7 +50,10 @@ fn print_msg_header(mut rd: &mut dyn RecordDecorator, record: &Record) -> io::Re write!(rd, " ")?; write!(rd, "[{}:{}]", record.file(), record.line())?; write!(rd, " ")?; - write!(rd, "[{:?}]", thread::current().id())?; + match thread::current().name() { + None => write!(rd, "[{:?}]", thread::current().id())?, + Some(name) => write!(rd, "[{}]", name)?, + } rd.start_whitespace()?; write!(rd, " ")?; diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 0b838b0df9b..2ed12f0725b 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -492,150 +492,156 @@ fn spawn_peer( // microblock miner state let mut microblock_miner_state = None; - let server_thread = thread::spawn(move || { - let handler_args = RPCHandlerArgs { - exit_at_block_height: exit_at_block_height.as_ref(), - genesis_chainstate_hash: Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH) - .unwrap(), - ..RPCHandlerArgs::default() - }; + let server_thread = thread::Builder::new() + .name("p2p".to_string()) + .spawn(move || { + let handler_args = RPCHandlerArgs { + exit_at_block_height: exit_at_block_height.as_ref(), + genesis_chainstate_hash: Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH) + .unwrap(), + ..RPCHandlerArgs::default() + }; - let mut disconnected = false; - let mut num_p2p_state_machine_passes = 0; - let mut num_inv_sync_passes = 0; + let mut disconnected = false; + let mut num_p2p_state_machine_passes = 0; + let mut num_inv_sync_passes = 0; - while !disconnected { - let download_backpressure = results_with_data.len() > 0; - let poll_ms = if !download_backpressure && this.has_more_downloads() { - // keep getting those blocks -- drive the downloader state-machine - debug!( - "P2P: backpressure: {}, more downloads: {}", - download_backpressure, - this.has_more_downloads() - ); - 100 - } else { - cmp::min(poll_timeout, config.node.microblock_frequency) - }; + while !disconnected { + let download_backpressure = results_with_data.len() > 0; + let poll_ms = if !download_backpressure && this.has_more_downloads() { + // keep getting those blocks -- drive the downloader state-machine + debug!( + "P2P: backpressure: {}, more downloads: {}", + download_backpressure, + this.has_more_downloads() + ); + 100 + } else { + cmp::min(poll_timeout, config.node.microblock_frequency) + }; - // Mine microblocks. - // NOTE: this has to go *here* because control over who can refresh the unconfirmed - // state (or mutate it in general) *must* reside within the same thread as the p2p - // thread, so that the p2p thread's in-RAM MARF state stays in-sync with the DB. - // - // If you don't do this, you'll get runtime panics in the RPC code due to the network - // thread's in-RAM view of the unconfirmed chain state trie (namely, the rowid of the - // trie) being out-of-sync with what's actually in the DB. An attempt to read from - // the MARF may lead to a panic, because there may not be a trie on-disk with the - // network code's rowid any longer in the case where *some other thread* modifies the - // unconfirmed state trie and invalidates the network thread's in-RAM copy of the trie - // rowid. - // - // Fortunately, microblock-mining isn't a very CPU or I/O-intensive process, and the - // node operator can bound how expensive each microblock can be in order to limit the - // amount of time the microblock miner spends mining. - // - // Once the Clarity DB has been refactored to be safe to write to by multiple threads - // concurrently, then microblock mining can be moved to the relayer thread (where it - // really ought to occur, since microblock mining can be both CPU- and I/O-intensive). - let next_microblock = match try_mine_microblock( - &config, - &mut microblock_miner_state, - &mut chainstate, - &sortdb, - &mem_pool, - &coord_comms, - miner_tip_arc.clone(), - ) { - Ok(x) => x, - Err(e) => { - warn!("Failed to mine next microblock: {:?}", &e); - None - } - }; + // Mine microblocks. + // NOTE: this has to go *here* because control over who can refresh the unconfirmed + // state (or mutate it in general) *must* reside within the same thread as the p2p + // thread, so that the p2p thread's in-RAM MARF state stays in-sync with the DB. + // + // If you don't do this, you'll get runtime panics in the RPC code due to the network + // thread's in-RAM view of the unconfirmed chain state trie (namely, the rowid of the + // trie) being out-of-sync with what's actually in the DB. An attempt to read from + // the MARF may lead to a panic, because there may not be a trie on-disk with the + // network code's rowid any longer in the case where *some other thread* modifies the + // unconfirmed state trie and invalidates the network thread's in-RAM copy of the trie + // rowid. + // + // Fortunately, microblock-mining isn't a very CPU or I/O-intensive process, and the + // node operator can bound how expensive each microblock can be in order to limit the + // amount of time the microblock miner spends mining. + // + // Once the Clarity DB has been refactored to be safe to write to by multiple threads + // concurrently, then microblock mining can be moved to the relayer thread (where it + // really ought to occur, since microblock mining can be both CPU- and I/O-intensive). + let next_microblock = match try_mine_microblock( + &config, + &mut microblock_miner_state, + &mut chainstate, + &sortdb, + &mem_pool, + &coord_comms, + miner_tip_arc.clone(), + ) { + Ok(x) => x, + Err(e) => { + warn!("Failed to mine next microblock: {:?}", &e); + None + } + }; - let (canonical_consensus_tip, canonical_block_tip) = - SortitionDB::get_canonical_stacks_chain_tip_hash(sortdb.conn()) - .expect("Failed to read canonical stacks chain tip"); + let (canonical_consensus_tip, canonical_block_tip) = + SortitionDB::get_canonical_stacks_chain_tip_hash(sortdb.conn()) + .expect("Failed to read canonical stacks chain tip"); - let mut expected_attachments = match attachments_rx.try_recv() { - Ok(expected_attachments) => expected_attachments, - _ => { - debug!("Atlas: attachment channel is empty"); - HashSet::new() - } - }; + let mut expected_attachments = match attachments_rx.try_recv() { + Ok(expected_attachments) => expected_attachments, + _ => { + debug!("Atlas: attachment channel is empty"); + HashSet::new() + } + }; - let network_result = match this.run( - &sortdb, - &mut chainstate, - &mut mem_pool, - Some(&mut dns_client), - download_backpressure, - poll_ms, - &handler_args, - &mut expected_attachments, - ) { - Ok(res) => res, - Err(e) => { - error!("P2P: Failed to process network dispatch: {:?}", &e); - panic!(); - } - }; + let network_result = match this.run( + &sortdb, + &mut chainstate, + &mut mem_pool, + Some(&mut dns_client), + download_backpressure, + poll_ms, + &handler_args, + &mut expected_attachments, + ) { + Ok(res) => res, + Err(e) => { + error!("P2P: Failed to process network dispatch: {:?}", &e); + panic!(); + } + }; - if num_p2p_state_machine_passes < network_result.num_state_machine_passes { - // p2p state-machine did a full pass. Notify anyone listening. - sync_comms.notify_p2p_state_pass(); - num_p2p_state_machine_passes = network_result.num_state_machine_passes; - } + if num_p2p_state_machine_passes < network_result.num_state_machine_passes { + // p2p state-machine did a full pass. Notify anyone listening. + sync_comms.notify_p2p_state_pass(); + num_p2p_state_machine_passes = network_result.num_state_machine_passes; + } - if num_inv_sync_passes < network_result.num_inv_sync_passes { - // inv-sync state-machine did a full pass. Notify anyone listening. - sync_comms.notify_inv_sync_pass(); - num_inv_sync_passes = network_result.num_inv_sync_passes; - } + if num_inv_sync_passes < network_result.num_inv_sync_passes { + // inv-sync state-machine did a full pass. Notify anyone listening. + sync_comms.notify_inv_sync_pass(); + num_inv_sync_passes = network_result.num_inv_sync_passes; + } - if network_result.has_data_to_store() { - results_with_data.push_back(RelayerDirective::HandleNetResult(network_result)); - } - if let Some(microblock) = next_microblock { - results_with_data.push_back(RelayerDirective::BroadcastMicroblock( - canonical_consensus_tip, - canonical_block_tip, - microblock, - )); - } + if network_result.has_data_to_store() { + results_with_data.push_back(RelayerDirective::HandleNetResult(network_result)); + } + if let Some(microblock) = next_microblock { + results_with_data.push_back(RelayerDirective::BroadcastMicroblock( + canonical_consensus_tip, + canonical_block_tip, + microblock, + )); + } - while let Some(next_result) = results_with_data.pop_front() { - // have blocks, microblocks, and/or transactions (don't care about anything else), - if let Err(e) = relay_channel.try_send(next_result) { - debug!( - "P2P: {:?}: download backpressure detected", - &this.local_peer - ); - match e { - TrySendError::Full(directive) => { - // don't lose this data -- just try it again - results_with_data.push_front(directive); - break; - } - TrySendError::Disconnected(_) => { - info!("P2P: Relayer hang up with p2p channel"); - disconnected = true; - break; + while let Some(next_result) = results_with_data.pop_front() { + // have blocks, microblocks, and/or transactions (don't care about anything else), + if let Err(e) = relay_channel.try_send(next_result) { + debug!( + "P2P: {:?}: download backpressure detected", + &this.local_peer + ); + match e { + TrySendError::Full(directive) => { + // don't lose this data -- just try it again + results_with_data.push_front(directive); + break; + } + TrySendError::Disconnected(_) => { + info!("P2P: Relayer hang up with p2p channel"); + disconnected = true; + break; + } } + } else { + debug!("P2P: Dispatched result to Relayer!"); } - } else { - debug!("P2P: Dispatched result to Relayer!"); } } - } - debug!("P2P thread exit!"); - }); + debug!("P2P thread exit!"); + }) + .unwrap(); - let _jh = thread::spawn(move || { - dns_resolver.thread_main(); - }); + let _jh = thread::Builder::new() + .name("dns-resolver".to_string()) + .spawn(move || { + dns_resolver.thread_main(); + }) + .unwrap(); Ok(server_thread) } @@ -683,7 +689,7 @@ fn spawn_miner_relayer( let mut bitcoin_controller = BitcoinRegtestController::new_dummy(config.clone()); - let _relayer_handle = thread::spawn(move || { + let _relayer_handle = thread::Builder::new().name("relayer".to_string()).spawn(move || { let mut did_register_key = false; let mut key_registered_at_block = 0; while let Ok(mut directive) = relay_channel.recv() { @@ -899,7 +905,7 @@ fn spawn_miner_relayer( } } debug!("Relayer exit!"); - }); + }).unwrap(); Ok(()) } diff --git a/testnet/stacks-node/src/run_loop/neon.rs b/testnet/stacks-node/src/run_loop/neon.rs index a3920e0df22..ab04b3dbc65 100644 --- a/testnet/stacks-node/src/run_loop/neon.rs +++ b/testnet/stacks-node/src/run_loop/neon.rs @@ -227,16 +227,19 @@ impl RunLoop { let atlas_config = AtlasConfig::default(); let moved_atlas_config = atlas_config.clone(); - thread::spawn(move || { - ChainsCoordinator::run( - chain_state_db, - coordinator_burnchain_config, - attachments_tx, - &mut coordinator_dispatcher, - coordinator_receivers, - moved_atlas_config, - ); - }); + thread::Builder::new() + .name("chains-coordinator".to_string()) + .spawn(move || { + ChainsCoordinator::run( + chain_state_db, + coordinator_burnchain_config, + attachments_tx, + &mut coordinator_dispatcher, + coordinator_receivers, + moved_atlas_config, + ); + }) + .unwrap(); let mut burnchain_tip = burnchain.wait_for_sortitions(None); @@ -287,9 +290,12 @@ impl RunLoop { let prometheus_bind = self.config.node.prometheus_bind.clone(); if let Some(prometheus_bind) = prometheus_bind { - thread::spawn(move || { - start_serving_monitoring_metrics(prometheus_bind); - }); + thread::Builder::new() + .name("prometheus".to_string()) + .spawn(move || { + start_serving_monitoring_metrics(prometheus_bind); + }) + .unwrap(); } let mut block_height = 1.max(burnchain_config.first_block_height);