From 33830a034acb4a986ef435e383d280a4442d600e Mon Sep 17 00:00:00 2001 From: Kasper Ziemianek Date: Thu, 27 Jun 2024 10:53:19 +0200 Subject: [PATCH 1/5] ws connection read messages fix --- .../tls-websocket-server/src/connection.rs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/bitacross-worker/core/tls-websocket-server/src/connection.rs b/bitacross-worker/core/tls-websocket-server/src/connection.rs index 10266f9b57..7da158faa9 100644 --- a/bitacross-worker/core/tls-websocket-server/src/connection.rs +++ b/bitacross-worker/core/tls-websocket-server/src/connection.rs @@ -131,7 +131,7 @@ where /// Read from a web-socket, or initiate handshake if websocket is not initialized yet. /// /// Returns a boolean 'connection should be closed'. - fn read_or_initialize_websocket(&mut self) -> WebSocketResult { + fn read_or_initialize_websocket(&mut self) -> WebSocketResult<(bool, bool)> { if let StreamState::Established(web_socket) = &mut self.stream_state { trace!( "Read is possible for connection {}: {}", @@ -147,12 +147,15 @@ where ); }, Err(e) => match e { - tungstenite::Error::ConnectionClosed => return Ok(true), - tungstenite::Error::AlreadyClosed => return Ok(true), - _ => error!( + tungstenite::Error::ConnectionClosed => return Ok((true, false)), + tungstenite::Error::AlreadyClosed => return Ok((true, false)), + _ => { + error!( "Failed to read message from web-socket (connection {}): {:?}", self.connection_token.0, e - ), + ); + return Ok((false , false)) + } }, } trace!("Read successful for connection {}", self.connection_token.0); @@ -161,12 +164,11 @@ where self.stream_state = std::mem::take(&mut self.stream_state).attempt_handshake(); if self.stream_state.is_invalid() { warn!("Web-socket connection ({:?}) failed, closing", self.connection_token); - return Ok(true) + return Ok((true, false)) } debug!("Initialized connection {} successfully", self.connection_token.0); } - - Ok(false) + Ok((false, true)) } fn handle_message(&mut self, message: Message) -> WebSocketResult<()> { @@ -276,6 +278,7 @@ where fn on_ready(&mut self, poll: &mut Poll, event: &Event) -> WebSocketResult<()> { let mut is_closing = false; + let mut read = true; if event.readiness().is_readable() { trace!("Connection ({:?}) is readable", self.token()); @@ -283,7 +286,12 @@ where let connection_state = self.maybe_do_tls_read(); if connection_state.is_alive() { - is_closing = self.read_or_initialize_websocket()?; + loop { + if !read { + break; + } + (is_closing, read) = self.read_or_initialize_websocket()?; + } } else { is_closing = connection_state.is_closing(); } From 8393800b949fb41eca1e69e03e4ac02efc0049e8 Mon Sep 17 00:00:00 2001 From: Jayanring Date: Mon, 1 Jul 2024 08:08:39 +0000 Subject: [PATCH 2/5] fix fmt --- .../core/tls-websocket-server/src/connection.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bitacross-worker/core/tls-websocket-server/src/connection.rs b/bitacross-worker/core/tls-websocket-server/src/connection.rs index 7da158faa9..35dbe0d2e9 100644 --- a/bitacross-worker/core/tls-websocket-server/src/connection.rs +++ b/bitacross-worker/core/tls-websocket-server/src/connection.rs @@ -151,11 +151,11 @@ where tungstenite::Error::AlreadyClosed => return Ok((true, false)), _ => { error!( - "Failed to read message from web-socket (connection {}): {:?}", - self.connection_token.0, e + "Failed to read message from web-socket (connection {}): {:?}", + self.connection_token.0, e ); - return Ok((false , false)) - } + return Ok((false, false)) + }, }, } trace!("Read successful for connection {}", self.connection_token.0); @@ -288,7 +288,7 @@ where if connection_state.is_alive() { loop { if !read { - break; + break } (is_closing, read) = self.read_or_initialize_websocket()?; } From ebbb6b2a0d1322461b08c777e58ad49da4684d04 Mon Sep 17 00:00:00 2001 From: Jayanring Date: Tue, 2 Jul 2024 06:37:03 +0000 Subject: [PATCH 3/5] optim handling --- .../tls-websocket-server/src/connection.rs | 66 +++++++++++-------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/bitacross-worker/core/tls-websocket-server/src/connection.rs b/bitacross-worker/core/tls-websocket-server/src/connection.rs index 35dbe0d2e9..baa1f0a7a6 100644 --- a/bitacross-worker/core/tls-websocket-server/src/connection.rs +++ b/bitacross-worker/core/tls-websocket-server/src/connection.rs @@ -28,9 +28,11 @@ use mio::{event::Event, net::TcpStream, Poll, Ready, Token}; use rustls::{ServerSession, Session}; use std::{ format, + io::ErrorKind, string::{String, ToString}, sync::Arc, time::Instant, + vec, }; use tungstenite::Message; @@ -131,44 +133,58 @@ where /// Read from a web-socket, or initiate handshake if websocket is not initialized yet. /// /// Returns a boolean 'connection should be closed'. - fn read_or_initialize_websocket(&mut self) -> WebSocketResult<(bool, bool)> { + fn drain_message_or_initialize_websocket(&mut self) -> WebSocketResult { if let StreamState::Established(web_socket) = &mut self.stream_state { trace!( "Read is possible for connection {}: {}", self.connection_token.0, web_socket.can_read() ); - match web_socket.read_message() { - Ok(m) => - if let Err(e) = self.handle_message(m) { - error!( - "Failed to handle web-socket message (connection {}): {:?}", - self.connection_token.0, e - ); - }, - Err(e) => match e { - tungstenite::Error::ConnectionClosed => return Ok((true, false)), - tungstenite::Error::AlreadyClosed => return Ok((true, false)), - _ => { - error!( - "Failed to read message from web-socket (connection {}): {:?}", - self.connection_token.0, e - ); - return Ok((false, false)) + + let mut messages = vec![]; + let mut is_closing = false; + loop { + match web_socket.read_message() { + Ok(m) => messages.push(m), + Err(e) => { + match e { + tungstenite::Error::ConnectionClosed + | tungstenite::Error::AlreadyClosed => is_closing = true, + tungstenite::Error::Io(e) + if matches!(e.kind(), ErrorKind::WouldBlock) => {}, // no message to read + _ => { + error!( + "Failed to read message from web-socket (connection {}): {:?}", + self.connection_token.0, e + ); + }, + } + break }, - }, + } } + + messages.into_iter().for_each(|m| { + if let Err(e) = self.handle_message(m) { + error!( + "Failed to handle web-socket message (connection {}): {:?}", + self.connection_token.0, e + ); + } + }); + trace!("Read successful for connection {}", self.connection_token.0); + Ok(is_closing) } else { trace!("Initialize connection {}", self.connection_token.0); self.stream_state = std::mem::take(&mut self.stream_state).attempt_handshake(); if self.stream_state.is_invalid() { warn!("Web-socket connection ({:?}) failed, closing", self.connection_token); - return Ok((true, false)) + return Ok(true) } debug!("Initialized connection {} successfully", self.connection_token.0); + Ok(false) } - Ok((false, true)) } fn handle_message(&mut self, message: Message) -> WebSocketResult<()> { @@ -278,7 +294,6 @@ where fn on_ready(&mut self, poll: &mut Poll, event: &Event) -> WebSocketResult<()> { let mut is_closing = false; - let mut read = true; if event.readiness().is_readable() { trace!("Connection ({:?}) is readable", self.token()); @@ -286,12 +301,7 @@ where let connection_state = self.maybe_do_tls_read(); if connection_state.is_alive() { - loop { - if !read { - break - } - (is_closing, read) = self.read_or_initialize_websocket()?; - } + is_closing = self.drain_message_or_initialize_websocket()?; } else { is_closing = connection_state.is_closing(); } From 4a097cfc719b9e282898a70ca5aa988e9390dd04 Mon Sep 17 00:00:00 2001 From: Jayanring Date: Wed, 3 Jul 2024 03:09:17 +0000 Subject: [PATCH 4/5] close ws when read error --- bitacross-worker/core/tls-websocket-server/src/connection.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bitacross-worker/core/tls-websocket-server/src/connection.rs b/bitacross-worker/core/tls-websocket-server/src/connection.rs index baa1f0a7a6..906bf48749 100644 --- a/bitacross-worker/core/tls-websocket-server/src/connection.rs +++ b/bitacross-worker/core/tls-websocket-server/src/connection.rs @@ -143,6 +143,9 @@ where let mut messages = vec![]; let mut is_closing = false; + + // Looping over 'read_message' is merely a workaround for the unexpected behavior of mio event triggering. + // Final solution will be applied in P-907. loop { match web_socket.read_message() { Ok(m) => messages.push(m), @@ -157,6 +160,7 @@ where "Failed to read message from web-socket (connection {}): {:?}", self.connection_token.0, e ); + is_closing = true; }, } break From d1914956a45206854fa47ac672916cb9d86fe35c Mon Sep 17 00:00:00 2001 From: Jayanring Date: Wed, 3 Jul 2024 08:47:20 +0000 Subject: [PATCH 5/5] optimize error handling --- .../core/tls-websocket-server/src/connection.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bitacross-worker/core/tls-websocket-server/src/connection.rs b/bitacross-worker/core/tls-websocket-server/src/connection.rs index 906bf48749..667d440b4b 100644 --- a/bitacross-worker/core/tls-websocket-server/src/connection.rs +++ b/bitacross-worker/core/tls-websocket-server/src/connection.rs @@ -151,14 +151,13 @@ where Ok(m) => messages.push(m), Err(e) => { match e { - tungstenite::Error::ConnectionClosed - | tungstenite::Error::AlreadyClosed => is_closing = true, tungstenite::Error::Io(e) if matches!(e.kind(), ErrorKind::WouldBlock) => {}, // no message to read _ => { - error!( + trace!( "Failed to read message from web-socket (connection {}): {:?}", - self.connection_token.0, e + self.connection_token.0, + e ); is_closing = true; },