Skip to content

Commit

Permalink
fix: avoid double conns, better state tracking (#1505)
Browse files Browse the repository at this point in the history
## Description

On `main` the test `sync_full_basic` is flakey. This PR (hopefully)
fixes it.

The reason was: We have the situation that two peers initiate
connections to each other at (roughly) the same time. In #1491 this was
sometimes prevented, but not reliably.

This PR fixes it by:
* When connecting, we set a `SyncState::Dialing` for the `(namespace,
peer)`
* When accepting, if our own state is `Dialing` for the incoming request
for `(namespace, peer)` we compare our peer id with that of the incoming
request, and abort if ours is higher (doesn't matter which way, we care
about a predictable outcome only
* Through this, only one of the two simoultanoues connections will
survive
* Also added a `Abort` frame to the wire protocol to transfer to inform
the dialer about the reason of the declined request, which is either "we
do double sync, and will take the other conn" (`AlreadySyncing`) or
"this replica is not here" (`NotAvailable`)

This PR also:
* Further improves logging
* Improves errors

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.

---------

Co-authored-by: Diva M <divma@protonmail.com>
  • Loading branch information
Frando and divagant-martian committed Sep 21, 2023
1 parent f16e439 commit d8cc9df
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 183 deletions.
2 changes: 1 addition & 1 deletion iroh-net/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl From<VerifyingKey> for PublicKey {

impl Debug for PublicKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut text = data_encoding::BASE32_NOPAD.encode(self.as_bytes());
let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]);
text.make_ascii_lowercase();
write!(f, "PublicKey({text})")
}
Expand Down
167 changes: 118 additions & 49 deletions iroh-sync/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use std::{future::Future, net::SocketAddr};

use anyhow::{Context, Result};
use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint};
use serde::{Deserialize, Serialize};
use tracing::debug;

use crate::{
Expand All @@ -30,18 +30,23 @@ pub async fn connect_and_sync<S: store::Store>(
peer: PublicKey,
derp_region: Option<u16>,
addrs: &[SocketAddr],
) -> Result<()> {
debug!(peer = ?peer, "sync (via connect): start");
) -> Result<(), ConnectError> {
debug!(?peer, "sync[dial]: connect");
let namespace = doc.namespace();
let connection = endpoint
.connect((peer, derp_region, addrs).into(), SYNC_ALPN)
.await
.context("failed to establish connection")?;
debug!(?peer, ?namespace, "sync (via connect): connected");
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
.map_err(ConnectError::connect)?;
debug!(?peer, ?namespace, "sync[dial]: connected");
let (mut send_stream, mut recv_stream) =
connection.open_bi().await.map_err(ConnectError::connect)?;
let res = run_alice::<S, _, _>(&mut send_stream, &mut recv_stream, doc, peer).await;
send_stream.finish().await?;
recv_stream.read_to_end(0).await?;

send_stream.finish().await.map_err(ConnectError::close)?;
recv_stream
.read_to_end(0)
.await
.map_err(ConnectError::close)?;

#[cfg(feature = "metrics")]
if res.is_ok() {
Expand All @@ -50,47 +55,32 @@ pub async fn connect_and_sync<S: store::Store>(
inc!(Metrics, sync_via_connect_failure);
}

debug!(peer = ?peer, ?res, "sync (via connect): done");
debug!(?peer, ?namespace, ?res, "sync[dial]: done");
res
}

/// What to do with incoming sync requests
#[derive(Debug)]
pub enum AcceptOutcome<S: store::Store> {
/// This namespace is not available for sync.
NotAvailable,
/// This namespace is already syncing, therefore abort.
AlreadySyncing,
/// Accept the sync request.
Accept(Replica<S::Instance>),
}

impl<S: store::Store> From<Option<Replica<S::Instance>>> for AcceptOutcome<S> {
fn from(replica: Option<Replica<S::Instance>>) -> Self {
match replica {
Some(replica) => AcceptOutcome::Accept(replica),
None => AcceptOutcome::NotAvailable,
}
}
}
pub type AcceptOutcome<S> = Result<Replica<<S as store::Store>::Instance>, AbortReason>;

/// Handle an iroh-sync connection and sync all shared documents in the replica store.
pub async fn handle_connection<S, F, Fut>(
connecting: quinn::Connecting,
accept_cb: F,
) -> std::result::Result<(NamespaceId, PublicKey), SyncError>
) -> Result<(NamespaceId, PublicKey), AcceptError>
where
S: store::Store,
F: Fn(NamespaceId, PublicKey) -> Fut,
Fut: Future<Output = anyhow::Result<AcceptOutcome<S>>>,
{
let connection = connecting.await.map_err(SyncError::connect)?;
let peer = get_peer_id(&connection).await.map_err(SyncError::connect)?;
let connection = connecting.await.map_err(AcceptError::connect)?;
let peer = get_peer_id(&connection)
.await
.map_err(AcceptError::connect)?;
let (mut send_stream, mut recv_stream) = connection
.accept_bi()
.await
.map_err(|error| SyncError::open(peer, error))?;
debug!(peer = ?peer, "sync (via accept): start");
.map_err(|e| AcceptError::open(peer, e))?;
debug!(?peer, "sync[accept]: handle");

let res = run_bob::<S, _, _, _, _>(&mut send_stream, &mut recv_stream, accept_cb, peer).await;

Expand All @@ -101,24 +91,30 @@ where
inc!(Metrics, sync_via_accept_failure);
}

debug!(peer = ?peer, ?res, "sync (via accept): done");
let namespace = match &res {
Ok(namespace) => Some(*namespace),
Err(err) => err.namespace(),
};

let namespace = res?;
send_stream
.finish()
.await
.map_err(|error| SyncError::close(peer, namespace, error))?;
.map_err(|error| AcceptError::close(peer, namespace, error))?;
recv_stream
.read_to_end(0)
.await
.map_err(|error| SyncError::close(peer, namespace, error))?;
.map_err(|error| AcceptError::close(peer, namespace, error))?;
let namespace = res?;

debug!(?peer, ?namespace, "sync[accept]: done");

Ok((namespace, peer))
}

