Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: WASM client via web-sys transport #648

Merged
merged 41 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
261adb9
feat: untested web-sys transport
niklasad1 Jan 7, 2022
6e8324d
rewrite me
niklasad1 Jan 10, 2022
6831194
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Feb 2, 2022
d671947
make it work
niklasad1 Feb 8, 2022
908a156
add hacks and works :)
niklasad1 Feb 8, 2022
b6e1e04
add subscription test too
niklasad1 Feb 8, 2022
0497648
revert StdError change; still works
niklasad1 Feb 9, 2022
267d260
cleanup
niklasad1 Feb 9, 2022
e03566b
remove hacks
niklasad1 Feb 9, 2022
8307117
more wasm tests outside workspace
niklasad1 Feb 9, 2022
d2769e0
kill mutually exclusive features
niklasad1 Feb 9, 2022
6346a8b
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Feb 9, 2022
f7351f4
merge nits
niklasad1 Feb 9, 2022
7d8a688
remove unsafe hack
niklasad1 Feb 9, 2022
1414280
fix nit
niklasad1 Feb 9, 2022
8e6028e
core: fix features and deps
niklasad1 Feb 10, 2022
e56c099
ci: add WASM test
niklasad1 Feb 10, 2022
12b383b
test again
niklasad1 Feb 10, 2022
337d76d
work work
niklasad1 Feb 10, 2022
520edfc
comeon
niklasad1 Feb 10, 2022
f006173
work work
niklasad1 Feb 10, 2022
9baac7f
revert unintentional change
niklasad1 Feb 10, 2022
d98659b
Update core/Cargo.toml
niklasad1 Feb 15, 2022
56fccd2
Update core/src/client/async_client/mod.rs
niklasad1 Feb 15, 2022
ff3e0b6
revert needless change: std hashmap + fxhashmap works
niklasad1 Feb 15, 2022
40a3bdd
cleanup
niklasad1 Feb 15, 2022
f2389ab
extract try_connect_until fn
niklasad1 Feb 15, 2022
3ac6dbb
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Feb 22, 2022
af3884e
remove todo
niklasad1 Feb 22, 2022
4ec7477
fix bad merge
niklasad1 Feb 22, 2022
34e1dbb
add wasm client wrapper crate
niklasad1 Mar 4, 2022
0b8ff78
fix nits
niklasad1 Mar 4, 2022
6b32049
use gloo-net dependency
niklasad1 Mar 21, 2022
b558166
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Mar 21, 2022
1fc3705
fix build
niklasad1 Mar 22, 2022
0972961
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Apr 20, 2022
c683541
grumbles CI: rename to `wasm_tests`
niklasad1 Apr 20, 2022
ee4bb2f
fix bad merge
niklasad1 Apr 20, 2022
cad8c75
fix grumbles
niklasad1 Apr 20, 2022
7d6e9e5
fix nit
niklasad1 Apr 20, 2022
b2de6d0
comeon CI
niklasad1 Apr 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ on:
branches:
- master

env:
CARGO_TERM_COLOR: always
# Download a RPC server to run wasm tests against.
SUBSTRATE_URL: https://releases.parity.io/substrate/x86_64-debian:stretch/latest/substrate/substrate

jobs:
check-style:
name: Check style
Expand Down Expand Up @@ -149,3 +154,27 @@ jobs:

- name: Cargo nextest
run: cargo nextest run --workspace

test_wasm:
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
name: Test wasm
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@master

- name: Install
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done cargo install wasm-pack in the past; is there an advantage to doing it this way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah; one advantage of the script by the looks of it is that it'll download pre-compiled binaries if they exist; that's good to know!


