Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

caches reed-solomon encoder/decoder instance #27510

Merged
merged 1 commit into from
Sep 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {
solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule_cache::LeaderScheduleCache,
shred::{ProcessShredsStats, Shredder},
shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_measure::measure::Measure,
solana_runtime::{bank::Bank, bank_forks::BankForks},
Expand Down Expand Up @@ -107,6 +107,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);

Expand Down
16 changes: 13 additions & 3 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use {
raptorq::{Decoder, Encoder},
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags,
Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache,
Shred, ShredFlags, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
Expand Down Expand Up @@ -53,6 +53,7 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
assert!(data_shreds.len() >= num_shreds);
Expand All @@ -78,6 +79,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
shredder.entries_to_shreds(
Expand All @@ -87,6 +89,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
Expand All @@ -104,6 +107,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
Some(shred_size),
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
let reed_solomon_cache = ReedSolomonCache::default();
// 1Mb
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
Expand All @@ -114,6 +118,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
Expand All @@ -135,6 +140,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
bencher.iter(|| {
Expand All @@ -159,10 +165,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
)
.len();
})
Expand All @@ -172,12 +180,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
let coding_shreds = Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
);
bencher.iter(|| {
Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap();
Shredder::try_recovery(coding_shreds[..].to_vec(), &reed_solomon_cache).unwrap();
})
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ pub mod test {
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder},
shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_runtime::bank::Bank,
solana_sdk::{
Expand Down Expand Up @@ -482,6 +482,7 @@ pub mod test {
0, // next_shred_index,
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
(
Expand Down
7 changes: 6 additions & 1 deletion core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
itertools::Itertools,
solana_entry::entry::Entry,
solana_gossip::contact_info::ContactInfo,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signature, Signer},
Expand Down Expand Up @@ -36,6 +36,7 @@ pub(super) struct BroadcastDuplicatesRun {
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
original_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
partition_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl BroadcastDuplicatesRun {
Expand All @@ -56,6 +57,7 @@ impl BroadcastDuplicatesRun {
cluster_nodes_cache,
original_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
partition_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -164,6 +166,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -180,6 +183,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't
Expand All @@ -192,6 +196,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let sigs: Vec<_> = partition_last_data_shred
Expand Down
6 changes: 5 additions & 1 deletion core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::*,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
};

Expand All @@ -11,6 +11,7 @@ pub(super) struct BroadcastFakeShredsRun {
partition: usize,
shred_version: u16,
next_code_index: u32,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl BroadcastFakeShredsRun {
Expand All @@ -20,6 +21,7 @@ impl BroadcastFakeShredsRun {
partition,
shred_version,
next_code_index: 0,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -61,6 +63,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -81,6 +84,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
std::{thread::sleep, time::Duration},
};
Expand All @@ -17,6 +17,7 @@ pub(super) struct FailEntryVerificationBroadcastRun {
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl FailEntryVerificationBroadcastRun {
Expand All @@ -32,6 +33,7 @@ impl FailEntryVerificationBroadcastRun {
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
Expand Down Expand Up @@ -92,6 +94,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

Expand All @@ -107,6 +110,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't know
Expand All @@ -119,6 +123,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
self.next_shred_index += 1;
Expand Down
6 changes: 5 additions & 1 deletion core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
},
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
solana_sdk::{
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
Expand All @@ -29,6 +29,7 @@ pub struct StandardBroadcastRun {
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl StandardBroadcastRun {
Expand All @@ -48,6 +49,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}

Expand Down Expand Up @@ -77,6 +79,7 @@ impl StandardBroadcastRun {
state.next_shred_index,
state.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
stats,
);
self.report_and_reset_stats(true);
Expand Down Expand Up @@ -126,6 +129,7 @@ impl StandardBroadcastRun {
next_shred_index,
next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
process_stats,
);
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Expand Down
3 changes: 2 additions & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod tests {
super::*,
solana_ledger::{
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{Shred, ShredFlags},
shred::{ReedSolomonCache, Shred, ShredFlags},
},
};

Expand Down Expand Up @@ -294,6 +294,7 @@ mod tests {
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
3, // next_code_index
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
Expand Down
7 changes: 6 additions & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, Shred},
shred::{self, Nonce, ReedSolomonCache, Shred},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
Expand Down Expand Up @@ -220,6 +220,7 @@ fn run_insert<F>(
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<ShredPayload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
Expand Down Expand Up @@ -282,6 +283,7 @@ where
false, // is_trusted
Some(retransmit_sender),
&handle_duplicate,
reed_solomon_cache,
metrics,
)?;
for index in inserted_indices {
Expand Down Expand Up @@ -411,6 +413,7 @@ impl WindowService {
.thread_name(|i| format!("solWinInsert{:02}", i))
.build()
.unwrap();
let reed_solomon_cache = ReedSolomonCache::default();
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
Expand All @@ -432,6 +435,7 @@ impl WindowService {
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
&reed_solomon_cache,
) {
ws_metrics.record_error(&e);
if Self::should_exit_on_error(e, &handle_error) {
Expand Down Expand Up @@ -507,6 +511,7 @@ mod test {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub(crate) mod tests {
super::*,
rand::Rng,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -343,6 +343,7 @@ pub(crate) mod tests {
next_shred_index,
next_shred_index, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds.swap_remove(0)
Expand Down
Loading