Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions src/transport/common/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use socket2::{Domain, Socket, Type};
use tokio::net::{TcpListener as TokioTcpListener, TcpStream};

use std::{
io,
io::{self, ErrorKind},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl DialAddresses {
/// Socket listening to zero or more addresses.
pub struct SocketListener {
/// Listeners.
listeners: Vec<TokioTcpListener>,
listeners: Vec<Option<TokioTcpListener>>,
/// The index in the listeners from which the polling is resumed.
poll_index: usize,
}
Expand Down Expand Up @@ -308,7 +308,7 @@ impl SocketListener {
vec![local_address]
};

Some((listener, listen_addresses))
Some((Some(listener), listen_addresses))
})
.unzip();

Expand Down Expand Up @@ -455,17 +455,43 @@ impl Stream for SocketListener {
let len = self.listeners.len();
for index in 0..len {
let current = (self.poll_index + index) % len;
let listener = &mut self.listeners[current];

match listener.poll_accept(cx) {
Poll::Pending => {}
Poll::Ready(Err(error)) => {
self.poll_index = (self.poll_index + 1) % len;
return Poll::Ready(Some(Err(error)));
}
Poll::Ready(Ok((stream, address))) => {
self.poll_index = (self.poll_index + 1) % len;
return Poll::Ready(Some(Ok((stream, address))));
let Some(listener) = &mut self.listeners[current] else {
continue;
};

loop {
match listener.poll_accept(cx) {
Poll::Pending => {
break;
}
Poll::Ready(Ok((stream, address))) => {
self.poll_index = (self.poll_index + 1) % len;
return Poll::Ready(Some(Ok((stream, address))));
}
Poll::Ready(Err(error)) => match error.kind() {
ErrorKind::ConnectionAborted | ErrorKind::ConnectionReset => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would double check that socket.accept() can't generate these errors continuously after a socket error — otherwise we would end up busy-looping here.

tracing::debug!(
target: LOG_TARGET,
?error,
"Recoverable error for listener, continuing",
);
// Poll again the listener on recoverable errors.
continue;
}
_ => {
self.poll_index = (self.poll_index + 1) % len;

tracing::error!(
target: LOG_TARGET,
?error,
"Fatal error for listener",
);
self.listeners[current] = None;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO a better way would be to terminate the process and let the restarted process listen again, than stopping accepting incoming connections completely.


return Poll::Ready(Some(Err(error)));
}
},
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,23 +560,24 @@ impl Stream for TcpTransport {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) {
return match event {
match event {
None => {
tracing::error!(
target: LOG_TARGET,
"TCP listener terminated, ignore if the node is stopping",
);

Poll::Ready(None)
return Poll::Ready(None);
}
Some(Err(error)) => {
tracing::error!(
target: LOG_TARGET,
?error,
"TCP listener terminated with error",
"One of the TCP listeners terminated with error. Please ensure your network interface is working correctly. The listener will be removed from service.",
);

Poll::Ready(None)
// Ensure the task is polled again to continue processing other listeners.
cx.waker().wake_by_ref();
}
Some(Ok((connection, address))) => {
let connection_id = self.context.next_connection_id();
Expand All @@ -595,11 +596,11 @@ impl Stream for TcpTransport {
},
);

Poll::Ready(Some(TransportEvent::PendingInboundConnection {
return Poll::Ready(Some(TransportEvent::PendingInboundConnection {
connection_id,
}))
}));
}
};
}
}

while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {
Expand Down
15 changes: 8 additions & 7 deletions src/transport/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,23 +607,24 @@ impl Stream for WebSocketTransport {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) {
return match event {
match event {
None => {
tracing::error!(
target: LOG_TARGET,
"Websocket listener terminated, ignore if the node is stopping",
);

Poll::Ready(None)
return Poll::Ready(None);
}
Some(Err(error)) => {
tracing::error!(
target: LOG_TARGET,
?error,
"Websocket listener terminated with error",
"One of the Websocket listeners terminated with error. Please ensure your network interface is working correctly. The listener will be removed from service.",
);

Poll::Ready(None)
// Ensure the task is polled again to continue processing other listeners.
cx.waker().wake_by_ref();
}
Some(Ok((connection, address))) => {
let connection_id = self.context.next_connection_id();
Expand All @@ -642,11 +643,11 @@ impl Stream for WebSocketTransport {
},
);

Poll::Ready(Some(TransportEvent::PendingInboundConnection {
return Poll::Ready(Some(TransportEvent::PendingInboundConnection {
connection_id,
}))
}));
}
};
}
}

while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {
Expand Down
Loading