- name: Download Substrate
run: |
curl $SUBSTRATE_URL --output substrate --location
chmod +x substrate
mkdir -p ~/.local/bin
mv substrate ~/.local/bin
- name: Run WASM tests
run: |
substrate --dev --tmp > /dev/null 2>&1 &
wasm-pack test --headless --firefox
wasm-pack test --headless --chrome
pkill substrate
working-directory: wasm-tests
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ members = [
"client/ws-client",
"client/http-client",
"client/transport",
"client/wasm-client",
"proc-macros",
"wasm-tests",
]
resolver = "2"
27 changes: 23 additions & 4 deletions client/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,49 @@ documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.9.0", optional = true }
jsonrpsee-core = { path = "../../core", version = "0.9.0", features = ["client"] }
tracing = { version = "0.1", optional = true }
tracing = "0.1"
thiserror = { version = "1", optional = true }
futures = { version = "0.3.14", default-features = false, features = ["std"], optional = true }
anyhow = { version = "1", optional = true }
futures-channel = { version = "0.3.14", default-features = false, optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
http = { version = "0.2", optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
tokio = { version = "1", features = ["net", "time", "macros"], optional = true }
pin-project = { version = "1", optional = true }
rustls-native-certs = { version = "0.6", optional = true }
webpki-roots = { version = "0.22", optional = true }
tokio-rustls = { version = "0.23", optional = true }
futures-timer = { version = "3", optional = true }

# ws
soketto = { version = "0.7.1", optional = true }

# wasm
wasm-bindgen = { version = "0.2.69", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
js-sys = { version = "0.3.46", optional = true }
web-sys = { version = "0.3.46", features = ["BinaryType", "Blob", "ErrorEvent", "FileReader", "MessageEvent", "CloseEvent", "ProgressEvent", "WebSocket", "console"], optional = true }

[features]
tls = ["tokio-rustls", "webpki-roots", "rustls-native-certs"]
ws = [
"futures",
"futures-util",
"http",
"tokio",
"tokio-util",
"soketto",
"pin-project",
"jsonrpsee-types",
"thiserror",
"tracing"
]
web = [
"wasm-bindgen",
"wasm-bindgen-futures",
"js-sys",
"web-sys",
"futures-channel",
"futures-timer",
"futures-util",
"anyhow",
"thiserror",
]
4 changes: 4 additions & 0 deletions client/transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@
/// Websocket transport
#[cfg(feature = "ws")]
pub mod ws;

/// Websocket transport via web-sys.
#[cfg(feature = "web")]
pub mod web_sys;
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
155 changes: 155 additions & 0 deletions client/transport/src/web_sys.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use std::time::Duration;

use futures_channel::{mpsc, oneshot};
use futures_timer::Delay;
use futures_util::future::{self, Either};
use futures_util::StreamExt;
use jsonrpsee_core::async_trait;
use jsonrpsee_core::client::{TransportReceiverT, TransportSenderT};
use wasm_bindgen::closure::Closure;
use wasm_bindgen::{JsCast, JsValue};
use web_sys::{CloseEvent, MessageEvent, WebSocket};

#[derive(Debug)]
enum WebSocketMessage {
Data(String),
Close,
}

/// Web-sys transport error that can occur.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Internal send error
#[error("Could not send message: {0}")]
SendError(#[from] mpsc::SendError),
/// Connection got closed
#[error("Connection is closed")]
ConnectionClosed,
/// Timeout while trying to connect.
#[error("Connection timeout exceeded: {0:?}")]
ConnectionTimeout(Duration),
/// Error that occurred in `JS context`.
#[error("JS Error: {0:?}")]
JsError(String),
}

/// Sender.
#[derive(Debug)]
pub struct Sender(mpsc::UnboundedSender<WebSocketMessage>);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rustwasm/wasm-bindgen#955

The JsValue is !Send and I had to use another channel here instead of having WebSocket here directly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah maybe this answers my question about not impling Send on the wasm Transport traits!


/// Receiver.
#[derive(Debug)]
pub struct Receiver(mpsc::UnboundedReceiver<String>);

#[async_trait]
impl TransportSenderT for Sender {
type Error = Error;

async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
tracing::trace!("tx: {:?}", msg);
self.0.unbounded_send(WebSocketMessage::Data(msg)).map_err(|e| e.into_send_error())?;
Ok(())
}

async fn close(&mut self) -> Result<(), Error> {
self.0.unbounded_send(WebSocketMessage::Close).map_err(|e| e.into_send_error())?;
Ok(())
}
}

#[async_trait]
impl TransportReceiverT for Receiver {
type Error = Error;

async fn receive(&mut self) -> Result<String, Self::Error> {
match self.0.next().await {
Some(msg) => {
tracing::trace!("rx: {:?}", msg);
Ok(msg)
}
None => Err(Error::ConnectionClosed),
}
}
}

/// Create a transport sender & receiver pair.
pub async fn connect(url: impl AsRef<str>, conn_timeout: Duration) -> Result<(Sender, Receiver), Error> {
let (from_back, rx) = mpsc::unbounded();
let (tx, mut to_back) = mpsc::unbounded();

let websocket = WebSocket::new(url.as_ref()).map_err(|e| Error::JsError(format!("{:?}", e)))?;
websocket.set_binary_type(web_sys::BinaryType::Arraybuffer);

let tx1 = tx.clone();

let from_back1 = from_back.clone();
let on_msg_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
// Supported formats: https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send
let js_val = e.data();
tracing::trace!("rx: {:?}", js_val);

// Text message.
if let Some(txt) = js_val.dyn_ref::<js_sys::JsString>() {
let _ = from_back1.unbounded_send(String::from(txt));
}
// Binary message.
else if let Some(abuf) = js_val.dyn_ref::<js_sys::ArrayBuffer>() {
let array = js_sys::Uint8Array::new(abuf);
let msg = String::from_utf8(array.to_vec()).expect("valid UTF-8 from WebSocket; qed");
let _ = from_back1.unbounded_send(msg);
} else {
tracing::warn!("Received unsupported message: {:?}", js_val);
}
}) as Box<dyn FnMut(MessageEvent)>);

