Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous Websocket Message Reading on Readiness Event #2852

Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions bitacross-worker/core/tls-websocket-server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ use mio::{event::Event, net::TcpStream, Poll, Ready, Token};
use rustls::{ServerSession, Session};
use std::{
format,
io::ErrorKind,
string::{String, ToString},
sync::Arc,
time::Instant,
vec,
};
use tungstenite::Message;

Expand Down Expand Up @@ -131,31 +133,48 @@ where
/// Read from a web-socket, or initiate handshake if websocket is not initialized yet.
///
/// Returns a boolean 'connection should be closed'.
fn read_or_initialize_websocket(&mut self) -> WebSocketResult<bool> {
fn drain_message_or_initialize_websocket(&mut self) -> WebSocketResult<bool> {
if let StreamState::Established(web_socket) = &mut self.stream_state {
trace!(
"Read is possible for connection {}: {}",
self.connection_token.0,
web_socket.can_read()
);
match web_socket.read_message() {
Ok(m) =>
if let Err(e) = self.handle_message(m) {
error!(
"Failed to handle web-socket message (connection {}): {:?}",
self.connection_token.0, e
);

Jayanring marked this conversation as resolved.
Show resolved Hide resolved
let mut messages = vec![];
let mut is_closing = false;
loop {
match web_socket.read_message() {
Ok(m) => messages.push(m),
Err(e) => {
match e {
tungstenite::Error::ConnectionClosed
| tungstenite::Error::AlreadyClosed => is_closing = true,
Jayanring marked this conversation as resolved.
Show resolved Hide resolved
tungstenite::Error::Io(e)
if matches!(e.kind(), ErrorKind::WouldBlock) => {}, // no message to read
_ => {
error!(
"Failed to read message from web-socket (connection {}): {:?}",
self.connection_token.0, e
);
},
Jayanring marked this conversation as resolved.
Show resolved Hide resolved
}
break
},
Err(e) => match e {
tungstenite::Error::ConnectionClosed => return Ok(true),
tungstenite::Error::AlreadyClosed => return Ok(true),
_ => error!(
"Failed to read message from web-socket (connection {}): {:?}",
self.connection_token.0, e
),
},
}
}

messages.into_iter().for_each(|m| {
if let Err(e) = self.handle_message(m) {
error!(
"Failed to handle web-socket message (connection {}): {:?}",
self.connection_token.0, e
);
}
});

trace!("Read successful for connection {}", self.connection_token.0);
Ok(is_closing)
} else {
trace!("Initialize connection {}", self.connection_token.0);
self.stream_state = std::mem::take(&mut self.stream_state).attempt_handshake();
Expand All @@ -164,9 +183,8 @@ where
return Ok(true)
}
debug!("Initialized connection {} successfully", self.connection_token.0);
Ok(false)
}

Ok(false)
}

fn handle_message(&mut self, message: Message) -> WebSocketResult<()> {
Expand Down Expand Up @@ -283,7 +301,7 @@ where
let connection_state = self.maybe_do_tls_read();

if connection_state.is_alive() {
is_closing = self.read_or_initialize_websocket()?;
is_closing = self.drain_message_or_initialize_websocket()?;
} else {
is_closing = connection_state.is_closing();
}
Expand Down