Skip to content

Commit

Permalink
uses Duration for epoch duration type
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed May 18, 2021
1 parent 2b90e04 commit 840e6cf
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 27 deletions.
38 changes: 19 additions & 19 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1740,8 +1740,11 @@ impl ClusterInfo {
bank_forks: Option<&RwLock<BankForks>>,
stakes: &HashMap<Pubkey, u64>,
) {
let epoch_ms = get_epoch_millis(bank_forks);
let timeouts = self.gossip.read().unwrap().make_timeouts(stakes, epoch_ms);
let epoch_duration = get_epoch_duration(bank_forks);
let timeouts = {
let gossip = self.gossip.read().unwrap();
gossip.make_timeouts(stakes, epoch_duration)
};
let num_purged = self
.time_gossip_write_lock("purge", &self.stats.purge)
.purge(thread_pool, timestamp(), &timeouts);
Expand Down Expand Up @@ -1857,7 +1860,6 @@ impl ClusterInfo {
}

self.handle_purge(&thread_pool, bank_forks.as_deref(), &stakes);

self.process_entrypoints(&mut entrypoints_processed);

//TODO: possibly tune this parameter
Expand Down Expand Up @@ -2160,7 +2162,7 @@ impl ClusterInfo {
responses: Vec<(Pubkey, Vec<CrdsValue>)>,
thread_pool: &ThreadPool,
stakes: &HashMap<Pubkey, u64>,
epoch_time_ms: u64,
epoch_duration: Duration,
) {
let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time);
if responses.is_empty() {
Expand Down Expand Up @@ -2209,11 +2211,10 @@ impl ClusterInfo {
.reduce(HashMap::new, merge)
});
if !responses.is_empty() {
let timeouts = self
.gossip
.read()
.unwrap()
.make_timeouts(&stakes, epoch_time_ms);
let timeouts = {
let gossip = self.gossip.read().unwrap();
gossip.make_timeouts(&stakes, epoch_duration)
};
for (from, data) in responses {
self.handle_pull_response(&from, data, &timeouts);
}
Expand Down Expand Up @@ -2533,7 +2534,7 @@ impl ClusterInfo {
response_sender: &PacketSender,
stakes: HashMap<Pubkey, u64>,
feature_set: Option<&FeatureSet>,
epoch_time_ms: u64,
epoch_duration: Duration,
should_check_duplicate_instance: bool,
) -> Result<()> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
Expand Down Expand Up @@ -2625,7 +2626,7 @@ impl ClusterInfo {
response_sender,
require_stake_for_gossip,
);
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_duration);
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
self.handle_batch_pong_messages(pong_messages, Instant::now());
self.handle_batch_pull_requests(
Expand Down Expand Up @@ -2663,7 +2664,6 @@ impl ClusterInfo {
.add_relaxed(excess_count as u64);
}
}
let epoch_time_ms = get_epoch_millis(bank_forks);
// Using root_bank instead of working_bank here so that an enbaled
// feature does not roll back (if the feature happens to get enabled in
// a minority fork).
Expand All @@ -2682,7 +2682,7 @@ impl ClusterInfo {
response_sender,
stakes,
feature_set.as_deref(),
epoch_time_ms,
get_epoch_duration(bank_forks),
should_check_duplicate_instance,
)?;
if last_print.elapsed().as_millis() > 2000 {
Expand Down Expand Up @@ -2786,10 +2786,10 @@ impl ClusterInfo {
}
}

// Returns root bank's epoch duration in millis. Falls back on
// Returns root bank's epoch duration. Falls back on
// DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT
// if there are no working banks.
fn get_epoch_millis(bank_forks: Option<&RwLock<BankForks>>) -> u64 {
fn get_epoch_duration(bank_forks: Option<&RwLock<BankForks>>) -> Duration {
let num_slots = match bank_forks {
None => {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
Expand All @@ -2800,7 +2800,7 @@ fn get_epoch_millis(bank_forks: Option<&RwLock<BankForks>>) -> u64 {
bank.get_slots_in_epoch(bank.epoch())
}
};
num_slots * DEFAULT_MS_PER_SLOT
Duration::from_millis(num_slots * DEFAULT_MS_PER_SLOT)
}

/// Turbine logic
Expand Down Expand Up @@ -3855,7 +3855,7 @@ mod tests {
let gossip = cluster_info.gossip.read().unwrap();
gossip.make_timeouts(
&HashMap::default(), // stakes,
gossip.pull.crds_timeout,
Duration::from_millis(gossip.pull.crds_timeout),
)
};
ClusterInfo::handle_pull_response(
Expand Down Expand Up @@ -4510,8 +4510,8 @@ mod tests {
#[test]
fn test_get_epoch_millis_no_bank() {
assert_eq!(
get_epoch_millis(/*bank_forks=*/ None), // 48 hours
DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT
get_epoch_duration(/*bank_forks=*/ None).as_millis() as u64,
DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours
);
}
}
5 changes: 3 additions & 2 deletions core/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::Mutex,
time::Duration,
};

///The min size for bloom filters
Expand Down Expand Up @@ -303,9 +304,9 @@ impl CrdsGossip {
pub fn make_timeouts(
&self,
stakes: &HashMap<Pubkey, u64>,
epoch_ms: u64,
epoch_duration: Duration,
) -> HashMap<Pubkey, u64> {
self.pull.make_timeouts(self.id, stakes, epoch_ms)
self.pull.make_timeouts(self.id, stakes, epoch_duration)
}

pub fn purge(
Expand Down
10 changes: 5 additions & 5 deletions core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::{
convert::TryInto,
net::SocketAddr,
sync::Mutex,
time::Instant,
time::{Duration, Instant},
};

pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
Expand Down Expand Up @@ -554,9 +554,9 @@ impl CrdsGossipPull {
&self,
self_pubkey: Pubkey,
stakes: &HashMap<Pubkey, u64>,
epoch_ms: u64,
epoch_duration: Duration,
) -> HashMap<Pubkey, u64> {
let extended_timeout = self.crds_timeout.max(epoch_ms);
let extended_timeout = self.crds_timeout.max(epoch_duration.as_millis() as u64);
let default_timeout = if stakes.values().all(|stake| *stake == 0) {
extended_timeout
} else {
Expand Down Expand Up @@ -1339,7 +1339,7 @@ mod test {
.process_pull_response(
&mut node_crds,
&node_pubkey,
&node.make_timeouts(node_pubkey, &HashMap::new(), 0),
&node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()),
rsp.pop().unwrap(),
1,
)
Expand Down Expand Up @@ -1388,7 +1388,7 @@ mod test {
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);

// purge
let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), 0);
let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default());
node.purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts);

//verify self is still valid after purge
Expand Down
2 changes: 1 addition & 1 deletion core/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ fn network_run_push(
let mut node_lock = node.lock().unwrap();
let timeouts = node_lock.make_timeouts(
&HashMap::default(), // stakes
node_lock.pull.crds_timeout,
Duration::from_millis(node_lock.pull.crds_timeout),
);
node_lock.purge(thread_pool, now, &timeouts);
(node_lock.id, node_lock.new_push_messages(vec![], now))
Expand Down

0 comments on commit 840e6cf

Please sign in to comment.