/// Failure reasons for sync.
/// Errors that may occur on handling incoming sync connections.
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum SyncError {
pub enum AcceptError {
/// Failed to establish connection
#[error("Failed to establish connection")]
Connect {
Expand All @@ -132,6 +128,13 @@ pub enum SyncError {
#[source]
error: anyhow::Error,
},
/// We aborted the sync request.
#[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
Abort {
peer: PublicKey,
namespace: NamespaceId,
reason: AbortReason,
},
/// Failed to run sync
#[error("Failed to sync {namespace:?} with {peer:?}")]
Sync {
Expand All @@ -144,13 +147,52 @@ pub enum SyncError {
#[error("Failed to close {namespace:?} with {peer:?}")]
Close {
peer: PublicKey,
namespace: NamespaceId,
namespace: Option<NamespaceId>,
#[source]
error: anyhow::Error,
},
}

impl SyncError {
/// Errors that may occur on outgoing sync requests.
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum ConnectError {
/// Failed to establish connection
#[error("Failed to establish connection")]
Connect {
#[source]
error: anyhow::Error,
},
/// The remote peer aborted the sync request.
#[error("Remote peer aborted sync: {0:?}")]
RemoteAbort(AbortReason),
/// We cancelled the operation
#[error("Cancelled")]
Cancelled,
/// Failed to run sync
#[error("Failed to sync")]
Sync {
#[source]
error: anyhow::Error,
},
/// Failed to close
#[error("Failed to close connection1")]
Close {
#[source]
error: anyhow::Error,
},
}

/// Reason why we aborted an incoming sync request.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum AbortReason {
/// Namespace is not avaiable.
NotAvailable,
/// We are already syncing this namespace.
AlreadySyncing,
}

impl AcceptError {
fn connect(error: impl Into<anyhow::Error>) -> Self {
Self::Connect {
error: error.into(),
Expand All @@ -173,7 +215,11 @@ impl SyncError {
error: error.into(),
}
}
fn close(peer: PublicKey, namespace: NamespaceId, error: impl Into<anyhow::Error>) -> Self {
fn close(
peer: PublicKey,
namespace: Option<NamespaceId>,
error: impl Into<anyhow::Error>,
) -> Self {
Self::Close {
peer,
namespace,
Expand All @@ -183,20 +229,43 @@ impl SyncError {
/// Get the peer's node ID (if available)
pub fn peer(&self) -> Option<PublicKey> {
match self {
SyncError::Connect { .. } => None,
SyncError::Open { peer, .. } => Some(*peer),
SyncError::Sync { peer, .. } => Some(*peer),
SyncError::Close { peer, .. } => Some(*peer),
AcceptError::Connect { .. } => None,
AcceptError::Open { peer, .. } => Some(*peer),
AcceptError::Sync { peer, .. } => Some(*peer),
AcceptError::Close { peer, .. } => Some(*peer),
AcceptError::Abort { peer, .. } => Some(*peer),
}
}

/// Get the namespace (if available)
pub fn namespace(&self) -> Option<NamespaceId> {
match self {
SyncError::Connect { .. } => None,
SyncError::Open { .. } => None,
SyncError::Sync { namespace, .. } => namespace.to_owned(),
SyncError::Close { namespace, .. } => Some(*namespace),
AcceptError::Connect { .. } => None,
AcceptError::Open { .. } => None,
AcceptError::Sync { namespace, .. } => namespace.to_owned(),
AcceptError::Close { namespace, .. } => namespace.to_owned(),
AcceptError::Abort { namespace, .. } => Some(*namespace),
}
}
}

impl ConnectError {
fn connect(error: impl Into<anyhow::Error>) -> Self {
Self::Connect {
error: error.into(),
}
}
fn close(error: impl Into<anyhow::Error>) -> Self {
Self::Close {
error: error.into(),
}
}
pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
Self::Sync {
error: error.into(),
}
}
pub(crate) fn remote_abort(reason: AbortReason) -> Self {
Self::RemoteAbort(reason)
}
}

0 comments on commit d8cc9df

Please sign in to comment.