Skip to content

Commit

Permalink
[rpc module]: improve TestSubscription to return None when closed (
Browse files Browse the repository at this point in the history
…#566)

* fix(TestSubscription): use None for closed.

* add test for subscription close
  • Loading branch information
niklasad1 committed Nov 18, 2021
1 parent fff8460 commit 0e46b5c
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions utils/src/server/rpc_module.rs
Expand Up @@ -739,13 +739,17 @@ impl TestSubscription {
self.sub_id
}

/// Get the next element of type T from the underlying stream.
/// Returns `Some((val, sub_id))` for the next element of type T from the underlying stream,
/// otherwise `None` if the subscruption was closed.
///
/// Panics if the stream was closed or if the decoding the value as `T`.
pub async fn next<T: DeserializeOwned>(&mut self) -> (T, jsonrpsee_types::v2::SubscriptionId) {
let raw = self.rx.next().await.expect("subscription not closed");
let val: SubscriptionResponse<T> = serde_json::from_str(&raw).expect("valid response");
(val.params.result, val.params.subscription)
/// # Panics
///
/// If the decoding the value as `T` fails.
pub async fn next<T: DeserializeOwned>(&mut self) -> Option<(T, jsonrpsee_types::v2::SubscriptionId)> {
let raw = self.rx.next().await?;
let val: SubscriptionResponse<T> =
serde_json::from_str(&raw).expect("valid response in TestSubscription::next()");
Some((val.params.result, val.params.subscription))
}
}

Expand Down Expand Up @@ -937,14 +941,39 @@ mod tests {

let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
for i in (0..=2).rev() {
let (val, id) = my_sub.next::<char>().await;
let (val, id) = my_sub.next::<char>().await.unwrap();
assert_eq!(val, std::char::from_digit(i, 10).unwrap());
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
}

// The subscription is now closed
let (sub_closed_err, _) = my_sub.next::<SubscriptionClosedError>().await;
// The subscription is now closed by the server.
let (sub_closed_err, _) = my_sub.next::<SubscriptionClosedError>().await.unwrap();
assert_eq!(sub_closed_err.subscription_id(), my_sub.subscription_id());
assert_eq!(sub_closed_err.close_reason(), "Closed by the server");
}

#[tokio::test]
async fn close_test_subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_unsub", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") {
return;
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Ok(())
})
.unwrap();

let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
let (val, id) = my_sub.next::<String>().await.unwrap();
assert_eq!(&val, "lo");
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));

// close the subscription to ensure it doesn't return any items.
my_sub.close();
assert_eq!(None, my_sub.next::<String>().await);
}
}

0 comments on commit 0e46b5c

Please sign in to comment.