Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat!: support adding additional proof chain keys to user messages
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Added `additional_proof_chain_key` parameter to  `Routing::send_message`, added `proof_chain` field to `Event::MessageReceived`.
  • Loading branch information
madadam committed Mar 25, 2021
1 parent 2e10be6 commit 2275730
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 61 deletions.
4 changes: 3 additions & 1 deletion examples/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ async fn handle_event(index: usize, node: &mut Routing, event: Event) -> bool {
index, elders.prefix, elders.key, sibling_elders, elders.elders, self_status_change
);
}
Event::MessageReceived { content, src, dst } => info!(
Event::MessageReceived {
content, src, dst, ..
} => info!(
"Node #{} received message - src: {:?}, dst: {:?}, content: {}",
index,
src,
Expand Down
2 changes: 1 addition & 1 deletion examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ impl Network {
aggregation: Aggregation::None,
};

match node.send_message(itinerary, bytes).await {
match node.send_message(itinerary, bytes, None).await {
Ok(()) => Ok(true),
Err(RoutingError::InvalidSrcLocation) => Ok(false), // node name changed
Err(error) => {
Expand Down
7 changes: 6 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::section::SectionChain;
use bytes::Bytes;
use ed25519_dalek::Keypair;
use hex_fmt::HexFmt;
Expand Down Expand Up @@ -78,6 +79,8 @@ pub enum Event {
src: SrcLocation,
/// The destination location that receives the message.
dst: DstLocation,
/// The proof chain for the message.
proof_chain: SectionChain,
},
/// A new peer joined our section.
MemberJoined {
Expand Down Expand Up @@ -136,7 +139,9 @@ impl Debug for Event {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match self {
Self::Genesis => write!(formatter, "Genesis"),
Self::MessageReceived { content, src, dst } => write!(
Self::MessageReceived {
content, src, dst, ..
} => write!(
formatter,
"MessageReceived {{ content: \"{:<8}\", src: {:?}, dst: {:?} }}",
HexFmt(content),
Expand Down
62 changes: 14 additions & 48 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,9 +1022,7 @@ impl Approved {
}

fn handle_user_message(&mut self, msg: &Message, content: Bytes) -> Result<Vec<Command>> {
let src = msg.src().clone();
let dst = *msg.dst();
if let DstLocation::EndUser(end_user) = &dst {
if let DstLocation::EndUser(end_user) = msg.dst() {
let recipients = match end_user {
EndUser::AllClients(public_key) => {
self.get_all_socket_addr(public_key).copied().collect()
Expand All @@ -1046,45 +1044,12 @@ impl Approved {
message: MessageType::ClientMessage(ClientMessage::from(content)?),
}]);
}
if let SrcAuthority::BlsShare {
proof_share,
src_name,
..
} = &src
{
let signed_bytes = bincode::serialize(&msg.signable_view())?;
match self
.message_accumulator
.add(&signed_bytes, proof_share.clone())
{
Ok(proof) => {
trace!("Successfully aggregated signatures for message: {:?}", msg);
let key = msg.proof_chain_last_key()?;
if key.verify(&proof.signature, signed_bytes) {
self.send_event(Event::MessageReceived {
content,
src: SrcLocation::Section(*src_name),
dst,
});
} else {
trace!(
"Aggregated signature is invalid. Handling message {:?} skipped",
msg
);
}
}
Err(AggregatorError::NotEnoughShares) => {}
Err(err) => {
trace!("Error accumulating message at destination: {:?}", err);
}
}
return Ok(vec![]);
}

self.send_event(Event::MessageReceived {
content,
src: src.src_location(),
dst,
src: msg.src().src_location(),
dst: *msg.dst(),
proof_chain: msg.proof_chain()?.clone(),
});
Ok(vec![])
}
Expand Down Expand Up @@ -2070,6 +2035,7 @@ impl Approved {
&mut self,
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<&bls::PublicKey>,
) -> Result<Vec<Command>> {
let are_we_src = itinerary.src.equals(&self.node.name())
|| itinerary.src.equals(&self.section().prefix().name());
Expand All @@ -2092,6 +2058,7 @@ impl Approved {
}

let variant = Variant::UserMessage(content);
let proof_chain = self.create_proof_chain(&itinerary.dst, additional_proof_chain_key)?;

// If the msg is to be aggregated at dst, we don't vote among our peers, we simply send the
// msg as our vote to the dst.
Expand All @@ -2101,18 +2068,18 @@ impl Approved {
itinerary.src.name(),
itinerary.dst,
variant,
self.create_proof_chain(&itinerary.dst, None)?,
proof_chain,
None,
)?
} else if itinerary.aggregate_at_src() {
let vote = self.create_accumulate_at_src_vote(itinerary.dst, variant, None)?;
let vote = self.create_accumulate_at_src_vote(itinerary.dst, variant, proof_chain);
let recipients = delivery_group::signature_targets(
&itinerary.dst,
self.section.elders_info().peers().copied(),
);
return self.send_vote(&recipients, vote);
} else {
Message::single_src(&self.node, itinerary.dst, variant, None, None)?
Message::single_src(&self.node, itinerary.dst, variant, Some(proof_chain), None)?
};
let mut commands = vec![];

Expand All @@ -2137,10 +2104,10 @@ impl Approved {
src: XorName,
dst: DstLocation,
variant: Variant,
proof_chain_first_key: Option<&bls::PublicKey>,
additional_proof_chain_key: Option<&bls::PublicKey>,
recipients: &[Peer],
) -> Result<Vec<Command>> {
let proof_chain = self.create_proof_chain(&dst, proof_chain_first_key)?;
let proof_chain = self.create_proof_chain(&dst, additional_proof_chain_key)?;
let dst_key = if let Some(name) = dst.name() {
*self.section_key_by_name(&name)
} else {
Expand Down Expand Up @@ -2215,9 +2182,8 @@ impl Approved {
&self,
dst: DstLocation,
variant: Variant,
proof_chain_first_key: Option<&bls::PublicKey>,
) -> Result<Vote> {
let proof_chain = self.create_proof_chain(&dst, proof_chain_first_key)?;
proof_chain: SectionChain,
) -> Vote {
let dst_key = if let Some(name) = dst.name() {
*self.section_key_by_name(&name)
} else {
Expand All @@ -2241,7 +2207,7 @@ impl Approved {

trace!("Create {:?}", vote);

Ok(vote)
vote
}

fn create_proof_chain(
Expand Down
8 changes: 7 additions & 1 deletion src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub(crate) enum Command {
SendUserMessage {
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<bls::PublicKey>,
},
/// Schedule a timeout after the given duration. When the timeout expires, a `HandleTimeout`
/// command is raised. The token is used to identify the timeout.
Expand Down Expand Up @@ -150,10 +151,15 @@ impl Debug for Command {
.field("delivery_group_size", delivery_group_size)
.field("message", message)
.finish(),
Self::SendUserMessage { itinerary, content } => f
Self::SendUserMessage {
itinerary,
content,
additional_proof_chain_key,
} => f
.debug_struct("SendUserMessage")
.field("itinerary", itinerary)
.field("content", &format_args!("{:10}", HexFmt(content)))
.field("additional_proof_chain_key", additional_proof_chain_key)
.finish(),
Self::ScheduleTimeout { duration, token } => f
.debug_struct("ScheduleTimeout")
Expand Down
17 changes: 15 additions & 2 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,16 @@ impl Routing {
/// Send a message.
/// Messages sent here, either section to section or node to node are signed
/// and validated upon receipt by routing itself.
pub async fn send_message(&self, itinerary: Itinerary, content: Bytes) -> Result<()> {
///
/// `additional_proof_chain_key` is a key to be included in the proof chain attached to the
/// message. This is useful when the message contains some data that is signed with a different
/// key than the whole message is so that the recipient can verify such key.
pub async fn send_message(
&self,
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<bls::PublicKey>,
) -> Result<()> {
if let DstLocation::EndUser(EndUser::Client {
socket_id,
public_key,
Expand All @@ -330,7 +339,11 @@ impl Routing {
debug!("Sending user message instead.. (Command::SendUserMessage)");
}
}
let command = Command::SendUserMessage { itinerary, content };
let command = Command::SendUserMessage {
itinerary,
content,
additional_proof_chain_key,
};
self.stage.clone().handle_commands(command).await
}

Expand Down
14 changes: 9 additions & 5 deletions src/routing/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,15 @@ impl Stage {
self.send_message(&recipients, delivery_group_size, message)
.await
}
Command::SendUserMessage { itinerary, content } => self
.state
.lock()
.await
.send_user_message(itinerary, content),
Command::SendUserMessage {
itinerary,
content,
additional_proof_chain_key,
} => self.state.lock().await.send_user_message(
itinerary,
content,
additional_proof_chain_key.as_ref(),
),
Command::ScheduleTimeout { duration, token } => Ok(self
.handle_schedule_timeout(duration, token)
.await
Expand Down
1 change: 1 addition & 0 deletions src/routing/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,7 @@ async fn message_to_self(dst: MessageDst) -> Result<()> {
aggregation: Aggregation::None,
},
content: content.clone(),
additional_proof_chain_key: None,
})
.await?;

Expand Down
5 changes: 3 additions & 2 deletions tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async fn test_messages_client_node() -> Result<()> {
aggregation: Aggregation::None,
},
query_clone.clone().serialize()?,
None,
)
.await?;
break;
Expand Down Expand Up @@ -160,7 +161,7 @@ async fn test_messages_between_nodes() -> Result<()> {
};

node2
.send_message(itinerary, Bytes::from_static(msg))
.send_message(itinerary, Bytes::from_static(msg), None)
.await?;

println!("msg sent");
Expand All @@ -178,7 +179,7 @@ async fn test_messages_between_nodes() -> Result<()> {

// send response from node1 to node2
node1
.send_message(itinerary, Bytes::from_static(response))
.send_message(itinerary, Bytes::from_static(response), None)
.await?;

println!("checking response received..");
Expand Down

0 comments on commit 2275730

Please sign in to comment.