Skip to content

Commit

Permalink
retransmits shreds recovered from erasure codes
Browse files Browse the repository at this point in the history
Shreds recovered from erasure codes have not been received from turbine
and have not been retransmitted to other nodes downstream. This results
in more repairs across the cluster which is slower.

This commit channels through recovered shreds to retransmit stage in
order to further broadcast the shreds to downstream nodes in the tree.
  • Loading branch information
behzadnouri committed Aug 17, 2021
1 parent 3c71670 commit 7a8807b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 113 deletions.
3 changes: 3 additions & 0 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ impl RetransmitStage {
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
// https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136
let _retransmit_sender = retransmit_sender.clone();

let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
let thread_hdls = retransmitter(
Expand Down Expand Up @@ -598,6 +600,7 @@ mod tests {
let cluster_info = Arc::new(cluster_info);

let (retransmit_sender, retransmit_receiver) = channel();
let _retransmit_sender = retransmit_sender.clone();
let _t_retransmit = retransmitter(
retransmit_socket,
bank_forks,
Expand Down
7 changes: 6 additions & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ fn run_insert<F>(
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<Shred>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
) -> Result<()>
where
Expand Down Expand Up @@ -287,7 +288,8 @@ where
shreds,
repairs,
Some(leader_schedule_cache),
false,
false, // is_trusted
Some(retransmit_sender),
&handle_duplicate,
metrics,
)?;
Expand Down Expand Up @@ -467,6 +469,7 @@ impl WindowService {
insert_receiver,
duplicate_sender,
completed_data_sets_sender,
retransmit_sender.clone(),
outstanding_requests,
);

Expand Down Expand Up @@ -528,6 +531,7 @@ impl WindowService {
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
check_duplicate_sender: CrossbeamSender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<Shred>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> JoinHandle<()> {
let mut handle_timeout = || {};
Expand Down Expand Up @@ -557,6 +561,7 @@ impl WindowService {
&mut metrics,
&mut ws_metrics,
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
Expand Down
214 changes: 102 additions & 112 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use {
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
mpsc::{sync_channel, Receiver, Sender, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
time::Instant,
Expand Down Expand Up @@ -800,6 +800,7 @@ impl Blockstore {
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec<Shred>>>,
handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
Expand All @@ -811,7 +812,7 @@ impl Blockstore {
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
start.stop();
let insert_lock_elapsed = start.as_us();
metrics.insert_lock_elapsed += start.as_us();

let db = &*self.db;
let mut write_batch = db.batch()?;
Expand All @@ -822,129 +823,130 @@ impl Blockstore {
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();

let num_shreds = shreds.len();
metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0;
let mut index_meta_time = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
shreds
.into_iter()
.zip(is_repaired.into_iter())
.enumerate()
.for_each(|(i, (shred, is_repaired))| {
if shred.is_data() {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
newly_completed_data_sets.extend(completed_data_sets);
inserted_indices.push(i);
num_inserted += 1;
}
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
);
for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() {
if shred.is_data() {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
panic!("There should be no other case");
ShredSource::Turbine
};
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
newly_completed_data_sets.extend(completed_data_sets);
inserted_indices.push(i);
metrics.num_inserted += 1;
}
});
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
);
} else {
panic!("There should be no other case");
}
}
start.stop();

let insert_shreds_elapsed = start.as_us();
metrics.insert_shreds_elapsed += start.as_us();
let mut start = Measure::start("Shred recovery");
let mut num_recovered = 0;
let mut num_recovered_inserted = 0;
let mut num_recovered_failed_sig = 0;
let mut num_recovered_failed_invalid = 0;
let mut num_recovered_exists = 0;
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data = Self::try_shred_recovery(
let recovered_data_shreds = Self::try_shred_recovery(
db,
&erasure_metas,
&mut index_working_set,
&mut just_inserted_data_shreds,
&mut just_inserted_coding_shreds,
);

num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
if shred.verify(&leader) {
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
num_recovered_exists += 1;
}
Err(InsertDataShredError::InvalidShred) => {
num_recovered_failed_invalid += 1;
}
Err(InsertDataShredError::BlockstoreError(_)) => {}
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(completed_data_sets);
num_recovered_inserted += 1;
}
metrics.num_recovered += recovered_data_shreds.len();
let recovered_data_shreds: Vec<_> = recovered_data_shreds
.into_iter()
.filter_map(|shred| {
let leader =
leader_schedule_cache.slot_leader_at(shred.slot(), /*bank=*/ None)?;
if !shred.verify(&leader) {
metrics.num_recovered_failed_sig += 1;
return None;
}
match self.check_insert_data_shred(
shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
metrics.num_recovered_exists += 1;
None
}
Err(InsertDataShredError::InvalidShred) => {
metrics.num_recovered_failed_invalid += 1;
None
}
Err(InsertDataShredError::BlockstoreError(_)) => None,
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(completed_data_sets);
metrics.num_recovered_inserted += 1;
Some(shred)
}
} else {
num_recovered_failed_sig += 1;
}
})
// Always collect recovered-shreds so that above insert code is
// executed even if retransmit-sender is None.
.collect();
if !recovered_data_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(recovered_data_shreds);
}
});
}
}
start.stop();
let shred_recovery_elapsed = start.as_us();
metrics.shred_recovery_elapsed += start.as_us();

just_inserted_coding_shreds
.into_iter()
.for_each(|((_, _), shred)| {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
num_inserted += 1;
});
metrics.num_inserted += just_inserted_coding_shreds.len() as u64;
for shred in just_inserted_coding_shreds.into_values() {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
}

let mut start = Measure::start("Shred recovery");
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
// drop the others
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
start.stop();
let chaining_elapsed = start.as_us();
metrics.chaining_elapsed += start.as_us();

let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
Expand All @@ -963,12 +965,12 @@ impl Blockstore {
}
}
start.stop();
let commit_working_sets_elapsed = start.as_us();
metrics.commit_working_sets_elapsed += start.as_us();

let mut start = Measure::start("Write Batch");
self.db.write(write_batch)?;
start.stop();
let write_batch_elapsed = start.as_us();
metrics.write_batch_elapsed += start.as_us();

send_signals(
&self.new_shreds_signals,
Expand All @@ -979,20 +981,7 @@ impl Blockstore {

total_start.stop();

metrics.num_shreds += num_shreds;
metrics.total_elapsed += total_start.as_us();
metrics.insert_lock_elapsed += insert_lock_elapsed;
metrics.insert_shreds_elapsed += insert_shreds_elapsed;
metrics.shred_recovery_elapsed += shred_recovery_elapsed;
metrics.chaining_elapsed += chaining_elapsed;
metrics.commit_working_sets_elapsed += commit_working_sets_elapsed;
metrics.write_batch_elapsed += write_batch_elapsed;
metrics.num_inserted += num_inserted;
metrics.num_recovered += num_recovered;
metrics.num_recovered_inserted += num_recovered_inserted;
metrics.num_recovered_failed_sig += num_recovered_failed_sig;
metrics.num_recovered_failed_invalid = num_recovered_failed_invalid;
metrics.num_recovered_exists = num_recovered_exists;
metrics.index_meta_time += index_meta_time;

Ok((newly_completed_data_sets, inserted_indices))
Expand Down Expand Up @@ -1034,7 +1023,8 @@ impl Blockstore {
vec![false; shreds_len],
leader_schedule,
is_trusted,
&|_| {},
None, // retransmit-sender
&|_| {}, // handle-duplicates
&mut BlockstoreInsertionMetrics::default(),
)
}
Expand Down

0 comments on commit 7a8807b

Please sign in to comment.