From a115e957dba6c8b7bf2d5af9d1e6916555642cd3 Mon Sep 17 00:00:00 2001 From: oliver-giersch Date: Sun, 24 Apr 2022 16:09:56 +0200 Subject: [PATCH] refactors ipc transport internals (#1174) * refactors ipc transport internals ran cargo +nightly fmt fixes typo remove some commented out code * remove one unnecessary return stmt Co-authored-by: Oliver Giersch Co-authored-by: Oliver Giersch --- Cargo.lock | 19 ++ ethers-providers/Cargo.toml | 1 + ethers-providers/src/transports/common.rs | 155 ++++----- ethers-providers/src/transports/http.rs | 14 +- ethers-providers/src/transports/ipc.rs | 363 +++++++++++----------- ethers-providers/src/transports/ws.rs | 50 ++- 6 files changed, 323 insertions(+), 279 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34500ddb1..077ea5607 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1321,6 +1321,7 @@ dependencies = [ "futures-core", "futures-timer", "futures-util", + "hashers", "hex", "http", "once_cell", @@ -1634,6 +1635,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -1725,6 +1735,15 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +[[package]] +name = "hashers" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2bca93b15ea5a746f220e56587f71e73c6165eab783df9e26590069953e3c30" +dependencies = [ + "fxhash", +] + [[package]] name = "heck" version = "0.4.0" diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 0e9eb75c9..c66c1af9d 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -40,6 +40,7 @@ tracing-futures = { version = "0.2.5", default-features = false, features = ["st bytes = { version = "1.1.0", default-features = false, optional = true } once_cell = "1.10.0" +hashers = "1.0.1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] # tokio diff --git a/ethers-providers/src/transports/common.rs b/ethers-providers/src/transports/common.rs index 9f0ac1330..483114cdf 100644 --- a/ethers-providers/src/transports/common.rs +++ b/ethers-providers/src/transports/common.rs @@ -48,54 +48,21 @@ impl<'a, T> Request<'a, T> { } } -/// A JSON-RPC Notifcation -#[allow(unused)] -#[derive(Deserialize, Debug)] -pub struct Notification<'a> { - #[serde(alias = "JSONRPC")] - jsonrpc: &'a str, - method: &'a str, - #[serde(borrow)] - pub params: Subscription<'a>, +/// A JSON-RPC response +#[derive(Debug)] +pub enum Response<'a> { + Success { id: u64, result: &'a RawValue }, + Error { id: u64, error: JsonRpcError }, + Notification { method: &'a str, params: Params<'a> }, } #[derive(Deserialize, Debug)] -pub struct Subscription<'a> { +pub struct Params<'a> { pub subscription: U256, #[serde(borrow)] pub result: &'a RawValue, } -#[derive(Debug)] -pub enum Response<'a> { - Success { id: u64, jsonrpc: &'a str, result: &'a RawValue }, - Error { id: u64, jsonrpc: &'a str, error: JsonRpcError }, -} - -#[allow(unused)] -impl Response<'_> { - pub fn id(&self) -> u64 { - match self { - Self::Success { id, .. } => *id, - Self::Error { id, .. } => *id, - } - } - - pub fn as_result(&self) -> Result<&RawValue, &JsonRpcError> { - match self { - Self::Success { result, .. } => Ok(*result), - Self::Error { error, .. } => Err(error), - } - } - - pub fn into_result(self) -> Result, JsonRpcError> { - match self { - Self::Success { result, .. } => Ok(result.to_owned()), - Self::Error { error, .. } => Err(error), - } - } -} - // FIXME: ideally, this could be auto-derived as an untagged enum, but due to // https://github.com/serde-rs/serde/issues/1183 this currently fails impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> { @@ -115,62 +82,96 @@ impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> { where A: MapAccess<'de>, { + let mut jsonrpc = false; + + // response & error let mut id = None; - let mut jsonrpc = None; + // only response let mut result = None; + // only error let mut error = None; + // only notification + let mut method = None; + let mut params = None; while let Some(key) = map.next_key()? { match key { - "id" => { - let value: u64 = map.next_value()?; - let prev = id.replace(value); - if prev.is_some() { - return Err(de::Error::duplicate_field("id")) - } - } "jsonrpc" => { - let value: &'de str = map.next_value()?; + if jsonrpc { + return Err(de::Error::duplicate_field("jsonrpc")) + } + + let value = map.next_value()?; if value != "2.0" { return Err(de::Error::invalid_value(Unexpected::Str(value), &"2.0")) } - let prev = jsonrpc.replace(value); - if prev.is_some() { - return Err(de::Error::duplicate_field("jsonrpc")) + jsonrpc = true; + } + "id" => { + if id.is_some() { + return Err(de::Error::duplicate_field("id")) } + + let value: u64 = map.next_value()?; + id = Some(value); } "result" => { - let value: &RawValue = map.next_value()?; - let prev = result.replace(value); - if prev.is_some() { + if result.is_some() { return Err(de::Error::duplicate_field("result")) } + + let value: &RawValue = map.next_value()?; + result = Some(value); } "error" => { - let value: JsonRpcError = map.next_value()?; - let prev = error.replace(value); - if prev.is_some() { + if error.is_some() { return Err(de::Error::duplicate_field("error")) } + + let value: JsonRpcError = map.next_value()?; + error = Some(value); + } + "method" => { + if method.is_some() { + return Err(de::Error::duplicate_field("method")) + } + + let value: &str = map.next_value()?; + method = Some(value); + } + "params" => { + if params.is_some() { + return Err(de::Error::duplicate_field("params")) + } + + let value: Params = map.next_value()?; + params = Some(value); } key => { return Err(de::Error::unknown_field( key, - &["id", "jsonrpc", "result", "error"], + &["id", "jsonrpc", "result", "error", "params", "method"], )) } } } - let id = id.ok_or_else(|| de::Error::missing_field("id"))?; - let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?; + // jsonrpc version must be present in all responses + if !jsonrpc { + return Err(de::Error::missing_field("jsonrpc")) + } - match (result, error) { - (Some(result), None) => Ok(Response::Success { id, jsonrpc, result }), - (None, Some(error)) => Ok(Response::Error { id, jsonrpc, error }), + match (id, result, error, method, params) { + (Some(id), Some(result), None, None, None) => { + Ok(Response::Success { id, result }) + } + (Some(id), None, Some(error), None, None) => Ok(Response::Error { id, error }), + (None, None, None, Some(method), Some(params)) => { + Ok(Response::Notification { method, params }) + } _ => Err(de::Error::custom( - "response must have either a `result` or `error` field", + "response must be either a success/error or notification object", )), } } @@ -223,19 +224,29 @@ mod tests { let response: Response<'_> = serde_json::from_str(r#"{"jsonrpc":"2.0","result":19,"id":1}"#).unwrap(); - assert_eq!(response.id(), 1); - let result: u64 = serde_json::from_str(response.into_result().unwrap().get()).unwrap(); - assert_eq!(result, 19); + match response { + Response::Success { id, result } => { + assert_eq!(id, 1); + let result: u64 = serde_json::from_str(result.get()).unwrap(); + assert_eq!(result, 19); + } + _ => panic!("expected `Success` response"), + } let response: Response<'_> = serde_json::from_str( r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"error occurred"},"id":2}"#, ) .unwrap(); - assert_eq!(response.id(), 2); - let err = response.into_result().unwrap_err(); - assert_eq!(err.code, -32000); - assert_eq!(err.message, "error occurred"); + match response { + Response::Error { id, error } => { + assert_eq!(id, 2); + assert_eq!(error.code, -32000); + assert_eq!(error.message, "error occurred"); + assert!(error.data.is_none()); + } + _ => panic!("expected `Error` response"), + } } #[test] diff --git a/ethers-providers/src/transports/http.rs b/ethers-providers/src/transports/http.rs index ca77ddf5b..56bdf8512 100644 --- a/ethers-providers/src/transports/http.rs +++ b/ethers-providers/src/transports/http.rs @@ -73,12 +73,20 @@ impl JsonRpcClient for Provider { let res = self.client.post(self.url.as_ref()).json(&payload).send().await?; let text = res.text().await?; - let response: Response<'_> = match serde_json::from_str(&text) { - Ok(response) => response, + + let raw = match serde_json::from_str(&text) { + Ok(Response::Success { result, .. }) => result.to_owned(), + Ok(Response::Error { error, .. }) => return Err(error.into()), + Ok(_) => { + let err = ClientError::SerdeJson { + err: serde::de::Error::custom("unexpected notification over HTTP transport"), + text, + }; + return Err(err) + } Err(err) => return Err(ClientError::SerdeJson { err, text }), }; - let raw = response.as_result().map_err(Clone::clone)?; let res = serde_json::from_str(raw.get()) .map_err(|err| ClientError::SerdeJson { err, text: raw.to_string() })?; diff --git a/ethers-providers/src/transports/ipc.rs b/ethers-providers/src/transports/ipc.rs index 6b0650838..61ba86c96 100644 --- a/ethers-providers/src/transports/ipc.rs +++ b/ethers-providers/src/transports/ipc.rs @@ -1,71 +1,75 @@ -use crate::{ - provider::ProviderError, - transports::common::{JsonRpcError, Request, Response}, - JsonRpcClient, PubsubClient, -}; -use ethers_core::types::U256; - -use async_trait::async_trait; -use futures_channel::mpsc; -use futures_util::stream::{Fuse, StreamExt}; -use oneshot::error::RecvError; -use serde::{de::DeserializeOwned, Serialize}; -use serde_json::{value::RawValue, Deserializer, StreamDeserializer}; use std::{ - collections::HashMap, + cell::RefCell, + convert::Infallible, + hash::BuildHasherDefault, path::Path, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, + thread, }; + +use async_trait::async_trait; +use bytes::{Buf as _, BytesMut}; +use ethers_core::types::U256; +use futures_channel::mpsc; +use futures_util::stream::StreamExt as _; +use hashers::fx_hash::FxHasher64; +use serde::{de::DeserializeOwned, Serialize}; +use serde_json::{value::RawValue, Deserializer}; use thiserror::Error; use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}, - net::UnixStream, - sync::oneshot, + io::{AsyncReadExt as _, AsyncWriteExt as _, BufReader}, + net::{ + unix::{ReadHalf, WriteHalf}, + UnixStream, + }, + runtime, + sync::oneshot::{self, error::RecvError}, +}; + +use crate::{ + provider::ProviderError, + transports::common::{JsonRpcError, Request, Response}, + JsonRpcClient, PubsubClient, }; -use tokio_util::io::ReaderStream; -use tracing::{error, warn}; -use super::common::Notification; +use super::common::Params; + +type FxHashMap = std::collections::HashMap>; + +type Pending = oneshot::Sender, JsonRpcError>>; +type Subscription = mpsc::UnboundedSender>; /// Unix Domain Sockets (IPC) transport. #[derive(Debug, Clone)] pub struct Ipc { id: Arc, - messages_tx: mpsc::UnboundedSender, + request_tx: mpsc::UnboundedSender, } -type Pending = oneshot::Sender, JsonRpcError>>; -type Subscription = mpsc::UnboundedSender>; - #[derive(Debug)] enum TransportMessage { - Request { id: u64, request: String, sender: Pending }, + Request { id: u64, request: Box<[u8]>, sender: Pending }, Subscribe { id: U256, sink: Subscription }, Unsubscribe { id: U256 }, } impl Ipc { - /// Creates a new IPC transport from a Async Reader / Writer - fn new(stream: S) -> Self { + /// Creates a new IPC transport from a given path using Unix sockets. + pub async fn connect(path: impl AsRef) -> Result { let id = Arc::new(AtomicU64::new(1)); - let (messages_tx, messages_rx) = mpsc::unbounded(); + let (request_tx, request_rx) = mpsc::unbounded(); - IpcServer::new(stream, messages_rx).spawn(); - Self { id, messages_tx } - } + let stream = UnixStream::connect(path).await?; + spawn_ipc_server(stream, request_rx); - /// Creates a new IPC transport from a given path using Unix sockets - #[cfg(unix)] - pub async fn connect>(path: P) -> Result { - let ipc = UnixStream::connect(path).await?; - Ok(Self::new(ipc)) + Ok(Self { id, request_tx }) } fn send(&self, msg: TransportMessage) -> Result<(), IpcError> { - self.messages_tx + self.request_tx .unbounded_send(msg) .map_err(|_| IpcError::ChannelError("IPC server receiver dropped".to_string()))?; @@ -88,7 +92,7 @@ impl JsonRpcClient for Ipc { let (sender, receiver) = oneshot::channel(); let payload = TransportMessage::Request { id: next_id, - request: serde_json::to_string(&Request::new(next_id, method, params))?, + request: serde_json::to_vec(&Request::new(next_id, method, params))?.into_boxed_slice(), sender, }; @@ -117,169 +121,171 @@ impl PubsubClient for Ipc { } } -struct IpcServer { - socket_reader: Fuse>>, - socket_writer: WriteHalf, - requests: Fuse>, - pending: HashMap, - subscriptions: HashMap, +fn spawn_ipc_server(stream: UnixStream, request_rx: mpsc::UnboundedReceiver) { + // 65 KiB should be more than enough for this thread, as all unbounded data + // growth occurs on heap-allocated data structures and buffers and the call + // stack is not going to do anything crazy either + const STACK_SIZE: usize = 1 << 16; + // spawn a light-weight thread with a thread-local async runtime just for + // sending and receiving data over the IPC socket + let _ = thread::Builder::new() + .name("ipc-server-thread".to_string()) + .stack_size(STACK_SIZE) + .spawn(move || { + let rt = runtime::Builder::new_current_thread() + .enable_io() + .build() + .expect("failed to create ipc-server-thread async runtime"); + + rt.block_on(run_ipc_server(stream, request_rx)); + }) + .expect("failed to spawn ipc server thread"); } -impl IpcServer -where - T: AsyncRead + AsyncWrite, -{ - /// Instantiates the Websocket Server - pub fn new(ipc: T, requests: mpsc::UnboundedReceiver) -> Self { - let (socket_reader, socket_writer) = tokio::io::split(ipc); - let socket_reader = ReaderStream::new(socket_reader).fuse(); - Self { - socket_reader, - socket_writer, - requests: requests.fuse(), - pending: HashMap::default(), - subscriptions: HashMap::default(), +async fn run_ipc_server( + mut stream: UnixStream, + request_rx: mpsc::UnboundedReceiver, +) { + // the shared state for both reads & writes + let shared = Shared { + pending: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(), + subs: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(), + }; + + // split the stream and run two independent concurrently (local), thereby + // allowing reads and writes to occurr concurrently + let (reader, writer) = stream.split(); + let read = shared.handle_ipc_reads(reader); + let write = shared.handle_ipc_writes(writer, request_rx); + + // run both loops concurrently, until either encounts an error + if let Err(e) = futures_util::try_join!(read, write) { + match e { + IpcError::ServerExit => {} + err => tracing::error!(?err, "exiting IPC server due to error"), } } +} - /// Spawns the event loop - fn spawn(mut self) - where - T: 'static + Send, - { - let f = async move { - let mut read_buffer = Vec::new(); - loop { - let closed = self.process(&mut read_buffer).await.expect("IPC Server panic"); - if closed && self.pending.is_empty() { - break - } - } - }; - - tokio::spawn(f); - } +struct Shared { + pending: RefCell>, + subs: RefCell>, +} - /// Processes 1 item selected from the incoming `requests` or `socket` - #[allow(clippy::single_match)] - async fn process(&mut self, read_buffer: &mut Vec) -> Result { - futures_util::select! { - // Handle requests - msg = self.requests.next() => match msg { - Some(msg) => self.handle_request(msg).await?, - None => return Ok(true), - }, - // Handle socket messages - msg = self.socket_reader.next() => match msg { - Some(Ok(msg)) => self.handle_socket(read_buffer, msg)?, - Some(Err(err)) => { - error!("IPC read error: {:?}", err); - return Err(err.into()); - }, - None => {}, - }, - // finished - complete => {}, - }; +impl Shared { + async fn handle_ipc_reads(&self, reader: ReadHalf<'_>) -> Result { + let mut reader = BufReader::new(reader); + let mut buf = BytesMut::with_capacity(4096); + + loop { + // try to read the next batch of bytes into the buffer + let read = reader.read_buf(&mut buf).await?; + if read == 0 { + // eof, socket was closed + return Err(IpcError::ServerExit) + } - Ok(false) + // parse the received bytes into 0-n jsonrpc messages + let read = self.handle_bytes(&buf)?; + // split off all bytes that were parsed into complete messages + // any remaining bytes that correspond to incomplete messages remain + // in the buffer + buf.advance(read); + } } - async fn handle_request(&mut self, msg: TransportMessage) -> Result<(), IpcError> { - match msg { - TransportMessage::Request { id, request, sender } => { - if self.pending.insert(id, sender).is_some() { - warn!("Replacing a pending request with id {:?}", id); - } - - if let Err(err) = self.socket_writer.write(request.as_bytes()).await { - error!("IPC connection error: {:?}", err); - self.pending.remove(&id); + async fn handle_ipc_writes( + &self, + mut writer: WriteHalf<'_>, + mut request_rx: mpsc::UnboundedReceiver, + ) -> Result { + use TransportMessage::*; + + while let Some(msg) = request_rx.next().await { + match msg { + Request { id, request, sender } => { + let prev = self.pending.borrow_mut().insert(id, sender); + assert!(prev.is_none(), "replaced pending IPC request (id={})", id); + + if let Err(err) = writer.write_all(&request).await { + tracing::error!("IPC connection error: {:?}", err); + self.pending.borrow_mut().remove(&id); + } } - } - TransportMessage::Subscribe { id, sink } => { - if self.subscriptions.insert(id, sink).is_some() { - warn!("Replacing already-registered subscription with id {:?}", id); + Subscribe { id, sink } => { + if self.subs.borrow_mut().insert(id, sink).is_some() { + tracing::warn!( + %id, + "replaced already-registered subscription" + ); + } } - } - TransportMessage::Unsubscribe { id } => { - if self.subscriptions.remove(&id).is_none() { - warn!("Unsubscribing from non-existent subscription with id {:?}", id); + Unsubscribe { id } => { + if self.subs.borrow_mut().remove(&id).is_none() { + tracing::warn!( + %id, + "attempted to unsubscribe from non-existent subscription" + ); + } } } - }; + } - Ok(()) + // the request receiver will only be closed if the sender instance + // located within the transport handle is dropped, this is not truly an + // error but leads to the `try_join` in `run_ipc_server` to cancel the + // read half future + Err(IpcError::ServerExit) } - fn handle_socket( - &mut self, - read_buffer: &mut Vec, - bytes: bytes::Bytes, - ) -> Result<(), IpcError> { - // Extend buffer of previously unread with the new read bytes - read_buffer.extend_from_slice(&bytes); - // Deserialize as many full elements from the stream as exists - let mut de: StreamDeserializer<_, &RawValue> = - Deserializer::from_slice(read_buffer).into_iter(); - // Iterate through these elements, and handle responses/notifications - while let Some(Ok(raw)) = de.next() { - if let Ok(response) = serde_json::from_str(raw.get()) { - // Send notify response if okay. - if let Err(e) = self.respond(response) { - error!(err = %e, "Failed to send IPC response"); - } - } - - if let Ok(notification) = serde_json::from_str(raw.get()) { - // Send notify response if okay. - if let Err(e) = self.notify(notification) { - error!(err = %e, "Failed to send IPC notification"); - } - } - - warn!("JSON from IPC stream is not a response or notification"); + fn handle_bytes(&self, bytes: &BytesMut) -> Result { + // deserialize all complete jsonrpc responses in the buffer + let mut de = Deserializer::from_slice(bytes.as_ref()).into_iter(); + while let Some(Ok(response)) = de.next() { + match response { + Response::Success { id, result } => self.send_response(id, Ok(result.to_owned())), + Response::Error { id, error } => self.send_response(id, Err(error)), + Response::Notification { params, .. } => self.send_notification(params), + }; } - // Get the offset of bytes to handle partial buffer reads - let read_len = de.byte_offset(); + Ok(de.byte_offset()) + } - // Reset buffer to just include the partial value bytes. - read_buffer.copy_within(read_len.., 0); - read_buffer.truncate(read_buffer.len() - read_len); + fn send_response(&self, id: u64, result: Result, JsonRpcError>) { + // retrieve the channel sender for responding to the pending request + let response_tx = match self.pending.borrow_mut().remove(&id) { + Some(tx) => tx, + None => { + tracing::warn!(%id, "no pending request exists for the response ID"); + return + } + }; - Ok(()) + // a failure to send the response indicates that the pending request has + // been dropped in the mean time + let _ = response_tx.send(result.map_err(Into::into)); } /// Sends notification through the channel based on the ID of the subscription. /// This handles streaming responses. - fn notify(&mut self, notification: Notification<'_>) -> Result<(), IpcError> { - let id = notification.params.subscription; - if let Some(tx) = self.subscriptions.get(&id) { - tx.unbounded_send(notification.params.result.to_owned()).map_err(|_| { - IpcError::ChannelError(format!("Subscription receiver {} dropped", id)) - })?; - } - - Ok(()) - } - - /// Sends JSON response through the channel based on the ID in that response. - /// This handles RPC calls with only one response, and the channel entry is dropped after - /// sending. - fn respond(&mut self, response: Response<'_>) -> Result<(), IpcError> { - let id = response.id(); - let res = response.into_result(); - - let response_tx = self.pending.remove(&id).ok_or_else(|| { - IpcError::ChannelError("No response channel exists for the response ID".to_string()) - })?; - - response_tx.send(res).map_err(|_| { - IpcError::ChannelError("Receiver channel for response has been dropped".to_string()) - })?; + fn send_notification(&self, params: Params<'_>) { + // retrieve the channel sender for notifying the subscription stream + let subs = self.subs.borrow(); + let tx = match subs.get(¶ms.subscription) { + Some(tx) => tx, + None => { + tracing::warn!( + id = ?params.subscription, + "no subscription exists for the notification ID" + ); + return + } + }; - Ok(()) + // a failure to send the response indicates that the pending request has + // been dropped in the mean time (and should have been unsubscribed!) + let _ = tx.unbounded_send(params.result.to_owned()); } } @@ -302,7 +308,10 @@ pub enum IpcError { ChannelError(String), #[error(transparent)] - Canceled(#[from] RecvError), + RequestCancelled(#[from] RecvError), + + #[error("The IPC server has exited")] + ServerExit, } impl From for ProviderError { diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs index 10c1f1430..53039b6a4 100644 --- a/ethers-providers/src/transports/ws.rs +++ b/ethers-providers/src/transports/ws.rs @@ -11,10 +11,7 @@ use futures_util::{ sink::{Sink, SinkExt}, stream::{Fuse, Stream, StreamExt}, }; -use serde::{ - de::{DeserializeOwned, Error}, - Serialize, -}; +use serde::{de::DeserializeOwned, Serialize}; use serde_json::value::RawValue; use std::{ collections::{btree_map::Entry, BTreeMap}, @@ -26,7 +23,7 @@ use std::{ }; use thiserror::Error; -use super::common::{Notification, Response}; +use super::common::{Params, Response}; if_wasm! { use wasm_bindgen::prelude::*; @@ -320,35 +317,34 @@ where } async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> { - if let Ok(response) = serde_json::from_str::>(&inner) { - if let Some(request) = self.pending.remove(&response.id()) { - if !request.is_canceled() { - request.send(response.into_result()).map_err(to_client_error)?; - } - } + let (id, result) = match serde_json::from_str(&inner)? { + Response::Success { id, result } => (id, Ok(result.to_owned())), + Response::Error { id, error } => (id, Err(error)), + Response::Notification { params, .. } => return self.handle_notification(params), + }; - return Ok(()) + if let Some(request) = self.pending.remove(&id) { + if !request.is_canceled() { + request.send(result).map_err(to_client_error)?; + } } - if let Ok(notification) = serde_json::from_str::>(&inner) { - let id = notification.params.subscription; - if let Entry::Occupied(stream) = self.subscriptions.entry(id) { - if let Err(err) = stream.get().unbounded_send(notification.params.result.to_owned()) - { - if err.is_disconnected() { - // subscription channel was closed on the receiver end - stream.remove(); - } - return Err(to_client_error(err)) + Ok(()) + } + + fn handle_notification(&mut self, params: Params<'_>) -> Result<(), ClientError> { + let id = params.subscription; + if let Entry::Occupied(stream) = self.subscriptions.entry(id) { + if let Err(err) = stream.get().unbounded_send(params.result.to_owned()) { + if err.is_disconnected() { + // subscription channel was closed on the receiver end + stream.remove(); } + return Err(to_client_error(err)) } - - return Ok(()) } - Err(ClientError::JsonError(serde_json::Error::custom( - "response is neither a valid jsonrpc response nor notification", - ))) + Ok(()) } #[cfg(target_arch = "wasm32")]