Skip to content

Commit

Permalink
[feat] Utilize peers in join
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Apr 19, 2024
1 parent 2792513 commit 82d0e33
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
}
ToLiveActor::NeighborUp { namespace, peer } => {
debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
self.sync_with_peer(namespace, NodeAddr::from(peer), SyncReason::NewNeighbor);
self.subscribers
.send(&namespace, Event::NeighborUp(peer))
.await;
Expand Down Expand Up @@ -313,15 +313,16 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
Ok(true)
}

#[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
#[instrument("connect", skip_all, fields(node = %node.node_id.fmt_short(), namespace = %namespace.fmt_short()))]
fn sync_with_peer(&mut self, namespace: NamespaceId, node: NodeAddr, reason: SyncReason) {
let peer = node.node_id;
if !self.state.start_connect(&namespace, peer, reason) {
return;
}
let endpoint = self.endpoint.clone();
let sync = self.sync.clone();
let fut = async move {
let res = connect_and_sync(&endpoint, &sync, namespace, NodeAddr::new(peer)).await;
let res = connect_and_sync(&endpoint, &sync, namespace, node).await;
(namespace, peer, reason, res)
}
.instrument(Span::current());
Expand Down Expand Up @@ -409,9 +410,9 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
let peer_ids: Vec<PublicKey> = peers.iter().map(|p| p.node_id).collect();

// add addresses of peers to our endpoint address book
for peer in peers.into_iter() {
for peer in peers.iter() {
let peer_id = peer.node_id;
if let Err(err) = self.endpoint.add_node_addr(peer) {
if let Err(err) = self.endpoint.add_node_addr(peer.clone()) {
warn!(peer = %peer_id.fmt_short(), "failed to add known addrs: {err:?}");
}
}
Expand All @@ -425,7 +426,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
.await?;

// trigger initial sync with initial peers
for peer in peer_ids {
for peer in peers {
self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
}
Ok(())
Expand Down Expand Up @@ -551,7 +552,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
.await;

if resync {
self.sync_with_peer(namespace, peer, SyncReason::Resync);
self.sync_with_peer(namespace, NodeAddr::from(peer), SyncReason::Resync);
}
}

Expand Down Expand Up @@ -598,7 +599,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
match self.sync.has_news_for_us(report.namespace, heads).await {
Ok(Some(updated_authors)) => {
info!(%updated_authors, "news reported: sync now");
self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
self.sync_with_peer(report.namespace, NodeAddr::from(from), SyncReason::SyncReport);
}
Ok(None) => {
debug!("no news reported: nothing to do");
Expand Down

0 comments on commit 82d0e33

Please sign in to comment.