Skip to content

Commit

Permalink
Remove Clone impl from WebSocket.
Browse files Browse the repository at this point in the history
When the WebSocket is used with frameworks, passed down as props, it might be `drop`ed automatically, which closes the WebSocket connection. Initially `Clone` was added so sender and receiver can be in different `spawn_local`s but it turns out that `StreamExt::split` solves that problem very well.

See #13 for more information about the issue
  • Loading branch information
ranile committed Nov 12, 2021
1 parent fb35677 commit 445e9a5
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions src/websocket/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ use web_sys::{Blob, MessageEvent};
/// Wrapper around browser's WebSocket API.
#[allow(missing_debug_implementations)]
#[pin_project(PinnedDrop)]
#[derive(Clone)]
pub struct WebSocket {
ws: web_sys::WebSocket,
sink_wakers: Rc<RefCell<Vec<Waker>>>,
sink_waker: Rc<RefCell<Option<Waker>>>,
#[pin]
message_receiver: Receiver<StreamMessage>,
#[allow(clippy::type_complexity)]
Expand All @@ -77,23 +76,22 @@ impl WebSocket {
/// [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#exceptions_thrown)
/// to learn more.
pub fn open(url: &str) -> Result<Self, JsError> {
let wakers: Rc<RefCell<Vec<Waker>>> = Rc::new(RefCell::new(vec![]));
let waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
let ws = web_sys::WebSocket::new(url).map_err(js_to_js_error)?;

let (sender, receiver) = async_broadcast::broadcast(10);

let open_callback: Closure<dyn FnMut()> = {
let wakers = Rc::clone(&wakers);
let waker = Rc::clone(&waker);
Closure::wrap(Box::new(move || {
for waker in wakers.borrow_mut().drain(..) {
if let Some(waker) = waker.borrow_mut().take() {
waker.wake();
}
}) as Box<dyn FnMut()>)
};

ws.set_onopen(Some(open_callback.as_ref().unchecked_ref()));
// open_callback.forget();
//

let message_callback: Closure<dyn FnMut(MessageEvent)> = {
let sender = sender.clone();
Closure::wrap(Box::new(move |e: MessageEvent| {
Expand All @@ -106,7 +104,6 @@ impl WebSocket {
};

ws.set_onmessage(Some(message_callback.as_ref().unchecked_ref()));
// message_callback.forget();

let error_callback: Closure<dyn FnMut(web_sys::ErrorEvent)> = {
let sender = sender.clone();
Expand All @@ -123,7 +120,6 @@ impl WebSocket {
};

ws.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
// error_callback.forget();

let close_callback: Closure<dyn FnMut(web_sys::CloseEvent)> = {
Closure::wrap(Box::new(move |e: web_sys::CloseEvent| {
Expand All @@ -144,11 +140,10 @@ impl WebSocket {
};

ws.set_onerror(Some(close_callback.as_ref().unchecked_ref()));
// close_callback.forget();

Ok(Self {
ws,
sink_wakers: wakers,
sink_waker: waker,
message_receiver: receiver,
closures: Rc::new((
open_callback,
Expand All @@ -163,9 +158,6 @@ impl WebSocket {
///
/// See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#parameters)
/// to learn about parameters passed to this function and when it can return an `Err(_)`
///
/// **Note**: If *only one* of the instances of websocket is closed, the entire connection closes.
/// This is unlikely to happen in real-world as [`wasm_bindgen_futures::spawn_local`] requires `'static`.
pub fn close(self, code: Option<u16>, reason: Option<&str>) -> Result<(), JsError> {
let result = match (code, reason) {
(None, None) => self.ws.close(),
Expand Down Expand Up @@ -240,7 +232,7 @@ impl Sink<Message> for WebSocket {
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready_state = self.ws.ready_state();
if ready_state == 0 {
self.sink_wakers.borrow_mut().push(cx.waker().clone());
*self.sink_waker.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(Ok(()))
Expand Down Expand Up @@ -298,23 +290,35 @@ mod tests {
use super::*;
use futures::{SinkExt, StreamExt};
use wasm_bindgen_test::*;
use wasm_bindgen_futures::spawn_local;

wasm_bindgen_test_configure!(run_in_browser);

const ECHO_SERVER_URL: &str = env!("ECHO_SERVER_URL");

#[wasm_bindgen_test]
async fn websocket_works() {
let mut ws = WebSocket::open(ECHO_SERVER_URL).unwrap();

ws.send(Message::Text("test".to_string())).await.unwrap();

// ignore first message
// the echo-server used sends it's info in the first message
let _ = ws.next().await;
assert_eq!(
ws.next().await.unwrap().unwrap(),
Message::Text("test".to_string())
)
fn websocket_works() {
let ws = WebSocket::open(ECHO_SERVER_URL).unwrap();
let (mut sender, mut receiver) = ws.split();

spawn_local(async move {
sender.send(Message::Text(String::from("test 1"))).await.unwrap();
sender.send(Message::Text(String::from("test 2"))).await.unwrap();
});

spawn_local(async move {
// ignore first message
// the echo-server used sends it's info in the first message
// let _ = ws.next().await;

assert_eq!(
receiver.next().await.unwrap().unwrap(),
Message::Text("test 1".to_string())
);
assert_eq!(
receiver.next().await.unwrap().unwrap(),
Message::Text("test 2".to_string())
);
});
}
}

0 comments on commit 445e9a5

Please sign in to comment.