Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: updates for section key response changes
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef authored and maqi committed Feb 16, 2021
1 parent c54f034 commit 71f89d8
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 88 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ lru_time_cache = "~0.11.0"
qp2p = "~0.9.16"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
sn_messaging = { path= "../sn_messaging" }
sn_messaging = { git= "https://github.com/joshuef/sn_messaging/", branch = "Feat-SectionKeyOnMessage" }
thiserror = "1.0.23"
xor_name = "1.1.0"
resource_proof = "0.8.0"
Expand Down
75 changes: 49 additions & 26 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ use ed25519_dalek::Verifier;
use itertools::Itertools;
use resource_proof::ResourceProof;
use sn_messaging::{
client::Error as ClientError,
infrastructure::{GetSectionResponse, Query},
infrastructure::{
Error as InfrastructureError, GetSectionResponse, InfrastructureInformation,
Message as InfrastructureMessage,
},
node::NodeMessage,
MessageType,
};
Expand Down Expand Up @@ -198,25 +200,29 @@ impl Approved {
Ok(commands)
}

pub async fn handle_infrastructure_query(
pub async fn handle_infrastructure_message(
&mut self,
sender: SocketAddr,
message: Query,
message: InfrastructureMessage,
) -> Vec<Command> {
match message {
Query::GetSectionRequest(name) => {
InfrastructureMessage::GetSectionRequest(name) => {
debug!("Received GetSectionRequest({}) from {}", name, sender);

let response = if self.section.prefix().matches(&name) {
GetSectionResponse::Success {
prefix: self.section.elders_info().prefix,
key: *self.section.chain().last_key(),
elders: self
.section
.elders_info()
.peers()
.map(|peer| (*peer.name(), *peer.addr()))
.collect(),
if let Ok(pk_set) = self.public_key_set() {
GetSectionResponse::Success(InfrastructureInformation {
prefix: self.section.elders_info().prefix,
pk_set,
elders: self
.section
.elders_info()
.peers()
.map(|peer| (*peer.name(), *peer.addr()))
.collect(),
})
} else {
GetSectionResponse::SectionInfrastructureError(InfrastructureError::NoSectionPkSet)
}
} else {
// If we are elder, we should know a section that is closer to `name` that us.
Expand All @@ -228,28 +234,28 @@ impl Approved {
let addrs = section.peers().map(Peer::addr).copied().collect();
GetSectionResponse::Redirect(addrs)
};
let response = Query::GetSectionResponse(response);
let response = InfrastructureMessage::GetSectionResponse(response);
debug!("Sending {:?} to {}", response, sender);

vec![Command::SendMessage {
recipients: vec![sender],
delivery_group_size: 1,
message: MessageType::InfrastructureQuery(response),
message: MessageType::InfrastructureMessage(response),
}]
}
Query::GetSectionResponse(_) => {
InfrastructureMessage::GetSectionResponse(_) => {
if let Some(RelocateState::InProgress(tx)) = &mut self.relocate_state {
trace!("Forwarding {:?} to the bootstrap task", message);
let _ = tx
.send((MessageType::InfrastructureQuery(message), sender))
.send((MessageType::InfrastructureMessage(message), sender))
.await;
}

vec![]
}
Query::SectionKeyResponse(_) => {
error!("Shall not receive an error response to client");
vec![]
InfrastructureMessage::InfrastructureError(_) => {
// TODO handle this...
unimplemented!()
}
}
}
Expand Down Expand Up @@ -1839,18 +1845,35 @@ impl Approved {
Ok(Some(command))
}

pub fn check_key_status(&self, bls_pk: &bls::PublicKey) -> Result<(), ClientError> {
pub fn check_key_status(&self, bls_pk: &bls::PublicKey) -> Result<(), InfrastructureError> {
if self.dkg_voter.has_ongoing_dkg() {
return Err(ClientError::DkgInProgress);
return Err(
InfrastructureError::DkgInProgress,
);
}
if !self.section.chain().has_key(bls_pk) {
return Err(ClientError::UnrecognizedSectionKey);
return Err(
InfrastructureError::UnrecognizedSectionKey,
);
}
if bls_pk != self.section.chain().last_key() {
if let Ok(public_key_set) = self.public_key_set() {
return Err(ClientError::TargetSectionKeyIsNotCurrent(public_key_set));
return Err(
InfrastructureError::TargetSectionInfoOutdated(InfrastructureInformation {
prefix: *self.section.prefix(),
pk_set: public_key_set,
elders: self
.section
.elders_info()
.peers()
.map(|peer| (*peer.name(), *peer.addr()))
.collect(),
}),
);
} else {
return Err(ClientError::DkgInProgress);
return Err(
InfrastructureError::DkgInProgress,
);
}
}
Ok(())
Expand Down
95 changes: 53 additions & 42 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use bytes::Bytes;
use futures::future;
use resource_proof::ResourceProof;
use sn_messaging::{
infrastructure::{GetSectionResponse, Query},
infrastructure::{
GetSectionResponse, InfrastructureInformation, Message as InfrastructureMessage,
},
node::NodeMessage,
MessageType, WireMsg,
};
Expand Down Expand Up @@ -149,11 +151,12 @@ impl<'a> State<'a> {
let (response, sender) = self.receive_get_section_response().await?;

match response {
GetSectionResponse::Success {
GetSectionResponse::Success(InfrastructureInformation {
prefix,
key,
pk_set,
elders,
} => {
}) => {
let key = pk_set.public_key();
info!(
"Joining a section ({:b}), key: {:?}, elders: {:?} (given by {:?})",
prefix, key, elders, sender
Expand All @@ -167,6 +170,10 @@ impl<'a> State<'a> {
);
bootstrap_addrs = new_bootstrap_addrs.to_vec();
}
GetSectionResponse::SectionInfrastructureError(error) => {
error!("Handle infrastructure error: {:?}", error);
// TODO: handle
}
}
}
}
Expand All @@ -186,11 +193,11 @@ impl<'a> State<'a> {
None => self.node.name(),
};

let message = Query::GetSectionRequest(destination);
let message = InfrastructureMessage::GetSectionRequest(destination);

let _ = self
.send_tx
.send((MessageType::InfrastructureQuery(message), recipients))
.send((MessageType::InfrastructureMessage(message), recipients))
.await;

Ok(())
Expand All @@ -199,28 +206,30 @@ impl<'a> State<'a> {
async fn receive_get_section_response(&mut self) -> Result<(GetSectionResponse, SocketAddr)> {
while let Some((message, sender)) = self.recv_rx.next().await {
match message {
MessageType::InfrastructureQuery(Query::GetSectionResponse(response)) => {
match response {
GetSectionResponse::Redirect(addrs) if addrs.is_empty() => {
error!("Invalid GetSectionResponse::Redirect: missing peers");
continue;
}
GetSectionResponse::Success { prefix, .. }
if !prefix.matches(&self.node.name()) =>
{
error!("Invalid GetSectionResponse::Success: bad prefix");
continue;
}
GetSectionResponse::Redirect(_) | GetSectionResponse::Success { .. } => {
return Ok((response, sender))
}
MessageType::InfrastructureMessage(InfrastructureMessage::GetSectionResponse(
response,
)) => match response {
GetSectionResponse::Redirect(addrs) if addrs.is_empty() => {
error!("Invalid GetSectionResponse::Redirect: missing peers");
continue;
}
}
GetSectionResponse::Success(InfrastructureInformation { prefix, .. })
if !prefix.matches(&self.node.name()) =>
{
error!("Invalid GetSectionResponse::Success: bad prefix");
continue;
}
GetSectionResponse::Redirect(_)
| GetSectionResponse::Success { .. }
| GetSectionResponse::SectionInfrastructureError(_) => {
return Ok((response, sender))
}
},
MessageType::NodeMessage(NodeMessage(msg_bytes)) => {
let message = Message::from_bytes(Bytes::from(msg_bytes))?;
self.backlog_message(message, sender)
}
MessageType::InfrastructureQuery(_)
MessageType::InfrastructureMessage(_)
| MessageType::ClientMessage(_)
| MessageType::Ping => {}
}
Expand Down Expand Up @@ -377,7 +386,7 @@ impl<'a> State<'a> {
}
MessageType::Ping
| MessageType::ClientMessage(_)
| MessageType::InfrastructureQuery(_) => continue,
| MessageType::InfrastructureMessage(_) => continue,
};

match message.variant() {
Expand Down Expand Up @@ -605,20 +614,20 @@ mod tests {
let (message, recipients) = send_rx.try_recv()?;

assert_eq!(recipients, [bootstrap_addr]);
assert_matches!(message, MessageType::InfrastructureQuery(Query::GetSectionRequest(name)) => {
assert_matches!(message, MessageType::InfrastructureMessage(InfrastructureMessage::GetSectionRequest(name)) => {
assert_eq!(name, *peer.name());
});

// Send GetSectionResponse::Success
let message = Query::GetSectionResponse(GetSectionResponse::Success {
let message = InfrastructureMessage::GetSectionResponse(GetSectionResponse::Success {
prefix: elders_info.prefix,
key: pk,
elders: elders_info
.peers()
.map(|peer| (*peer.name(), *peer.addr()))
.collect(),
});
recv_tx.try_send((MessageType::InfrastructureQuery(message), bootstrap_addr))?;
recv_tx.try_send((MessageType::InfrastructureMessage(message), bootstrap_addr))?;
task::yield_now().await;

// Receive JoinRequest
Expand Down Expand Up @@ -685,17 +694,17 @@ mod tests {
assert_eq!(recipients, vec![bootstrap_node.addr]);
assert_matches!(
message,
MessageType::InfrastructureQuery(Query::GetSectionRequest(_))
MessageType::InfrastructureMessage(InfrastructureMessage::GetSectionRequest(_))
);

// Send GetSectionResponse::Redirect
let new_bootstrap_addrs: Vec<_> = (0..ELDER_SIZE).map(|_| gen_addr()).collect();
let message = Query::GetSectionResponse(GetSectionResponse::Redirect(
let message = InfrastructureMessage::GetSectionResponse(GetSectionResponse::Redirect(
new_bootstrap_addrs.clone(),
));

recv_tx.try_send((
MessageType::InfrastructureQuery(message),
MessageType::InfrastructureMessage(message),
bootstrap_node.addr,
))?;
task::yield_now().await;
Expand All @@ -706,7 +715,7 @@ mod tests {
assert_eq!(recipients, new_bootstrap_addrs);
assert_matches!(
message,
MessageType::InfrastructureQuery(Query::GetSectionRequest(_))
MessageType::InfrastructureMessage(InfrastructureMessage::GetSectionRequest(_))
);

Ok(())
Expand Down Expand Up @@ -739,31 +748,33 @@ mod tests {
let (message, _) = send_rx.try_recv()?;
assert_matches!(
message,
MessageType::InfrastructureQuery(Query::GetSectionRequest(_))
MessageType::InfrastructureMessage(InfrastructureMessage::GetSectionRequest(_))
);

let message = Query::GetSectionResponse(GetSectionResponse::Redirect(vec![]));
let message =
InfrastructureMessage::GetSectionResponse(GetSectionResponse::Redirect(vec![]));

recv_tx.try_send((
MessageType::InfrastructureQuery(message),
MessageType::InfrastructureMessage(message),
bootstrap_node.addr,
))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));

let addrs = (0..ELDER_SIZE).map(|_| gen_addr()).collect();
let message = Query::GetSectionResponse(GetSectionResponse::Redirect(addrs));
let message =
InfrastructureMessage::GetSectionResponse(GetSectionResponse::Redirect(addrs));

recv_tx.try_send((
MessageType::InfrastructureQuery(message),
MessageType::InfrastructureMessage(message),
bootstrap_node.addr,
))?;
task::yield_now().await;

let (message, _) = send_rx.try_recv()?;
assert_matches!(
message,
MessageType::InfrastructureQuery(Query::GetSectionRequest(_))
MessageType::InfrastructureMessage(InfrastructureMessage::GetSectionRequest(_))
);

Ok(())
Expand Down Expand Up @@ -810,10 +821,10 @@ mod tests {
let (message, _) = send_rx.try_recv()?;
assert_matches!(
message,
MessageType::InfrastructureQuery(Query::GetSectionRequest(_))
MessageType::InfrastructureMessage(InfrastructureMessage::GetSectionRequest(_))
);

let message = Query::GetSectionResponse(GetSectionResponse::Success {
let message = InfrastructureMessage::GetSectionResponse(GetSectionResponse::Success {
prefix: bad_prefix,
key: bls::SecretKey::random().public_key(),
elders: (0..ELDER_SIZE)
Expand All @@ -822,13 +833,13 @@ mod tests {
});

recv_tx.try_send((
MessageType::InfrastructureQuery(message),
MessageType::InfrastructureMessage(message),
bootstrap_node.addr,
))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));

let message = Query::GetSectionResponse(GetSectionResponse::Success {
let message = InfrastructureMessage::GetSectionResponse(GetSectionResponse::Success {
prefix: good_prefix,
key: bls::SecretKey::random().public_key(),
elders: (0..ELDER_SIZE)
Expand All @@ -837,7 +848,7 @@ mod tests {
});

recv_tx.try_send((
MessageType::InfrastructureQuery(message),
MessageType::InfrastructureMessage(message),
bootstrap_node.addr,
))?;

Expand Down

0 comments on commit 71f89d8

Please sign in to comment.