Skip to content

Commit

Permalink
refactor(async client): refactor background task (#1145)
Browse files Browse the repository at this point in the history
* refactor(async client): refactor background task

Split send and receive to separate tasks to support multiplexing reads/writes

* fix tests

* dont rely on tokio::spawn handles anymore

* fix build

* fix build again

* switch to std::sync::Mutex

* fix tests again

* bench stuff

* don't block in read task

* fix build

* fix nits

* works now

* revert bench code

* replace unreachable with None

* Revert "replace unreachable with None"

This reverts commit 49edaee.

* fix nits

* use dashmap instead of Arc<Mutex<RequestManager>

* Revert "use dashmap instead of Arc<Mutex<RequestManager>"

This reverts commit d73aeeb.

* refactor select loops prio for closed futs

* grumbles: save waker to wake when new items are pushed

* fix build

* fix build again

* fix some nits
  • Loading branch information
niklasad1 committed Aug 3, 2023
1 parent 47d93a5 commit 54f4dcd
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 270 deletions.
8 changes: 4 additions & 4 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use jsonrpsee_core::client::{
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, ResponseSuccess, TwoPointZero};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tower::layer::util::Identity;
use tower::{Layer, Service};
Expand Down Expand Up @@ -320,7 +320,7 @@ where
if response.id == id {
Ok(result)
} else {
Err(Error::InvalidRequestId)
Err(InvalidRequestId::NotPendingRequest(response.id.to_string()).into())
}
}

Expand Down Expand Up @@ -363,7 +363,7 @@ where
}

for rp in json_rps {
let id = rp.id.try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
let id = rp.id.try_parse_inner_as_number()?;

let res = match ResponseSuccess::try_from(rp) {
Ok(r) => {
Expand All @@ -385,7 +385,7 @@ where
if let Some(elem) = maybe_elem {
*elem = res;
} else {
return Err(Error::InvalidRequestId);
return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
}
}

Expand Down
4 changes: 2 additions & 2 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn method_call_with_wrong_id_kind() {
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let res: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(Error::InvalidRequestId)));
assert!(matches!(res, Err(Error::InvalidRequestId(_))));
}

#[tokio::test]
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn response_with_wrong_id() {
.await
.unwrap()
.unwrap_err();
assert!(matches!(err, Error::InvalidRequestId));
assert!(matches!(err, Error::InvalidRequestId(_)));
}

#[tokio::test]
Expand Down
4 changes: 3 additions & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn method_call_with_wrong_id_kind() {
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();

let err: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "Invalid request ID"));
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "request ID=0 is not a pending call"));
}

#[tokio::test]
Expand Down Expand Up @@ -191,6 +191,8 @@ async fn notification_handler_works() {

#[tokio::test]
async fn notification_without_polling_doesnt_make_client_unuseable() {
init_logger();

let server = WebSocketTestServer::with_hardcoded_notification(
"127.0.0.1:0".parse().unwrap(),
server_notification("test", "server originated notification".into()),
Expand Down
8 changes: 5 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ tokio = { version = "1.16", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures-timer = { version = "3", optional = true }
globset = { version = "0.4", optional = true }
tokio-stream = { version = "0.1", optional = true }

[features]
default = []
Expand All @@ -55,19 +54,22 @@ client = ["futures-util/sink", "tokio/sync"]
async-client = [
"async-lock",
"client",
"futures-util/alloc",
"rustc-hash",
"tokio/macros",
"tokio/rt",
"tokio-stream",
"tokio/time",
"futures-timer",
]
async-wasm-client = [
"async-lock",
"client",
"futures-util/alloc",
"wasm-bindgen-futures",
"rustc-hash/std",
"futures-timer/wasm-bindgen",
"tokio-stream",
"tokio/macros",
"tokio/time",
]

[dev-dependencies]
Expand Down
80 changes: 40 additions & 40 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use tokio::sync::{mpsc, oneshot};

use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorObject, Id, Notification, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
ErrorObject, Id, InvalidRequestId, Notification, RequestSer, Response, ResponseSuccess, SubscriptionId,
SubscriptionResponse,
};
use serde_json::Value as JsonValue;
use std::ops::Range;
Expand All @@ -63,7 +64,7 @@ pub(crate) fn process_batch_response(
Some(state) => state,
None => {
tracing::warn!("Received unknown batch response");
return Err(Error::InvalidRequestId);
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)).into());
}
};

