Skip to content

Commit

Permalink
Merge #1536
Browse files Browse the repository at this point in the history
1536: Ensure adults are probing for AE updates before first split r=joshuef a=davidrusu

Adults were skipping AE probes before splits for some reason

Also included in this PR:
- move AE probe building to network knowledge
- remove the unused `stopped` state from cmd ctrl which removes some error cases for us.

Co-authored-by: David Rusu <davidrusu.me@gmail.com>
  • Loading branch information
bors[bot] and davidrusu committed Sep 1, 2022
2 parents 5c54848 + d20f4e0 commit 5194123
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 87 deletions.
4 changes: 4 additions & 0 deletions sn_interface/src/network_knowledge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ impl NetworkKnowledge {
true
}

pub fn anti_entropy_probe(&self) -> SystemMsg {
SystemMsg::AntiEntropyProbe(self.section_key())
}

/// Update our network knowledge if the provided SAP is valid and can be verified
/// with the provided proof chain.
/// If the '`update_sap`' flag is set to 'true', the provided SAP and chain will be
Expand Down
34 changes: 4 additions & 30 deletions sn_node/src/node/flow_ctrl/cmd_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::node::{
dispatcher::Dispatcher,
event_channel::EventSender,
},
CmdProcessEvent, Error, Event, RateLimits, Result,
CmdProcessEvent, Error, Event, RateLimits,
};

use custom_debug::Debug;
Expand Down Expand Up @@ -48,7 +48,6 @@ pub(crate) struct CmdCtrl {
cmd_queue: PriorityQueue<CmdJob, Priority>,
attempted: CmdThroughput,
monitoring: RateLimits,
stopped: bool,
pub(crate) dispatcher: Arc<Dispatcher>,
id_counter: usize,
}
Expand All @@ -59,7 +58,6 @@ impl CmdCtrl {
cmd_queue: PriorityQueue::new(),
attempted: CmdThroughput::default(),
monitoring,
stopped: false,
dispatcher: Arc::new(dispatcher),
id_counter: 0,
}
Expand All @@ -69,7 +67,7 @@ impl CmdCtrl {
self.dispatcher.node()
}

