Skip to content

Commit

Permalink
Graceful shutdown for connections, networks and swarms.
Browse files Browse the repository at this point in the history
Building on the ability to wait for connection shutdown to
complete introduced in libp2p#1619,
this commit extends the ability for performing graceful
shutdowns in the following ways:

  1. The `ConnectionHandler` (and thus also `ProtocolsHandler`) can
  participate in the shutdown, via new `poll_close` methods. The
  muxer and underlying transport connection only starts closing once
  the connection handler signals readiness to do so.

  2. A `Network` can be gracefully shut down, which involves a
  graceful shutdown of the underlying connection `Pool`. The `Pool`
  in turn proceeds with a shutdown by rejecting new connections
  while draining established connections.

  3. A `Swarm` can be gracefully shut down, which involves a
  graceful shutdown of the underlying `Network` followed by
  polling the `NetworkBehaviour` until it returns `Poll::Pending`,
  i.e. it has no more output.

In particular, the following are important details:

  * Analogous to new inbound and outbound connections during shutdown,
  while a single connection is shutting down, it rejects new inbound substreams
  and, by the return type of `ConnectionHandler::poll_close`,
  no new outbound substreams can be requested.

  * The `NodeHandlerWrapper` managing the `ProtocolsHandler`
  always waits for already ongoing inbound and outbound substream
  upgrades to complete. Since the `NodeHandlerWrapper` is a
  `ConnectionHandler`, the previous point applies w.r.t. new inbound
  and outbound substreams.

  * When the `connection_keep_alive` expires, a graceful shutdown
  is initiated.
  • Loading branch information
Roman S. Borschel committed Jul 28, 2020
1 parent 9322433 commit 1251247
Show file tree
Hide file tree
Showing 23 changed files with 1,201 additions and 614 deletions.
122 changes: 96 additions & 26 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use error::{ConnectionError, PendingConnectionError};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
pub use manager::ConnectionId;
pub use substream::{Substream, SubstreamEndpoint, Close};
pub use substream::{Substream, SubstreamEndpoint};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};

use crate::muxing::StreamMuxer;
Expand Down Expand Up @@ -194,10 +194,12 @@ where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Node that handles the muxing.
/// The substream multiplexer over the connection I/O stream.
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
/// Handler that processes substreams.
/// The connection handler for the substreams.
handler: THandler,
/// The operating state of the connection.
state: ConnectionState,
}

impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
Expand Down Expand Up @@ -231,68 +233,114 @@ where
Connection {
muxing: Muxing::new(muxer),
handler,
state: ConnectionState::Open,
}
}

/// Returns a reference to the `ConnectionHandler`
pub fn handler(&self) -> &THandler {
&self.handler
}

/// Returns a mutable reference to the `ConnectionHandler`
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
}

/// Notifies the connection handler of an event.
///
/// Has no effect if the connection handler is already closed.
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
match self.state {
ConnectionState::Open | ConnectionState::CloseHandler
=> self.handler.inject_event(event),
_ => {
log::trace!("Ignoring handler event. Handler is closed.")
}
}
}

/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
/// Begins a graceful shutdown of the connection.
///
/// The connection must continue to be `poll()`ed to drive the
/// shutdown process to completion. Once connection shutdown is
/// complete, `poll()` returns `Ok(None)`.
pub fn start_close(&mut self) {
if self.state == ConnectionState::Open {
self.state = ConnectionState::CloseHandler;
}
}

