diff --git a/iroh-net/src/key.rs b/iroh-net/src/key.rs index 0b2b7cc656..a2dbe158ee 100644 --- a/iroh-net/src/key.rs +++ b/iroh-net/src/key.rs @@ -121,7 +121,7 @@ impl From for PublicKey { impl Debug for PublicKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut text = data_encoding::BASE32_NOPAD.encode(self.as_bytes()); + let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]); text.make_ascii_lowercase(); write!(f, "PublicKey({text})") } diff --git a/iroh-sync/src/net.rs b/iroh-sync/src/net.rs index 41b126c487..d6eb3d2384 100644 --- a/iroh-sync/src/net.rs +++ b/iroh-sync/src/net.rs @@ -2,8 +2,8 @@ use std::{future::Future, net::SocketAddr}; -use anyhow::{Context, Result}; use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint}; +use serde::{Deserialize, Serialize}; use tracing::debug; use crate::{ @@ -30,18 +30,23 @@ pub async fn connect_and_sync( peer: PublicKey, derp_region: Option, addrs: &[SocketAddr], -) -> Result<()> { - debug!(peer = ?peer, "sync (via connect): start"); +) -> Result<(), ConnectError> { + debug!(?peer, "sync[dial]: connect"); let namespace = doc.namespace(); let connection = endpoint .connect((peer, derp_region, addrs).into(), SYNC_ALPN) .await - .context("failed to establish connection")?; - debug!(?peer, ?namespace, "sync (via connect): connected"); - let (mut send_stream, mut recv_stream) = connection.open_bi().await?; + .map_err(ConnectError::connect)?; + debug!(?peer, ?namespace, "sync[dial]: connected"); + let (mut send_stream, mut recv_stream) = + connection.open_bi().await.map_err(ConnectError::connect)?; let res = run_alice::(&mut send_stream, &mut recv_stream, doc, peer).await; - send_stream.finish().await?; - recv_stream.read_to_end(0).await?; + + send_stream.finish().await.map_err(ConnectError::close)?; + recv_stream + .read_to_end(0) + .await + .map_err(ConnectError::close)?; #[cfg(feature = "metrics")] if res.is_ok() { @@ -50,47 +55,32 @@ pub async fn connect_and_sync( inc!(Metrics, sync_via_connect_failure); } - debug!(peer = ?peer, ?res, "sync (via connect): done"); + debug!(?peer, ?namespace, ?res, "sync[dial]: done"); res } /// What to do with incoming sync requests -#[derive(Debug)] -pub enum AcceptOutcome { - /// This namespace is not available for sync. - NotAvailable, - /// This namespace is already syncing, therefore abort. - AlreadySyncing, - /// Accept the sync request. - Accept(Replica), -} - -impl From>> for AcceptOutcome { - fn from(replica: Option>) -> Self { - match replica { - Some(replica) => AcceptOutcome::Accept(replica), - None => AcceptOutcome::NotAvailable, - } - } -} +pub type AcceptOutcome = Result::Instance>, AbortReason>; /// Handle an iroh-sync connection and sync all shared documents in the replica store. pub async fn handle_connection( connecting: quinn::Connecting, accept_cb: F, -) -> std::result::Result<(NamespaceId, PublicKey), SyncError> +) -> Result<(NamespaceId, PublicKey), AcceptError> where S: store::Store, F: Fn(NamespaceId, PublicKey) -> Fut, Fut: Future>>, { - let connection = connecting.await.map_err(SyncError::connect)?; - let peer = get_peer_id(&connection).await.map_err(SyncError::connect)?; + let connection = connecting.await.map_err(AcceptError::connect)?; + let peer = get_peer_id(&connection) + .await + .map_err(AcceptError::connect)?; let (mut send_stream, mut recv_stream) = connection .accept_bi() .await - .map_err(|error| SyncError::open(peer, error))?; - debug!(peer = ?peer, "sync (via accept): start"); + .map_err(|e| AcceptError::open(peer, e))?; + debug!(?peer, "sync[accept]: handle"); let res = run_bob::(&mut send_stream, &mut recv_stream, accept_cb, peer).await; @@ -101,24 +91,30 @@ where inc!(Metrics, sync_via_accept_failure); } - debug!(peer = ?peer, ?res, "sync (via accept): done"); + let namespace = match &res { + Ok(namespace) => Some(*namespace), + Err(err) => err.namespace(), + }; - let namespace = res?; send_stream .finish() .await - .map_err(|error| SyncError::close(peer, namespace, error))?; + .map_err(|error| AcceptError::close(peer, namespace, error))?; recv_stream .read_to_end(0) .await - .map_err(|error| SyncError::close(peer, namespace, error))?; + .map_err(|error| AcceptError::close(peer, namespace, error))?; + let namespace = res?; + + debug!(?peer, ?namespace, "sync[accept]: done"); + Ok((namespace, peer)) } -/// Failure reasons for sync. +/// Errors that may occur on handling incoming sync connections. #[derive(thiserror::Error, Debug)] #[allow(missing_docs)] -pub enum SyncError { +pub enum AcceptError { /// Failed to establish connection #[error("Failed to establish connection")] Connect { @@ -132,6 +128,13 @@ pub enum SyncError { #[source] error: anyhow::Error, }, + /// We aborted the sync request. + #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")] + Abort { + peer: PublicKey, + namespace: NamespaceId, + reason: AbortReason, + }, /// Failed to run sync #[error("Failed to sync {namespace:?} with {peer:?}")] Sync { @@ -144,13 +147,52 @@ pub enum SyncError { #[error("Failed to close {namespace:?} with {peer:?}")] Close { peer: PublicKey, - namespace: NamespaceId, + namespace: Option, #[source] error: anyhow::Error, }, } -impl SyncError { +/// Errors that may occur on outgoing sync requests. +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum ConnectError { + /// Failed to establish connection + #[error("Failed to establish connection")] + Connect { + #[source] + error: anyhow::Error, + }, + /// The remote peer aborted the sync request. + #[error("Remote peer aborted sync: {0:?}")] + RemoteAbort(AbortReason), + /// We cancelled the operation + #[error("Cancelled")] + Cancelled, + /// Failed to run sync + #[error("Failed to sync")] + Sync { + #[source] + error: anyhow::Error, + }, + /// Failed to close + #[error("Failed to close connection1")] + Close { + #[source] + error: anyhow::Error, + }, +} + +/// Reason why we aborted an incoming sync request. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum AbortReason { + /// Namespace is not avaiable. + NotAvailable, + /// We are already syncing this namespace. + AlreadySyncing, +} + +impl AcceptError { fn connect(error: impl Into) -> Self { Self::Connect { error: error.into(), @@ -173,7 +215,11 @@ impl SyncError { error: error.into(), } } - fn close(peer: PublicKey, namespace: NamespaceId, error: impl Into) -> Self { + fn close( + peer: PublicKey, + namespace: Option, + error: impl Into, + ) -> Self { Self::Close { peer, namespace, @@ -183,20 +229,43 @@ impl SyncError { /// Get the peer's node ID (if available) pub fn peer(&self) -> Option { match self { - SyncError::Connect { .. } => None, - SyncError::Open { peer, .. } => Some(*peer), - SyncError::Sync { peer, .. } => Some(*peer), - SyncError::Close { peer, .. } => Some(*peer), + AcceptError::Connect { .. } => None, + AcceptError::Open { peer, .. } => Some(*peer), + AcceptError::Sync { peer, .. } => Some(*peer), + AcceptError::Close { peer, .. } => Some(*peer), + AcceptError::Abort { peer, .. } => Some(*peer), } } /// Get the namespace (if available) pub fn namespace(&self) -> Option { match self { - SyncError::Connect { .. } => None, - SyncError::Open { .. } => None, - SyncError::Sync { namespace, .. } => namespace.to_owned(), - SyncError::Close { namespace, .. } => Some(*namespace), + AcceptError::Connect { .. } => None, + AcceptError::Open { .. } => None, + AcceptError::Sync { namespace, .. } => namespace.to_owned(), + AcceptError::Close { namespace, .. } => namespace.to_owned(), + AcceptError::Abort { namespace, .. } => Some(*namespace), } } } + +impl ConnectError { + fn connect(error: impl Into) -> Self { + Self::Connect { + error: error.into(), + } + } + fn close(error: impl Into) -> Self { + Self::Close { + error: error.into(), + } + } + pub(crate) fn sync(error: impl Into) -> Self { + Self::Sync { + error: error.into(), + } + } + pub(crate) fn remote_abort(reason: AbortReason) -> Self { + Self::RemoteAbort(reason) + } +} diff --git a/iroh-sync/src/net/codec.rs b/iroh-sync/src/net/codec.rs index d2b7dc4391..d085b71652 100644 --- a/iroh-sync/src/net/codec.rs +++ b/iroh-sync/src/net/codec.rs @@ -1,6 +1,6 @@ use std::future::Future; -use anyhow::{anyhow, bail, ensure, Result}; +use anyhow::{anyhow, ensure}; use bytes::{Buf, BufMut, BytesMut}; use futures::SinkExt; use iroh_net::key::PublicKey; @@ -11,7 +11,7 @@ use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite}; use tracing::trace; use crate::{ - net::{AcceptOutcome, SyncError}, + net::{AbortReason, AcceptError, AcceptOutcome, ConnectError}, store, NamespaceId, Replica, }; @@ -23,10 +23,7 @@ const MAX_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; // This is likely too large, impl Decoder for SyncCodec { type Item = Message; type Error = anyhow::Error; - fn decode( - &mut self, - src: &mut BytesMut, - ) -> std::result::Result, Self::Error> { + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 4 { return Ok(None); } @@ -50,11 +47,7 @@ impl Decoder for SyncCodec { impl Encoder for SyncCodec { type Error = anyhow::Error; - fn encode( - &mut self, - item: Message, - dst: &mut BytesMut, - ) -> std::result::Result<(), Self::Error> { + fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { let len = postcard::serialize_with_flavor(&item, postcard::ser_flavors::Size::default()).unwrap(); ensure!( @@ -81,13 +74,17 @@ impl Encoder for SyncCodec { /// On any error and on success the substream is closed. #[derive(Debug, Clone, Serialize, Deserialize)] enum Message { + /// Init message (sent by the dialing peer) Init { /// Namespace to sync namespace: NamespaceId, /// Initial message message: crate::sync::ProtocolMessage, }, + /// Sync messages (sent by both peers) Sync(crate::sync::ProtocolMessage), + /// Abort message (sent by the accepting peer to decline a request) + Abort { reason: AbortReason }, } /// Runs the initiator side of the sync protocol. @@ -96,7 +93,7 @@ pub(super) async fn run_alice, other_peer_id: PublicKey, -) -> Result<()> { +) -> Result<(), ConnectError> { let other_peer_id = *other_peer_id.as_bytes(); let mut reader = FramedRead::new(reader, SyncCodec); let mut writer = FramedWrite::new(writer, SyncCodec); @@ -105,29 +102,38 @@ pub(super) async fn run_alice bob: {:#?}", init_message); - writer.send(init_message).await?; + writer + .send(init_message) + .await + .map_err(ConnectError::sync)?; // Sync message loop - while let Some(msg) = reader.next().await { - match msg? { + let msg = msg.map_err(ConnectError::sync)?; + match msg { Message::Init { .. } => { - bail!("unexpected message: init"); + return Err(ConnectError::sync(anyhow!("unexpected init message"))); } Message::Sync(msg) => { if let Some(msg) = alice .sync_process_message(msg, other_peer_id) - .map_err(Into::into)? + .map_err(ConnectError::sync)? { trace!("alice -> bob: {:#?}", msg); - writer.send(Message::Sync(msg)).await?; + writer + .send(Message::Sync(msg)) + .await + .map_err(ConnectError::sync)?; } else { break; } } + Message::Abort { reason } => { + return Err(ConnectError::remote_abort(reason)); + } } } @@ -140,7 +146,7 @@ pub(super) async fn run_bob( reader: &mut R, accept_cb: F, other_peer_id: PublicKey, -) -> std::result::Result +) -> Result where S: store::Store, R: AsyncRead + Unpin, @@ -149,10 +155,7 @@ where Fut: Future>>, { let mut state = BobState::::new(other_peer_id); - state - .run(writer, reader, accept_cb) - .await - .map_err(|err| SyncError::sync(state.peer, state.namespace(), err)) + state.run(writer, reader, accept_cb).await } struct BobState { @@ -168,7 +171,16 @@ impl BobState { } } - async fn run(&mut self, writer: W, reader: R, accept_cb: F) -> Result + pub fn fail(&self, reason: impl Into) -> AcceptError { + AcceptError::sync(self.peer, self.namespace(), reason.into()) + } + + async fn run( + &mut self, + writer: W, + reader: R, + accept_cb: F, + ) -> Result where R: AsyncRead + Unpin, W: AsyncWrite + Unpin, @@ -178,15 +190,23 @@ impl BobState { let mut reader = FramedRead::new(reader, SyncCodec); let mut writer = FramedWrite::new(writer, SyncCodec); while let Some(msg) = reader.next().await { - let next = match (msg?, self.replica.as_ref()) { + let msg = msg.map_err(|e| self.fail(e))?; + let next = match (msg, self.replica.as_ref()) { (Message::Init { namespace, message }, None) => { - let replica = match accept_cb(namespace, self.peer).await? { - AcceptOutcome::Accept(replica) => replica, - AcceptOutcome::NotAvailable => { - bail!("abort sync: {namespace:?} not available") - } - AcceptOutcome::AlreadySyncing => { - bail!("abort sync: {namespace:?} already syncing") + let accept = accept_cb(namespace, self.peer).await; + let accept = accept.map_err(|e| self.fail(e))?; + let replica = match accept { + Ok(replica) => replica, + Err(reason) => { + writer + .send(Message::Abort { reason }) + .await + .map_err(|e| self.fail(e))?; + return Err(AcceptError::Abort { + namespace, + peer: self.peer, + reason, + }); } }; trace!(?namespace, peer = ?self.peer, "run_bob: recv initial message {message:#?}"); @@ -198,13 +218,24 @@ impl BobState { trace!(namespace = ?replica.namespace(), peer = ?self.peer, "run_bob: recv {msg:#?}"); replica.sync_process_message(msg, *self.peer.as_bytes()) } - (Message::Init { .. }, Some(_)) => bail!("double init message"), - (Message::Sync(_), None) => bail!("unexpected sync message before init"), + (Message::Init { .. }, Some(_)) => { + return Err(self.fail(anyhow!("double init message"))) + } + (Message::Sync(_), None) => { + return Err(self.fail(anyhow!("unexpected sync message before init"))) + } + (Message::Abort { reason }, _) => { + return Err(self.fail(anyhow!("unexpected abort message ({reason:?})"))) + } }; - match next.map_err(Into::into)? { + let next = next.map_err(|e| self.fail(e))?; + match next { Some(msg) => { trace!(namespace = ?self.namespace(), peer = ?self.peer, "run_bob: send {msg:#?}"); - writer.send(Message::Sync(msg)).await?; + writer + .send(Message::Sync(msg)) + .await + .map_err(|e| self.fail(e))?; } None => break, } @@ -213,7 +244,7 @@ impl BobState { trace!(namespace = ?self.namespace().unwrap(), peer = ?self.peer, "run_bob: finished"); self.namespace() - .ok_or_else(|| anyhow!("Stream closed before init message")) + .ok_or_else(|| self.fail(anyhow!("Stream closed before init message"))) } fn namespace(&self) -> Option { @@ -228,6 +259,7 @@ mod tests { sync::Namespace, AuthorId, }; + use anyhow::Result; use iroh_bytes::Hash; use iroh_net::key::SecretKey; use rand_core::{CryptoRngCore, SeedableRng}; @@ -300,7 +332,7 @@ mod tests { futures::future::ready( bob_replica_store_task .open_replica(&namespace) - .map(Into::into), + .map(|r| r.ok_or(AbortReason::NotAvailable)), ) }, alice_peer_id, @@ -502,7 +534,11 @@ mod tests { &mut bob_writer, &mut bob_reader, |namespace, _| { - futures::future::ready(bob_store.open_replica(&namespace).map(Into::into)) + futures::future::ready( + bob_store + .open_replica(&namespace) + .map(|r| r.ok_or(AbortReason::NotAvailable)), + ) }, alice_node_pubkey, ) diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index ecde915bd2..4e93443dc2 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -570,9 +570,15 @@ const AUTHOR_BYTES: std::ops::Range = 32..64; const KEY_BYTES: std::ops::RangeFrom = 64..; /// The indentifier of a record. -#[derive(Default, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct RecordIdentifier(Bytes); +impl Default for RecordIdentifier { + fn default() -> Self { + Self::new(NamespaceId::default(), AuthorId::default(), b"") + } +} + impl Debug for RecordIdentifier { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RecordIdentifier") diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 65ef33fc9f..28a2cc95d1 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -26,7 +26,9 @@ use iroh_gossip::{ }; use iroh_net::{key::PublicKey, MagicEndpoint}; use iroh_sync::{ - net::{connect_and_sync, handle_connection, AcceptOutcome, SyncError}, + net::{ + connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError, + }, store, sync::{Entry, InsertOrigin, NamespaceId, Replica, SignedEntry}, }; @@ -35,7 +37,8 @@ use tokio::{ sync::{self, mpsc, oneshot}, task::JoinError, }; -use tracing::{debug, error, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, debug_span, error, warn, Instrument}; const CHANNEL_CAP: usize = 8; @@ -105,9 +108,11 @@ pub enum Op { Put(SignedEntry), } -#[derive(Debug)] +#[derive(Debug, Clone)] enum SyncState { - Running, + None, + Dialing(CancellationToken), + Accepting, Finished, Failed, } @@ -247,6 +252,7 @@ impl LiveSync { downloader: Downloader, ) -> Self { let (to_actor_tx, to_actor_rx) = mpsc::channel(CHANNEL_CAP); + let me = base32::fmt_short(endpoint.peer_id()); let mut actor = Actor::new( endpoint, gossip, @@ -256,8 +262,9 @@ impl LiveSync { to_actor_rx, to_actor_tx.clone(), ); + let span = debug_span!("sync", %me); let task = rt.main().spawn(async move { - if let Err(err) = actor.run().await { + if let Err(err) = actor.run().instrument(span).await { error!("live sync failed: {err:?}"); } }); @@ -389,12 +396,12 @@ struct Actor { /// Running sync futures (from connect). #[allow(clippy::type_complexity)] - running_sync_connect: - FuturesUnordered)>>, - /// Running sync futures (from accept). - running_sync_accept: FuturesUnordered< - BoxFuture<'static, std::result::Result<(NamespaceId, PublicKey), SyncError>>, + running_sync_connect: FuturesUnordered< + BoxFuture<'static, (NamespaceId, PublicKey, SyncReason, Result<(), ConnectError>)>, >, + /// Running sync futures (from accept). + running_sync_accept: + FuturesUnordered>>, /// Runnning download futures. pending_downloads: FuturesUnordered>>, /// Running gossip join futures. @@ -508,9 +515,9 @@ impl Actor { } Some((namespace, res)) = self.pending_joins.next() => { if let Err(err) = res { - error!("failed to join gossip for {namespace:?}: {err:?}"); + error!(?namespace, %err, "failed to join gossip"); } else { - debug!("joined gossip for {namespace:?}"); + debug!(?namespace, "joined gossip"); } // TODO: maintain some join state } @@ -528,6 +535,16 @@ impl Actor { Ok(()) } + fn set_sync_state(&mut self, namespace: NamespaceId, peer: PublicKey, state: SyncState) { + self.sync_state.insert((namespace, peer), state); + } + fn get_sync_state(&self, namespace: NamespaceId, peer: PublicKey) -> SyncState { + self.sync_state + .get(&(namespace, peer)) + .cloned() + .unwrap_or(SyncState::None) + } + fn get_replica_if_syncing(&self, namespace: &NamespaceId) -> Option> { if !self.syncing_replicas.contains(namespace) { None @@ -546,27 +563,34 @@ impl Actor { let Some(replica) = self.get_replica_if_syncing(&namespace) else { return; }; - // Check if we synced and only start sync if not yet synced - // sync_with_peer is triggered on NeighborUp events, so might trigger repeatedly for the - // same peers. - // TODO: Track finished time and potentially re-run sync - if let Some(_state) = self.sync_state.get(&(namespace, peer)) { - return; + // Do not initiate the sync if we are already syncing or did previously sync successfully. + // TODO: Track finished time and potentially re-run sync on finished state if enough time + // passed. + match self.get_sync_state(namespace, peer) { + SyncState::Accepting | SyncState::Dialing(_) | SyncState::Finished => { + return; + } + SyncState::Failed | SyncState::None => {} }; - debug!(?peer, ?namespace, "start sync (reason: {reason:?})"); - self.sync_state - .insert((namespace, peer), SyncState::Running); - let task = { + + let cancel = CancellationToken::new(); + self.set_sync_state(namespace, peer, SyncState::Dialing(cancel.clone())); + let fut = { let endpoint = self.endpoint.clone(); let replica = replica.clone(); async move { - // TODO: Make sure that the peer is dialable. - let res = connect_and_sync::(&endpoint, &replica, peer, None, &[]).await; + debug!(?peer, ?namespace, ?reason, "sync[dial]: start"); + let fut = connect_and_sync::(&endpoint, &replica, peer, None, &[]); + let res = tokio::select! { + biased; + _ = cancel.cancelled() => Err(ConnectError::Cancelled), + res = fut => res + }; (namespace, peer, reason, res) } .boxed() }; - self.running_sync_connect.push(task); + self.running_sync_connect.push(fut); } async fn shutdown(&mut self) -> anyhow::Result<()> { @@ -603,13 +627,14 @@ impl Actor { } async fn start_sync(&mut self, namespace: NamespaceId, peers: Vec) -> Result<()> { - self.ensure_subscription(namespace)?; + self.open_replica(namespace)?; self.syncing_replicas.insert(namespace); self.join_peers(namespace, peers).await?; Ok(()) } - fn ensure_subscription(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { + /// Open a replica, if not yet in our set of open replicas. + fn open_replica(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { if !self.open_replicas.contains(&namespace) { let Some(replica) = self.replica_store.open_replica(&namespace)? else { bail!("Replica not found"); @@ -646,7 +671,7 @@ impl Actor { namespace: NamespaceId, cb: OnLiveEventCallback, ) -> anyhow::Result { - self.ensure_subscription(namespace)?; + self.open_replica(namespace)?; let subs = self.event_subscriptions.entry(namespace).or_default(); let removal_id = self .event_removal_id @@ -672,6 +697,7 @@ impl Actor { async fn stop_sync(&mut self, namespace: NamespaceId) -> anyhow::Result<()> { if self.syncing_replicas.remove(&namespace) { self.gossip.quit(namespace.into()).await?; + self.sync_state.retain(|(n, _peer), _value| *n != namespace); self.maybe_close_replica(namespace); } Ok(()) @@ -731,21 +757,54 @@ impl Actor { namespace: NamespaceId, peer: PublicKey, reason: SyncReason, - result: Result<()>, + result: Result<(), ConnectError>, ) { - self.on_sync_finished(namespace, peer, Origin::Connect(reason), result) - .await; + match result { + Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => { + debug!( + ?peer, + ?namespace, + ?reason, + "sync[dial]: remote abort, already syncing" + ); + } + Err(ConnectError::Cancelled) => { + // In case the remote aborted with already running: do nothing + debug!( + ?peer, + ?namespace, + ?reason, + "sync[dial]: cancelled, already syncing" + ); + } + Err(err) => { + self.on_sync_finished(namespace, peer, Origin::Connect(reason), Err(err.into())) + .await; + } + Ok(()) => { + self.on_sync_finished(namespace, peer, Origin::Connect(reason), Ok(())) + .await; + } + } } async fn on_sync_via_accept_finished( &mut self, - res: std::result::Result<(NamespaceId, PublicKey), SyncError>, + res: Result<(NamespaceId, PublicKey), AcceptError>, ) { match res { Ok((namespace, peer)) => { self.on_sync_finished(namespace, peer, Origin::Accept, Ok(())) .await; } + Err(AcceptError::Abort { + peer, + namespace, + reason, + }) if reason == AbortReason::AlreadySyncing => { + // In case we aborted the sync: do nothing (our outgoing sync is in progress) + debug!(?peer, ?namespace, ?reason, "sync[accept]: aborted by us"); + } Err(err) => { if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) { self.on_sync_finished( @@ -756,7 +815,7 @@ impl Actor { ) .await; } else { - debug!("sync failed (via accept): {err:?}"); + debug!("sync[accept]: failed {err:?}"); } } } @@ -769,12 +828,22 @@ impl Actor { origin: Origin, result: anyhow::Result<()>, ) { - debug!(?peer, ?namespace, "sync done (via {origin:?}): {result:?}"); + // debug log the result, warn in case of errors + match (&origin, &result) { + (Origin::Accept, Ok(())) => debug!(?peer, ?namespace, "sync[accept]: done"), + (Origin::Connect(reason), Ok(())) => { + debug!(?peer, ?namespace, ?reason, "sync[dial]: done") + } + (Origin::Accept, Err(err)) => warn!(?peer, ?namespace, ?err, "sync[accept]: failed"), + (Origin::Connect(reason), Err(err)) => { + warn!(?peer, ?namespace, ?err, ?reason, "sync[dial]: failed") + } + } let state = match result { Ok(_) => SyncState::Finished, Err(_) => SyncState::Failed, }; - self.sync_state.insert((namespace, peer), state); + self.set_sync_state(namespace, peer, state); let event = SyncEvent { namespace, peer, @@ -896,6 +965,7 @@ impl Actor { } .boxed() }; + debug!("sync[accept] incoming connection"); let fut = async move { handle_connection::(conn, request_replica_cb).await }.boxed(); self.running_sync_accept.push(fut); @@ -907,22 +977,32 @@ impl Actor { peer: PublicKey, ) -> AcceptOutcome { let Some(replica) = self.get_replica_if_syncing(&namespace) else { - return AcceptOutcome::NotAvailable; + return Err(AbortReason::NotAvailable); }; - let state = self.sync_state.get(&(namespace, peer)); - match state { - None | Some(SyncState::Failed | SyncState::Finished) => { - self.sync_state - .insert((namespace, peer), SyncState::Running); - AcceptOutcome::Accept(replica.clone()) + match self.get_sync_state(namespace, peer) { + SyncState::None | SyncState::Failed | SyncState::Finished => { + self.set_sync_state(namespace, peer, SyncState::Accepting); + Ok(replica.clone()) + } + SyncState::Accepting => Err(AbortReason::AlreadySyncing), + // Incoming sync request while we are dialing ourselves. + // In this case, compare the binary representations of our and the other node's peer id + // to deterministically decide which of the two concurrent connections will succeed. + SyncState::Dialing(cancel) => { + if peer.as_bytes() > self.endpoint.peer_id().as_bytes() { + cancel.cancel(); + self.set_sync_state(namespace, peer, SyncState::Accepting); + Ok(replica.clone()) + } else { + Err(AbortReason::AlreadySyncing) + } } - Some(SyncState::Running) => AcceptOutcome::AlreadySyncing, } } } /// Outcome of a sync operation -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct SyncEvent { /// Namespace that was synced pub namespace: NamespaceId, @@ -968,3 +1048,16 @@ async fn notify_all(subs: &mut HashMap, event: LiveEve } } } + +/// Utilities for working with byte array identifiers +// TODO: copy-pasted from iroh-gossip/src/proto/util.rs +// Unify into iroh-common crate or similar +pub(super) mod base32 { + /// Convert to a base32 string limited to the first 10 bytes + pub fn fmt_short(bytes: impl AsRef<[u8]>) -> String { + let len = bytes.as_ref().len().min(10); + let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]); + text.make_ascii_lowercase(); + text + } +} diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 5da0adecfb..e1096cb2b2 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -9,7 +9,7 @@ use iroh::{ collection::IrohCollectionParser, node::{Builder, Node}, rpc_protocol::ShareMode, - sync_engine::{LiveEvent, Origin, SyncReason}, + sync_engine::{LiveEvent, Origin, SyncEvent}, }; use quic_rpc::transport::misc::DummyServerEndpoint; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -43,9 +43,11 @@ fn test_node( async fn spawn_node( rt: runtime::Handle, + i: usize, ) -> anyhow::Result> { let node = test_node(rt, "127.0.0.1:0".parse()?); let node = node.spawn().await?; + tracing::info!("spawned node {i} {:?}", node.peer_id()); Ok(node) } @@ -53,12 +55,10 @@ async fn spawn_nodes( rt: runtime::Handle, n: usize, ) -> anyhow::Result>> { - let mut nodes = vec![]; - for _i in 0..n { - let node = spawn_node(rt.clone()).await?; - nodes.push(node); - } - Ok(nodes) + futures::future::join_all((0..n).map(|i| spawn_node(rt.clone(), i))) + .await + .into_iter() + .collect() } /// This tests the simplest scenario: A node connects to another node, and performs sync. @@ -95,7 +95,6 @@ async fn sync_simple() -> Result<()> { }; assert_eq!(event.namespace, doc.id()); assert_eq!(event.peer, nodes[0].peer_id()); - assert_eq!(event.origin, Origin::Connect(SyncReason::DirectJoin)); assert_eq!(event.result, Ok(())); let event = events.try_next().await?.unwrap(); assert!(matches!(event, LiveEvent::ContentReady { .. })); @@ -108,7 +107,6 @@ async fn sync_simple() -> Result<()> { }; assert_eq!(event.namespace, doc0.id()); assert_eq!(event.peer, nodes[1].peer_id()); - assert_eq!(event.origin, Origin::Accept); assert_eq!(event.result, Ok(())); for node in nodes { @@ -122,7 +120,7 @@ async fn sync_simple() -> Result<()> { async fn sync_subscribe() -> Result<()> { setup_logging(); let rt = test_runtime(); - let node = spawn_node(rt).await?; + let node = spawn_node(rt, 0).await?; let client = node.client(); let doc = client.docs.create().await?; let mut sub = doc.subscribe().await?; @@ -141,8 +139,8 @@ async fn sync_subscribe() -> Result<()> { async fn sync_full_basic() -> Result<()> { setup_logging(); let rt = test_runtime(); - let nodes = spawn_nodes(rt, 3).await?; - let clients = nodes.iter().map(|node| node.client()).collect::>(); + let mut nodes = spawn_nodes(rt.clone(), 2).await?; + let mut clients = nodes.iter().map(|node| node.client()).collect::>(); // node1: create doc and ticket let (ticket, doc1) = { @@ -208,47 +206,61 @@ async fn sync_full_basic() -> Result<()> { }; // node 3 joins & imports the doc from peer 1 - let _doc3 = { - let iroh = &clients[2]; - let doc = iroh.docs.import(ticket).await?; + nodes.push(spawn_node(rt.clone(), nodes.len()).await?); + clients.push(nodes.last().unwrap().client()); + let iroh = &clients[2]; + let doc = iroh.docs.import(ticket).await?; - // wait for 2 remote inserts - let mut events = doc.subscribe().await?; - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::InsertRemote { .. }), - "expected InsertRemote but got {event:?}" - ); - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::InsertRemote { .. }), - "expected InsertRemote but got {event:?}" - ); - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::SyncFinished(_)), - "expected SyncFinished but got {event:?}" - ); - let event = events.try_next().await?.unwrap(); - assert!( - matches!(event, LiveEvent::ContentReady { .. }), - "expected ContentReady but got {event:?}" - ); + // expect 2 times InsertRemote + let mut events = doc.subscribe().await?; + let event = events.try_next().await?.unwrap(); + assert!( + matches!(event, LiveEvent::InsertRemote { .. }), + "expected InsertRemote but got {event:?}" + ); + let event = events.try_next().await?.unwrap(); + assert!( + matches!(event, LiveEvent::InsertRemote { .. }), + "expected InsertRemote but got {event:?}" + ); + + // now expect SyncFinished + let event = events.try_next().await?.unwrap(); + let LiveEvent::SyncFinished(event) = event else { + panic!("expected SyncFinished but got {event:?}"); + }; + let expected = SyncEvent { + peer: nodes[0].peer_id(), + namespace: doc.id(), + result: Ok(()), + origin: event.origin.clone(), + finished: event.finished, + }; + assert_eq!(event, expected, "expected {expected:?} but got {event:?}"); + + // expect 2 times ContentReady + // potentically a `SyncFinished` event with `Origin::Accept` is inbetween, + // as node1 or node2 could connect to us. + let mut i = 0; + while i < 2 { let event = events.try_next().await?.unwrap(); + if matches!( + event, + LiveEvent::SyncFinished(SyncEvent { + origin: Origin::Accept, + .. + }) + ) { + continue; + } assert!( matches!(event, LiveEvent::ContentReady { .. }), "expected ContentReady but got {event:?}" ); - - assert_latest(&doc, b"k1", b"v1").await; - assert_latest(&doc, b"k2", b"v2").await; - doc - }; - - // TODO: - // - gossiping between multiple peers - // - better test utils - // - ... + i += 1; + } + assert_latest(&doc, b"k1", b"v1").await; + assert_latest(&doc, b"k2", b"v2").await; for node in nodes { node.shutdown(); @@ -261,7 +273,7 @@ async fn sync_full_basic() -> Result<()> { async fn sync_subscribe_stop() -> Result<()> { setup_logging(); let rt = test_runtime(); - let node = spawn_node(rt).await?; + let node = spawn_node(rt, 0).await?; let client = node.client(); let doc = client.docs.create().await?;