Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Resend messages on failure #1705

Merged
merged 15 commits into from Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion appveyor.yml
Expand Up @@ -24,7 +24,7 @@ configuration:
- Release

build_script:
- cargo check --verbose --release --lib --tests
- cargo check --verbose --release --features=mock_base --lib --tests

test_script:
- cargo test --verbose --release --features=mock -- --nocapture
2 changes: 1 addition & 1 deletion scripts/clippy
Expand Up @@ -4,7 +4,7 @@ set -x -e
export RUSTFLAGS="-C opt-level=2 -C codegen-units=8"

cargo fmt -- --check
cargo clippy $@ --all-targets
#cargo clippy $@ --all-targets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this to be commented out?
if does not required any more?
shall it be removed, instead of comment out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's temporary - this is because the non-mock code no longer compiles until quic-p2p's API is updated to match what we did here in mock-quic-p2p. Once quic-p2p is updated, this will be restored.

cargo clippy $@ --all-targets --features=mock_base
cargo clippy $@ --all-targets --features=mock_parsec
cargo clippy $@ --all-targets --features=mock_serialise
Expand Down
33 changes: 17 additions & 16 deletions src/chain/chain.rs
Expand Up @@ -1091,7 +1091,7 @@ impl Chain {
&self,
dst: &Authority<XorName>,
connected_peers: &[&XorName],
) -> Result<BTreeSet<XorName>, Error> {
) -> Result<(Vec<XorName>, usize), Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better update the comment to explain what this usize about?

// FIXME: only filtering for now to match RT.
// should confirm if needed esp after msg_relay changes.
let is_connected = |target_name: &XorName| connected_peers.contains(&target_name);
Expand Down Expand Up @@ -1119,17 +1119,17 @@ impl Chain {
})
};

let (best_section_len, best_section) = match *dst {
let (best_section_len, mut best_section) = match *dst {
Authority::ManagedNode(ref target_name)
| Authority::Client {
proxy_node_name: ref target_name,
..
} => {
if target_name == self.our_id().name() {
return Ok(BTreeSet::new());
return Ok((Vec::new(), 0));
}
if self.has(target_name) && is_connected(&target_name) {
return Ok(iter::once(*target_name).collect());
return Ok((vec![*target_name], 1));
}
candidates(target_name)?
}
Expand All @@ -1139,7 +1139,8 @@ impl Chain {
if let Some(group) =
self.other_closest_names(target_name, self.min_sec_size, &connected_peers)
{
return Ok(group.into_iter().collect());
let group_len = group.len();
return Ok((group, group_len));
}
candidates(target_name)?
}
Expand All @@ -1152,8 +1153,9 @@ impl Chain {

// FIXME: only doing this for now to match RT.
// should confirm if needed esp after msg_relay changes.
section = section.into_iter().filter(is_connected).collect();
return Ok(section);
let section: Vec<_> = section.into_iter().filter(is_connected).collect();
let dg_size = section.len();
return Ok((section, dg_size));
}
candidates(target_name)?
}
Expand All @@ -1174,24 +1176,23 @@ impl Chain {
}
};

let mut targets = Iterator::flatten(
let targets = Iterator::flatten(
self.all_sections()
.filter_map(is_compatible)
.map(SectionInfo::member_names),
)
.filter(is_connected)
.collect::<BTreeSet<_>>();
let _ = targets.remove(&self.our_id().name());
return Ok(targets);
.filter(|name| name != self.our_id().name())
.collect::<Vec<_>>();
let dg_size = targets.len();
return Ok((targets, dg_size));
}
candidates(&prefix.lower_bound())?
}
};
Ok(best_section
.into_iter()
.filter(|&x| x != *self.our_id().name())
.take(delivery_group_size(best_section_len))
.collect())

best_section.retain(|&x| x != *self.our_id().name());
Ok((best_section, delivery_group_size(best_section_len)))
}

