Skip to content

Commit

Permalink
Make EventSubscription and FilterEvents Send-able (#471)
Browse files Browse the repository at this point in the history
* Make EventSubscription and FilterEvents Send-able

* Cargo fmt

* clippy

* Remove unused import
  • Loading branch information
jsdw committed Mar 8, 2022
1 parent 14ef6c8 commit a091d2b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 12 deletions.
14 changes: 13 additions & 1 deletion subxt/src/events/event_subscription.rs
Expand Up @@ -84,7 +84,7 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> {
#[derivative(Debug = "ignore")]
at: Option<
std::pin::Pin<
Box<dyn Future<Output = Result<Events<'a, T, Evs>, BasicError>> + 'a>,
Box<dyn Future<Output = Result<Events<'a, T, Evs>, BasicError>> + Send + 'a>,
>,
>,
_event_type: std::marker::PhantomData<Evs>,
Expand Down Expand Up @@ -175,3 +175,15 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
Poll::Ready(Some(events))
}
}

#[cfg(test)]
mod test {
use super::*;

// Ensure `EventSubscription` can be sent; only actually a compile-time check.
#[test]
fn check_sendability() {
fn assert_send<T: Send>() {}
assert_send::<EventSubscription<crate::DefaultConfig, ()>>();
}
}
12 changes: 8 additions & 4 deletions subxt/src/events/filter_events.rs
Expand Up @@ -51,7 +51,8 @@ pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> {
FilteredEventDetails<T::Hash, Filter::ReturnType>,
BasicError,
>,
> + 'a,
> + Send
+ 'a,
>,
>,
}
Expand Down Expand Up @@ -131,7 +132,8 @@ pub trait EventFilter: private::Sealed {
FilteredEventDetails<T::Hash, Self::ReturnType>,
BasicError,
>,
> + 'a,
> + Send
+ 'a,
>;
}

Expand All @@ -150,7 +152,9 @@ impl<Ev: Event> EventFilter for (Ev,) {
fn filter<'a, T: Config, Evs: Decode + 'static>(
events: Events<'a, T, Evs>,
) -> Box<
dyn Iterator<Item = Result<FilteredEventDetails<T::Hash, Ev>, BasicError>> + 'a,
dyn Iterator<Item = Result<FilteredEventDetails<T::Hash, Ev>, BasicError>>
+ Send
+ 'a,
> {
let block_hash = events.block_hash();
let mut iter = events.into_iter_raw();
Expand Down Expand Up @@ -189,7 +193,7 @@ macro_rules! impl_event_filter {
type ReturnType = ( $(Option<$ty>,)+ );
fn filter<'a, T: Config, Evs: Decode + 'static>(
events: Events<'a, T, Evs>
) -> Box<dyn Iterator<Item=Result<FilteredEventDetails<T::Hash,Self::ReturnType>, BasicError>> + 'a> {
) -> Box<dyn Iterator<Item=Result<FilteredEventDetails<T::Hash,Self::ReturnType>, BasicError>> + Send + 'a> {
let block_hash = events.block_hash();
let mut iter = events.into_iter_raw();
Box::new(std::iter::from_fn(move || {
Expand Down
11 changes: 4 additions & 7 deletions subxt/src/rpc.rs
Expand Up @@ -31,15 +31,12 @@ use crate::{
storage::StorageKeyPrefix,
Config,
Metadata,
PhantomDataSendSync,
};
use codec::{
Decode,
Encode,
};
use core::{
convert::TryInto,
marker::PhantomData,
};
use frame_metadata::RuntimeMetadataPrefixed;
pub use jsonrpsee::{
client_transport::ws::{
Expand Down Expand Up @@ -211,14 +208,14 @@ pub struct ReadProof<Hash> {
pub struct Rpc<T: Config> {
/// Rpc client for sending requests.
pub client: Arc<RpcClient>,
marker: PhantomData<T>,
_marker: PhantomDataSendSync<T>,
}

impl<T: Config> Clone for Rpc<T> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
marker: PhantomData,
_marker: PhantomDataSendSync::new(),
}
}
}
Expand All @@ -228,7 +225,7 @@ impl<T: Config> Rpc<T> {
pub fn new(client: RpcClient) -> Self {
Self {
client: Arc::new(client),
marker: PhantomData,
_marker: PhantomDataSendSync::new(),
}
}

Expand Down
42 changes: 42 additions & 0 deletions subxt/tests/integration/events.rs
Expand Up @@ -135,3 +135,45 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> {

Ok(())
}

// This is just a compile-time check that we can subscribe to events in
// a context that requires the event subscription/filtering to be Send-able.
// We test a typical use of EventSubscription and FilterEvents. We don't need
// to run this code; just check that it compiles.
#[allow(unused)]
async fn check_events_are_sendable() {
// check that EventSubscription can be used across await points.
async_std::task::spawn(async {
let ctx = test_context().await;

let mut event_sub = ctx.api.events().subscribe().await?;

while let Some(ev) = event_sub.next().await {
// if `event_sub` doesn't implement Send, we can't hold
// it across an await point inside of a tokio::spawn, which
// requires Send. This will lead to a compile error.
}

Ok::<_, subxt::BasicError>(())
});

// Check that FilterEvents can be used across await points.
async_std::task::spawn(async {
let ctx = test_context().await;

let mut event_sub = ctx
.api
.events()
.subscribe()
.await?
.filter_events::<(balances::events::Transfer,)>();

while let Some(ev) = event_sub.next().await {
// if `event_sub` doesn't implement Send, we can't hold
// it across an await point inside of a tokio::spawn, which
// requires Send; This will lead to a compile error.
}

Ok::<_, subxt::BasicError>(())
});
}

0 comments on commit a091d2b

Please sign in to comment.