Skip to content

Commit

Permalink
feat: Improve content propagation in sync (#1480)
Browse files Browse the repository at this point in the history
## Description

This improves content propagation for document sync by implementing the
following features.

* feat: allow to send gossip to neighbors only
* feat: provider and candidate roles for downloader
* feat: transfer content status during set reconciliation
* feat: inform neighbors about finished content downloads

## Notes & open questions

na

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.

---------

Co-authored-by: Diva M <divma@protonmail.com>
  • Loading branch information
Frando and divagant-martian committed Sep 21, 2023
1 parent 1bf55db commit 49bde4f
Show file tree
Hide file tree
Showing 14 changed files with 730 additions and 266 deletions.
2 changes: 1 addition & 1 deletion iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ anyhow = { version = "1", features = ["backtrace"] }
blake3 = { package = "iroh-blake3", version = "1.4.3"}
bytes = { version = "1.4.0", features = ["serde"] }
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["add", "debug", "display", "from", "try_into"] }
derive_more = { version = "1.0.0-beta.1", features = ["add", "debug", "display", "from", "try_into", "into"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
indexmap = "2.0"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ async fn subscribe_loop(gossip: Gossip, topic: TopicId) -> anyhow::Result<()> {
let mut stream = gossip.subscribe(topic).await?;
loop {
let event = stream.recv().await?;
if let Event::Received(data, _prev_peer) = event {
let (from, message) = SignedMessage::verify_and_decode(&data)?;
if let Event::Received(msg) = event {
let (from, message) = SignedMessage::verify_and_decode(&msg.content)?;
match message {
Message::AboutMe { name } => {
names.insert(from, name.clone());
Expand Down
94 changes: 53 additions & 41 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use genawaiter::sync::{Co, Gen};
use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, AddrInfo, MagicEndpoint, PeerAddr};
use rand::rngs::StdRng;
use rand_core::SeedableRng;
use std::{collections::HashMap, fmt, future::Future, sync::Arc, task::Poll, time::Instant};
use std::{collections::HashMap, future::Future, sync::Arc, task::Poll, time::Instant};
use tokio::{
sync::{broadcast, mpsc, oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, trace, warn};

use self::util::{read_message, write_message, Dialer, Timers};
use crate::proto::{self, TopicId};
use crate::proto::{self, Scope, TopicId};

pub mod util;

Expand Down Expand Up @@ -50,16 +50,16 @@ type ProtoMessage = proto::Message<PublicKey>;
/// Each topic is a separate broadcast tree with separate memberships.
///
/// A topic has to be joined before you can publish or subscribe on the topic.
/// To join the swarm for a topic, you have to know the [PublicKey] of at least one peer that also joined the topic.
/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic.
///
/// Messages published on the swarm will be delivered to all peers that joined the swarm for that
/// topic. You will also be relaying (gossiping) messages published by other peers.
///
/// With the default settings, the protocol will maintain up to 5 peer connections per topic.
///
/// Even though the [`Gossip`] is created from a [MagicEndpoint], it does not accept connections
/// Even though the [`Gossip`] is created from a [`MagicEndpoint`], it does not accept connections
/// itself. You should run an accept loop on the MagicEndpoint yourself, check the ALPN protocol of incoming
/// connections, and if the ALPN protocol equals [GOSSIP_ALPN], forward the connection to the
/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the
/// gossip actor through [Self::handle_connection].
///
/// The gossip actor will, however, initiate new connections to other peers by itself.
Expand Down Expand Up @@ -149,20 +149,33 @@ impl Gossip {
Ok(())
}

/// Broadcast a message on a topic.
/// Broadcast a message on a topic to all peers in the swarm.
///
/// This does not join the topic automatically, so you have to call [Self::join] yourself
/// This does not join the topic automatically, so you have to call [`Self::join`] yourself
/// for messages to be broadcast to peers.
pub async fn broadcast(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
self.send(ToActor::Broadcast(topic, message, tx)).await?;
self.send(ToActor::Broadcast(topic, message, Scope::Swarm, tx))
.await?;
rx.await??;
Ok(())
}

/// Broadcast a message on a topic to the immediate neighbors.
///
/// This does not join the topic automatically, so you have to call [`Self::join`] yourself
/// for messages to be broadcast to peers.
pub async fn broadcast_neighbors(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
self.send(ToActor::Broadcast(topic, message, Scope::Neighbors, tx))
.await?;
rx.await??;
Ok(())
}

/// Subscribe to messages and event notifications for a topic.
///
/// Does not join the topic automatically, so you have to call [Self::join] yourself
/// Does not join the topic automatically, so you have to call [`Self::join`] yourself
/// to actually receive messages.
pub async fn subscribe(&self, topic: TopicId) -> anyhow::Result<broadcast::Receiver<Event>> {
let (tx, rx) = oneshot::channel();
Expand All @@ -173,7 +186,7 @@ impl Gossip {

/// Subscribe to all events published on topics that you joined.
///
/// Note that this method takes self by value. Usually you would clone the [Gossip] handle.
/// Note that this method takes self by value. Usually you would clone the [`Gossip`] handle.
/// before.
pub fn subscribe_all(self) -> impl Stream<Item = anyhow::Result<(TopicId, Event)>> {
Gen::new(|co| async move {
Expand All @@ -196,7 +209,7 @@ impl Gossip {
}
}

/// Pass an incoming [quinn::Connection] to the gossip actor.
/// Handle an incoming [`quinn::Connection`].
///
/// Make sure to check the ALPN protocol yourself before passing the connection.
pub async fn handle_connection(&self, conn: quinn::Connection) -> anyhow::Result<()> {
Expand Down Expand Up @@ -256,42 +269,37 @@ enum ConnOrigin {
}

/// Input messages for the gossip [`Actor`].
#[derive(derive_more::Debug)]
enum ToActor {
/// Handle a new QUIC connection, either from accept (external to the actor) or from connect
/// (happens internally in the actor).
ConnIncoming(PublicKey, ConnOrigin, quinn::Connection),
ConnIncoming(PublicKey, ConnOrigin, #[debug(skip)] quinn::Connection),
/// Join a topic with a list of peers. Reply with oneshot once at least one peer joined.
Join(TopicId, Vec<PublicKey>, oneshot::Sender<anyhow::Result<()>>),
Join(
TopicId,
Vec<PublicKey>,
#[debug(skip)] oneshot::Sender<anyhow::Result<()>>,
),
/// Leave a topic, send disconnect messages and drop all state.
Quit(TopicId),
/// Broadcast a message on a topic.
Broadcast(TopicId, Bytes, oneshot::Sender<anyhow::Result<()>>),
Broadcast(
TopicId,
#[debug("<{}b>", _1.len())] Bytes,
Scope,
#[debug(skip)] oneshot::Sender<anyhow::Result<()>>,
),
/// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a
/// topic.
Subscribe(
TopicId,
oneshot::Sender<anyhow::Result<broadcast::Receiver<Event>>>,
#[debug(skip)] oneshot::Sender<anyhow::Result<broadcast::Receiver<Event>>>,
),
/// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a
/// topic.
SubscribeAll(oneshot::Sender<anyhow::Result<broadcast::Receiver<(TopicId, Event)>>>),
}

impl fmt::Debug for ToActor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ToActor::ConnIncoming(peer_id, origin, _conn) => {
write!(f, "ConnIncoming({peer_id:?}, {origin:?})")
}
ToActor::Join(topic, peers, _reply) => write!(f, "Join({topic:?}, {peers:?})"),
ToActor::Quit(topic) => write!(f, "Quit({topic:?})"),
ToActor::Broadcast(topic, message, _reply) => {
write!(f, "Broadcast({topic:?}, bytes<{}>)", message.len())
}
ToActor::Subscribe(topic, _reply) => write!(f, "Subscribe({topic:?})"),
ToActor::SubscribeAll(_reply) => write!(f, "SubscribeAll"),
}
}
SubscribeAll(
#[debug(skip)] oneshot::Sender<anyhow::Result<broadcast::Receiver<(TopicId, Event)>>>,
),
}

/// Actor that sends and handles messages between the connection and main state loops
Expand Down Expand Up @@ -340,7 +348,8 @@ impl Actor {
},
_ = self.on_endpoints_rx.changed() => {
let info = self.endpoint.my_addr().await?;
let peer_data = postcard::to_stdvec(&info)?;
let peer_data = Bytes::from(postcard::to_stdvec(&info)?);

self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?;
}
(peer_id, res) = self.dialer.next_conn() => {
Expand Down Expand Up @@ -429,9 +438,12 @@ impl Actor {
.await?;
self.subscribers_topic.remove(&topic_id);
}
ToActor::Broadcast(topic_id, message, reply) => {
self.handle_in_event(InEvent::Command(topic_id, Command::Broadcast(message)), now)
.await?;
ToActor::Broadcast(topic_id, message, scope, reply) => {
self.handle_in_event(
InEvent::Command(topic_id, Command::Broadcast(message, scope)),
now,
)
.await?;
reply.send(Ok(())).ok();
}
ToActor::Subscribe(topic_id, reply) => {
Expand Down Expand Up @@ -687,8 +699,8 @@ mod test {
loop {
let ev = stream2.recv().await.unwrap();
info!("go2 event: {ev:?}");
if let Event::Received(msg, _prev_peer) = ev {
recv.push(msg);
if let Event::Received(msg) = ev {
recv.push(msg.content);
}
if recv.len() == len {
return recv;
Expand All @@ -702,8 +714,8 @@ mod test {
loop {
let ev = stream3.recv().await.unwrap();
info!("go3 event: {ev:?}");
if let Event::Received(msg, _prev_peer) = ev {
recv.push(msg);
if let Event::Received(msg) = ev {
recv.push(msg.content);
}
if recv.len() == len {
return recv;
Expand Down
34 changes: 28 additions & 6 deletions iroh-gossip/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod util;
#[cfg(test)]
mod tests;

pub use plumtree::Scope;
pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
pub use topic::{Command, Config, Event, IO};

Expand All @@ -79,7 +80,20 @@ impl<T> PeerIdentity for T where T: Hash + Eq + Copy + fmt::Debug + Serialize +
///
/// Implementations may use these bytes to supply addresses or other information needed to connect
/// to a peer that is not included in the peer's [`PeerIdentity`].
pub type PeerData = bytes::Bytes;
#[derive(
derive_more::Debug,
Serialize,
Deserialize,
Clone,
PartialEq,
Eq,
derive_more::From,
derive_more::Into,
derive_more::Deref,
Default,
)]
#[debug("PeerData({}b)", self.0.len())]
pub struct PeerData(bytes::Bytes);

/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand All @@ -106,7 +120,7 @@ mod test {
assert_synchronous_active, report_round_distribution, sort, Network, Simulator,
SimulatorConfig,
},
TopicId,
Scope, TopicId,
};

#[test]
Expand Down Expand Up @@ -215,10 +229,14 @@ mod test {
assert!(assert_synchronous_active(&network));

// now broadcast a first message
network.command(1, t, Command::Broadcast(b"hi1".to_vec().into()));
network.command(
1,
t,
Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
);
network.ticks(broadcast_ticks);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_, _))));
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
// message should be received by two other nodes
assert_eq!(received.count(), 2);
assert!(assert_synchronous_active(&network));
Expand All @@ -230,10 +248,14 @@ mod test {
report_round_distribution(&network);

// now broadcast again
network.command(1, t, Command::Broadcast(b"hi2".to_vec().into()));
network.command(
1,
t,
Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
);
network.ticks(broadcast_ticks);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_, _))));
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
// message should be received by all 5 other nodes
assert_eq!(received.count(), 5);
assert!(assert_synchronous_active(&network));
Expand Down
Loading

0 comments on commit 49bde4f

Please sign in to comment.