diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 0cfdcc71402778..2008e489c5b2e5 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -460,6 +460,8 @@ impl RetransmitStage { ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); + // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 + let _retransmit_sender = retransmit_sender.clone(); let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); let thread_hdls = retransmitter( @@ -598,6 +600,7 @@ mod tests { let cluster_info = Arc::new(cluster_info); let (retransmit_sender, retransmit_receiver) = channel(); + let _retransmit_sender = retransmit_sender.clone(); let _t_retransmit = retransmitter( retransmit_socket, bank_forks, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 983d01ffa1baf5..522eca0f8cea33 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -258,6 +258,7 @@ fn run_insert( metrics: &mut BlockstoreInsertionMetrics, ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: &CompletedDataSetsSender, + retransmit_sender: &Sender>, outstanding_requests: &RwLock, ) -> Result<()> where @@ -285,7 +286,8 @@ where shreds, repairs, Some(leader_schedule_cache), - false, + false, // is_trusted + Some(retransmit_sender), &handle_duplicate, metrics, )?; @@ -473,6 +475,7 @@ impl WindowService { insert_receiver, duplicate_sender, completed_data_sets_sender, + retransmit_sender.clone(), outstanding_requests, ); @@ -534,6 +537,7 @@ impl WindowService { insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, + retransmit_sender: Sender>, outstanding_requests: Arc>, ) -> JoinHandle<()> { let mut handle_timeout = || {}; @@ -563,6 +567,7 @@ impl WindowService { &mut metrics, &mut ws_metrics, &completed_data_sets_sender, + &retransmit_sender, &outstanding_requests, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e313a1f4f71208..a97359155756a7 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -54,7 +54,7 @@ use { rc::Rc, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, + mpsc::{sync_channel, Receiver, Sender, SyncSender, TrySendError}, Arc, Mutex, RwLock, RwLockWriteGuard, }, time::Instant, @@ -800,6 +800,7 @@ impl Blockstore { is_repaired: Vec, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, + retransmit_sender: Option<&Sender>>, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, ) -> Result<(Vec, Vec)> @@ -811,7 +812,7 @@ impl Blockstore { let mut start = Measure::start("Blockstore lock"); let _lock = self.insert_shreds_lock.lock().unwrap(); start.stop(); - let insert_lock_elapsed = start.as_us(); + metrics.insert_lock_elapsed += start.as_us(); let db = &*self.db; let mut write_batch = db.batch()?; @@ -822,66 +823,56 @@ impl Blockstore { let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); - let num_shreds = shreds.len(); + metrics.num_shreds += shreds.len(); let mut start = Measure::start("Shred insertion"); - let mut num_inserted = 0; let mut index_meta_time = 0; let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); - shreds - .into_iter() - .zip(is_repaired.into_iter()) - .enumerate() - .for_each(|(i, (shred, is_repaired))| { - if shred.is_data() { - let shred_source = if is_repaired { - ShredSource::Repaired - } else { - ShredSource::Turbine - }; - if let Ok(completed_data_sets) = self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - handle_duplicate, - leader_schedule, - shred_source, - ) { - newly_completed_data_sets.extend(completed_data_sets); - inserted_indices.push(i); - num_inserted += 1; - } - } else if shred.is_code() { - self.check_cache_coding_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut just_inserted_coding_shreds, - &mut index_meta_time, - handle_duplicate, - is_trusted, - is_repaired, - ); + for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() { + if shred.is_data() { + let shred_source = if is_repaired { + ShredSource::Repaired } else { - panic!("There should be no other case"); + ShredSource::Turbine + }; + if let Ok(completed_data_sets) = self.check_insert_data_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + handle_duplicate, + leader_schedule, + shred_source, + ) { + newly_completed_data_sets.extend(completed_data_sets); + inserted_indices.push(i); + metrics.num_inserted += 1; } - }); + } else if shred.is_code() { + self.check_cache_coding_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut just_inserted_coding_shreds, + &mut index_meta_time, + handle_duplicate, + is_trusted, + is_repaired, + ); + } else { + panic!("There should be no other case"); + } + } start.stop(); - let insert_shreds_elapsed = start.as_us(); + metrics.insert_shreds_elapsed += start.as_us(); let mut start = Measure::start("Shred recovery"); - let mut num_recovered = 0; - let mut num_recovered_inserted = 0; - let mut num_recovered_failed_sig = 0; - let mut num_recovered_failed_invalid = 0; - let mut num_recovered_exists = 0; if let Some(leader_schedule_cache) = leader_schedule { - let recovered_data = Self::try_shred_recovery( + let recovered_shreds = Self::try_shred_recovery( db, &erasure_metas, &mut index_working_set, @@ -889,62 +880,73 @@ impl Blockstore { &mut just_inserted_coding_shreds, ); - num_recovered = recovered_data.len(); - recovered_data.into_iter().for_each(|shred| { - if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { - if shred.verify(&leader) { - match self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - &handle_duplicate, - leader_schedule, - ShredSource::Recovered, - ) { - Err(InsertDataShredError::Exists) => { - num_recovered_exists += 1; - } - Err(InsertDataShredError::InvalidShred) => { - num_recovered_failed_invalid += 1; - } - Err(InsertDataShredError::BlockstoreError(_)) => {} - Ok(completed_data_sets) => { - newly_completed_data_sets.extend(completed_data_sets); - num_recovered_inserted += 1; - } + metrics.num_recovered += recovered_shreds.len(); + let recovered_shreds: Vec<_> = recovered_shreds + .into_iter() + .filter_map(|shred| { + let leader = + leader_schedule_cache.slot_leader_at(shred.slot(), /*bank=*/ None)?; + if !shred.verify(&leader) { + metrics.num_recovered_failed_sig += 1; + return None; + } + match self.check_insert_data_shred( + shred.clone(), + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + &handle_duplicate, + leader_schedule, + ShredSource::Recovered, + ) { + Err(InsertDataShredError::Exists) => { + metrics.num_recovered_exists += 1; + None + } + Err(InsertDataShredError::InvalidShred) => { + metrics.num_recovered_failed_invalid += 1; + None + } + Err(InsertDataShredError::BlockstoreError(_)) => None, + Ok(completed_data_sets) => { + newly_completed_data_sets.extend(completed_data_sets); + metrics.num_recovered_inserted += 1; + Some(shred) } - } else { - num_recovered_failed_sig += 1; } + }) + // Always collect recovered-shreds so that above insert code is + // executed even if retransmit-sender is None. + .collect(); + if !recovered_shreds.is_empty() { + if let Some(retransmit_sender) = retransmit_sender { + let _ = retransmit_sender.send(recovered_shreds); } - }); + } } start.stop(); - let shred_recovery_elapsed = start.as_us(); + metrics.shred_recovery_elapsed += start.as_us(); - just_inserted_coding_shreds - .into_iter() - .for_each(|((_, _), shred)| { - self.check_insert_coding_shred( - shred, - &mut index_working_set, - &mut write_batch, - &mut index_meta_time, - ); - num_inserted += 1; - }); + metrics.num_inserted += just_inserted_coding_shreds.len() as u64; + for ((_, _), shred) in just_inserted_coding_shreds { + self.check_insert_coding_shred( + shred, + &mut index_working_set, + &mut write_batch, + &mut index_meta_time, + ); + } let mut start = Measure::start("Shred recovery"); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?; start.stop(); - let chaining_elapsed = start.as_us(); + metrics.chaining_elapsed += start.as_us(); let mut start = Measure::start("Commit Working Sets"); let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( @@ -963,12 +965,12 @@ impl Blockstore { } } start.stop(); - let commit_working_sets_elapsed = start.as_us(); + metrics.commit_working_sets_elapsed += start.as_us(); let mut start = Measure::start("Write Batch"); self.db.write(write_batch)?; start.stop(); - let write_batch_elapsed = start.as_us(); + metrics.write_batch_elapsed += start.as_us(); send_signals( &self.new_shreds_signals, @@ -979,20 +981,7 @@ impl Blockstore { total_start.stop(); - metrics.num_shreds += num_shreds; metrics.total_elapsed += total_start.as_us(); - metrics.insert_lock_elapsed += insert_lock_elapsed; - metrics.insert_shreds_elapsed += insert_shreds_elapsed; - metrics.shred_recovery_elapsed += shred_recovery_elapsed; - metrics.chaining_elapsed += chaining_elapsed; - metrics.commit_working_sets_elapsed += commit_working_sets_elapsed; - metrics.write_batch_elapsed += write_batch_elapsed; - metrics.num_inserted += num_inserted; - metrics.num_recovered += num_recovered; - metrics.num_recovered_inserted += num_recovered_inserted; - metrics.num_recovered_failed_sig += num_recovered_failed_sig; - metrics.num_recovered_failed_invalid = num_recovered_failed_invalid; - metrics.num_recovered_exists = num_recovered_exists; metrics.index_meta_time += index_meta_time; Ok((newly_completed_data_sets, inserted_indices)) @@ -1034,7 +1023,8 @@ impl Blockstore { vec![false; shreds_len], leader_schedule, is_trusted, - &|_| {}, + None, // retransmit-sender + &|_| {}, // handle-duplicates &mut BlockstoreInsertionMetrics::default(), ) }