/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
///
/// > **Note**: A return value of `Ok(None)` signals successful
/// > connection shutdown, whereas an `Err` signals termination
/// > of the connection due to an error. In either case, the
/// > connection must be dropped; any further method calls
/// > result in unspecified behaviour.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>>
-> Poll<Result<Option<Event<THandler::OutEvent>>, ConnectionError<THandler::Error>>>
{
loop {
if let ConnectionState::Closed = self.state { // (1)
return Poll::Ready(Ok(None))
}

if let ConnectionState::CloseMuxer = self.state { // (2)
match futures::ready!(self.muxing.poll_close(cx)) {
Ok(()) => {
self.state = ConnectionState::Closed;
return Poll::Ready(Ok(None))
}
Err(e) => return Poll::Ready(Err(ConnectionError::IO(e)))
}
}

// At this point the connection is either open or in the process
// of a graceful shutdown by the connection handler.
let mut io_pending = false;

// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
// of new substreams or other muxer events.
match self.muxing.poll(cx) {
Poll::Pending => io_pending = true,
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
// Drop new inbound substreams when closing. This is analogous
// to rejecting new connections.
if self.state == ConnectionState::Open {
self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
} else {
log::trace!("Inbound substream dropped. Connection is closing.")
}
}
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { user_data, substream })) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
return Poll::Ready(Ok(Some(Event::AddressChange(address))));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

// Poll the handler for new events.
match self.handler.poll(cx) {
let poll = match &self.state {
ConnectionState::Open => self.handler.poll(cx).map_ok(Some),
ConnectionState::CloseHandler => self.handler.poll_close(cx).map_ok(
|event| event.map(ConnectionHandlerEvent::Custom)),
s => panic!("Unexpected closing state: {:?}", s) // s.a. (1),(2)
};

match poll {
Poll::Pending => {
if io_pending {
return Poll::Pending // Nothing to do
}
}
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
Poll::Ready(Ok(Some(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data)))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
Poll::Ready(Ok(Some(ConnectionHandlerEvent::Custom(event)))) => {
return Poll::Ready(Ok(Some(Event::Handler(event))));
}
Poll::Ready(Ok(Some(ConnectionHandlerEvent::Close))) => {
self.start_close()
}
Poll::Ready(Ok(None)) => {
// The handler is done, we can now close the muxer (i.e. connection).
self.state = ConnectionState::CloseMuxer;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
Expand Down Expand Up @@ -352,3 +400,25 @@ impl fmt::Display for ConnectionLimit {

/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}

/// The state of a [`Connection`] w.r.t. an active graceful close.
#[derive(Debug, PartialEq, Eq)]
enum ConnectionState {
/// The connection is open, accepting new inbound and outbound
/// substreams.
Open,
/// The connection is closing, rejecting new inbound substreams
/// and not permitting new outbound substreams while the
/// connection handler closes. [`ConnectionHandler::poll_close`]
/// is called until completion which results in transitioning to
/// `CloseMuxer`.
CloseHandler,
/// The connection is closing, rejecting new inbound substreams
/// and not permitting new outbound substreams while the
/// muxer is closing the transport connection. [`Muxer::poll_close`]
/// is called until completion, which results in transitioning
/// to `Closed`.
CloseMuxer,
/// The connection is closed.
Closed
}
38 changes: 37 additions & 1 deletion core/src/connection/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{Multiaddr, PeerId};
use std::{task::Context, task::Poll};
use std::task::{Context, Poll};
use super::{Connected, SubstreamEndpoint};

/// The interface of a connection handler.
Expand Down Expand Up @@ -66,6 +66,37 @@ pub trait ConnectionHandler {
/// Returning an error will close the connection to the remote.
fn poll(&mut self, cx: &mut Context)
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>;

/// Polls the handler to make progress towards closing the connection.
///
/// When a connection is actively closed, the handler can perform
/// a graceful shutdown of the connection by draining the I/O
/// activity, e.g. allowing in-flight requests to complete without
/// accepting new ones, possibly signaling the remote that it
/// should direct further requests elsewhere.
///
/// The handler can also use the opportunity to flush any buffers
/// or clean up any other (asynchronous) resources before the
/// connection is ultimately dropped and closed on the transport
/// layer.
///
/// While closing, new inbound substreams are rejected and the
/// handler is unable to request new outbound substreams as
/// per the return type of `poll_close`.
///
/// The handler signals its readiness for the connection
/// to be closed by returning `Ready(Ok(None))`, which is the
/// default implementation. Hence, by default, connection
/// shutdown is not delayed and may result in ungraceful
/// interruption of ongoing I/O.
///
/// > **Note**: Once `poll_close()` is invoked, the handler is no
/// > longer `poll()`ed.
fn poll_close(&mut self, _: &mut Context)
-> Poll<Result<Option<Self::OutEvent>, Self::Error>>
{
Poll::Ready(Ok(None))
}
}

/// Prototype for a `ConnectionHandler`.
Expand Down Expand Up @@ -99,6 +130,9 @@ pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {

/// Other event.
Custom(TCustom),

/// Initiate connection shutdown.
Close,
}

/// Event produced by a handler.
Expand All @@ -112,6 +146,7 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
ConnectionHandlerEvent::OutboundSubstreamRequest(map(val))
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
ConnectionHandlerEvent::Close => ConnectionHandlerEvent::Close,
}
}

Expand All @@ -124,6 +159,7 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
ConnectionHandlerEvent::OutboundSubstreamRequest(val)
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)),
ConnectionHandlerEvent::Close => ConnectionHandlerEvent::Close,
}
}
}
Expand Down
64 changes: 26 additions & 38 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
muxing::StreamMuxer,
connection::{
self,
Close,
Connected,
Connection,
ConnectionError,
Expand Down Expand Up @@ -168,9 +167,6 @@ where
event: Option<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>
},

/// The connection is closing (active close).
Closing(Close<M>),

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>),

Expand Down Expand Up @@ -250,11 +246,8 @@ where
Poll::Ready(Some(Command::NotifyHandler(event))) =>
connection.inject_event(event),
Poll::Ready(Some(Command::Close)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
this.state = State::Closing(connection.close());
continue 'poll
// Start closing the connection, if not already.
connection.start_close();
}
Poll::Ready(None) => {
// The manager has dropped the task or disappeared; abort.
Expand All @@ -267,13 +260,19 @@ where
// Send the event to the manager.
match this.events.poll_ready(cx) {
Poll::Pending => {
this.state = State::Established { connection, event: Some(event) };
this.state = State::Established {
connection,
event: Some(event),
};
return Poll::Pending
}
Poll::Ready(result) => {
if result.is_ok() {
if let Ok(()) = this.events.start_send(event) {
this.state = State::Established { connection, event: None };
this.state = State::Established {
connection,
event: None,
};
continue 'poll
}
}
Expand All @@ -282,24 +281,34 @@ where
}
}
} else {
// Poll the connection for new events.
match Connection::poll(Pin::new(&mut connection), cx) {
Poll::Pending => {
this.state = State::Established { connection, event: None };
this.state = State::Established {
connection,
event: None,
};
return Poll::Pending
}
Poll::Ready(Ok(connection::Event::Handler(event))) => {
Poll::Ready(Ok(Some(connection::Event::Handler(event)))) => {
this.state = State::Established {
connection,
event: Some(Event::Notify { id, event })
event: Some(Event::Notify { id, event }),
};
}
Poll::Ready(Ok(connection::Event::AddressChange(new_address))) => {
Poll::Ready(Ok(Some(connection::Event::AddressChange(new_address)))) => {
this.state = State::Established {
connection,
event: Some(Event::AddressChange { id, new_address })
event: Some(Event::AddressChange { id, new_address }),
};
}
Poll::Ready(Ok(None)) => {
// The connection is closed, don't accept any further commands
// and terminate the task with a final event.
this.commands.get_mut().close();
let event = Event::Closed { id: this.id, error: None };
this.state = State::Terminating(event);
continue 'poll
}
Poll::Ready(Err(error)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
Expand All @@ -311,27 +320,6 @@ where
}
}

State::Closing(mut closing) => {
// Try to gracefully close the connection.
match closing.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed { id: this.id, error: None };
this.state = State::Terminating(event);
}
Poll::Ready(Err(e)) => {
let event = Event::Closed {
id: this.id,
error: Some(ConnectionError::IO(e))
};
this.state = State::Terminating(event);
}
Poll::Pending => {
this.state = State::Closing(closing);
return Poll::Pending
}
}
}

State::Terminating(event) => {
// Try to deliver the final event.
match this.events.poll_ready(cx) {
Expand Down
Loading

0 comments on commit 1251247

Please sign in to comment.