From d8cc9df6262f805f9cfd205973e67cb960879472 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Thu, 21 Sep 2023 18:13:03 +0200 Subject: [PATCH] fix: avoid double conns, better state tracking (#1505) ## Description On `main` the test `sync_full_basic` is flakey. This PR (hopefully) fixes it. The reason was: We have the situation that two peers initiate connections to each other at (roughly) the same time. In #1491 this was sometimes prevented, but not reliably. This PR fixes it by: * When connecting, we set a `SyncState::Dialing` for the `(namespace, peer)` * When accepting, if our own state is `Dialing` for the incoming request for `(namespace, peer)` we compare our peer id with that of the incoming request, and abort if ours is higher (doesn't matter which way, we care about a predictable outcome only * Through this, only one of the two simoultanoues connections will survive * Also added a `Abort` frame to the wire protocol to transfer to inform the dialer about the reason of the declined request, which is either "we do double sync, and will take the other conn" (`AlreadySyncing`) or "this replica is not here" (`NotAvailable`) This PR also: * Further improves logging * Improves errors ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. --------- Co-authored-by: Diva M --- iroh-net/src/key.rs | 2 +- iroh-sync/src/net.rs | 167 ++++++++++++++++++++++---------- iroh-sync/src/net/codec.rs | 116 ++++++++++++++-------- iroh-sync/src/sync.rs | 8 +- iroh/src/sync_engine/live.rs | 181 ++++++++++++++++++++++++++--------- iroh/tests/sync.rs | 108 +++++++++++---------- 6 files changed, 399 insertions(+), 183 deletions(-) diff --git a/iroh-net/src/key.rs b/iroh-net/src/key.rs index 0b2b7cc656..a2dbe158ee 100644 --- a/iroh-net/src/key.rs +++ b/iroh-net/src/key.rs @@ -121,7 +121,7 @@ impl From for PublicKey { impl Debug for PublicKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut text = data_encoding::BASE32_NOPAD.encode(self.as_bytes()); + let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]); text.make_ascii_lowercase(); write!(f, "PublicKey({text})") } diff --git a/iroh-sync/src/net.rs b/iroh-sync/src/net.rs index 41b126c487..d6eb3d2384 100644 --- a/iroh-sync/src/net.rs +++ b/iroh-sync/src/net.rs @@ -2,8 +2,8 @@ use std::{future::Future, net::SocketAddr}; -use anyhow::{Context, Result}; use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint}; +use serde::{Deserialize, Serialize}; use tracing::debug; use crate::{ @@ -30,18 +30,23 @@ pub async fn connect_and_sync( peer: PublicKey, derp_region: Option, addrs: &[SocketAddr], -) -> Result<()> { - debug!(peer = ?peer, "sync (via connect): start"); +) -> Result<(), ConnectError> { + debug!(?peer, "sync[dial]: connect"); let namespace = doc.namespace(); let connection = endpoint .connect((peer, derp_region, addrs).into(), SYNC_ALPN) .await - .context("failed to establish connection")?; - debug!(?peer, ?namespace, "sync (via connect): connected"); - let (mut send_stream, mut recv_stream) = connection.open_bi().await?; + .map_err(ConnectError::connect)?; + debug!(?peer, ?namespace, "sync[dial]: connected"); + let (mut send_stream, mut recv_stream) = + connection.open_bi().await.map_err(ConnectError::connect)?; let res = run_alice::(&mut send_stream, &mut recv_stream, doc, peer).await; - send_stream.finish().await?; - recv_stream.read_to_end(0).await?; + + send_stream.finish().await.map_err(ConnectError::close)?; + recv_stream + .read_to_end(0) + .await + .map_err(ConnectError::close)?; #[cfg(feature = "metrics")] if res.is_ok() { @@ -50,47 +55,32 @@ pub async fn connect_and_sync( inc!(Metrics, sync_via_connect_failure); } - debug!(peer = ?peer, ?res, "sync (via connect): done"); + debug!(?peer, ?namespace, ?res, "sync[dial]: done"); res } /// What to do with incoming sync requests -#[derive(Debug)] -pub enum AcceptOutcome { - /// This namespace is not available for sync. - NotAvailable, - /// This namespace is already syncing, therefore abort. - AlreadySyncing, - /// Accept the sync request. - Accept(Replica), -} - -impl From>> for AcceptOutcome { - fn from(replica: Option>) -> Self { - match replica { - Some(replica) => AcceptOutcome::Accept(replica), - None => AcceptOutcome::NotAvailable, - } - } -} +pub type AcceptOutcome = Result::Instance>, AbortReason>; /// Handle an iroh-sync connection and sync all shared documents in the replica store. pub async fn handle_connection( connecting: quinn::Connecting, accept_cb: F, -) -> std::result::Result<(NamespaceId, PublicKey), SyncError> +) -> Result<(NamespaceId, PublicKey), AcceptError> where S: store::Store, F: Fn(NamespaceId, PublicKey) -> Fut, Fut: Future>>, { - let connection = connecting.await.map_err(SyncError::connect)?; - let peer = get_peer_id(&connection).await.map_err(SyncError::connect)?; + let connection = connecting.await.map_err(AcceptError::connect)?; + let peer = get_peer_id(&connection) + .await + .map_err(AcceptError::connect)?; let (mut send_stream, mut recv_stream) = connection .accept_bi() .await - .map_err(|error| SyncError::open(peer, error))?; - debug!(peer = ?peer, "sync (via accept): start"); + .map_err(|e| AcceptError::open(peer, e))?; + debug!(?peer, "sync[accept]: handle"); let res = run_bob::(&mut send_stream, &mut recv_stream, accept_cb, peer).await; @@ -101,24 +91,30 @@ where inc!(Metrics, sync_via_accept_failure); } - debug!(peer = ?peer, ?res, "sync (via accept): done"); + let namespace = match &res { + Ok(namespace) => Some(*namespace), + Err(err) => err.namespace(), + }; - let namespace = res?; send_stream .finish() .await - .map_err(|error| SyncError::close(peer, namespace, error))?; + .map_err(|error| AcceptError::close(peer, namespace, error))?; recv_stream .read_to_end(0) .await - .map_err(|error| SyncError::close(peer, namespace, error))?; + .map_err(|error| AcceptError::close(peer, namespace, error))?; + let namespace = res?; + + debug!(?peer, ?namespace, "sync[accept]: done"); + Ok((namespace, peer)) } -/// Failure reasons for sync. +/// Errors that may occur on handling incoming sync connections. #[derive(thiserror::Error, Debug)] #[allow(missing_docs)] -pub enum SyncError { +pub enum AcceptError { /// Failed to establish connection #[error("Failed to establish connection")] Connect { @@ -132,6 +128,13 @@ pub enum SyncError { #[source] error: anyhow::Error, }, + /// We aborted the sync request. + #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")] + Abort { + peer: PublicKey, + namespace: NamespaceId, + reason: AbortReason, + }, /// Failed to run sync #[error("Failed to sync {namespace:?} with {peer:?}")] Sync { @@ -144,13 +147,52 @@ pub enum SyncError { #[error("Failed to close {namespace:?} with {peer:?}")] Close { peer: PublicKey, - namespace: NamespaceId, + namespace: Option, #[source] error: anyhow::Error, }, } -impl SyncError { +/// Errors that may occur on outgoing sync requests. +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum ConnectError { + /// Failed to establish connection + #[error("Failed to establish connection")] + Connect { + #[source] + error: anyhow::Error, + }, + /// The remote peer aborted the sync request. + #[error("Remote peer aborted sync: {0:?}")] + RemoteAbort(AbortReason), + /// We cancelled the operation + #[error("Cancelled")] + Cancelled, + /// Failed to run sync + #[error("Failed to sync")] + Sync { + #[source] + error: anyhow::Error, + }, + /// Failed to close + #[error("Failed to close connection1")] + Close { + #[source] + error: anyhow::Error, + }, +} + +/// Reason why we aborted an incoming sync request. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum AbortReason { + /// Namespace is not avaiable. + NotAvailable, + /// We are already syncing this namespace. + AlreadySyncing, +} + +impl AcceptError { fn connect(error: impl Into) -> Self { Self::Connect { error: error.into(), @@ -173,7 +215,11 @@ impl SyncError { error: error.into(), } } - fn close(peer: PublicKey, namespace: NamespaceId, error: impl Into) -> Self { + fn close( + peer: PublicKey, + namespace: Option, + error: impl Into, + ) -> Self { Self::Close { peer, namespace, @@ -183,20 +229,43 @@ impl SyncError { /// Get the peer's node ID (if available) pub fn peer(&self) -> Option { match self { - SyncError::Connect { .. } => None, - SyncError::Open { peer, .. } => Some(*peer), - SyncError::Sync { peer, .. } => Some(*peer), - SyncError::Close { peer, .. } => Some(*peer), + AcceptError::Connect { .. } => None, + AcceptError::Open { peer, .. } => Some(*peer), + AcceptError::Sync { peer, .. } => Some(*peer), + AcceptError::Close { peer, .. } => Some(*peer), + AcceptError::Abort { peer, .. } => Some(*peer), } } /// Get the namespace (if available) pub fn namespace(&self) -> Option { match self { - SyncError::Connect { .. } => None, - SyncError::Open { .. } => None, - SyncError::Sync { namespace, .. } => namespace.to_owned(), - SyncError::Close { namespace, .. } => Some(*namespace), + AcceptError::Connect { .. } => None, + AcceptError::Open { .. } => None, + AcceptError::Sync { namespace, .. } => namespace.to_owned(), + AcceptError::Close { namespace, .. } => namespace.to_owned(), + AcceptError::Abort { namespace, .. } => Some(*namespace), } } } + +impl ConnectError { + fn connect(error: impl Into) -> Self { + Self::Connect { + error: error.into(), + } + } + fn close(error: impl Into) -> Self { + Self::Close { + error: error.into(), + } + } + pub(crate) fn sync(error: impl Into) -> Self { + Self::Sync { + error: error.into(), + } + } + pub(crate) fn remote_abort(reason: AbortReason) -> Self { + Self::RemoteAbort(reason) + } +} diff --git a/iroh-sync/src/net/codec.rs b/iroh-sync/src/net/codec.rs index d2b7dc4391..d085b71652 100644 --- a/iroh-sync/src/net/codec.rs +++ b/iroh-sync/src/net/codec.rs @@ -1,6 +1,6 @@ use std::future::Future; -use anyhow::{anyhow, bail, ensure, Result}; +use anyhow::{anyhow, ensure}; use bytes::{Buf, BufMut, BytesMut}; use futures::SinkExt; use iroh_net::key::PublicKey; @@ -11,7 +11,7 @@ use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite}; use tracing::trace; use crate::{ - net::{AcceptOutcome, SyncError}, + net::{AbortReason, AcceptError, AcceptOutcome, ConnectError}, store, NamespaceId, Replica, }; @@ -23,10 +23,7 @@ const MAX_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; // This is likely too large, impl Decoder for SyncCodec { type Item = Message; type Error = anyhow::Error; - fn decode( - &mut self, - src: &mut BytesMut, - ) -> std::result::Result, Self::Error> { + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 4 { return Ok(None); } @@ -50,11 +47,7 @@ impl Decoder for SyncCodec { impl Encoder for SyncCodec { type Error = anyhow::Error; - fn encode( - &mut self, - item: Message, - dst: &mut BytesMut, - ) -> std::result::Result<(), Self::Error> { + fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { let len = postcard::serialize_with_flavor(&item, postcard::ser_flavors::Size::default()).unwrap(); ensure!( @@ -81,13 +74,17 @@ impl Encoder for SyncCodec { /// On any error and on success the substream is closed. #[derive(Debug, Clone, Serialize, Deserialize)] enum Message { + /// Init message (sent by the dialing peer) Init { /// Namespace to sync namespace: NamespaceId, /// Initial message message: crate::sync::ProtocolMessage, }, + /// Sync messages (sent by both peers) Sync(crate::sync::ProtocolMessage), + /// Abort message (sent by the accepting peer to decline a request) + Abort { reason: AbortReason }, } /// Runs the initiator side of the sync protocol. @@ -96,7 +93,7 @@ pub(super) async fn run_alice, other_peer_id: PublicKey, -) -> Result<()> { +) -> Result<(), ConnectError> { let other_peer_id = *other_peer_id.as_bytes(); let mut reader = FramedRead::new(reader, SyncCodec); let mut writer = FramedWrite::new(writer, SyncCodec); @@ -105,29 +102,38 @@ pub(super) async fn run_alice bob: {:#?}", init_message); - writer.send(init_message).await?; + writer + .send(init_message) + .await + .map_err(ConnectError::sync)?; // Sync message loop - while let Some(msg) = reader.next().await { - match msg? { + let msg = msg.map_err(ConnectError::sync)?; + match msg { Message::Init { .. } => { - bail!("unexpected message: init"); + return Err(ConnectError::sync(anyhow!("unexpected init message"))); } Message::Sync(msg) => { if let Some(msg) = alice .sync_process_message(msg, other_peer_id) - .map_err(Into::into)? + .map_err(ConnectError::sync)? { trace!("alice -> bob: {:#?}", msg); - writer.send(Message::Sync(msg)).await?; + writer + .send(Message::Sync(msg)) + .await + .map_err(ConnectError::sync)?; } else { break; } } + Message::Abort { reason } => { + return Err(ConnectError::remote_abort(reason)); + } } } @@ -140,7 +146,7 @@ pub(super) async fn run_bob( reader: &mut R, accept_cb: F, other_peer_id: PublicKey, -) -> std::result::Result +) -> Result where S: store::Store, R: AsyncRead + Unpin, @@ -149,10 +155,7 @@ where Fut: Future>>, { let mut state = BobState::::new(other_peer_id); - state - .run(writer, reader, accept_cb) - .await - .map_err(|err| SyncError::sync(state.peer, state.namespace(), err)) + state.run(writer, reader, accept_cb).await } struct BobState { @@ -168,7 +171,16 @@ impl BobState { } } - async fn run(&mut self, writer: W, reader: R, accept_cb: F) -> Result + pub fn fail(&self, reason: impl Into) -> AcceptError { + AcceptError::sync(self.peer, self.namespace(), reason.into()) + } + + async fn run( + &mut self, + writer: W, + reader: R, + accept_cb: F, + ) -> Result where R: AsyncRead + Unpin, W: AsyncWrite + Unpin, @@ -178,15 +190,23 @@ impl BobState { let mut reader = FramedRead::new(reader, SyncCodec); let mut writer = FramedWrite::new(writer, SyncCodec); while let Some(msg) = reader.next().await { - let next = match (msg?, self.replica.as_ref()) { + let msg = msg.map_err(|e| self.fail(e))?; + let next = match (msg, self.replica.as_ref()) { (Message::Init { namespace, message }, None) => { - let replica = match accept_cb(namespace, self.peer).await? { - AcceptOutcome::Accept(replica) => replica, - AcceptOutcome::NotAvailable => { - bail!("abort sync: {namespace:?} not available") - } - AcceptOutcome::AlreadySyncing => { - bail!("abort sync: {namespace:?} already syncing") + let accept = accept_cb(namespace, self.peer).await; + let accept = accept.map_err(|e| self.fail(e))?; + let replica = match accept { + Ok(replica) => replica, + Err(reason) => { + writer + .send(Message::Abort { reason }) + .await + .map_err(|e| self.fail(e))?; + return Err(AcceptError::Abort { + namespace, + peer: self.peer, + reason, + }); } }; trace!(?namespace, peer = ?self.peer, "run_bob: recv initial message {message:#?}"); @@ -198,13 +218,24 @@ impl BobState { trace!(namespace = ?replica.namespace(), peer = ?self.peer, "run_bob: recv {msg:#?}"); replica.sync_process_message(msg, *self.peer.as_bytes()) } - (Message::Init { .. }, Some(_)) => bail!("double init message"), - (Message::Sync(_), None) => bail!("unexpected sync message before init"), + (Message::Init { .. }, Some(_)) => { + return Err(self.fail(anyhow!("double init message"))) + } + (Message::Sync(_), None) => { + return Err(self.fail(anyhow!("unexpected sync message before init"))) + } + (Message::Abort { reason }, _) => { + return Err(self.fail(anyhow!("unexpected abort message ({reason:?})"))) + } }; - match next.map_err(Into::into)? { + let next = next.map_err(|e| self.fail(e))?; + match next { Some(msg) => { trace!(namespace = ?self.namespace(), peer = ?self.peer, "run_bob: send {msg:#?}"); - writer.send(Message::Sync(msg)).await?; + writer + .send(Message::Sync(msg)) + .await + .map_err(|e| self.fail(e))?; } None => break, } @@ -213,7 +244,7 @@ impl BobState { trace!(namespace = ?self.namespace().unwrap(), peer = ?self.peer, "run_bob: finished"); self.namespace() - .ok_or_else(|| anyhow!("Stream closed before init message")) + .ok_or_else(|| self.fail(anyhow!("Stream closed before init message"))) } fn namespace(&self) -> Option { @@ -228,6 +259,7 @@ mod tests { sync::Namespace, AuthorId, }; + use anyhow::Result; use iroh_bytes::Hash; use iroh_net::key::SecretKey; use rand_core::{CryptoRngCore, SeedableRng}; @@ -300,7 +332,7 @@ mod tests { futures::future::ready( bob_replica_store_task .open_replica(&namespace) - .map(Into::into), + .map(|r| r.ok_or(AbortReason::NotAvailable)), ) }, alice_peer_id, @@ -502,7 +534,11 @@ mod tests { &mut bob_writer, &mut bob_reader, |namespace, _| { - futures::future::ready(bob_store.open_replica(&namespace).map(Into::into)) + futures::future::ready( + bob_store + .open_replica(&namespace) + .map(|r| r.ok_or(AbortReason::NotAvailable)), + ) }, alice_node_pubkey, ) diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index ecde915bd2..4e93443dc2 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -570,9 +570,15 @@ const AUTHOR_BYTES: std::ops::Range = 32..64; const KEY_BYTES: std::ops::RangeFrom = 64..; /// The indentifier of a record. -#[derive(Default, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct RecordIdentifier(Bytes); +impl Default for RecordIdentifier { + fn default() -> Self { + Self::new(NamespaceId::default(), AuthorId::default(), b"") + } +} + impl Debug for RecordIdentifier { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RecordIdentifier") diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 65ef33fc9f..28a2cc95d1 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -26,7 +26,9 @@ use iroh_gossip::{ }; use iroh_net::{key::PublicKey, MagicEndpoint}; use iroh_sync::{ - net::{connect_and_sync, handle_connection, AcceptOutcome, SyncError}, + net::{ + connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError, + }, store, sync::{Entry, InsertOrigin, NamespaceId, Replica, SignedEntry}, }; @@ -35,7 +37,8 @@ use tokio::{ sync::{self, mpsc, oneshot}, task::JoinError, }; -use tracing::{debug, error, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, debug_span, error, warn, Instrument}; const CHANNEL_CAP: usize = 8; @@ -105,9 +108,11 @@ pub enum Op { Put(SignedEntry), } -#[derive(Debug)] +#[derive(Debug, Clone)] enum SyncState { - Running, + None, + Dialing(CancellationToken), + Accepting, Finished, Failed, } @@ -247,6 +252,7 @@ impl LiveSync { downloader: Downloader, ) -> Self { let (to_actor_tx, to_actor_rx) = mpsc::channel(CHANNEL_CAP); + let me = base32::fmt_short(endpoint.peer_id()); let mut actor = Actor::new( endpoint, gossip, @@ -256,8 +262,9 @@ impl LiveSync { to_actor_rx, to_actor_tx.clone(), ); + let span = debug_span!("sync", %me); let task = rt.main().spawn(async move { - if let Err(err) = actor.run().await { + if let Err(err) = actor.run().instrument(span).await { error!("live sync failed: {err:?}"); } }); @@ -389,12 +396,12 @@ struct Actor { /// Running sync futures (from connect). #[allow(clippy::type_complexity)] - running_sync_connect: - FuturesUnordered)>>, - /// Running sync futures (from accept). - running_sync_accept: FuturesUnordered< - BoxFuture<'static, std::result::Result<(NamespaceId, PublicKey), SyncError>>, + running_sync_connect: FuturesUnordered< + BoxFuture<'static, (NamespaceId, PublicKey, SyncReason, Result<(), ConnectError>)>, >, + /// Running sync futures (from accept). + running_sync_accept: + FuturesUnordered>>, /// Runnning download futures. pending_downloads: FuturesUnordered>>, /// Running gossip join futures. @@ -508,9 +515,9 @@ impl Actor { } Some((namespace, res)) = self.pending_joins.next() => { if let Err(err) = res { - error!("failed to join gossip for {namespace:?}: {err:?}"); + error!(?namespace, %err, "failed to join gossip"); } else { - debug!("joined gossip for {namespace:?}"); + debug!(?namespace, "joined gossip"); } // TODO: maintain some join state } @@ -528,6 +535,16 @@ impl Actor { Ok(()) } + fn set_sync_state(&mut self, namespace: NamespaceId, peer: PublicKey, state: SyncState) { + self.sync_state.insert((namespace, peer), state); + } + fn get_sync_state(&self, namespace: NamespaceId, peer: PublicKey) -> SyncState { + self.sync_state + .get(&(namespace, peer)) + .cloned() + .unwrap_or(SyncState::None) + } + fn get_replica_if_syncing(&self, namespace: &NamespaceId) -> Option> { if !self.syncing_replicas.contains(namespace) { None @@ -546,27 +563,34 @@ impl Actor { let Some(replica) = self.get_replica_if_syncing(&namespace) else { return; }; - // Check if we synced and only start sync if not yet synced - // sync_with_peer is triggered on NeighborUp events, so might trigger repeatedly for the - // same peers. - // TODO: Track finished time and potentially re-run sync - if let Some(_state) = self.sync_state.get(&(namespace, peer)) { - return; + // Do not initiate the sync if we are already syncing or did previously sync successfully. + // TODO: Track finished time and potentially re-run sync on finished state if enough time + // passed. + match self.get_sync_state(namespace, peer) { + SyncState::Accepting | SyncState::Dialing(_) | SyncState::Finished => { + return; + } + SyncState::Failed | SyncState::None => {} }; - debug!(?peer, ?namespace, "start sync (reason: {reason:?})"); - self.sync_state - .insert((namespace, peer), SyncState::Running); - let task = { + + let cancel = CancellationToken::new(); + self.set_sync_state(namespace, peer, SyncState::Dialing(cancel.clone())); + let fut = { let endpoint = self.endpoint.clone(); let replica = replica.clone(); async move { - // TODO: Make sure that the peer is dialable. - let res = connect_and_sync::(&endpoint, &replica, peer, None, &[]).await; + debug!(?peer, ?namespace, ?reason, "sync[dial]: start"); + let fut = connect_and_sync::(&endpoint, &replica, peer, None, &[]); + let res = tokio::select! { + biased; + _ = cancel.cancelled() => Err(ConnectError::Cancelled), + res = fut => res + }; (namespace, peer, reason, res) } .boxed() }; - self.running_sync_connect.push(task); + self.running_sync_connect.push(fut); } async fn shutdown(&mut self) -> anyhow::Result<()> { @@ -603,13 +627,14 @@ impl Actor { } async fn start_sync(&mut self, namespace: NamespaceId, peers: Vec) -> Result<()> { - self.ensure_subscription(namespace)?; + self.open_replica(namespace)?; self.syncing_replicas.insert(namespace); self.join_peers(namespace, peers).await?; Ok(()) } - fn ensure_subscription(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { + /// Open a replica, if not yet in our set of open replicas. + fn open_replica(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { if !self.open_replicas.contains(&namespace) { let Some(replica) = self.replica_store.open_replica(&namespace)? else { bail!("Replica not found"); @@ -646,7 +671,7 @@ impl Actor { namespace: NamespaceId, cb: OnLiveEventCallback, ) -> anyhow::Result { - self.ensure_subscription(namespace)?; + self.open_replica(namespace)?; let subs = self.event_subscriptions.entry(namespace).or_default(); let removal_id = self .event_removal_id @@ -672,6 +697,7 @@ impl Actor { async fn stop_sync(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { if self.syncing_replicas.remove(&namespace) { self.gossip.quit(namespace.into()).await?; + self.sync_state.retain(|(n, _peer), _value| *n != namespace); self.maybe_close_replica(namespace); } Ok(()) @@ -731,21 +757,54 @@ impl Actor { namespace: NamespaceId, peer: PublicKey, reason: SyncReason, - result: Result<()>, + result: Result<(), ConnectError>, ) { - self.on_sync_finished(namespace, peer, Origin::Connect(reason), result) - .await; + match result { + Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => { + debug!( + ?peer, + ?namespace, + ?reason, + "sync[dial]: remote abort, already syncing" + ); + } + Err(ConnectError::Cancelled) => { + // In case the remote aborted with already running: do nothing + debug!( + ?peer, + ?namespace, + ?reason, + "sync[dial]: cancelled, already syncing" + ); + } + Err(err) => { + self.on_sync_finished(namespace, peer, Origin::Connect(reason), Err(err.into())) + .await; + } + Ok(()) => { + self.on_sync_finished(namespace, peer, Origin::Connect(reason), Ok(())) + .await; + } + } } async fn on_sync_via_accept_finished( &mut self, - res: std::result::Result<(NamespaceId, PublicKey), SyncError>, + res: Result<(NamespaceId, PublicKey), AcceptError>, ) { match res { Ok((namespace, peer)) => { self.on_sync_finished(namespace, peer, Origin::Accept, Ok(())) .await; } + Err(AcceptError::Abort { + peer, + namespace, + reason, + }) if reason == AbortReason::AlreadySyncing => { + // In case we aborted the sync: do nothing (our outgoing sync is in progress) + debug!(?peer, ?namespace, ?reason, "sync[accept]: aborted by us"); + } Err(err) => { if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) { self.on_sync_finished( @@ -756,7 +815,7 @@ impl Actor { ) .await; } else { - debug!("sync failed (via accept): {err:?}"); + debug!("sync[accept]: failed {err:?}"); } } } @@ -769,12 +828,22 @@ impl Actor { origin: Origin, result: anyhow::Result<()>, ) { - debug!(?peer, ?namespace, "sync done (via {origin:?}): {result:?}"); + // debug log the result, warn in case of errors + match (&origin, &result) { + (Origin::Accept, Ok(())) => debug!(?peer, ?namespace, "sync[accept]: done"), + (Origin::Connect(reason), Ok(())) => { + debug!(?peer, ?namespace, ?reason, "sync[dial]: done") + } + (Origin::Accept, Err(err)) => warn!(?peer, ?namespace, ?err, "sync[accept]: failed"), + (Origin::Connect(reason), Err(err)) => { + warn!(?peer, ?namespace, ?err, ?reason, "sync[dial]: failed") + } + } let state = match result { Ok(_) => SyncState::Finished, Err(_) => SyncState::Failed, }; - self.sync_state.insert((namespace, peer), state); + self.set_sync_state(namespace, peer, state); let event = SyncEvent { namespace, peer, @@ -896,6 +965,7 @@ impl Actor { } .boxed() }; + debug!("sync[accept] incoming connection"); let fut = async move { handle_connection::(conn, request_replica_cb).await }.boxed(); self.running_sync_accept.push(fut); @@ -907,22 +977,32 @@ impl Actor { peer: PublicKey, ) -> AcceptOutcome { let Some(replica) = self.get_replica_if_syncing(&namespace) else { - return AcceptOutcome::NotAvailable; + return Err(AbortReason::NotAvailable); }; - let state = self.sync_state.get(&(namespace, peer)); - match state { - None | Some(SyncState::Failed | SyncState::Finished) => { - self.sync_state - .insert((namespace, peer), SyncState::Running); - AcceptOutcome::Accept(replica.clone()) + match self.get_sync_state(namespace, peer) { + SyncState::None | SyncState::Failed | SyncState::Finished => { + self.set_sync_state(namespace, peer, SyncState::Accepting); + Ok(replica.clone()) + } + SyncState::Accepting => Err(AbortReason::AlreadySyncing), + // Incoming sync request while we are dialing ourselves. + // In this case, compare the binary representations of our and the other node's peer id + // to deterministically decide which of the two concurrent connections will succeed. + SyncState::Dialing(cancel) => { + if peer.as_bytes() > self.endpoint.peer_id().as_bytes() { + cancel.cancel(); + self.set_sync_state(namespace, peer, SyncState::Accepting); + Ok(replica.clone()) + } else { + Err(AbortReason::AlreadySyncing) + } } - Some(SyncState::Running) => AcceptOutcome::AlreadySyncing, } } } /// Outcome of a sync operation -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct SyncEvent { /// Namespace that was synced pub namespace: NamespaceId, @@ -968,3 +1048,16 @@ async fn notify_all(subs: &mut HashMap, event: LiveEve } } } + +/// Utilities for working with byte array identifiers +// TODO: copy-pasted from iroh-gossip/src/proto/util.rs +// Unify into iroh-common crate or similar +pub(super) mod base32 { + /// Convert to a base32 string limited to the first 10 bytes + pub fn fmt_short(bytes: impl AsRef<[u8]>) -> String { + let len = bytes.as_ref().len().min(10); + let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]); + text.make_ascii_lowercase(); + text + } +} diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 5da0adecfb..e1096cb2b2 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -9,7 +9,7 @@ use iroh::{ collection::IrohCollectionParser, node::{Builder, Node}, rpc_protocol::ShareMode, - sync_engine::{LiveEvent, Origin, SyncReason}, + sync_engine::{LiveEvent, Origin, SyncEvent}, }; use quic_rpc::transport::misc::DummyServerEndpoint; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -43,9 +43,11 @@ fn test_node( async fn spawn_node( rt: runtime::Handle, + i: usize, ) -> anyhow::Result> { let node = test_node(rt, "127.0.0.1:0".parse()?); let node = node.spawn().await?; + tracing::info!("spawned node {i} {:?}", node.peer_id()); Ok(node) } @@ -53,12 +55,10 @@ async fn spawn_nodes( rt: runtime::Handle, n: usize, ) -> anyhow::Result>> { - let mut nodes = vec![]; - for _i in 0..n { - let node = spawn_node(rt.clone()).await?; - nodes.push(node); - } - Ok(nodes) + futures::future::join_all((0..n).map(|i| spawn_node(rt.clone(), i))) + .await + .into_iter() + .collect() } /// This tests the simplest scenario: A node connects to another node, and performs sync. @@ -95,7 +95,6 @@ async fn sync_simple() -> Result<()> { }; assert_eq!(event.namespace, doc.id()); assert_eq!(event.peer, nodes[0].peer_id()); - assert_eq!(event.origin, Origin::Connect(SyncReason::DirectJoin)); assert_eq!(event.result, Ok(())); let event = events.try_next().await?.unwrap(); assert!(matches!(event, LiveEvent::ContentReady { .. })); @@ -108,7 +107,6 @@ async fn sync_simple() -> Result<()> { }; assert_eq!(event.namespace, doc0.id()); assert_eq!(event.peer, nodes[1].peer_id()); - assert_eq!(event.origin, Origin::Accept); assert_eq!(event.result, Ok(())); for node in nodes { @@ -122,7 +120,7 @@ async fn sync_simple() -> Result<()> { async fn sync_subscribe() -> Result<()> { setup_logging(); let rt = test_runtime(); - let node = spawn_node(rt).await?; + let node = spawn_node(rt, 0).await?; let client = node.client(); let doc = client.docs.create().await?; let mut sub = doc.subscribe().await?; @@ -141,8 +139,8 @@ async fn sync_subscribe() -> Result<()> { async fn sync_full_basic() -> Result<()> { setup_logging(); let rt = test_runtime(); - let nodes = spawn_nodes(rt, 3).await?; - let clients = nodes.iter().map(|node| node.client()).collect::>(); + let mut nodes = spawn_nodes(rt.clone(), 2).await?; + let mut clients = nodes.iter().map(|node| node.client()).collect::>(); // node1: create doc and ticket let (ticket, doc1) = { @@ -208,47 +206,61 @@ async fn sync_full_basic() -> Result<()> { }; // node 3 joins & imports the doc from peer 1 - let _doc3 = { - let iroh = &clients[2]; - let doc = iroh.docs.import(ticket).await?; + nodes.push(spawn_node(rt.clone(), nodes.len()).await?); + clients.push(nodes.last().unwrap().client()); + let iroh = &clients[2]; + let doc = iroh.docs.import(ticket).await?; - // wait for 2 remote inserts - let mut events = doc.subscribe().await?; - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::InsertRemote { .. }), - "expected InsertRemote but got {event:?}" - ); - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::InsertRemote { .. }), - "expected InsertRemote but got {event:?}" - ); - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::SyncFinished(_)), - "expected SyncFinished but got {event:?}" - ); - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::ContentReady { .. }), - "expected ContentReady but got {event:?}" - ); + // expect 2 times InsertRemote + let mut events = doc.subscribe().await?; + let event = events.try_next().await?.unwrap(); + assert!( + matches!(event, LiveEvent::InsertRemote { .. }), + "expected InsertRemote but got {event:?}" + ); + let event = events.try_next().await?.unwrap(); + assert!( + matches!(event, LiveEvent::InsertRemote { .. }), + "expected InsertRemote but got {event:?}" + ); + + // now expect SyncFinished + let event = events.try_next().await?.unwrap(); + let LiveEvent::SyncFinished(event) = event else { + panic!("expected SyncFinished but got {event:?}"); + }; + let expected = SyncEvent { + peer: nodes[0].peer_id(), + namespace: doc.id(), + result: Ok(()), + origin: event.origin.clone(), + finished: event.finished, + }; + assert_eq!(event, expected, "expected {expected:?} but got {event:?}"); + + // expect 2 times ContentReady + // potentically a `SyncFinished` event with `Origin::Accept` is inbetween, + // as node1 or node2 could connect to us. + let mut i = 0; + while i < 2 { let event = events.try_next().await?.unwrap(); + if matches!( + event, + LiveEvent::SyncFinished(SyncEvent { + origin: Origin::Accept, + .. + }) + ) { + continue; + } assert!( matches!(event, LiveEvent::ContentReady { .. }), "expected ContentReady but got {event:?}" ); - - assert_latest(&doc, b"k1", b"v1").await; - assert_latest(&doc, b"k2", b"v2").await; - doc - }; - - // TODO: - // - gossiping between multiple peers - // - better test utils - // - ... + i += 1; + } + assert_latest(&doc, b"k1", b"v1").await; + assert_latest(&doc, b"k2", b"v2").await; for node in nodes { node.shutdown(); @@ -261,7 +273,7 @@ async fn sync_full_basic() -> Result<()> { async fn sync_subscribe_stop() -> Result<()> { setup_logging(); let rt = test_runtime(); - let node = spawn_node(rt).await?; + let node = spawn_node(rt, 0).await?; let client = node.client(); let doc = client.docs.create().await?;