Skip to content

Commit

Permalink
caches reed-solomon encoder/decoder instance
Browse files Browse the repository at this point in the history
ReedSolomon::new(...) initializes a matrix and an inversion-tree:
https://github.com/rust-rse/reed-solomon-erasure/blob/eb1f66f47/src/core.rs#L450-L458

In order to cache this computation, this commit caches the reed-solomon
encoder/decoder instance for each (data_shards, parity_shards) pair.
  • Loading branch information
behzadnouri committed Aug 31, 2022
1 parent 12f9213 commit 57ec9e2
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 48 deletions.
3 changes: 2 additions & 1 deletion core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {
solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule_cache::LeaderScheduleCache,
shred::{ProcessShredsStats, Shredder},
shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_measure::measure::Measure,
solana_runtime::{bank::Bank, bank_forks::BankForks},
Expand Down Expand Up @@ -106,6 +106,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
true, // is_last_in_slot
0, // next_shred_index
0, // next_code_index
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);

Expand Down
16 changes: 13 additions & 3 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use {
raptorq::{Decoder, Encoder},
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags,
Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache,
Shred, ShredFlags, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
Expand Down Expand Up @@ -52,6 +52,7 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
true, // is_last_in_slot
0, // next_shred_index
0, // next_code_index
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
assert!(data_shreds.len() >= num_shreds);
Expand All @@ -77,6 +78,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
shredder.entries_to_shreds(
Expand All @@ -85,6 +87,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
true,
0,
0,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
Expand All @@ -102,6 +105,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
Some(shred_size),
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
let reed_solomon_cache = ReedSolomonCache::default();
// 1Mb
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
Expand All @@ -111,6 +115,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
true,
0,
0,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
Expand All @@ -131,6 +136,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
true,
0,
0,
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
bencher.iter(|| {
Expand All @@ -155,10 +161,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
)
.len();
})
Expand All @@ -168,12 +176,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
let coding_shreds = Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
);
bencher.iter(|| {
Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap();
Shredder::try_recovery(coding_shreds[..].to_vec(), &reed_solomon_cache).unwrap();
})
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ pub mod test {
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder},
shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_runtime::bank::Bank,
solana_sdk::{
Expand Down Expand Up @@ -488,6 +488,7 @@ pub mod test {
true, // is_last_in_slot
0, // next_shred_index,
0, // next_code_index
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
(
Expand Down
7 changes: 6 additions & 1 deletion core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
itertools::Itertools,
solana_entry::entry::Entry,
solana_gossip::contact_info::ContactInfo,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signature, Signer},
Expand Down Expand Up @@ -36,6 +36,7 @@ pub(super) struct BroadcastDuplicatesRun {
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
original_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
partition_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl BroadcastDuplicatesRun {
Expand All @@ -56,6 +57,7 @@ impl BroadcastDuplicatesRun {
cluster_nodes_cache,
original_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
partition_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -163,6 +165,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
self.next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -178,6 +181,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
true,
self.next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't
Expand All @@ -189,6 +193,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
true,
self.next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let sigs: Vec<_> = partition_last_data_shred
Expand Down
6 changes: 5 additions & 1 deletion core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::*,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
};

Expand All @@ -11,6 +11,7 @@ pub(super) struct BroadcastFakeShredsRun {
partition: usize,
shred_version: u16,
next_code_index: u32,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl BroadcastFakeShredsRun {
Expand All @@ -20,6 +21,7 @@ impl BroadcastFakeShredsRun {
partition,
shred_version,
next_code_index: 0,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -60,6 +62,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
last_tick_height == bank.max_tick_height(),
next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -79,6 +82,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
last_tick_height == bank.max_tick_height(),
next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
std::{thread::sleep, time::Duration},
};
Expand All @@ -17,6 +17,7 @@ pub(super) struct FailEntryVerificationBroadcastRun {
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl FailEntryVerificationBroadcastRun {
Expand All @@ -32,6 +33,7 @@ impl FailEntryVerificationBroadcastRun {
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -91,6 +93,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
self.next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -105,6 +108,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
true,
self.next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't know
Expand All @@ -116,6 +120,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
false,
self.next_shred_index,
self.next_code_index,
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
self.next_shred_index += 1;
Expand Down
6 changes: 5 additions & 1 deletion core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
},
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
solana_sdk::{
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
Expand All @@ -29,6 +29,7 @@ pub struct StandardBroadcastRun {
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl StandardBroadcastRun {
Expand All @@ -48,6 +49,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}

Expand Down Expand Up @@ -76,6 +78,7 @@ impl StandardBroadcastRun {
true, // is_last_in_slot,
state.next_shred_index,
state.next_code_index,
&self.reed_solomon_cache,
stats,
);
self.report_and_reset_stats(true);
Expand Down Expand Up @@ -124,6 +127,7 @@ impl StandardBroadcastRun {
is_slot_end,
next_shred_index,
next_code_index,
&self.reed_solomon_cache,
process_stats,
);
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Expand Down
3 changes: 2 additions & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod tests {
super::*,
solana_ledger::{
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{Shred, ShredFlags},
shred::{ReedSolomonCache, Shred, ShredFlags},
},
};

Expand Down Expand Up @@ -294,6 +294,7 @@ mod tests {
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
3, // next_code_index
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
Expand Down
7 changes: 6 additions & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, Shred},
shred::{self, Nonce, ReedSolomonCache, Shred},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
Expand Down Expand Up @@ -220,6 +220,7 @@ fn run_insert<F>(
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<ShredPayload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
Expand Down Expand Up @@ -282,6 +283,7 @@ where
false, // is_trusted
Some(retransmit_sender),
&handle_duplicate,
reed_solomon_cache,
metrics,
)?;
for index in inserted_indices {
Expand Down Expand Up @@ -411,6 +413,7 @@ impl WindowService {
.thread_name(|i| format!("solWinInsert{:02}", i))
.build()
.unwrap();
let reed_solomon_cache = ReedSolomonCache::default();
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
Expand All @@ -432,6 +435,7 @@ impl WindowService {
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
&reed_solomon_cache,
) {
ws_metrics.record_error(&e);
if Self::should_exit_on_error(e, &handle_error) {
Expand Down Expand Up @@ -506,6 +510,7 @@ mod test {
true, // is_last_in_slot
0, // next_shred_index
0, // next_code_index
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub(crate) mod tests {
super::*,
rand::Rng,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -342,6 +342,7 @@ pub(crate) mod tests {
true, // is_last_in_slot
next_shred_index,
next_shred_index, // next_code_index
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds.swap_remove(0)
Expand Down
Loading

0 comments on commit 57ec9e2

Please sign in to comment.