diff --git a/check.sh b/check.sh deleted file mode 100755 index 4b10628..0000000 --- a/check.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env bash -# This scripts runs various CI-like checks in a convenient way. -set -eux - -cargo install --quiet typos-cli - -typos -cargo check --quiet --workspace --all-targets -cargo check --quiet --workspace --all-targets --all-features -cargo check --quiet -p example_app --all-features --lib --target wasm32-unknown-unknown -cargo fmt --all -- --check -cargo clippy --quiet --workspace --all-targets --all-features -- -D warnings -W clippy::all -cargo test --quiet --workspace --all-targets --all-features -cargo test --quiet --workspace --doc - -./cargo_deny.sh - -echo "All checks passed!" diff --git a/ewebsock/Cargo.toml b/ewebsock/Cargo.toml index 9edb7e4..5f81ff0 100644 --- a/ewebsock/Cargo.toml +++ b/ewebsock/Cargo.toml @@ -28,6 +28,8 @@ tls = ["tungstenite/rustls-tls-webpki-roots"] ## This adds a lot of dependencies, ## but may yield lower latency and CPU usage ## when using `ws_connect`. +## +## Will ignore any `ControlFlow::Break` returned from the `on_event` callback. tokio = [ "dep:async-stream", "dep:futures", diff --git a/ewebsock/src/lib.rs b/ewebsock/src/lib.rs index ae588eb..1e3c0a1 100644 --- a/ewebsock/src/lib.rs +++ b/ewebsock/src/lib.rs @@ -20,6 +20,8 @@ #[cfg(not(feature = "tokio"))] mod native_tungstenite; +use std::ops::ControlFlow; + #[cfg(not(target_arch = "wasm32"))] #[cfg(not(feature = "tokio"))] pub use native_tungstenite::*; @@ -98,9 +100,9 @@ impl WsReceiver { let on_event = Box::new(move |event| { wake_up(); // wake up UI thread if tx.send(event).is_ok() { - std::ops::ControlFlow::Continue(()) + ControlFlow::Continue(()) } else { - std::ops::ControlFlow::Break(()) + ControlFlow::Break(()) } }); let ws_receiver = WsReceiver { rx }; @@ -119,7 +121,7 @@ pub type Error = String; /// Short for `Result`. pub type Result = std::result::Result; -pub(crate) type EventHandler = Box std::ops::ControlFlow<()>>; +pub(crate) type EventHandler = Box ControlFlow<()>>; /// Options for a connection. #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -143,6 +145,9 @@ impl Default for Options { /// Connect to the given URL, and return a sender and receiver. /// +/// If `on_event` returns [`ControlFlow::Break`], the connection will be closed +/// without calling `on_event` again. +/// /// This is a wrapper around [`ws_connect`]. /// /// # Errors @@ -161,6 +166,9 @@ pub fn connect(url: impl Into, options: Options) -> Result<(WsSender, Ws /// /// This allows you to wake up the UI thread, for instance. /// +/// If `on_event` returns [`ControlFlow::Break`], the connection will be closed +/// without calling `on_event` again. +/// /// This is a wrapper around [`ws_connect`]. /// /// # Errors @@ -180,6 +188,9 @@ pub fn connect_with_wakeup( /// Connect and call the given event handler on each received event. /// +/// If `on_event` returns [`ControlFlow::Break`], the connection will be closed +/// without calling `on_event` again. +/// /// See [`crate::connect`] for a more high-level version. /// /// # Errors @@ -196,6 +207,9 @@ pub fn ws_connect(url: String, options: Options, on_event: EventHandler) -> Resu /// /// This can be slightly more efficient when you don't need to send messages. /// +/// If `on_event` returns [`ControlFlow::Break`], the connection will be closed +/// without calling `on_event` again. +/// /// # Errors /// * On native: failure to spawn receiver thread. /// * On web: failure to use `WebSocket` API. diff --git a/ewebsock/src/native_tungstenite.rs b/ewebsock/src/native_tungstenite.rs index deec026..c5d1e84 100644 --- a/ewebsock/src/native_tungstenite.rs +++ b/ewebsock/src/native_tungstenite.rs @@ -1,6 +1,9 @@ #![allow(deprecated)] // TODO(emilk): Remove when we update tungstenite -use std::sync::mpsc::{Receiver, TryRecvError}; +use std::{ + ops::ControlFlow, + sync::mpsc::{Receiver, TryRecvError}, +}; use crate::{EventHandler, Options, Result, WsEvent, WsMessage}; @@ -13,9 +16,7 @@ pub struct WsSender { impl Drop for WsSender { fn drop(&mut self) { - if let Err(err) = self.close() { - log::warn!("Failed to close web-socket: {err:?}"); - } + self.close(); } } @@ -32,16 +33,11 @@ impl WsSender { /// Close the connection. /// /// This is called automatically when the sender is dropped. - /// - /// # Errors - /// This should never fail, except _maybe_ on Web. - #[allow(clippy::unnecessary_wraps)] // To keep the same signature as the Web version - pub fn close(&mut self) -> Result<()> { + pub fn close(&mut self) { if self.tx.is_some() { log::debug!("Closing WebSocket"); } self.tx = None; - Ok(()) } /// Forget about this sender without closing the connection. @@ -90,33 +86,46 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler response.headers() ); - on_event(WsEvent::Opened); + let control = on_event(WsEvent::Opened); + if control.is_break() { + log::trace!("Closing connection due to Break"); + return socket + .close(None) + .map_err(|err| format!("Failed to close connection: {err}")); + } loop { - match socket.read_message() { + let control = match socket.read_message() { Ok(incoming_msg) => match incoming_msg { tungstenite::protocol::Message::Text(text) => { - on_event(WsEvent::Message(WsMessage::Text(text))); + on_event(WsEvent::Message(WsMessage::Text(text))) } tungstenite::protocol::Message::Binary(data) => { - on_event(WsEvent::Message(WsMessage::Binary(data))); + on_event(WsEvent::Message(WsMessage::Binary(data))) } tungstenite::protocol::Message::Ping(data) => { - on_event(WsEvent::Message(WsMessage::Ping(data))); + on_event(WsEvent::Message(WsMessage::Ping(data))) } tungstenite::protocol::Message::Pong(data) => { - on_event(WsEvent::Message(WsMessage::Pong(data))); + on_event(WsEvent::Message(WsMessage::Pong(data))) } tungstenite::protocol::Message::Close(close) => { on_event(WsEvent::Closed); log::debug!("WebSocket close received: {close:?}"); return Ok(()); } - tungstenite::protocol::Message::Frame(_) => {} + tungstenite::protocol::Message::Frame(_) => ControlFlow::Continue(()), }, Err(err) => { return Err(format!("read: {err}")); } + }; + + if control.is_break() { + log::trace!("Closing connection due to Break"); + return socket + .close(None) + .map_err(|err| format!("Failed to close connection: {err}")); } std::thread::sleep(std::time::Duration::from_millis(10)); @@ -172,7 +181,13 @@ pub fn ws_connect_blocking( response.headers() ); - on_event(WsEvent::Opened); + let control = on_event(WsEvent::Opened); + if control.is_break() { + log::trace!("Closing connection due to Break"); + return socket + .close(None) + .map_err(|err| format!("Failed to close connection: {err}")); + } match socket.get_mut() { tungstenite::stream::MaybeTlsStream::Plain(stream) => stream.set_nonblocking(true), @@ -216,38 +231,45 @@ pub fn ws_connect_blocking( Err(TryRecvError::Empty) => {} }; - match socket.read_message() { + let control = match socket.read_message() { Ok(incoming_msg) => { did_work = true; match incoming_msg { tungstenite::protocol::Message::Text(text) => { - on_event(WsEvent::Message(WsMessage::Text(text))); + on_event(WsEvent::Message(WsMessage::Text(text))) } tungstenite::protocol::Message::Binary(data) => { - on_event(WsEvent::Message(WsMessage::Binary(data))); + on_event(WsEvent::Message(WsMessage::Binary(data))) } tungstenite::protocol::Message::Ping(data) => { - on_event(WsEvent::Message(WsMessage::Ping(data))); + on_event(WsEvent::Message(WsMessage::Ping(data))) } tungstenite::protocol::Message::Pong(data) => { - on_event(WsEvent::Message(WsMessage::Pong(data))); + on_event(WsEvent::Message(WsMessage::Pong(data))) } tungstenite::protocol::Message::Close(close) => { on_event(WsEvent::Closed); log::debug!("Close received: {close:?}"); return Ok(()); } - tungstenite::protocol::Message::Frame(_) => {} + tungstenite::protocol::Message::Frame(_) => ControlFlow::Continue(()), } } Err(tungstenite::Error::Io(io_err)) if io_err.kind() == std::io::ErrorKind::WouldBlock => { - // Ignore + ControlFlow::Continue(()) // Ignore } Err(err) => { return Err(format!("read: {err}")); } + }; + + if control.is_break() { + log::trace!("Closing connection due to Break"); + return socket + .close(None) + .map_err(|err| format!("Failed to close connection: {err}")); } if !did_work { diff --git a/ewebsock/src/native_tungstenite_tokio.rs b/ewebsock/src/native_tungstenite_tokio.rs index 29b3e50..d2a941f 100644 --- a/ewebsock/src/native_tungstenite_tokio.rs +++ b/ewebsock/src/native_tungstenite_tokio.rs @@ -1,3 +1,5 @@ +use std::ops::ControlFlow; + use crate::{EventHandler, Options, Result, WsEvent, WsMessage}; /// This is how you send [`WsMessage`]s to the server. @@ -9,9 +11,7 @@ pub struct WsSender { impl Drop for WsSender { fn drop(&mut self) { - if let Err(err) = self.close() { - log::warn!("Failed to close web-socket: {err:?}"); - } + self.close(); } } @@ -28,16 +28,11 @@ impl WsSender { /// Close the connection. /// /// This is called automatically when the sender is dropped. - /// - /// # Errors - /// This should never fail, except _maybe_ on Web. - #[allow(clippy::unnecessary_wraps)] // To keep the same signature as the Web version - pub fn close(&mut self) -> Result<()> { + pub fn close(&mut self) { if self.tx.is_some() { log::debug!("Closing WebSocket"); } self.tx = None; - Ok(()) } /// Forget about this sender without closing the connection. @@ -57,7 +52,7 @@ async fn ws_connect_async( let config = tungstenite::protocol::WebSocketConfig::from(options); let disable_nagle = false; // God damn everyone who adds negations to the names of their variables - let (ws_stream, _) = match tokio_tungstenite::connect_async_with_config( + let (ws_stream, _response) = match tokio_tungstenite::connect_async_with_config( url, Some(config), disable_nagle, @@ -72,7 +67,11 @@ async fn ws_connect_async( }; log::info!("WebSocket handshake has been successfully completed"); - on_event(WsEvent::Opened); + + let control = on_event(WsEvent::Opened); + if control.is_break() { + log::warn!("ControlFlow::Break not implemented for the tungstenite tokio backend"); + } let (write, read) = ws_stream.split(); @@ -88,29 +87,28 @@ async fn ws_connect_async( .forward(write); let reader = read.for_each(move |event| { - match event { + let control = match event { Ok(message) => match message { tungstenite::protocol::Message::Text(text) => { - on_event(WsEvent::Message(WsMessage::Text(text))); + on_event(WsEvent::Message(WsMessage::Text(text))) } tungstenite::protocol::Message::Binary(data) => { - on_event(WsEvent::Message(WsMessage::Binary(data))); + on_event(WsEvent::Message(WsMessage::Binary(data))) } tungstenite::protocol::Message::Ping(data) => { - on_event(WsEvent::Message(WsMessage::Ping(data))); + on_event(WsEvent::Message(WsMessage::Ping(data))) } tungstenite::protocol::Message::Pong(data) => { - on_event(WsEvent::Message(WsMessage::Pong(data))); - } - tungstenite::protocol::Message::Close(_) => { - on_event(WsEvent::Closed); + on_event(WsEvent::Message(WsMessage::Pong(data))) } - tungstenite::protocol::Message::Frame(_) => {} + tungstenite::protocol::Message::Close(_) => on_event(WsEvent::Closed), + tungstenite::protocol::Message::Frame(_) => ControlFlow::Continue(()), }, - Err(err) => { - on_event(WsEvent::Error(err.to_string())); - } + Err(err) => on_event(WsEvent::Error(err.to_string())), }; + if control.is_break() { + log::warn!("ControlFlow::Break not implemented for the tungstenite tokio backend"); + } async {} }); diff --git a/ewebsock/src/web.rs b/ewebsock/src/web.rs index 8d55f48..f257cd5 100644 --- a/ewebsock/src/web.rs +++ b/ewebsock/src/web.rs @@ -1,5 +1,7 @@ #![allow(trivial_casts)] +use std::{ops::ControlFlow, rc::Rc}; + use crate::{EventHandler, Options, Result, WsEvent, WsMessage}; #[allow(clippy::needless_pass_by_value)] @@ -16,27 +18,25 @@ fn string_from_js_string(s: js_sys::JsString) -> String { /// /// When this is dropped, the connection is closed. pub struct WsSender { - ws: Option, + socket: Option>, } impl Drop for WsSender { fn drop(&mut self) { - if let Err(err) = self.close() { - log::warn!("Failed to close WebSocket: {err:?}"); - } + self.close(); } } impl WsSender { /// Send the message to the server. pub fn send(&mut self, msg: WsMessage) { - if let Some(ws) = &mut self.ws { + if let Some(socket) = &mut self.socket { let result = match msg { WsMessage::Binary(data) => { - ws.set_binary_type(web_sys::BinaryType::Blob); - ws.send_with_u8_array(&data) + socket.set_binary_type(web_sys::BinaryType::Blob); + socket.send_with_u8_array(&data) } - WsMessage::Text(text) => ws.send_with_str(&text), + WsMessage::Text(text) => socket.send_with_str(&text), unknown => { panic!("Don't know how to send message: {unknown:?}"); } @@ -50,21 +50,15 @@ impl WsSender { /// Close the connection. /// /// This is called automatically when the sender is dropped. - /// - /// # Errors - /// This should never fail, except _maybe_ on Web. - pub fn close(&mut self) -> Result<()> { - if let Some(ws) = self.ws.take() { - log::debug!("Closing WebSocket"); - ws.close().map_err(string_from_js_value) - } else { - Ok(()) + pub fn close(&mut self) { + if let Some(socket) = self.socket.take() { + close_socket(&socket); } } /// Forget about this sender without closing the connection. pub fn forget(mut self) { - self.ws = None; + self.socket = None; } } @@ -84,64 +78,70 @@ pub(crate) fn ws_connect_impl( use wasm_bindgen::JsCast as _; // Connect to an server - let ws = web_sys::WebSocket::new(&url).map_err(string_from_js_value)?; + let socket = web_sys::WebSocket::new(&url).map_err(string_from_js_value)?; + let socket = Rc::new(socket); // For small binary messages, like CBOR, Arraybuffer is more efficient than Blob handling - ws.set_binary_type(web_sys::BinaryType::Arraybuffer); + socket.set_binary_type(web_sys::BinaryType::Arraybuffer); // Allow it to be shared by the different callbacks: - let on_event: std::rc::Rc std::ops::ControlFlow<()>> = - on_event.into(); + let on_event: Rc ControlFlow<()>> = on_event.into(); // onmessage callback { let on_event = on_event.clone(); + let socket2 = socket.clone(); let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| { // Handle difference Text/Binary,... - if let Ok(abuf) = e.data().dyn_into::() { + let control = if let Ok(abuf) = e.data().dyn_into::() { let array = js_sys::Uint8Array::new(&abuf); - on_event(WsEvent::Message(WsMessage::Binary(array.to_vec()))); + on_event(WsEvent::Message(WsMessage::Binary(array.to_vec()))) } else if let Ok(blob) = e.data().dyn_into::() { // better alternative to juggling with FileReader is to use https://crates.io/crates/gloo-file let file_reader = web_sys::FileReader::new().expect("Failed to create FileReader"); let file_reader_clone = file_reader.clone(); // create onLoadEnd callback let on_event = on_event.clone(); - let onloadend_cb = - Closure::wrap(Box::new( - move |_e: web_sys::ProgressEvent| match file_reader_clone.result() { - Ok(file_reader) => { - let array = js_sys::Uint8Array::new(&file_reader); - on_event(WsEvent::Message(WsMessage::Binary(array.to_vec()))); - } - Err(err) => { - on_event(WsEvent::Error(format!( - "Failed to read binary blob: {}", - string_from_js_value(err) - ))); - } - }, - ) - as Box); + let socket3 = socket2.clone(); + let onloadend_cb = Closure::wrap(Box::new(move |_e: web_sys::ProgressEvent| { + let control = match file_reader_clone.result() { + Ok(file_reader) => { + let array = js_sys::Uint8Array::new(&file_reader); + on_event(WsEvent::Message(WsMessage::Binary(array.to_vec()))) + } + Err(err) => on_event(WsEvent::Error(format!( + "Failed to read binary blob: {}", + string_from_js_value(err) + ))), + }; + if control.is_break() { + close_socket(&socket3); + } + }) + as Box); file_reader.set_onloadend(Some(onloadend_cb.as_ref().unchecked_ref())); file_reader .read_as_array_buffer(&blob) .expect("blob not readable"); onloadend_cb.forget(); + ControlFlow::Continue(()) } else if let Ok(txt) = e.data().dyn_into::() { on_event(WsEvent::Message(WsMessage::Text(string_from_js_string( txt, - )))); + )))) } else { log::debug!("Unknown websocket message received: {:?}", e.data()); on_event(WsEvent::Message(WsMessage::Unknown(string_from_js_value( e.data(), - )))); + )))) + }; + if control.is_break() { + close_socket(&socket2); } }) as Box); // set message event handler on WebSocket - ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + socket.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); // forget the callback to keep it alive onmessage_callback.forget(); @@ -157,16 +157,20 @@ pub(crate) fn ws_connect_impl( ); on_event(WsEvent::Error(error_event.message())); }) as Box); - ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref())); + socket.set_onerror(Some(onerror_callback.as_ref().unchecked_ref())); onerror_callback.forget(); } { + let socket2 = socket.clone(); let on_event = on_event.clone(); let onopen_callback = Closure::wrap(Box::new(move |_| { - on_event(WsEvent::Opened); + let control = on_event(WsEvent::Opened); + if control.is_break() { + close_socket(&socket2); + } }) as Box); - ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref())); + socket.set_onopen(Some(onopen_callback.as_ref().unchecked_ref())); onopen_callback.forget(); } @@ -174,9 +178,19 @@ pub(crate) fn ws_connect_impl( let onclose_callback = Closure::wrap(Box::new(move |_| { on_event(WsEvent::Closed); }) as Box); - ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref())); + socket.set_onclose(Some(onclose_callback.as_ref().unchecked_ref())); onclose_callback.forget(); } - Ok(WsSender { ws: Some(ws) }) + Ok(WsSender { + socket: Some(socket), + }) +} + +fn close_socket(socket: &web_sys::WebSocket) { + if let Err(err) = socket.close() { + log::warn!("Failed to close WebSocket: {}", string_from_js_value(err)); + } else { + log::debug!("Closed WebSocket"); + } }