Skip to content

Commit

Permalink
experiments different turbine fanouts for propagating shreds (#29393)
Browse files Browse the repository at this point in the history
The commit allocates 2% of slots to running experiments with different
turbine fanouts based on the slot number.
The experiment is feature gated with an additional feature to disable
the experiment.

(cherry picked from commit 456d067)

# Conflicts:
#	core/src/retransmit_stage.rs
  • Loading branch information
behzadnouri authored and mergify[bot] committed Dec 26, 2022
1 parent 9f1a9bb commit 3d30f19
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 12 deletions.
56 changes: 49 additions & 7 deletions core/src/cluster_nodes.rs
Expand Up @@ -5,7 +5,7 @@ use {
rand::{seq::SliceRandom, Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo},
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
contact_info::ContactInfo,
crds::GossipRoute,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
Expand Down Expand Up @@ -35,6 +35,8 @@ use {
},
};

pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;

#[allow(clippy::large_enum_variant)]
enum NodeId {
// TVU node obtained through gossip (staked or not).
Expand Down Expand Up @@ -233,8 +235,10 @@ impl ClusterNodes<RetransmitStage> {
0
} else if self_index <= fanout {
1
} else {
} else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
};
let peers = get_retransmit_peers(fanout, self_index, &nodes);
return RetransmitPeers {
Expand All @@ -249,8 +253,10 @@ impl ClusterNodes<RetransmitStage> {
0
} else if self_index < fanout {
1
} else {
} else if self_index < fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
};
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes);
// Assert that the node itself is included in the set of neighbors, at
Expand Down Expand Up @@ -480,11 +486,47 @@ pub fn make_test_cluster<R: Rng>(
(nodes, stakes, cluster_info)
}

pub(crate) fn get_data_plane_fanout(shred_slot: Slot, root_bank: &Bank) -> usize {
if enable_turbine_fanout_experiments(shred_slot, root_bank) {
// Allocate ~2% of slots to turbine fanout experiments.
match shred_slot % 359 {
11 => 64,
61 => 768,
111 => 128,
161 => 640,
211 => 256,
261 => 512,
311 => 384,
_ => DATA_PLANE_FANOUT,
}
} else {
DATA_PLANE_FANOUT
}
}

fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool {
let feature_slot = root_bank
.feature_set
.activated_slot(&feature_set::drop_redundant_turbine_path::id());
match feature_slot {
check_feature_activation(
&feature_set::drop_redundant_turbine_path::id(),
shred_slot,
root_bank,
)
}

fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
&feature_set::enable_turbine_fanout_experiments::id(),
shred_slot,
root_bank,
) && !check_feature_activation(
&feature_set::disable_turbine_fanout_experiments::id(),
shred_slot,
root_bank,
)
}

// Returns true if the feature is effective for the shred slot.
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
match root_bank.feature_set.activated_slot(feature) {
None => false,
Some(feature_slot) => {
let epoch_schedule = root_bank.epoch_schedule();
Expand Down
21 changes: 16 additions & 5 deletions core/src/retransmit_stage.rs
Expand Up @@ -3,18 +3,22 @@

use {
crate::{
cluster_nodes::{ClusterNodes, ClusterNodesCache},
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, MAX_NUM_TURBINE_HOPS},
packet_hasher::PacketHasher,
},
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
<<<<<<< HEAD
solana_client::rpc_response::SlotUpdate,
solana_gossip::{
cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
contact_info::ContactInfo,
},
=======
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
>>>>>>> 456d06785 (experiments different turbine fanouts for propagating shreds (#29393))
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache,
shred::{self, ShredId},
Expand Down Expand Up @@ -56,8 +60,8 @@ struct RetransmitSlotStats {
outset: u64, // 1st shred retransmit timestamp.
// Number of shreds sent and received at different
// distances from the turbine broadcast root.
num_shreds_received: [usize; 3],
num_shreds_sent: [usize; 3],
num_shreds_received: [usize; MAX_NUM_TURBINE_HOPS],
num_shreds_sent: [usize; MAX_NUM_TURBINE_HOPS],
}

struct RetransmitStats {
Expand Down Expand Up @@ -300,8 +304,9 @@ fn retransmit_shred(
stats: &RetransmitStats,
) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) {
let mut compute_turbine_peers = Measure::start("turbine_start");
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, DATA_PLANE_FANOUT);
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout);
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
Expand Down Expand Up @@ -440,7 +445,7 @@ impl AddAssign for RetransmitSlotStats {
} else {
self.outset.min(outset)
};
for k in 0..3 {
for k in 0..MAX_NUM_TURBINE_HOPS {
self.num_shreds_received[k] += num_shreds_received[k];
self.num_shreds_sent[k] += num_shreds_sent[k];
}
Expand Down Expand Up @@ -554,9 +559,15 @@ impl RetransmitSlotStats {
self.num_shreds_received[2],
i64
),
(
"num_shreds_received_3rd_layer",
self.num_shreds_received[3],
i64
),
("num_shreds_sent_root", self.num_shreds_sent[0], i64),
("num_shreds_sent_1st_layer", self.num_shreds_sent[1], i64),
("num_shreds_sent_2nd_layer", self.num_shreds_sent[2], i64),
("num_shreds_sent_3rd_layer", self.num_shreds_sent[3], i64),
);
}
}
Expand Down
10 changes: 10 additions & 0 deletions sdk/src/feature_set.rs
Expand Up @@ -520,6 +520,14 @@ pub mod commission_updates_only_allowed_in_first_half_of_epoch {
solana_sdk::declare_id!("noRuG2kzACwgaY7TVmLRnUNPLKNVQE1fb7X55YWBehp");
}

pub mod enable_turbine_fanout_experiments {
solana_sdk::declare_id!("D31EFnLgdiysi84Woo3of4JMu7VmasUS3Z7j9HYXCeLY");
}

pub mod disable_turbine_fanout_experiments {
solana_sdk::declare_id!("Gz1aLrbeQ4Q6PTSafCZcGWZXz91yVRi7ASFzFEr1U4sa");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -644,6 +652,8 @@ lazy_static! {
(increase_tx_account_lock_limit::id(), "increase tx account lock limit to 128 #27241"),
(check_syscall_outputs_do_not_overlap::id(), "check syscall outputs do_not overlap #28600"),
(commission_updates_only_allowed_in_first_half_of_epoch::id(), "validator commission updates are only allowed in the first half of an epoch #29362"),
(enable_turbine_fanout_experiments::id(), "enable turbine fanout experiments #29393"),
(disable_turbine_fanout_experiments::id(), "disable turbine fanout experiments #29393"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down

0 comments on commit 3d30f19

Please sign in to comment.