Expand All @@ -79,7 +80,7 @@ pub(crate) fn process_batch_response(
if let Some(elem) = maybe_elem {
*elem = rp.result;
} else {
return Err(Error::InvalidRequestId);
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()).into());
}
}

Expand All @@ -95,7 +96,7 @@ pub(crate) fn process_batch_response(
pub(crate) fn process_subscription_response(
manager: &mut RequestManager,
response: SubscriptionResponse<JsonValue>,
) -> Result<(), Option<RequestMessage>> {
) -> Result<(), Option<SubscriptionId<'static>>> {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
Expand All @@ -110,9 +111,7 @@ pub(crate) fn process_subscription_response(
Ok(()) => Ok(()),
Err(err) => {
tracing::error!("Dropping subscription {:?} error: {:?}", sub_id, err);
let msg = build_unsubscribe_message(manager, request_id, sub_id)
.expect("request ID and subscription ID valid checked above; qed");
Err(Some(msg))
Err(Some(sub_id))
}
},
None => {
Expand All @@ -124,42 +123,42 @@ pub(crate) fn process_subscription_response(

/// Attempts to close a subscription when a [`SubscriptionError`] is received.
///
/// Returns `Ok(())` if the subscription was removed
/// Return `Err(e)` if the subscription was not found.
/// If the notification is not found it's just logged as a warning and the connection
/// will continue.
///
/// It's possible that the user closed down the subscription before the actual close response is received
pub(crate) fn process_subscription_close_response(
manager: &mut RequestManager,
response: SubscriptionError<JsonValue>,
) -> Result<(), Error> {
) {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => {
manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
}
None => {
tracing::error!("The server tried to close an invalid subscription: {:?}", sub_id);
return Err(Error::InvalidSubscriptionId);
tracing::debug!("The server tried to close an non-pending subscription: {:?}", sub_id);
}
};

manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
Ok(())
}
}

/// Attempts to process an incoming notification
///
/// Returns Ok() if the response was successfully handled
/// Returns Err() if there was no handler for the method
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification<JsonValue>) -> Result<(), Error> {
/// If the notification is not found it's just logged as a warning and the connection
/// will continue.
///
/// It's possible that user close down the subscription before this notification is received.
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification<JsonValue>) {
match manager.as_notification_handler_mut(notif.method.to_string()) {
Some(send_back_sink) => match send_back_sink.try_send(notif.params) {
Ok(()) => Ok(()),
Ok(()) => (),
Err(err) => {
tracing::error!("Error sending notification, dropping handler for {:?} error: {:?}", notif.method, err);
tracing::warn!("Could not send notification, dropping handler for {:?} error: {:?}", notif.method, err);
let _ = manager.remove_notification_handler(notif.method.into_owned());
Err(Error::Custom(err.to_string()))
}
},
None => {
tracing::error!("Notification: {:?} not a registered method", notif.method);
Err(Error::UnregisteredNotification(notif.method.into_owned()))
tracing::debug!("Notification: {:?} not a registered method", notif.method);
}
}
}
Expand All @@ -179,18 +178,19 @@ pub(crate) fn process_single_response(

match manager.request_status(&response_id) {
RequestStatus::PendingMethodCall => {
let send_back_oneshot = match manager.complete_pending_call(response_id) {
let send_back_oneshot = match manager.complete_pending_call(response_id.clone()) {
Some(Some(send)) => send,
Some(None) => return Ok(None),
None => return Err(Error::InvalidRequestId),
None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into()),
};

let _ = send_back_oneshot.send(result);
Ok(None)
}
RequestStatus::PendingSubscription => {
let (unsub_id, send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(response_id.clone()).ok_or(Error::InvalidRequestId)?;
let (unsub_id, send_back_oneshot, unsubscribe_method) = manager
.complete_pending_subscription(response_id.clone())
.ok_or(InvalidRequestId::NotPendingRequest(response_id.to_string()))?;

let sub_id = result.map(|r| SubscriptionId::try_from(r).ok());

Expand Down Expand Up @@ -220,23 +220,23 @@ pub(crate) fn process_single_response(
Ok(None)
}
}
RequestStatus::Subscription | RequestStatus::Invalid => Err(Error::InvalidRequestId),

RequestStatus::Subscription | RequestStatus::Invalid => {
Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into())
}
}
}

