Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat(accumulation): add support for accumlation at dest node
Browse files Browse the repository at this point in the history
BREAKING CHANGE: this uses a new version of sn_messaging with a breaking
change
  • Loading branch information
lionel-faber committed Feb 25, 2021
1 parent a183e08 commit f892838
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ rand_chacha = "~0.2.2"
thiserror = "1.0.23"
xor_name = "1.1.0"
resource_proof = "0.8.0"
sn_messaging = "~5.0.0"
sn_messaging = { git = "https://github.com/lionel1704/sn_messaging", branch = "acc-at-dest" }
sn_data_types = "~0.15.0"

[dependencies.bls]
Expand Down
1 change: 1 addition & 0 deletions examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ impl Network {
let dst = match dst {
DstLocation::Section(name) => name,
DstLocation::Node(name) => name,
DstLocation::AccumulatingNode(name) => name,
DstLocation::Direct | DstLocation::EndUser(_) => {
return Err(format_err!("unexpected probe message dst: {:?}", dst))
}
Expand Down
3 changes: 2 additions & 1 deletion src/delivery_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub(crate) fn delivery_targets(
let target_name = user.name();
section_candidates(&target_name, our_name, section, network)?
}
DstLocation::Node(target_name) => {
DstLocation::Node(target_name) | DstLocation::AccumulatingNode(target_name) => {
if target_name == our_name {
return Ok((Vec::new(), 0));
}
Expand Down Expand Up @@ -167,6 +167,7 @@ where
{
let dst_name = match dst {
DstLocation::Node(name) => *name,
DstLocation::AccumulatingNode(name) => *name,
DstLocation::Section(name) => *name,
DstLocation::EndUser(_) | DstLocation::Direct => {
error!("Invalid destination for signature targets: {:?}", dst);
Expand Down
62 changes: 61 additions & 1 deletion src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use crate::{
crypto::{self, name, Verifier},
error::{Error, Result},
node::Node,
section::{ExtendError, SectionProofChain, TrustStatus},
section::{ExtendError, SectionKeyShare, SectionProofChain, TrustStatus},
};
use bls_signature_aggregator::ProofShare;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use sn_messaging::DstLocation;
Expand Down Expand Up @@ -76,6 +77,16 @@ impl Message {
return Err(CreateError::FailedSignature);
}
}
SrcAuthority::BlsShare { proof_share, .. } => {
if !proof_share
.public_key_set
.public_key_share(proof_share.index)
.verify(&proof_share.signature_share, &signed_bytes)
{
error!("Failed signature: {:?}", msg);
return Err(CreateError::FailedSignature);
}
}
SrcAuthority::Section { signature, .. } => {
if let Some(proof_chain) = msg.proof_chain.as_ref() {
// FIXME Assumes the nodes proof last key is the one signing this message
Expand Down Expand Up @@ -122,6 +133,44 @@ impl Message {
Ok(msg)
}

/// Creates a message signed using a BLS KeyShare for
/// destination accumulation
pub(crate) fn for_dst_accumulation(
node: &Node,
key_share: &SectionKeyShare,
dst: DstLocation,
variant: Variant,
proof_chain: Option<SectionProofChain>,
dst_key: Option<bls::PublicKey>,
) -> Result<Self, CreateError> {
let serialized = bincode::serialize(&SignableView {
dst: &dst,
dst_key: dst_key.as_ref(),
variant: &variant,
})?;
let signature_share = key_share.secret_key_share.sign(&serialized);
let proof_share = ProofShare {
public_key_set: key_share.public_key_set.clone(),
index: key_share.index,
signature_share,
};
let src = SrcAuthority::BlsShare {
proof_share,
public_key: node.keypair.public,
age: node.age,
};

Self::new_signed(src, dst, variant, proof_chain, dst_key)
}

pub(crate) fn signable_view(&self) -> SignableView {
SignableView {
dst: &self.dst,
dst_key: self.dst_key.as_ref(),
variant: &self.variant,
}
}

/// Creates a signed message from single node.
pub(crate) fn single_src(
node: &Node,
Expand Down Expand Up @@ -192,6 +241,17 @@ impl Message {
.map(|(_, key)| key);
self.variant.verify(self.proof_chain.as_ref(), trusted_keys)
}
SrcAuthority::BlsShare { proof_share, .. } => {
if proof_share
.public_key_set
.public_key_share(proof_share.index)
.verify(&proof_share.signature_share, &bytes)
{
Ok(VerifyStatus::Full)
} else {
Err(Error::FailedSignature)
}
}
SrcAuthority::Section { prefix, signature } => {
// Proof chain is required for section-src messages.
let proof_chain = if let Some(proof_chain) = self.proof_chain.as_ref() {
Expand Down
23 changes: 20 additions & 3 deletions src/messages/src_authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
error::{Error, Result},
peer::Peer,
};
use bls_signature_aggregator::ProofShare;
use serde::{Deserialize, Serialize};
use sn_messaging::SrcLocation;
use std::net::SocketAddr;
Expand All @@ -19,8 +20,9 @@ use xor_name::{Prefix, XorName};
/// Source authority of a message.
/// Src of message and authority to send it. Authority is validated by the signature.
/// Messages do not need to sign this field as it is all verifiable (i.e. if the sig validates
/// agains the pub key and we know th epub key then we are good. If the proof is not recodnised we
/// ask for a longer chain that can be recodnised). Therefor we don't need to sign this field.
/// agains the pub key and we know th epub key then we are good. If the proof is not recognised we
/// ask for a longer chain that can be recognised). Therefore we don't need to sign this field.
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum SrcAuthority {
/// Authority of a single peer.
Expand All @@ -32,6 +34,16 @@ pub enum SrcAuthority {
/// ed-25519 signature of the message corresponding to the public key of the source peer.
signature: SimpleSignature,
},
/// Authority of a single peer that used
/// it's BLS Keyshare to sign the message.
BlsShare {
/// Public key of the source peer.
public_key: PublicKey,
/// Age of the source peer.
age: u8,
/// Proof Share signed by the peer's BLS KeyShare
proof_share: ProofShare,
},
/// Authority of a whole section.
Section {
/// Prefix of the source section.
Expand All @@ -45,6 +57,7 @@ impl SrcAuthority {
pub(crate) fn src_location(&self) -> SrcLocation {
match self {
Self::Node { public_key, .. } => SrcLocation::Node(name(public_key)),
Self::BlsShare { public_key, .. } => SrcLocation::Node(name(public_key)),
Self::Section { prefix, .. } => SrcLocation::Section(*prefix),
}
}
Expand All @@ -56,6 +69,7 @@ impl SrcAuthority {
pub(crate) fn to_node_name(&self) -> Result<XorName> {
match self {
Self::Node { public_key, .. } => Ok(name(public_key)),
Self::BlsShare { public_key, .. } => Ok(name(public_key)),
Self::Section { .. } => Err(Error::InvalidSrcLocation),
}
}
Expand All @@ -66,6 +80,9 @@ impl SrcAuthority {
Self::Section { .. } => Err(Error::InvalidSrcLocation),
Self::Node {
public_key, age, ..
}
| Self::BlsShare {
public_key, age, ..
} => Ok(Peer::new(name(public_key), addr, *age)),
}
}
Expand All @@ -74,7 +91,7 @@ impl SrcAuthority {
pub(crate) fn as_section_prefix(&self) -> Result<&Prefix> {
match self {
Self::Section { prefix, .. } => Ok(prefix),
Self::Node { .. } => Err(Error::InvalidSrcLocation),
_ => Err(Error::InvalidSrcLocation),
}
}
}
53 changes: 49 additions & 4 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use super::{
enduser_registry::{EndUserRegistry, SocketId},
message_accumulator::MessageAccumulator,
Command, SplitBarrier,
};
use crate::{
Expand All @@ -21,7 +22,7 @@ use crate::{
message_filter::MessageFilter,
messages::{
JoinRequest, Message, MessageHash, MessageStatus, PlainMessage, ResourceProofResponse,
Variant, VerifyStatus,
SrcAuthority, Variant, VerifyStatus,
},
network::Network,
node::Node,
Expand Down Expand Up @@ -65,6 +66,7 @@ pub(crate) struct Approved {
section: Section,
network: Network,
section_keys_provider: SectionKeysProvider,
message_accumulator: MessageAccumulator,
vote_accumulator: VoteAccumulator,
split_barrier: SplitBarrier,
// Voter for DKG
Expand Down Expand Up @@ -99,6 +101,7 @@ impl Approved {
network: Network::new(),
section_keys_provider,
vote_accumulator: Default::default(),
message_accumulator: Default::default(),
split_barrier: Default::default(),
dkg_voter: Default::default(),
relocate_state: None,
Expand Down Expand Up @@ -665,7 +668,31 @@ impl Approved {
self.handle_join_request(msg.src().to_node_peer(sender)?, *join_request.clone())
}
Variant::UserMessage(content) => {
self.handle_user_message(msg.src().src_location(), *msg.dst(), content.clone())
if msg.dst() == &DstLocation::AccumulatingNode(self.node().name()) {
if let SrcAuthority::BlsShare { proof_share, .. } = msg.src() {
match self.message_accumulator.add(
&bincode::serialize(&msg.signable_view())?,
proof_share.clone(),
) {
Ok(_) => {
trace!("Successfully accumulated message: {:?}", msg);
self.handle_user_message(
msg.src().src_location(),
*msg.dst(),
content.clone(),
)
}
Err(err) => {
trace!("Error accumulating message at destination: {:?}", err);
Ok(vec![])
}
}
} else {
Err(Error::InvalidSrcLocation)
}
} else {
self.handle_user_message(msg.src().src_location(), *msg.dst(), content.clone())
}
}
Variant::BouncedUntrustedMessage(message) => {
let sender = sender.ok_or(Error::InvalidSrcLocation)?;
Expand Down Expand Up @@ -748,7 +775,9 @@ impl Approved {
// If elder, always handle UserMessage, otherwise handle it only if addressed directly to us
// as a node.
fn should_handle_user_message(&self, dst: &DstLocation) -> bool {
self.is_elder() || dst == &DstLocation::Node(self.node.name())
self.is_elder()
|| dst == &DstLocation::Node(self.node.name())
|| dst == &DstLocation::AccumulatingNode(self.node.name())
}

// Decide how to handle a `Vote` message.
Expand Down Expand Up @@ -2016,12 +2045,28 @@ impl Approved {
return Err(Error::InvalidDstLocation);
}

if matches!(dst, DstLocation::AccumulatingNode(_)) && !matches!(src, SrcLocation::Node(_)) {
error!("Not sending user message {:?} -> {:?}: src should be a single node for dst accumulation", src, dst);
return Err(Error::InvalidSrcLocation);
}

let variant = Variant::UserMessage(content);

match src {
SrcLocation::Node(_) => {
// If the source is a single node, we don't even need to vote, so let's cut this short.
let msg = Message::single_src(&self.node, dst, variant, None, None)?;
let msg = if matches!(dst, DstLocation::AccumulatingNode(_)) {
Message::for_dst_accumulation(
&self.node,
self.section_keys_provider.key_share()?,
dst,
variant,
None,
None,
)?
} else {
Message::single_src(&self.node, dst, variant, None, None)?
};
let mut commands = vec![];

if dst.contains(&self.node.name(), self.section.prefix()) {
Expand Down
10 changes: 10 additions & 0 deletions src/routing/message_accumulator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use bls_signature_aggregator::{Error, Proof, ProofShare, SignatureAggregator};

#[derive(Default)]
pub(crate) struct MessageAccumulator(SignatureAggregator);

impl MessageAccumulator {
pub fn add(&mut self, payload: &[u8], proof_share: ProofShare) -> Result<Proof, Error> {
self.0.add(payload, proof_share)
}
}
1 change: 1 addition & 0 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod bootstrap;
mod comm;
mod enduser_registry;
mod event_stream;
mod message_accumulator;
mod split_barrier;
mod stage;
#[cfg(test)]
Expand Down

0 comments on commit f892838

Please sign in to comment.