/// Returns our own section, including our own name.
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Expand Up @@ -170,6 +170,7 @@ mod event_stream;
mod id;
mod message_filter;
mod messages;
mod network_service;
mod node;
mod outbox;
mod peer_manager;
Expand Down Expand Up @@ -255,10 +256,9 @@ pub(crate) type NetworkBytes = bytes::Bytes;
#[cfg(feature = "mock_serialise")]
pub(crate) type NetworkBytes = Box<crate::messages::Message>;

pub(crate) use self::network_service::NetworkService;
pub use self::quic_p2p::Config as NetworkConfig;
pub(crate) use self::quic_p2p::{
Event as NetworkEvent, Peer as ConnectionInfo, QuicP2p as NetworkService,
};
pub(crate) use self::quic_p2p::{Event as NetworkEvent, Peer as ConnectionInfo, QuicP2p};

#[cfg(test)]
mod tests {
Expand Down
19 changes: 17 additions & 2 deletions src/mock/quic_p2p/mod.rs
Expand Up @@ -82,8 +82,8 @@ impl QuicP2p {
/// If the peer is not connected, it will attempt to connect to it first
/// and then send the message. This can be called multiple times while the peer is still being
/// connected to - all the sends will be buffered until the peer is connected to.
pub fn send(&mut self, peer: Peer, msg: NetworkBytes) {
self.inner.borrow_mut().send(peer.peer_addr(), msg)
pub fn send(&mut self, peer: Peer, msg: NetworkBytes, msg_id: u64) {
self.inner.borrow_mut().send(peer.peer_addr(), msg, msg_id)
}

/// Get our connection info to give to others for them to connect to us
Expand Down Expand Up @@ -204,6 +204,21 @@ pub enum Event {
peer_addr: SocketAddr,
/// Message content.
msg: NetworkBytes,
/// Message ID
msg_id: u64,
},
/// Message sent by us and we won't receive UnsentUserMessage for this one.
/// Either it was sent successfully or it will fail too late for the failure
/// to be detected.
/// In most cases, this should be synonimous with success. It is safe to consider
fizyk20 marked this conversation as resolved.
Show resolved Hide resolved
/// a failure beyond this point as a byzantine fault.
SentUserMessage {
/// Intended message recipient.
peer_addr: SocketAddr,
/// Message content.
msg: NetworkBytes,
/// Message ID
msg_id: u64,
},
/// Connection successfuly established.
ConnectedTo {
Expand Down
20 changes: 13 additions & 7 deletions src/mock/quic_p2p/network.rs
Expand Up @@ -142,13 +142,18 @@ impl Network {

fn process_packet(&self, connection: &Connection, packet: Packet) {
let response = if let Some(dst) = self.find_node(&connection.dst) {
let msg = if let Packet::Message(ref msg, msg_id) = packet {
Some(Packet::MessageSent(msg.clone(), msg_id))
} else {
None
};
dst.borrow_mut().receive_packet(connection.src, packet);
None
msg
} else {
match packet {
Packet::BootstrapRequest(_) => Some(Packet::BootstrapFailure),
Packet::ConnectRequest(_) => Some(Packet::ConnectFailure),
Packet::Message(msg) => Some(Packet::MessageFailure(msg)),
Packet::Message(msg, msg_id) => Some(Packet::MessageFailure(msg, msg_id)),
_ => None,
}
};
Expand Down Expand Up @@ -266,8 +271,9 @@ pub(super) enum Packet {
ConnectRequest(OurType),
ConnectSuccess,
ConnectFailure,
Message(NetworkBytes),
MessageFailure(NetworkBytes),
Message(NetworkBytes, u64),
MessageFailure(NetworkBytes, u64),
MessageSent(NetworkBytes, u64),
Disconnect,
}

Expand All @@ -276,7 +282,7 @@ impl Packet {
#[cfg(not(feature = "mock_serialise"))]
pub fn is_parsec_gossip(&self) -> bool {
match self {
Packet::Message(bytes) if bytes.len() >= 8 => {
Packet::Message(bytes, _) if bytes.len() >= 8 => {
&bytes[..8] == PARSEC_REQ_MSG_TAGS || &bytes[..8] == PARSEC_RSP_MSG_TAGS
}
_ => false,
Expand All @@ -288,7 +294,7 @@ impl Packet {
use crate::messages::{DirectMessage, Message};

match self {
Packet::Message(ref message) => match **message {
Packet::Message(ref message, _) => match **message {
Message::Direct(ref message) => match message.content() {
DirectMessage::ParsecRequest(..) | DirectMessage::ParsecResponse(..) => true,
_ => false,
Expand Down Expand Up @@ -321,7 +327,7 @@ impl Queue {
.0
.iter()
.position(|packet| {
if let Packet::Message(_) = packet {
if let Packet::Message(_, _) = packet {
false
} else {
true
Expand Down
41 changes: 29 additions & 12 deletions src/mock/quic_p2p/node.rs
Expand Up @@ -27,7 +27,7 @@ pub(super) struct Node {
peers: FxHashMap<SocketAddr, ConnectionType>,
bootstrap_cache: FxHashSet<NodeInfo>,
pending_bootstraps: FxHashSet<SocketAddr>,
pending_messages: FxHashMap<SocketAddr, Vec<NetworkBytes>>,
pending_messages: FxHashMap<SocketAddr, Vec<(NetworkBytes, u64)>>,
}

impl Node {
Expand Down Expand Up @@ -98,12 +98,12 @@ impl Node {
}
}

pub fn send(&mut self, dst: SocketAddr, msg: NetworkBytes) {
pub fn send(&mut self, dst: SocketAddr, msg: NetworkBytes, msg_id: u64) {
if self.peers.contains_key(&dst) {
self.send_message(dst, msg)
self.send_message(dst, msg, msg_id)
} else {
self.send_connect_request(dst);
self.add_pending_message(dst, msg)
self.add_pending_message(dst, msg, msg_id)
}
}

Expand Down Expand Up @@ -158,6 +158,7 @@ impl Node {
self.network
.borrow_mut()
.send(self.addr, src, Packet::ConnectSuccess);
self.send_pending_messages(src);

self.fire_event(Event::ConnectedTo {
peer: Peer::new(peer_type, src),
Expand All @@ -179,13 +180,29 @@ impl Node {
// attempts, only when a previously successfully established connection gets
// dropped.
}
Packet::Message(msg) => self.fire_event(Event::NewMessage {
Packet::Message(msg, msg_id) => {
if self.peers.contains_key(&src) {
self.fire_event(Event::NewMessage {
peer_addr: src,
msg,
})
} else {
self.network.borrow_mut().send(
self.addr,
src,
Packet::MessageFailure(msg, msg_id),
)
}
}
Packet::MessageFailure(msg, msg_id) => self.fire_event(Event::UnsentUserMessage {
peer_addr: src,
msg,
msg_id,
}),
Packet::MessageFailure(msg) => self.fire_event(Event::UnsentUserMessage {
Packet::MessageSent(msg, msg_id) => self.fire_event(Event::SentUserMessage {
peer_addr: src,
msg,
msg_id,
}),
Packet::Disconnect => {
if self.peers.remove(&src).is_some() {
Expand Down Expand Up @@ -223,17 +240,17 @@ impl Node {
.send(self.addr, dst, Packet::ConnectRequest(self.config.our_type))
}

fn send_message(&self, dst: SocketAddr, msg: NetworkBytes) {
fn send_message(&self, dst: SocketAddr, msg: NetworkBytes, msg_id: u64) {
self.network
.borrow_mut()
.send(self.addr, dst, Packet::Message(msg))
.send(self.addr, dst, Packet::Message(msg, msg_id))
}

fn add_pending_message(&mut self, addr: SocketAddr, msg: NetworkBytes) {
fn add_pending_message(&mut self, addr: SocketAddr, msg: NetworkBytes, msg_id: u64) {
self.pending_messages
.entry(addr)
.or_insert_with(Default::default)
.push(msg)
.push((msg, msg_id))
}

fn send_pending_messages(&mut self, addr: SocketAddr) {
Expand All @@ -243,8 +260,8 @@ impl Node {
return;
};

for msg in messages {
self.send_message(addr, msg)
for (msg, msg_id) in messages {
self.send_message(addr, msg, msg_id)
}
}
}
Expand Down