// Close event.
let on_close_callback = Closure::once(move |_e: CloseEvent| {
tracing::info!("Connection closed");
tx1.close_channel();
from_back.close_channel();
});

websocket.set_onmessage(Some(on_msg_callback.as_ref().unchecked_ref()));
websocket.set_onclose(Some(on_close_callback.as_ref().unchecked_ref()));

// Prevent for being dropped (this will be leaked intentionally).
on_msg_callback.forget();
on_close_callback.forget();

try_connect_until(&websocket, conn_timeout).await?;

let tx3 = tx.clone();
wasm_bindgen_futures::spawn_local(async move {
while let Some(WebSocketMessage::Data(msg)) = to_back.next().await {
if let Err(e) = websocket.send_with_str(&msg) {
tracing::warn!("Failed to send: {:?}", e);
break;
}
}

let _ = websocket.close();
tx3.close_channel();
});

Ok((Sender(tx), Receiver(rx)))
}

async fn try_connect_until(websocket: &WebSocket, conn_timeout: Duration) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();

let on_open_callback = Closure::once(move |_: JsValue| {
tracing::info!("Connection established");
let _ = tx.send(());
});

websocket.set_onopen(Some(on_open_callback.as_ref().unchecked_ref()));

let res = match future::select(rx, Delay::new(conn_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
Either::Left((Err(_), _)) => unreachable!("A message is sent on this channel before close; qed"),
Either::Right((_, _)) => Err(Error::ConnectionTimeout(conn_timeout)),
};
drop(on_open_callback);

res
}
6 changes: 3 additions & 3 deletions client/transport/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;

use futures::io::{BufReader, BufWriter};
use futures_util::io::{BufReader, BufWriter};
use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_core::{async_trait, Cow};
Expand Down Expand Up @@ -188,7 +188,7 @@ impl TransportSenderT for Sender {

/// Sends out a request. Returns a `Future` that finishes when the request has been
/// successfully sent.
async fn send(&mut self, body: String) -> Result<(), WsError> {
async fn send(&mut self, body: String) -> Result<(), Self::Error> {
tracing::debug!("send: {}", body);
self.inner.send_text(body).await?;
self.inner.flush().await?;
Expand All @@ -206,7 +206,7 @@ impl TransportReceiverT for Receiver {
type Error = WsError;

/// Returns a `Future` resolving when the server sent us something back.
async fn receive(&mut self) -> Result<String, WsError> {
async fn receive(&mut self) -> Result<String, Self::Error> {
let mut message = Vec::new();
self.inner.receive_data(&mut message).await?;
let s = String::from_utf8(message).expect("Found invalid UTF-8");
Expand Down
Loading