Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Anti entropy #105

Merged
merged 19 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ rand_core = "~0.5.1"
rmp-serde = "~0.15.1"
serde_bytes = "0.11.5"
signature = "1.1.0"
sn_data_types = "~0.18"
sn_data_types = "~0.18.3"
thiserror = "1.0.23"
threshold_crypto = "~0.4.0"
xor_name = "1.1.10"
Expand Down
12 changes: 12 additions & 0 deletions src/client/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ pub enum Error {
/// Node failed to delete the requested data for some reason.
#[error("Failed to delete requested data")]
FailedToDelete,
/// Node does not manage any section funds.
#[error("Node does not currently manage any section funds")]
NoSectionFunds,
/// Node does not manage any metadata, so is likely not a fully prepared elder yet.
#[error("Node does not currently manage any section metadata")]
NoSectionMetaData,
/// Node does not manage any immutable chunks.
#[error("Node does not currently manage any immutable chunks")]
NoImmutableChunks,
/// Node is currently churning so cannot perform the request.
#[error("Cannot complete request due to churning of funds")]
NodeChurningFunds,
/// The node hasn't left the section, and was not marked for relocation during reward operations
#[error("Node is not being relocated")]
NodeWasNotRelocated,
Expand Down
281 changes: 216 additions & 65 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod data_exchange;
mod duty;
mod errors;
mod map;
mod network;
mod query;
mod register;
mod sender;
Expand All @@ -32,11 +31,6 @@ pub use self::{
duty::{AdultDuties, Duty, ElderDuties, NodeDuties},
errors::{Error, Result},
map::{MapRead, MapWrite},
network::{
NodeCmd, NodeCmdError, NodeDataError, NodeEvent, NodeQuery, NodeQueryResponse,
NodeRewardQuery, NodeSystemCmd, NodeSystemQuery, NodeSystemQueryResponse, NodeTransferCmd,
NodeTransferError, NodeTransferQuery, NodeTransferQueryResponse,
},
query::Query,
register::{RegisterRead, RegisterWrite},
sender::{Address, MsgSender, TransientElderKey, TransientSectionKey},
Expand All @@ -57,16 +51,118 @@ use std::{
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
};
use threshold_crypto::PublicKey as BlsPublicKey;
use xor_name::XorName;

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ClientMsg {
Process(ProcessMsg),
ProcessingError(ProcessingError),
SupportingInfo(SupportingInfo),
}

/// Our response to a processing error. Anti entropy in that it updates the erroring node
/// with any relevant information, and includes the original message, which should hereafter
/// be actionable
// #[allow(clippy::large_enum_variant)]
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct SupportingInfo {
/// Supporting information for the source_message process
pub info: SupportingInfoFor,
/// The original message that triggered the error this update should be correcting
pub source_message: ProcessMsg,
/// MessageId
pub id: MessageId,
/// Correlates to a ProcessingError
pub correlation_id: MessageId,
}

/// Various types of supporting information that can be received and acted upon by a node.
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum SupportingInfoFor {}

impl SupportingInfo {
pub fn new(
info: SupportingInfoFor,
source_message: ProcessMsg,
correlation_id: MessageId,
id: MessageId,
) -> Self {
Self {
info,
source_message,
id,
correlation_id,
}
}

/// Get msg id
pub fn id(&self) -> MessageId {
self.id
}

/// Get source message that originally triggered a ProcessingError. This should usually be replayed at source after applying supporting information
pub fn source_message(&self) -> &ProcessMsg {
&self.source_message
}

/// Get the supporting information of this message
pub fn info(&self) -> &SupportingInfoFor {
&self.info
}

/// MessageId of the ProcessingError that triggered this InformationUpdate
pub fn correlation_id(&self) -> MessageId {
self.correlation_id
}
}

/// Our LazyMesssage error. Recipient was unable to process this message for some reason.
/// The original message should be returned in full, and context can optionally be added via
/// reason.
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct ProcessingError {
/// Optional reason for the error. This should help the receiving node handle the error
reason: Option<Error>,
/// Message that triggered this error
source_message: Option<ProcessMsg>,
/// MessageId
id: MessageId,
}

impl ProcessingError {
pub fn new(reason: Option<Error>, source_message: Option<ProcessMsg>, id: MessageId) -> Self {
Self {
reason,
source_message,
id,
}
}

pub fn id(&self) -> MessageId {
self.id
}

pub fn source_message(&self) -> &Option<ProcessMsg> {
&self.source_message
}

pub fn reason(&self) -> &Option<Error> {
&self.reason
}
}

/// Message envelope containing a Safe message payload,
/// This struct also provides utilities to obtain the serialized bytes
/// ready to send them over the wire.
impl Message {
impl ClientMsg {
/// Convenience function to deserialize a 'Message' from bytes received over the wire.
/// It returns an error if the bytes don't correspond to a client message.
pub fn from(bytes: Bytes) -> crate::Result<Self> {
let deserialized = WireMsg::deserialize(bytes)?;
if let MessageType::ClientMessage(msg) = deserialized {
if let MessageType::Client { msg, .. } = deserialized {
Ok(msg)
} else {
Err(crate::Error::FailedToParse(
Expand All @@ -75,16 +171,47 @@ impl Message {
}
}

/// serialize this Message into bytes ready to be sent over the wire.
pub fn serialize(&self) -> crate::Result<Bytes> {
WireMsg::serialize_client_msg(self)
/// Serialize this Message into bytes ready to be sent over the wire.
pub fn serialize(&self, dest: XorName, dest_section_pk: BlsPublicKey) -> crate::Result<Bytes> {
WireMsg::serialize_client_msg(self, dest, dest_section_pk)
}

/// Gets the message ID.
pub fn id(&self) -> MessageId {
match self {
Self::Process(ProcessMsg::Cmd { id, .. })
| Self::Process(ProcessMsg::Query { id, .. })
| Self::Process(ProcessMsg::Event { id, .. })
| Self::Process(ProcessMsg::QueryResponse { id, .. })
| Self::Process(ProcessMsg::CmdError { id, .. })
| Self::ProcessingError(ProcessingError { id, .. }) => *id,
Self::SupportingInfo(SupportingInfo { id, .. }) => *id,
}
}

/// return ProcessMessage if any
pub fn get_process(&self) -> Option<&ProcessMsg> {
match self {
Self::Process(msg) => Some(msg),
Self::ProcessingError(_) => None,
Self::SupportingInfo(msg) => Some(&msg.source_message()),
}
}

/// return ProcessMessage if any
pub fn get_processing_error(&self) -> Option<&ProcessingError> {
match self {
Self::Process(_) => None,
Self::SupportingInfo(_) => None,
Self::ProcessingError(error) => Some(error),
}
}
}

///
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum Message {
pub enum ProcessMsg {
/// A Cmd is leads to a write / change of state.
/// We expect them to be successful, and only return a msg
/// if something went wrong.
Expand Down Expand Up @@ -128,63 +255,27 @@ pub enum Message {
/// ID of causing cmd.
correlation_id: MessageId,
},
/// Cmds only sent internally in the network.
NodeCmd {
/// NodeCmd.
cmd: NodeCmd,
/// Message ID.
id: MessageId,
},
/// An error of a NodeCmd.
NodeCmdError {
/// The error.
error: NodeCmdError,
/// Message ID.
id: MessageId,
/// ID of causing cmd.
correlation_id: MessageId,
},
/// Events only sent internally in the network.
NodeEvent {
/// Request.
event: NodeEvent,
/// Message ID.
id: MessageId,
/// ID of causing cmd.
correlation_id: MessageId,
},
/// Queries is a read-only operation.
NodeQuery {
/// Query.
query: NodeQuery,
/// Message ID.
id: MessageId,
},
/// The response to a query, containing the query result.
NodeQueryResponse {
/// QueryResponse.
response: NodeQueryResponse,
/// Message ID.
id: MessageId,
/// ID of causing query.
correlation_id: MessageId,
},
}

impl Message {
impl ProcessMsg {
pub fn create_processing_error(&self, reason: Option<Error>) -> ProcessingError {
ProcessingError {
source_message: Some(self.clone()),
id: MessageId::new(),
reason,
}
}
}

impl ProcessMsg {
/// Gets the message ID.
pub fn id(&self) -> MessageId {
match self {
Self::Cmd { id, .. }
| Self::Query { id, .. }
| Self::Event { id, .. }
| Self::QueryResponse { id, .. }
| Self::CmdError { id, .. }
| Self::NodeCmd { id, .. }
| Self::NodeEvent { id, .. }
| Self::NodeQuery { id, .. }
| Self::NodeCmdError { id, .. }
| Self::NodeQueryResponse { id, .. } => *id,
| Self::CmdError { id, .. } => *id,
}
}
}
Expand Down Expand Up @@ -386,7 +477,7 @@ try_from!(ActorHistory, GetHistory);
mod tests {
use super::*;
use anyhow::{anyhow, Result};
use sn_data_types::{Keypair, PublicBlob, UnseqMap};
use sn_data_types::{BlobAddress, DataAddress, Keypair, PublicBlob, UnseqMap};
use std::convert::{TryFrom, TryInto};

fn gen_keypairs() -> Vec<Keypair> {
Expand All @@ -406,6 +497,64 @@ mod tests {
gen_keypairs().iter().map(PublicKey::from).collect()
}

#[test]
fn debug_format_functional() -> Result<()> {
if let Some(key) = gen_keys().first() {
let errored_response = QueryResponse::GetSequence(Err(Error::AccessDenied(*key)));
assert!(format!("{:?}", errored_response)
.contains("GetSequence(Err(AccessDenied(PublicKey::"));
Ok(())
} else {
Err(anyhow!("Could not generate public key"))
}
}

#[test]
fn generate_processing_error() -> Result<()> {
if let Some(key) = gen_keys().first() {
let msg = ProcessMsg::Query {
query: Query::Transfer(TransferQuery::GetBalance(*key)),
id: MessageId::new(),
};
let random_addr = DataAddress::Blob(BlobAddress::Public(XorName::random()));
let lazy_error =
msg.create_processing_error(Some(Error::DataNotFound(random_addr.clone())));

assert!(format!("{:?}", lazy_error).contains("TransferQuery::GetBalance"));
assert!(format!("{:?}", lazy_error).contains("ProcessingError"));
assert!(
format!("{:?}", lazy_error).contains(&format!("DataNotFound({:?})", random_addr))
);

Ok(())
} else {
Err(anyhow!("Could not generate public key"))
}
}

#[test]
fn debug_format_processing_error() -> Result<()> {
if let Some(key) = gen_keys().first() {
let random_addr = DataAddress::Blob(BlobAddress::Public(XorName::random()));
let errored_response = ProcessingError {
reason: Some(Error::DataNotFound(random_addr.clone())),
source_message: Some(ProcessMsg::Query {
id: MessageId::new(),
query: Query::Transfer(TransferQuery::GetBalance(*key)),
}),
id: MessageId::new(),
};

assert!(format!("{:?}", errored_response).contains("TransferQuery::GetBalance"));
assert!(format!("{:?}", errored_response).contains("ProcessingError"));
assert!(format!("{:?}", errored_response)
.contains(&format!("DataNotFound({:?})", random_addr)));
Ok(())
} else {
Err(anyhow!("Could not generate public key"))
}
}

#[test]
fn try_from() -> Result<()> {
use QueryResponse::*;
Expand Down Expand Up @@ -457,14 +606,16 @@ mod tests {

let random_xor = xor_name::XorName::random();
let id = MessageId(random_xor);
let message = Message::Query {
let message = ClientMsg::Process(ProcessMsg::Query {
query: Query::Transfer(TransferQuery::GetBalance(pk)),
id,
};
});

// test msgpack serialization
let serialized = message.serialize()?;
let deserialized = Message::from(serialized)?;
let dest = XorName::random();
let dest_section_pk = threshold_crypto::SecretKey::random().public_key();
let serialized = message.serialize(dest, dest_section_pk)?;
let deserialized = ClientMsg::from(serialized)?;
assert_eq!(deserialized, message);

Ok(())
Expand Down