/// Sends an unsubscribe to request to server to indicate
/// that the client is not interested in the subscription anymore.
//
// NOTE: we don't count this a concurrent request as it's part of a subscription.
pub(crate) async fn stop_subscription(
sender: &mut impl TransportSenderT,
manager: &mut RequestManager,
pub(crate) async fn stop_subscription<S: TransportSenderT>(
sender: &mut S,
unsub: RequestMessage,
) {
if let Err(e) = sender.send(unsub.raw).await {
tracing::error!("Send unsubscribe request failed: {:?}", e);
let _ = manager.complete_pending_call(unsub.id);
}
) -> Result<(), S::Error> {
sender.send(unsub.raw).await?;
Ok(())
}

/// Builds an unsubscription message.
Expand All @@ -245,7 +245,7 @@ pub(crate) fn build_unsubscribe_message(
sub_req_id: Id<'static>,
sub_id: SubscriptionId<'static>,
) -> Option<RequestMessage> {
let (unsub_req_id, _, unsub, sub_id) = manager.remove_subscription(sub_req_id, sub_id)?;
let (unsub_req_id, _, unsub, sub_id) = manager.unsubscribe(sub_req_id, sub_id)?;

let mut params = ArrayParams::new();
params.insert(sub_id).ok()?;
Expand Down
36 changes: 34 additions & 2 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub(crate) struct RequestManager {

impl RequestManager {
/// Create a new `RequestManager`.
#[allow(unused)]
pub(crate) fn new() -> Self {
Self::default()
}
Expand Down Expand Up @@ -250,9 +251,9 @@ impl RequestManager {
}
}

/// Tries to remove a subscription.
/// Removes the subscription without waiting for the unsubscribe call.
///
/// Returns `Some` if the subscription was removed otherwise `None`.
/// Returns `Some` if the subscription was removed.
pub(crate) fn remove_subscription(
&mut self,
request_id: RequestId,
Expand All @@ -262,6 +263,7 @@ impl RequestManager {
(Entry::Occupied(request), Entry::Occupied(subscription))
if matches!(request.get(), Kind::Subscription(_)) =>
{
// Mark the request ID as pending unsubscription.
let (_req_id, kind) = request.remove_entry();
let (sub_id, _req_id) = subscription.remove_entry();
if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
Expand All @@ -274,6 +276,33 @@ impl RequestManager {
}
}

/// Initiates an unsubscribe which is not completed until the unsubscribe call
/// has been acknowledged.
///
/// Returns `Some` if the subscription was unsubscribed.
pub(crate) fn unsubscribe(
&mut self,
request_id: RequestId,
subscription_id: SubscriptionId<'static>,
) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId)> {
match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
(Entry::Occupied(mut request), Entry::Occupied(subscription))
if matches!(request.get(), Kind::Subscription(_)) =>
{
// Mark the request ID as "pending unsubscription" which will be resolved once the
// unsubscribe call has been acknowledged.
let kind = std::mem::replace(request.get_mut(), Kind::PendingMethodCall(None));
let (sub_id, _req_id) = subscription.remove_entry();
if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
Some((unsub_req_id, send_back, unsub, sub_id))
} else {
unreachable!("Subscription is Subscription checked above; qed");
}
}
_ => None,
}
}

/// Returns the status of a request ID
pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus {
self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind {
Expand Down Expand Up @@ -473,5 +502,8 @@ mod tests {
assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());

assert!(manager.requests.is_empty());
assert!(manager.subscriptions.is_empty());
}
}

0 comments on commit 54f4dcd

Please sign in to comment.