Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: remove join timeout - to be handled by the upper layers instead
Browse files Browse the repository at this point in the history
Also removed the obsolete test for losing proxy connection. That situation should be also handled by the upper layers using a timeout.
  • Loading branch information
madadam authored and dirvine committed Oct 8, 2020
1 parent a36e1eb commit cb4f6fe
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 178 deletions.
10 changes: 2 additions & 8 deletions src/node/stage/bootstrapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ use crate::{
use std::{iter, net::SocketAddr, sync::Arc};
use xor_name::Prefix;

// TODO: review if we still need to set a timeout for joining
/// Time after which bootstrap is cancelled (and possibly returnried).
// pub const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(20);

// The bootstrapping stage - node is trying to find the section to join.
pub(crate) struct Bootstrapping {
pub node_info: NodeInfo,
Expand Down Expand Up @@ -113,17 +109,15 @@ impl Bootstrapping {
);

let relocate_payload = self.join_section(&elders_info)?;
let (state, commands) = Joining::new(
let (state, command) = Joining::new(
elders_info,
section_key,
relocate_payload,
self.node_info.clone(),
)?;
let state = State::Joining(state);

Ok(iter::once(Command::Transition(Box::new(state)))
.chain(commands)
.collect())
Ok(vec![Command::Transition(Box::new(state)), command])
}
BootstrapResponse::Rebootstrap(new_conn_infos) => {
info!(
Expand Down
66 changes: 25 additions & 41 deletions src/node/stage/joining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::{approved::Approved, command, Command, NodeInfo, State};
use super::{approved::Approved, Command, NodeInfo, State};
use crate::{
error::{Error, Result},
event::{Connected, Event},
Expand All @@ -16,12 +16,9 @@ use crate::{
section::{EldersInfo, SharedState},
DstLocation, MIN_AGE,
};
use std::{net::SocketAddr, time::Duration};
use std::net::SocketAddr;
use xor_name::Prefix;

/// Time after which an attempt to joining a section is cancelled (and possibly retried).
pub const JOIN_TIMEOUT: Duration = Duration::from_secs(60);

// The joining stage - node is waiting to be approved by the section.
pub(crate) struct Joining {
pub node_info: NodeInfo,
Expand All @@ -31,7 +28,6 @@ pub(crate) struct Joining {
section_key: bls::PublicKey,
// Whether we are joining as infant or relocating.
join_type: JoinType,
timer_token: u64,
}

impl Joining {
Expand All @@ -40,22 +36,20 @@ impl Joining {
section_key: bls::PublicKey,
relocate_payload: Option<RelocatePayload>,
node_info: NodeInfo,
) -> Result<(Self, Vec<Command>)> {
) -> Result<(Self, Command)> {
let join_type = match relocate_payload {
Some(payload) => JoinType::Relocate(payload),
None => JoinType::First,
};

let mut stage = Self {
let state = Self {
node_info,
elders_info,
section_key,
join_type,
timer_token: 0,
};
let commands = stage.send_join_requests()?;
let command = state.send_join_requests()?;

Ok((stage, commands))
Ok((state, command))
}

pub fn handle_message(
Expand All @@ -75,11 +69,14 @@ impl Joining {
section_key,
}) => {
verify_message(&msg, None)?;
self.handle_bootstrap_response(
msg.src().to_sender_node(sender)?,
elders_info.clone(),
*section_key,
)
Ok(self
.handle_bootstrap_response(
msg.src().to_sender_node(sender)?,
elders_info.clone(),
*section_key,
)?
.into_iter()
.collect())
}
Variant::NodeApproval(payload) => {
let trusted_key = match &self.join_type {
Expand Down Expand Up @@ -114,24 +111,14 @@ impl Joining {
}
}

pub fn handle_timeout(&mut self, token: u64) -> Result<Vec<Command>> {
if token == self.timer_token {
debug!("Timeout when trying to join a section");
// Try again
self.send_join_requests()
} else {
Ok(vec![])
}
}

fn handle_bootstrap_response(
&mut self,
sender: Peer,
new_elders_info: EldersInfo,
new_section_key: bls::PublicKey,
) -> Result<Vec<Command>> {
) -> Result<Option<Command>> {
if new_section_key == self.section_key {
return Ok(vec![]);
return Ok(None);
}

if new_elders_info.prefix.matches(&self.node_info.name()) {
Expand All @@ -141,13 +128,14 @@ impl Joining {
);
self.elders_info = new_elders_info;
self.section_key = new_section_key;
self.send_join_requests()

Ok(Some(self.send_join_requests()?))
} else {
warn!(
"Newer Join response not for our prefix {:?} from {:?}",
new_elders_info, sender,
);
Ok(vec![])
Ok(None)
}
}

Expand All @@ -166,7 +154,7 @@ impl Joining {
}
}

fn send_join_requests(&mut self) -> Result<Vec<Command>> {
fn send_join_requests(&self) -> Result<Command> {
let (relocate_payload, age) = match &self.join_type {
JoinType::First { .. } => (None, MIN_AGE),
JoinType::Relocate(payload) => (Some(payload), payload.relocate_details().age),
Expand Down Expand Up @@ -197,15 +185,11 @@ impl Joining {
None,
)?;

self.timer_token = command::next_timer_token();

Ok(vec![
Command::send_message_to_targets(&recipients, recipients.len(), message.to_bytes()),
Command::ScheduleTimeout {
duration: JOIN_TIMEOUT,
token: self.timer_token,
},
])
Ok(Command::send_message_to_targets(
&recipients,
recipients.len(),
message.to_bytes(),
))
}

fn in_dst_location(&self, dst: &DstLocation) -> bool {
Expand Down
11 changes: 5 additions & 6 deletions src/node/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,16 @@ impl Stage {
msg: Message,
) -> Result<Vec<Command>> {
match &mut *self.state.lock().await {
State::Bootstrapping(stage) => stage.handle_message(sender, msg),
State::Joining(stage) => stage.handle_message(sender, msg),
State::Approved(stage) => stage.handle_message(sender, msg),
State::Bootstrapping(state) => state.handle_message(sender, msg),
State::Joining(state) => state.handle_message(sender, msg),
State::Approved(state) => state.handle_message(sender, msg),
}
}

async fn handle_timeout(&self, token: u64) -> Result<Vec<Command>> {
match &mut *self.state.lock().await {
State::Bootstrapping(_) => Ok(vec![]),
State::Joining(stage) => stage.handle_timeout(token),
State::Approved(stage) => stage.handle_timeout(token),
State::Approved(state) => state.handle_timeout(token),
_ => Ok(vec![]),
}
}

Expand Down
121 changes: 0 additions & 121 deletions src/node/tests/bootstrapping.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/node/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#[cfg(feature = "mock")]
mod adult;
#[cfg(feature = "mock")]
mod bootstrapping;
#[cfg(feature = "mock")]
mod elder;
#[cfg(feature = "mock")]
mod utils;
Expand Down

0 comments on commit cb4f6fe

Please sign in to comment.