Skip to content

Commit

Permalink
core/src/connection/manager: Fully close a task on disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 18, 2021
1 parent 705842f commit d744b4a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 61 deletions.
23 changes: 12 additions & 11 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl<'a, I> EstablishedEntry<'a, I> {
}

/// Sends a close command to the associated background task,
/// thus initiating a graceful active close of the connection.
/// thus initiating a graceful active close of the connectione
///
/// Has no effect if the connection is already closing.
///
Expand Down Expand Up @@ -496,16 +496,17 @@ impl<'a, I> EstablishedEntry<'a, I> {
}
}

/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()"),
}
}
// TODO: Needed?
// /// Instantly removes the entry from the manager, dropping
// /// the command channel to the background task of the connection,
// /// which will thus drop the connection asap without an orderly
// /// close or emitting another event.
// pub fn remove(self) -> Connected {
// match self.task.remove().state {
// TaskState::Established(c) => c,
// TaskState::Pending => unreachable!("By Entry::new()"),
// }
// }

/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
Expand Down
2 changes: 2 additions & 0 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ where
Poll::Pending => {}
Poll::Ready(None) => {
// The manager has dropped the task; abort.
// TODO: Should we return the handler in this case?
return Poll::Ready(());
}
Poll::Ready(Some(_)) => {
Expand Down Expand Up @@ -275,6 +276,7 @@ where
}
Poll::Ready(None) => {
// The manager has dropped the task or disappeared; abort.
// TODO: Should we return the handler in this case?
return Poll::Ready(());
}
}
Expand Down
54 changes: 4 additions & 50 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ pub struct Pool<THandler: IntoConnectionHandler, TTransErr> {

/// The pending connections that are currently being negotiated.
pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,

/// Established connections that have been closed in the context of
/// a [`Pool::disconnect`] in order to emit a `ConnectionClosed`
/// event for each. Every `ConnectionEstablished` event must be
/// paired with (eventually) a `ConnectionClosed`.
disconnected: Vec<Disconnected>,
}

impl<THandler: IntoConnectionHandler, TTransErr> fmt::Debug for Pool<THandler, TTransErr> {
Expand Down Expand Up @@ -201,7 +195,6 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
manager: Manager::new(manager_config),
established: Default::default(),
pending: Default::default(),
disconnected: Vec::new(),
}
}

Expand Down Expand Up @@ -421,16 +414,12 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
pub fn disconnect(&mut self, peer: &PeerId) {
if let Some(conns) = self.established.get(peer) {
// Count upwards because we push to / pop from the end. See also `Pool::poll`.
let mut num_established = 0;
for (&id, endpoint) in conns.iter() {
if let Some(manager::Entry::Established(e)) = self.manager.entry(id) {
let connected = e.remove();
self.disconnected.push(Disconnected {
id,
connected,
num_established,
});
num_established += 1;
e.start_close();
// TODO: I removed the disconnected logic, thus depending on start_close to
// eventually trigger a ConnectionClosed event. Make sure that is the case and
// also that the num_established counters are kept consistent.
}
self.counters.dec_established(endpoint);
}
Expand Down Expand Up @@ -536,29 +525,6 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
&'a mut self,
cx: &mut Context<'_>,
) -> Poll<PoolEvent<'a, THandler, TTransErr>> {
// Drain events resulting from forced disconnections.
//
// Note: The `Disconnected` entries in `self.disconnected`
// are inserted in ascending order of the remaining `num_established`
// connections. Thus we `pop()` them off from the end to emit the
// events in an order that properly counts down `num_established`.
// See also `Pool::disconnect`.
if let Some(Disconnected {
id,
connected,
num_established,
}) = self.disconnected.pop()
{
return Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
num_established,
error: None,
pool: self,
handler: todo!(),
});
}

// Poll the connection `Manager`.
loop {
let item = match self.manager.poll(cx) {
Expand Down Expand Up @@ -1108,15 +1074,3 @@ impl ConnectionLimits {
self
}
}

/// Information about a former established connection to a peer
/// that was dropped via [`Pool::disconnect`].
struct Disconnected {
/// The unique identifier of the dropped connection.
id: ConnectionId,
/// Information about the dropped connection.
connected: Connected,
/// The remaining number of established connections
/// to the same peer.
num_established: u32,
}

0 comments on commit d744b4a

Please sign in to comment.