Skip to content

Commit

Permalink
refactor: make the conn manager actually work as intended
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 22, 2024
1 parent ead60b3 commit 6229f8a
Show file tree
Hide file tree
Showing 3 changed files with 478 additions and 39 deletions.
76 changes: 37 additions & 39 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures_lite::{stream::Stream, StreamExt};
use futures_util::future::FutureExt;
use genawaiter::sync::{Co, Gen};
use iroh_net::{
dialer::{ConnDirection, ConnManager, NewConnection},
dialer::{ConnDirection, ConnInfo, ConnManager},
endpoint::Connection,
key::PublicKey,
AddrInfo, Endpoint, NodeAddr,
Expand Down Expand Up @@ -382,9 +382,14 @@ impl Actor {
}
}
}
Some(new_conn) = self.conn_manager.next() => {
Some(res) = self.conn_manager.next() => {
trace!(?i, "tick: conn_manager");
self.handle_new_connection(new_conn).await;
match res {
Ok(conn) => self.handle_new_connection(conn).await,
Err(err) => {
self.handle_in_event(InEvent::PeerDisconnected(err.node_id), Instant::now()).await?;
}
}
}
Some(res) = self.conn_tasks.join_next(), if !self.conn_tasks.is_empty() => {
match res {
Expand All @@ -393,7 +398,7 @@ impl Actor {
Ok((node_id, result)) => {
self.conn_manager.remove(&node_id);
self.conn_send_tx.remove(&node_id);
self.handle_in_event(InEvent::PeerDisconnected(node_id), Instant::now()).await ?;
self.handle_in_event(InEvent::PeerDisconnected(node_id), Instant::now()).await?;
match result {
Ok(()) => {
debug!(peer=%node_id.fmt_short(), "connection closed without error");
Expand Down Expand Up @@ -430,11 +435,11 @@ impl Actor {
async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> {
trace!("handle to_actor {msg:?}");
match msg {
ToActor::AcceptConn(conn) => match self.conn_manager.accept(conn) {
Err(err) => warn!(?err, "failed to accept connection"),
Ok(None) => {}
Ok(Some(conn)) => self.handle_new_connection(conn).await,
},
ToActor::AcceptConn(conn) => {
if let Err(err) = self.conn_manager.accept(conn) {
warn!(?err, "failed to accept connection");
}
}
ToActor::Join(topic_id, peers, reply) => {
self.handle_in_event(InEvent::Command(topic_id, Command::Join(peers)), now)
.await?;
Expand Down Expand Up @@ -498,7 +503,7 @@ impl Actor {
self.conn_manager.remove(&peer_id);
}
} else {
if !self.conn_manager.is_dialing(&peer_id) {
if !self.conn_manager.is_pending(&peer_id) {
debug!(peer = ?peer_id, "dial");
self.conn_manager.dial(peer_id);
}
Expand Down Expand Up @@ -545,38 +550,31 @@ impl Actor {
Ok(())
}

async fn handle_new_connection(&mut self, new_conn: NewConnection) {
let NewConnection {
async fn handle_new_connection(&mut self, new_conn: ConnInfo) {
let ConnInfo {
conn,
node_id: peer_id,
node_id,
direction,
} = new_conn;
match conn {
Ok(conn) => {
let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
self.conn_send_tx.insert(peer_id, send_tx.clone());

// Spawn a task for this connection
let pending_sends = self.pending_sends.remove(&peer_id);
let in_event_tx = self.in_event_tx.clone();
debug!(peer=%peer_id.fmt_short(), ?direction, "connection established");
self.conn_tasks.spawn(
connection_loop(
peer_id,
conn,
direction,
send_rx,
in_event_tx,
pending_sends,
)
.map(move |r| (peer_id, r))
.instrument(error_span!("gossip_conn", peer = %peer_id.fmt_short())),
);
}
Err(err) => {
warn!(peer=%peer_id.fmt_short(), "connecting to node failed: {err:?}");
}
}
let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
self.conn_send_tx.insert(node_id, send_tx.clone());

// Spawn a task for this connection
let pending_sends = self.pending_sends.remove(&node_id);
let in_event_tx = self.in_event_tx.clone();
debug!(peer=%node_id.fmt_short(), ?direction, "connection established");
self.conn_tasks.spawn(
connection_loop(
node_id,
conn,
direction,
send_rx,
in_event_tx,
pending_sends,
)
.map(move |r| (node_id, r))
.instrument(error_span!("gossip_conn", peer = %node_id.fmt_short())),
);
}

fn subscribe_all(&mut self) -> broadcast::Receiver<(TopicId, Event)> {
Expand Down

0 comments on commit 6229f8a

Please sign in to comment.