From c0431ed499090cfcae72f93dfb3194c056ee93cc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 23 Sep 2025 12:36:10 +0000 Subject: [PATCH 1/4] transport/listener: Continue polling on recoverable errors Signed-off-by: Alexandru Vasile --- src/transport/common/listener.rs | 35 +++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/transport/common/listener.rs b/src/transport/common/listener.rs index 856b4c19..3da255af 100644 --- a/src/transport/common/listener.rs +++ b/src/transport/common/listener.rs @@ -33,7 +33,7 @@ use socket2::{Domain, Socket, Type}; use tokio::net::{TcpListener as TokioTcpListener, TcpStream}; use std::{ - io, + io::{self, ErrorKind}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, sync::Arc, @@ -457,15 +457,30 @@ impl Stream for SocketListener { let current = (self.poll_index + index) % len; let listener = &mut self.listeners[current]; - match listener.poll_accept(cx) { - Poll::Pending => {} - Poll::Ready(Err(error)) => { - self.poll_index = (self.poll_index + 1) % len; - return Poll::Ready(Some(Err(error))); - } - Poll::Ready(Ok((stream, address))) => { - self.poll_index = (self.poll_index + 1) % len; - return Poll::Ready(Some(Ok((stream, address)))); + loop { + match listener.poll_accept(cx) { + Poll::Pending => { + break; + } + Poll::Ready(Ok((stream, address))) => { + self.poll_index = (self.poll_index + 1) % len; + return Poll::Ready(Some(Ok((stream, address)))); + } + Poll::Ready(Err(error)) => match error.kind() { + ErrorKind::ConnectionAborted | ErrorKind::ConnectionReset => { + tracing::debug!( + target: LOG_TARGET, + ?error, + "Recoverable error for listener, continuing", + ); + // Poll again the listener on recoverable errors. + continue; + } + _ => { + self.poll_index = (self.poll_index + 1) % len; + return Poll::Ready(Some(Err(error))); + } + }, } } } From f296c01c0234319f0bbaaef95863198bf9c29cf9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 23 Sep 2025 12:39:59 +0000 Subject: [PATCH 2/4] transport/listener: Drop fatal error listeners Signed-off-by: Alexandru Vasile --- src/transport/common/listener.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/transport/common/listener.rs b/src/transport/common/listener.rs index 3da255af..cdc1acca 100644 --- a/src/transport/common/listener.rs +++ b/src/transport/common/listener.rs @@ -157,7 +157,7 @@ impl DialAddresses { /// Socket listening to zero or more addresses. pub struct SocketListener { /// Listeners. - listeners: Vec, + listeners: Vec>, /// The index in the listeners from which the polling is resumed. poll_index: usize, } @@ -308,7 +308,7 @@ impl SocketListener { vec![local_address] }; - Some((listener, listen_addresses)) + Some((Some(listener), listen_addresses)) }) .unzip(); @@ -455,7 +455,10 @@ impl Stream for SocketListener { let len = self.listeners.len(); for index in 0..len { let current = (self.poll_index + index) % len; - let listener = &mut self.listeners[current]; + + let Some(listener) = &mut self.listeners[current] else { + continue; + }; loop { match listener.poll_accept(cx) { @@ -478,6 +481,14 @@ impl Stream for SocketListener { } _ => { self.poll_index = (self.poll_index + 1) % len; + + tracing::error!( + target: LOG_TARGET, + ?error, + "Fatal error for listener", + ); + self.listeners[current] = None; + return Poll::Ready(Some(Err(error))); } }, From 6a7499d7d97bfb1ea375277378bdc608aab2194b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 23 Sep 2025 12:43:33 +0000 Subject: [PATCH 3/4] tcp: Ensure the transport does not terminate on listener errors Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index a97fa417..1e1e834f 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -560,23 +560,24 @@ impl Stream for TcpTransport { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { - return match event { + match event { None => { tracing::error!( target: LOG_TARGET, "TCP listener terminated, ignore if the node is stopping", ); - Poll::Ready(None) + return Poll::Ready(None); } Some(Err(error)) => { tracing::error!( target: LOG_TARGET, ?error, - "TCP listener terminated with error", + "One of the TCP listeners terminated with error. Please ensure your network interface is working correctly. The listener will be removed from service.", ); - Poll::Ready(None) + // Ensure the task is polled again to continue processing other listeners. + cx.waker().wake_by_ref(); } Some(Ok((connection, address))) => { let connection_id = self.context.next_connection_id(); @@ -595,11 +596,11 @@ impl Stream for TcpTransport { }, ); - Poll::Ready(Some(TransportEvent::PendingInboundConnection { + return Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, - })) + })); } - }; + } } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { From d8066756596d58acc0a8f52e4889e62c11375fb2 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 23 Sep 2025 13:49:49 +0000 Subject: [PATCH 4/4] websocket: Ensure the transport does not terminate on listener errors Signed-off-by: Alexandru Vasile --- src/transport/websocket/mod.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 11013e6c..b0de04cb 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -607,23 +607,24 @@ impl Stream for WebSocketTransport { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { - return match event { + match event { None => { tracing::error!( target: LOG_TARGET, "Websocket listener terminated, ignore if the node is stopping", ); - Poll::Ready(None) + return Poll::Ready(None); } Some(Err(error)) => { tracing::error!( target: LOG_TARGET, ?error, - "Websocket listener terminated with error", + "One of the Websocket listeners terminated with error. Please ensure your network interface is working correctly. The listener will be removed from service.", ); - Poll::Ready(None) + // Ensure the task is polled again to continue processing other listeners. + cx.waker().wake_by_ref(); } Some(Ok((connection, address))) => { let connection_id = self.context.next_connection_id(); @@ -642,11 +643,11 @@ impl Stream for WebSocketTransport { }, ); - Poll::Ready(Some(TransportEvent::PendingInboundConnection { + return Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, - })) + })); } - }; + } } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {