diff --git a/src/transport/common/listener.rs b/src/transport/common/listener.rs index 856b4c19..cdc1acca 100644 --- a/src/transport/common/listener.rs +++ b/src/transport/common/listener.rs @@ -33,7 +33,7 @@ use socket2::{Domain, Socket, Type}; use tokio::net::{TcpListener as TokioTcpListener, TcpStream}; use std::{ - io, + io::{self, ErrorKind}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, sync::Arc, @@ -157,7 +157,7 @@ impl DialAddresses { /// Socket listening to zero or more addresses. pub struct SocketListener { /// Listeners. - listeners: Vec, + listeners: Vec>, /// The index in the listeners from which the polling is resumed. poll_index: usize, } @@ -308,7 +308,7 @@ impl SocketListener { vec![local_address] }; - Some((listener, listen_addresses)) + Some((Some(listener), listen_addresses)) }) .unzip(); @@ -455,17 +455,43 @@ impl Stream for SocketListener { let len = self.listeners.len(); for index in 0..len { let current = (self.poll_index + index) % len; - let listener = &mut self.listeners[current]; - match listener.poll_accept(cx) { - Poll::Pending => {} - Poll::Ready(Err(error)) => { - self.poll_index = (self.poll_index + 1) % len; - return Poll::Ready(Some(Err(error))); - } - Poll::Ready(Ok((stream, address))) => { - self.poll_index = (self.poll_index + 1) % len; - return Poll::Ready(Some(Ok((stream, address)))); + let Some(listener) = &mut self.listeners[current] else { + continue; + }; + + loop { + match listener.poll_accept(cx) { + Poll::Pending => { + break; + } + Poll::Ready(Ok((stream, address))) => { + self.poll_index = (self.poll_index + 1) % len; + return Poll::Ready(Some(Ok((stream, address)))); + } + Poll::Ready(Err(error)) => match error.kind() { + ErrorKind::ConnectionAborted | ErrorKind::ConnectionReset => { + tracing::debug!( + target: LOG_TARGET, + ?error, + "Recoverable error for listener, continuing", + ); + // Poll again the listener on recoverable errors. + continue; + } + _ => { + self.poll_index = (self.poll_index + 1) % len; + + tracing::error!( + target: LOG_TARGET, + ?error, + "Fatal error for listener", + ); + self.listeners[current] = None; + + return Poll::Ready(Some(Err(error))); + } + }, } } } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index a97fa417..1e1e834f 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -560,23 +560,24 @@ impl Stream for TcpTransport { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { - return match event { + match event { None => { tracing::error!( target: LOG_TARGET, "TCP listener terminated, ignore if the node is stopping", ); - Poll::Ready(None) + return Poll::Ready(None); } Some(Err(error)) => { tracing::error!( target: LOG_TARGET, ?error, - "TCP listener terminated with error", + "One of the TCP listeners terminated with error. Please ensure your network interface is working correctly. The listener will be removed from service.", ); - Poll::Ready(None) + // Ensure the task is polled again to continue processing other listeners. + cx.waker().wake_by_ref(); } Some(Ok((connection, address))) => { let connection_id = self.context.next_connection_id(); @@ -595,11 +596,11 @@ impl Stream for TcpTransport { }, ); - Poll::Ready(Some(TransportEvent::PendingInboundConnection { + return Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, - })) + })); } - }; + } } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 11013e6c..b0de04cb 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -607,23 +607,24 @@ impl Stream for WebSocketTransport { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { - return match event { + match event { None => { tracing::error!( target: LOG_TARGET, "Websocket listener terminated, ignore if the node is stopping", ); - Poll::Ready(None) + return Poll::Ready(None); } Some(Err(error)) => { tracing::error!( target: LOG_TARGET, ?error, - "Websocket listener terminated with error", + "One of the Websocket listeners terminated with error. Please ensure your network interface is working correctly. The listener will be removed from service.", ); - Poll::Ready(None) + // Ensure the task is polled again to continue processing other listeners. + cx.waker().wake_by_ref(); } Some(Ok((connection, address))) => { let connection_id = self.context.next_connection_id(); @@ -642,11 +643,11 @@ impl Stream for WebSocketTransport { }, ); - Poll::Ready(Some(TransportEvent::PendingInboundConnection { + return Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, - })) + })); } - }; + } } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {