Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: support dst accumulation with any message variant
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Mar 18, 2021
1 parent c19093a commit cc2f413
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 7 deletions.
60 changes: 56 additions & 4 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
node::Node,
section::{SectionChain, SectionChainError, SectionKeyShare},
};
use bls_signature_aggregator::ProofShare;
use bls_signature_aggregator::{Proof, ProofShare};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use sn_messaging::{Aggregation, DstLocation};
Expand Down Expand Up @@ -84,10 +84,18 @@ impl Message {
error!("Failed signature: {:?}", msg);
return Err(CreateError::FailedSignature);
}

if Some(&proof_share.public_key_set.public_key()) != msg.proof_chain_last_key().ok()
{
error!(
"Proof share public key doesn't match proof chain last key: {:?}",
msg
);
return Err(CreateError::PublicKeyMismatch);
}
}
SrcAuthority::Section { signature, .. } => {
if let Some(proof_chain) = msg.proof_chain.as_ref() {
// FIXME Assumes the nodes proof last key is the one signing this message
if !proof_chain.last_key().verify(signature, &signed_bytes) {
error!(
"Failed signature: {:?} (proof chain: {:?})",
Expand Down Expand Up @@ -140,11 +148,10 @@ impl Message {
key_share: &SectionKeyShare,
src_name: XorName,
dst: DstLocation,
content: Bytes,
variant: Variant,
proof_chain: SectionChain,
dst_key: Option<bls::PublicKey>,
) -> Result<Self, CreateError> {
let variant = Variant::UserMessage(content);
let serialized = bincode::serialize(&SignableView {
dst: &dst,
dst_key: dst_key.as_ref(),
Expand All @@ -164,6 +171,44 @@ impl Message {
Self::new_signed(src, dst, variant, Some(proof_chain), dst_key)
}

/// Converts the message src authority from `BlsShare` to `Section` on successful accumulation.
/// Returns errors if src is not `BlsShare` or if the proof is invalid.
pub(crate) fn into_dst_accumulated(mut self, proof: Proof) -> Result<Self> {
let (proof_share, src_name) = if let SrcAuthority::BlsShare {
proof_share,
src_name,
} = &self.src
{
(proof_share.clone(), *src_name)
} else {
error!("not a message for dst accumulation");
return Err(Error::InvalidMessage);
};

if proof_share.public_key_set.public_key() != proof.public_key {
error!("proof public key doesn't match proof share public key");
return Err(Error::InvalidMessage);
}

if Some(&proof.public_key) != self.proof_chain_last_key().ok() {
error!("proof public key doesn't match proof chain last key");
return Err(Error::InvalidMessage);
}

let bytes = bincode::serialize(&self.signable_view())?;

if !proof.verify(&bytes) {
return Err(Error::FailedSignature);
}

self.src = SrcAuthority::Section {
signature: proof.signature,
src_name,
};

Ok(self)
}

pub(crate) fn signable_view(&self) -> SignableView {
SignableView {
dst: &self.dst,
Expand Down Expand Up @@ -246,6 +291,10 @@ impl Message {
return Err(Error::InvalidMessage);
};

if proof_share.public_key_set.public_key() != *proof_chain.last_key() {
return Err(Error::InvalidMessage);
}

if !proof_share.verify(&bytes) {
return Err(Error::FailedSignature);
}
Expand Down Expand Up @@ -399,13 +448,16 @@ pub enum CreateError {
Bincode(#[from] bincode::Error),
#[error("signature check failed")]
FailedSignature,
#[error("public key mismatch")]
PublicKeyMismatch,
}

impl From<CreateError> for Error {
fn from(src: CreateError) -> Self {
match src {
CreateError::Bincode(inner) => Self::Bincode(inner),
CreateError::FailedSignature => Self::FailedSignature,
CreateError::PublicKeyMismatch => Self::InvalidMessage,
}
}
}
Expand Down
38 changes: 35 additions & 3 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,12 +649,44 @@ impl Approved {
}
}

fn accumulate_message(&mut self, msg: Message) -> Result<Option<Message>> {
let proof_share = if let SrcAuthority::BlsShare { proof_share, .. } = msg.src() {
proof_share
} else {
// Not an accumulating message, return unchanged.
return Ok(Some(msg));
};

let signed_bytes = bincode::serialize(&msg.signable_view())?;
match self
.message_accumulator
.add(&signed_bytes, proof_share.clone())
{
Ok(proof) => {
trace!("Successfully accumulated signatures for message: {:?}", msg);
Ok(Some(msg.into_dst_accumulated(proof)?))
}
Err(AggregatorError::NotEnoughShares) => Ok(None),
Err(err) => {
error!("Error accumulating message at destination: {:?}", err);
Err(Error::InvalidSignatureShare)
}
}
}

async fn handle_useful_message(
&mut self,
sender: Option<SocketAddr>,
msg: Message,
) -> Result<Vec<Command>> {
self.msg_filter.insert_incoming(&msg);

let msg = if let Some(msg) = self.accumulate_message(msg)? {
msg
} else {
return Ok(vec![]);
};

match msg.variant() {
Variant::NeighbourInfo { elders_info, .. } => {
if msg.dst().is_section() {
Expand Down Expand Up @@ -2095,26 +2127,26 @@ impl Approved {
return Err(Error::InvalidDstLocation);
}

let variant = Variant::UserMessage(content);

// If the msg is to be aggregated at dst, we don't vote among our peers, wemsimply send the msg as our vote to the dst.
let msg = if itinerary.aggregate_at_dst() {
Message::for_dst_accumulation(
self.section_keys_provider.key_share()?,
self.section().prefix().name(),
itinerary.dst,
content,
variant,
self.create_proof_chain(&itinerary.dst, None)?,
None,
)?
} else if itinerary.aggregate_at_src() {
let variant = Variant::UserMessage(content);
let vote = self.create_send_message_vote(itinerary.dst, variant, None)?;
let recipients = delivery_group::signature_targets(
&itinerary.dst,
self.section.elders_info().peers().copied(),
);
return self.send_vote(&recipients, vote);
} else {
let variant = Variant::UserMessage(content);
Message::single_src(&self.node, itinerary.dst, variant, None, None)?
};
let mut commands = vec![];
Expand Down

0 comments on commit cc2f413

Please sign in to comment.