Skip to content

Commit

Permalink
caches reed-solomon encoder/decoder instance (#27510)
Browse files Browse the repository at this point in the history
ReedSolomon::new(...) initializes a matrix and a data-decode-matrix cache:
https://github.com/rust-rse/reed-solomon-erasure/blob/273ebbced/src/core.rs#L460-L466

In order to cache this computation, this commit caches the reed-solomon
encoder/decoder instance for each (data_shards, parity_shards) pair.
  • Loading branch information
behzadnouri committed Sep 25, 2022
1 parent 9816c94 commit f49beb0
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 54 deletions.
3 changes: 2 additions & 1 deletion core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {
solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule_cache::LeaderScheduleCache,
shred::{ProcessShredsStats, Shredder},
shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_measure::measure::Measure,
solana_runtime::{bank::Bank, bank_forks::BankForks},
Expand Down Expand Up @@ -107,6 +107,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);

Expand Down
16 changes: 13 additions & 3 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use {
raptorq::{Decoder, Encoder},
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags,
Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache,
Shred, ShredFlags, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
Expand Down Expand Up @@ -53,6 +53,7 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
assert!(data_shreds.len() >= num_shreds);
Expand All @@ -78,6 +79,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
shredder.entries_to_shreds(
Expand All @@ -87,6 +89,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
Expand All @@ -104,6 +107,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
Some(shred_size),
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
let reed_solomon_cache = ReedSolomonCache::default();
// 1Mb
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
Expand All @@ -114,6 +118,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
Expand All @@ -135,6 +140,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
bencher.iter(|| {
Expand All @@ -159,10 +165,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
)
.len();
})
Expand All @@ -172,12 +180,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
let coding_shreds = Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
);
bencher.iter(|| {
Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap();
Shredder::try_recovery(coding_shreds[..].to_vec(), &reed_solomon_cache).unwrap();
})
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ pub mod test {
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder},
shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_runtime::bank::Bank,
solana_sdk::{
Expand Down Expand Up @@ -482,6 +482,7 @@ pub mod test {
0, // next_shred_index,
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
(
Expand Down
7 changes: 6 additions & 1 deletion core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
itertools::Itertools,
solana_entry::entry::Entry,
solana_gossip::contact_info::ContactInfo,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signature, Signer},
Expand Down Expand Up @@ -36,6 +36,7 @@ pub(super) struct BroadcastDuplicatesRun {
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
original_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
partition_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl BroadcastDuplicatesRun {
Expand All @@ -56,6 +57,7 @@ impl BroadcastDuplicatesRun {
cluster_nodes_cache,
original_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
partition_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -164,6 +166,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -180,6 +183,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't
Expand All @@ -192,6 +196,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let sigs: Vec<_> = partition_last_data_shred
Expand Down
6 changes: 5 additions & 1 deletion core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::*,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
};

Expand All @@ -11,6 +11,7 @@ pub(super) struct BroadcastFakeShredsRun {
partition: usize,
shred_version: u16,
next_code_index: u32,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl BroadcastFakeShredsRun {
Expand All @@ -20,6 +21,7 @@ impl BroadcastFakeShredsRun {
partition,
shred_version,
next_code_index: 0,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -61,6 +63,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -81,6 +84,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
std::{thread::sleep, time::Duration},
};
Expand All @@ -17,6 +17,7 @@ pub(super) struct FailEntryVerificationBroadcastRun {
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl FailEntryVerificationBroadcastRun {
Expand All @@ -32,6 +33,7 @@ impl FailEntryVerificationBroadcastRun {
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -92,6 +94,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -107,6 +110,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't know
Expand All @@ -119,6 +123,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
self.next_shred_index += 1;
Expand Down
6 changes: 5 additions & 1 deletion core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
},
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
solana_sdk::{
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
Expand All @@ -29,6 +29,7 @@ pub struct StandardBroadcastRun {
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl StandardBroadcastRun {
Expand All @@ -48,6 +49,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}

Expand Down Expand Up @@ -77,6 +79,7 @@ impl StandardBroadcastRun {
state.next_shred_index,
state.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
stats,
);
self.report_and_reset_stats(true);
Expand Down Expand Up @@ -126,6 +129,7 @@ impl StandardBroadcastRun {
next_shred_index,
next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
process_stats,
);
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Expand Down
3 changes: 2 additions & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod tests {
super::*,
solana_ledger::{
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{Shred, ShredFlags},
shred::{ReedSolomonCache, Shred, ShredFlags},
},
};

Expand Down Expand Up @@ -294,6 +294,7 @@ mod tests {
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
3, // next_code_index
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
Expand Down
7 changes: 6 additions & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, Shred},
shred::{self, Nonce, ReedSolomonCache, Shred},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
Expand Down Expand Up @@ -220,6 +220,7 @@ fn run_insert<F>(
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<ShredPayload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
Expand Down Expand Up @@ -282,6 +283,7 @@ where
false, // is_trusted
Some(retransmit_sender),
&handle_duplicate,
reed_solomon_cache,
metrics,
)?;
for index in inserted_indices {
Expand Down Expand Up @@ -411,6 +413,7 @@ impl WindowService {
.thread_name(|i| format!("solWinInsert{:02}", i))
.build()
.unwrap();
let reed_solomon_cache = ReedSolomonCache::default();
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
Expand All @@ -432,6 +435,7 @@ impl WindowService {
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
&reed_solomon_cache,
) {
ws_metrics.record_error(&e);
if Self::should_exit_on_error(e, &handle_error) {
Expand Down Expand Up @@ -507,6 +511,7 @@ mod test {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub(crate) mod tests {
super::*,
rand::Rng,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -343,6 +343,7 @@ pub(crate) mod tests {
next_shred_index,
next_shred_index, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds.swap_remove(0)
Expand Down
Loading

0 comments on commit f49beb0

Please sign in to comment.