Skip to content

Commit

Permalink
Optimize packet dedup (#22571)
Browse files Browse the repository at this point in the history
* Use bloom filter to dedup packets

* dedup first

* Update bloom/src/bloom.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* fixup

* fixup

* fixup

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
  • Loading branch information
aeyakovenko and t-nelson committed Jan 19, 2022
1 parent b448472 commit d343713
Show file tree
Hide file tree
Showing 25 changed files with 295 additions and 151 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -11,6 +11,7 @@ members = [
"banks-interface",
"banks-server",
"bucket_map",
"bloom",
"clap-utils",
"cli-config",
"cli-output",
Expand Down
5 changes: 1 addition & 4 deletions banking-bench/src/main.rs
Expand Up @@ -5,7 +5,7 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_core::{banking_stage::BankingStage, packet_deduper::PacketDeduper},
solana_core::banking_stage::BankingStage,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
blockstore::Blockstore,
Expand Down Expand Up @@ -226,7 +226,6 @@ fn main() {
SocketAddrSpace::Unspecified,
);
let cluster_info = Arc::new(cluster_info);
let packet_deduper = PacketDeduper::default();
let banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
Expand All @@ -236,7 +235,6 @@ fn main() {
None,
replay_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
packet_deduper.clone(),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down Expand Up @@ -351,7 +349,6 @@ fn main() {
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
packet_deduper.reset();
total_us += duration_as_us(&now.elapsed());
debug!(
"time: {} us checked: {} sent: {}",
Expand Down
32 changes: 32 additions & 0 deletions bloom/Cargo.toml
@@ -0,0 +1,32 @@
[package]
name = "solana-bloom"
version = "1.10.0"
description = "Solana bloom filter"
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
documentation = "https://docs.rs/solana-bloom"
edition = "2021"

[dependencies]
bv = { version = "0.11.1", features = ["serde"] }
fnv = "1.0.7"
rand = "0.7.0"
serde = { version = "1.0.133", features = ["rc"] }
rayon = "1.5.1"
serde_derive = "1.0.103"
solana-frozen-abi = { path = "../frozen-abi", version = "=1.10.0" }
solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.10.0" }
solana-sdk = { path = "../sdk", version = "=1.10.0" }
log = "0.4.14"

[lib]
crate-type = ["lib"]
name = "solana_bloom"

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

[build-dependencies]
rustc_version = "0.4"
2 changes: 1 addition & 1 deletion runtime/benches/bloom.rs → bloom/benches/bloom.rs
Expand Up @@ -5,7 +5,7 @@ use {
bv::BitVec,
fnv::FnvHasher,
rand::Rng,
solana_runtime::bloom::{AtomicBloom, Bloom, BloomHashIndex},
solana_bloom::bloom::{AtomicBloom, Bloom, BloomHashIndex},
solana_sdk::{
hash::{hash, Hash},
signature::Signature,
Expand Down
1 change: 1 addition & 0 deletions bloom/build.rs
10 changes: 5 additions & 5 deletions runtime/src/bloom.rs → bloom/src/bloom.rs
Expand Up @@ -101,7 +101,7 @@ impl<T: BloomHashIndex> Bloom<T> {
}
}
fn pos(&self, key: &T, k: u64) -> u64 {
key.hash_at_index(k) % self.bits.len()
key.hash_at_index(k).wrapping_rem(self.bits.len())
}
pub fn clear(&mut self) {
self.bits = BitVec::new_fill(false, self.bits.len());
Expand All @@ -111,7 +111,7 @@ impl<T: BloomHashIndex> Bloom<T> {
for k in &self.keys {
let pos = self.pos(key, *k);
if !self.bits.get(pos) {
self.num_bits_set += 1;
self.num_bits_set = self.num_bits_set.saturating_add(1);
self.bits.set(pos, true);
}
}
Expand Down Expand Up @@ -164,13 +164,13 @@ impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> {

impl<T: BloomHashIndex> AtomicBloom<T> {
fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) {
let pos = key.hash_at_index(hash_index) % self.num_bits;
let pos = key.hash_at_index(hash_index).wrapping_rem(self.num_bits);
// Divide by 64 to figure out which of the
// AtomicU64 bit chunks we need to modify.
let index = pos >> 6;
let index = pos.wrapping_shr(6);
// (pos & 63) is equivalent to mod 64 so that we can find
// the index of the bit within the AtomicU64 to modify.
let mask = 1u64 << (pos & 63);
let mask = 1u64.wrapping_shl(u32::try_from(pos & 63).unwrap());
(index as usize, mask)
}

Expand Down
5 changes: 5 additions & 0 deletions bloom/src/lib.rs
@@ -0,0 +1,5 @@
#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))]
pub mod bloom;

#[macro_use]
extern crate solana_frozen_abi_macro;
1 change: 1 addition & 0 deletions core/Cargo.toml
Expand Up @@ -34,6 +34,7 @@ retain_mut = "0.1.5"
serde = "1.0.133"
serde_derive = "1.0.103"
solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.10.0" }
solana-bloom = { path = "../bloom", version = "=1.10.0" }
solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.10.0" }
solana-client = { path = "../client", version = "=1.10.0" }
solana-entry = { path = "../entry", version = "=1.10.0" }
Expand Down
4 changes: 0 additions & 4 deletions core/benches/banking_stage.rs
Expand Up @@ -10,7 +10,6 @@ use {
rayon::prelude::*,
solana_core::{
banking_stage::{BankingStage, BankingStageStats},
packet_deduper::PacketDeduper,
qos_service::QosService,
},
solana_entry::entry::{next_hash, Entry},
Expand Down Expand Up @@ -222,7 +221,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
);
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let packet_deduper = PacketDeduper::default();
let _banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
Expand All @@ -232,7 +230,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
None,
s,
Arc::new(RwLock::new(CostModel::default())),
packet_deduper.clone(),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down Expand Up @@ -267,7 +264,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
packet_deduper.reset();
trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),
Expand Down

0 comments on commit d343713

Please sign in to comment.