pub(crate) async fn push(&mut self, cmd: Cmd) -> Result<()> {
pub(crate) async fn push(&mut self, cmd: Cmd) {
self.push_internal(cmd, None).await
}

Expand All @@ -78,32 +76,13 @@ impl CmdCtrl {
!self.cmd_queue.is_empty()
}

// consume self
// NB that clones could still exist, however they would be in the disconnected state
#[allow(unused)]
pub(crate) async fn stop(mut self) {
self.stopped = true;
self.cmd_queue.clear();
}

async fn push_internal(&mut self, cmd: Cmd, parent_id: Option<usize>) -> Result<()> {
if self.stopped().await {
// should not happen (be reachable)
return Err(Error::InvalidState);
}

async fn push_internal(&mut self, cmd: Cmd, parent_id: Option<usize>) {
self.id_counter += 1;

let job = CmdJob::new(self.id_counter, parent_id, cmd, SystemTime::now());

let prio = job.priority();
let _ = self.cmd_queue.push(job, prio);
Ok(())
}

// could be accessed via a clone
async fn stopped(&self) -> bool {
self.stopped
}

/// Get the next Cmd going off of priority
Expand Down Expand Up @@ -131,11 +110,7 @@ impl CmdCtrl {
job: CmdJob,
cmd_process_api: tokio::sync::mpsc::Sender<(Cmd, Option<usize>)>,
node_event_sender: EventSender,
) -> Result<()> {
if self.stopped().await {
return Err(Error::CmdCtrlStopped);
}

) {
#[cfg(feature = "test-utils")]
{
debug!("Cmd queue length: {}", self.cmd_queue.len());
Expand Down Expand Up @@ -226,7 +201,6 @@ impl CmdCtrl {
.await
.statemap_log_state(statemap::State::Idle);
});
Ok(())
}
}

Expand Down
21 changes: 5 additions & 16 deletions sn_node/src/node/flow_ctrl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,21 @@ impl FlowCtrl {
/// Process the next pending cmds
async fn process_next_cmd(&mut self) {
if let Some(next_cmd_job) = self.cmd_ctrl.next_cmd() {
if let Err(error) = self
.cmd_ctrl
self.cmd_ctrl
.process_cmd_job(
next_cmd_job,
self.cmd_sender_channel.clone(),
self.outgoing_node_event_sender.clone(),
)
.await
{
error!("Error during cmd processing: {error:?}");
}
}
}

/// Pull and queue up all pending cmds from the CmdChannel
async fn enqeue_new_cmds_from_channel(&mut self) -> Result<()> {
loop {
match self.incoming_cmds_from_apis.try_recv() {
Ok((cmd, _id)) => {
if let Err(error) = self.fire_and_forget(cmd).await {
error!("Error pushing node cmd from CmdChannel to controller: {error:?}");
}
}
Ok((cmd, _id)) => self.fire_and_forget(cmd).await,
Err(TryRecvError::Empty) => {
// do nothing else
return Ok(());
Expand All @@ -114,9 +106,7 @@ impl FlowCtrl {
let cmd = self.handle_new_msg_event(msg).await;

// dont use sender here incase channel gets full
if let Err(error) = self.fire_and_forget(cmd).await {
error!("Error pushing node msg as cmd to controller: {error:?}");
}
self.fire_and_forget(cmd).await;
}
Err(TryRecvError::Empty) => {
// do nothing else
Expand Down Expand Up @@ -211,9 +201,8 @@ impl FlowCtrl {
}

/// Does not await the completion of the cmd.
pub(crate) async fn fire_and_forget(&mut self, cmd: Cmd) -> Result<()> {
self.cmd_ctrl.push(cmd).await?;
Ok(())
pub(crate) async fn fire_and_forget(&mut self, cmd: Cmd) {
self.cmd_ctrl.push(cmd).await
}

// Listen for a new incoming connection event and handle it.
Expand Down
31 changes: 7 additions & 24 deletions sn_node/src/node/flow_ctrl/periodic_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ impl FlowCtrl {

for cmd in cmds {
// dont use sender here incase channel gets full
if let Err(error) = self.fire_and_forget(cmd).await {
error!("Error pushing node periodic cmd to controller: {error:?}");
}
self.fire_and_forget(cmd).await;
}
}

Expand All @@ -96,22 +94,17 @@ impl FlowCtrl {
&mut self,
last_section_probe: &mut Instant,
) {
let now = Instant::now();
let mut cmds = vec![];

// if we've passed enough time, section probe
if last_section_probe.elapsed() > SECTION_PROBE_INTERVAL {
*last_section_probe = now;
if let Some(cmd) = Self::probe_the_section(self.node.clone()).await {
cmds.push(cmd);
}
*last_section_probe = Instant::now();
cmds.push(Self::probe_the_section(self.node.clone()).await);
}

for cmd in cmds {
// dont use sender here incase channel gets full
if let Err(error) = self.fire_and_forget(cmd).await {
error!("Error pushing adult node periodic cmd to controller: {error:?}");
}
self.fire_and_forget(cmd).await;
}
}

Expand Down Expand Up @@ -171,9 +164,7 @@ impl FlowCtrl {

for cmd in cmds {
// dont use sender here incase channel gets full
if let Err(error) = self.fire_and_forget(cmd).await {
error!("Error pushing adult node periodic cmd to controller: {error:?}");
}
self.fire_and_forget(cmd).await;
}
}

Expand Down Expand Up @@ -235,20 +226,12 @@ impl FlowCtrl {

/// Generates a probe msg, which goes to our elders in order to
/// passively maintain network knowledge over time
async fn probe_the_section(node: Arc<RwLock<Node>>) -> Option<Cmd> {
async fn probe_the_section(node: Arc<RwLock<Node>>) -> Cmd {
let node = node.read().await;

// Send a probe message to an elder
info!("Starting to probe section");

let prefix = node.network_knowledge().prefix();

// Send a probe message to an elder
if !prefix.is_empty() {
Some(node.generate_section_probe_msg())
} else {
None
}
node.generate_section_probe_msg()
}

/// Generates a probe msg, which goes to all section elders in order to
Expand Down
24 changes: 7 additions & 17 deletions sn_node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ mod core {
messaging::{
data::OperationId,
signature_aggregator::SignatureAggregator,
system::{DkgSessionId, NodeState, SystemMsg},
system::{DkgSessionId, NodeState},
AuthorityProof, SectionAuth, SectionAuthorityProvider,
},
network_knowledge::{
Expand Down Expand Up @@ -319,20 +319,13 @@ mod core {
}

let matching_section = self.network_knowledge.section_by_name(&dst)?;

let section_key = matching_section.section_key();
let recipients = matching_section.elders_set();

info!(
"ProbeMsg target {:?} w/key {:?}",
matching_section.prefix(),
section_key
);
let probe = self.network_knowledge.anti_entropy_probe();

info!("ProbeMsg target {:?}: {probe:?}", matching_section.prefix());

Ok(self.send_system_msg(
SystemMsg::AntiEntropyProbe(section_key),
Peers::Multiple(recipients),
))
Ok(self.send_system_msg(probe, Peers::Multiple(recipients)))
}

/// Generates a SectionProbeMsg with our current knowledge,
Expand All @@ -341,18 +334,15 @@ mod core {
pub(crate) fn generate_section_probe_msg(&self) -> Cmd {
let our_section = self.network_knowledge.authority_provider();
let recipients = our_section.elders_set();
let our_key = our_section.section_key();

info!(
"ProbeMsg target our section {:?} recipients {:?}",
our_section.prefix(),
recipients,
);

self.send_system_msg(
SystemMsg::AntiEntropyProbe(our_key),
Peers::Multiple(recipients),
)
let probe = self.network_knowledge.anti_entropy_probe();
self.send_system_msg(probe, Peers::Multiple(recipients))
}

/// returns names that are relatively dysfunctional
Expand Down

0 comments on commit 5194123

Please sign in to comment.