Skip to content

Commit

Permalink
Remove wait_for_event and simplify event access in `subscribe_event…
Browse files Browse the repository at this point in the history
…s` (#437)

* exchange wait_for_events with subscribe_for_event_type

* fix comment

* move event subscription to event file

* add struct EventSubcription

* add EventSubscribe and remove std only function. Overkill

* add event record type and adapt tests accordingly

* fix example

* fix clippy

* fix no_std build

* remove comment

* add some hopefully helpful comments

* add map_err

* fix naming

* remove map_err()

* add comment .

* remove obsolete debug msg

* remove obsolete comment

* fix import

* remove untrue comment
  • Loading branch information
haerdib committed Jan 23, 2023
1 parent e27d177 commit 7dd79e4
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 131 deletions.
53 changes: 30 additions & 23 deletions examples/examples/event_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,16 @@
limitations under the License.
*/

//! Very simple example that shows how to subscribe to events.
//! Example that shows how to subscribe to events and do some action
//! upon encounterign them.

use codec::Decode;
use kitchensink_runtime::Runtime;
use log::debug;
use sp_core::H256 as Hash;
use substrate_api_client::{
rpc::{HandleSubscription, JsonrpseeClient},
Api, PlainTipExtrinsicParams, SubscribeFrameSystem,
};
use substrate_api_client::{rpc::JsonrpseeClient, Api, PlainTipExtrinsicParams, SubscribeEvents};

// This module depends on node_runtime.
// To avoid dependency collisions, node_runtime has been removed from the substrate-api-client library.
// This module depends on the specific node runtime.
// Replace this crate by your own if you run a custom substrate node to get your custom events.
use kitchensink_runtime::RuntimeEvent;
use kitchensink_runtime::{Runtime, RuntimeEvent};

#[tokio::main]
async fn main() {
Expand All @@ -38,21 +33,17 @@ async fn main() {
let api = Api::<(), _, PlainTipExtrinsicParams<Runtime>, Runtime>::new(client).unwrap();

println!("Subscribe to events");
let mut subscription = api.subscribe_system_events().unwrap();
let mut subscription = api.subscribe_events().unwrap();

// Wait for event callbacks from the node, which are received via subscription.
for _ in 0..5 {
let event_bytes = subscription.next().unwrap().unwrap().changes[0].1.clone().unwrap().0;
let events = Vec::<frame_system::EventRecord<RuntimeEvent, Hash>>::decode(
&mut event_bytes.as_slice(),
)
.unwrap();
for evr in &events {
println!("decoded: {:?} {:?}", evr.phase, evr.event);
match &evr.event {
RuntimeEvent::Balances(be) => {
println!(">>>>>>>>>> balances event: {:?}", be);
match &be {
let event_records = subscription.next_event::<RuntimeEvent, Hash>().unwrap().unwrap();
for event_record in &event_records {
println!("decoded: {:?} {:?}", event_record.phase, event_record.event);
match &event_record.event {
RuntimeEvent::Balances(balances_event) => {
println!(">>>>>>>>>> balances event: {:?}", balances_event);
match &balances_event {
pallet_balances::Event::Transfer { from, to, amount } => {
println!("Transactor: {:?}", from);
println!("Destination: {:?}", to);
Expand All @@ -64,8 +55,24 @@ async fn main() {
},
}
},
_ => debug!("ignoring unsupported module event: {:?}", evr.event),
RuntimeEvent::System(system_event) => {
println!(">>>>>>>>>> system event: {:?}", system_event);
match &system_event {
frame_system::Event::ExtrinsicSuccess { dispatch_info } => {
println!("DispatchInfo: {:?}", dispatch_info);
return
},
_ => {
debug!("ignoring unsupported system event");
},
}
},
_ => debug!("ignoring unsupported module event: {:?}", event_record.event),
}
}
}

// After we finished whatever we wanted, unusubscribe from the subscription,
// to ensure, that the node does not keep sending us events.
subscription.unsubscribe().unwrap();
}
13 changes: 13 additions & 0 deletions node-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub trait StaticEvent: Decode {
}

/// A phase of a block's execution.
// https://github.com/paritytech/substrate/blob/2bfc1dd66ef32cf8beb90007dfb544a9d28f1b2f/frame/system/src/lib.rs#L698-L708
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Encode, Decode)]
pub enum Phase {
/// Applying an extrinsic.
Expand All @@ -79,3 +80,15 @@ pub enum Phase {
/// Initializing the block.
Initialization,
}

/// Record of an event happening.
// https://github.com/paritytech/substrate/blob/2bfc1dd66ef32cf8beb90007dfb544a9d28f1b2f/frame/system/src/lib.rs#L716-L726
#[derive(Encode, Decode, PartialEq, Eq, Clone)]
pub struct EventRecord<E, T> {
/// The phase of the block it happened in.
pub phase: Phase,
/// The event itself.
pub event: E,
/// The list of the topics this event has.
pub topics: Vec<T>,
}
125 changes: 68 additions & 57 deletions src/api/rpc_api/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use crate::{
rpc::{HandleSubscription, Request, Subscribe},
GetBlock, GetStorage, Phase,
};
use ac_node_api::{EventDetails, Events, StaticEvent};
use ac_compose_macros::rpc_params;
use ac_node_api::{EventDetails, EventRecord, Events};
use ac_primitives::{ExtrinsicParams, FrameSystemConfig, StorageChangeSet};
use alloc::vec::Vec;
use codec::Encode;
use alloc::{vec, vec::Vec};
use codec::{Decode, Encode};
use core::marker::PhantomData;
use log::*;
use serde::de::DeserializeOwned;
use sp_runtime::traits::{Block as BlockTrait, GetRuntimeBlockType, Hash as HashTrait};
Expand Down Expand Up @@ -70,24 +72,65 @@ where
}
}

// FIXME: This should rather be implemented directly on the
// Subscription return value, rather than the api. Or directly
// subcribe. Should be looked at in #288
// https://github.com/scs/substrate-api-client/issues/288#issuecomment-1346221653
/// Wrapper around a Event `StorageChangeSet` subscription.
/// Simplifies the event retrieval from the subscription.
pub struct EventSubscription<Subscription, Hash> {
pub subscription: Subscription,
_phantom: PhantomData<Hash>,
}

impl<Subscription, Hash> EventSubscription<Subscription, Hash>
where
Hash: DeserializeOwned,
Subscription: HandleSubscription<StorageChangeSet<Hash>>,
{
/// Create a new wrapper around the subscription.
pub fn new(subscription: Subscription) -> Self {
Self { subscription, _phantom: Default::default() }
}

/// Wait for the next value from the internal subscription.
/// Upon encounter, it retrieves and decodes the expected `EventRecord`.
pub fn next_event<RuntimeEvent: Decode, Topic: Decode>(
&mut self,
) -> Option<Result<Vec<EventRecord<RuntimeEvent, Topic>>>> {
let change_set = match self.subscription.next()? {
Ok(set) => set,
Err(e) => return Some(Err(Error::RpcClient(e))),
};
// Since we subscribed to only the events key, we can simply take the first value of the
// changes in the set. Also, we don't care about the key but only the data, so take
// the second value in the tuple of two.
let storage_data = change_set.changes[0].1.as_ref()?;
let events = Decode::decode(&mut storage_data.0.as_slice()).map_err(Error::Codec);
Some(events)
}

/// Unsubscribe from the internal subscription.
pub fn unsubscribe(self) -> Result<()> {
self.subscription.unsubscribe().map_err(|e| e.into())
}
}

impl<Subscription, Hash> From<Subscription> for EventSubscription<Subscription, Hash>
where
Hash: DeserializeOwned,
Subscription: HandleSubscription<StorageChangeSet<Hash>>,
{
fn from(subscription: Subscription) -> Self {
EventSubscription::new(subscription)
}
}

pub trait SubscribeEvents<Client, Hash>
where
Client: Subscribe,
Hash: DeserializeOwned,
{
fn wait_for_event<Ev: StaticEvent>(
&self,
subscription: &mut Client::Subscription<StorageChangeSet<Hash>>,
) -> Result<Ev>;

fn wait_for_event_details<Ev: StaticEvent>(
/// Subscribe to events.
fn subscribe_events(
&self,
subscription: &mut Client::Subscription<StorageChangeSet<Hash>>,
) -> Result<EventDetails>;
) -> Result<EventSubscription<Client::Subscription<StorageChangeSet<Hash>>, Hash>>;
}

impl<Signer, Client, Params, Runtime> SubscribeEvents<Client, Runtime::Hash>
Expand All @@ -97,49 +140,17 @@ where
Params: ExtrinsicParams<Runtime::Index, Runtime::Hash>,
Runtime: FrameSystemConfig,
{
fn wait_for_event<Ev: StaticEvent>(
fn subscribe_events(
&self,
subscription: &mut Client::Subscription<StorageChangeSet<Runtime::Hash>>,
) -> Result<Ev> {
let maybe_event_details = self.wait_for_event_details::<Ev>(subscription)?;
maybe_event_details
.as_event()?
.ok_or(Error::Other("Could not find the specific event".into()))
}

fn wait_for_event_details<Ev: StaticEvent>(
&self,
subscription: &mut Client::Subscription<StorageChangeSet<Runtime::Hash>>,
) -> Result<EventDetails> {
while let Some(change_set) = subscription.next() {
let event_bytes = change_set?.changes[0].1.as_ref().unwrap().0.clone();
let events = Events::<Runtime::Hash>::new(
self.metadata().clone(),
Default::default(),
event_bytes,
);
for maybe_event_details in events.iter() {
let event_details = maybe_event_details?;

// Check for failed xt and return as Dispatch Error in case we find one.
// Careful - this reports the first one encountered. This event may belong to another extrinsic
// than the one that is being waited for.
event_details.check_if_failed()?;

let event_metadata = event_details.event_metadata();
trace!(
"Found extrinsic: {:?}, {:?}",
event_metadata.pallet(),
event_metadata.event()
);
if event_metadata.pallet() == Ev::PALLET && event_metadata.event() == Ev::EVENT {
return Ok(event_details)
} else {
trace!("Not the event we are looking for, skipping.")
}
}
}
Err(Error::NoStream)
) -> Result<
EventSubscription<Client::Subscription<StorageChangeSet<Runtime::Hash>>, Runtime::Hash>,
> {
let key = crate::storage_key("System", "Events");
let subscription = self
.client()
.subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
.map(|sub| sub.into())?;
Ok(subscription)
}
}

Expand Down
35 changes: 3 additions & 32 deletions src/api/rpc_api/frame_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@

use crate::{
api::{Api, GetStorage, Result},
rpc::{Request, Subscribe},
rpc::Request,
};
use ac_compose_macros::rpc_params;
use ac_primitives::{
AccountInfo, ExtrinsicParams, FrameSystemConfig, SignExtrinsic, StorageChangeSet, StorageKey,
};
use alloc::{string::String, vec, vec::Vec};
use ac_primitives::{AccountInfo, ExtrinsicParams, FrameSystemConfig, SignExtrinsic, StorageKey};
use alloc::{string::String, vec::Vec};
use log::*;
use serde::de::DeserializeOwned;

pub trait GetAccountInformation<AccountId> {
type Index;
Expand Down Expand Up @@ -158,29 +155,3 @@ where
Ok(res)
}
}

pub trait SubscribeFrameSystem<Client, Hash>
where
Client: Subscribe,
Hash: DeserializeOwned,
{
fn subscribe_system_events(&self) -> Result<Client::Subscription<StorageChangeSet<Hash>>>;
}

impl<Signer, Client, Params, Runtime> SubscribeFrameSystem<Client, Runtime::Hash>
for Api<Signer, Client, Params, Runtime>
where
Client: Subscribe,
Params: ExtrinsicParams<Runtime::Index, Runtime::Hash>,
Runtime: FrameSystemConfig,
{
fn subscribe_system_events(
&self,
) -> Result<Client::Subscription<StorageChangeSet<Runtime::Hash>>> {
debug!("subscribing to events");
let key = crate::storage_key("System", "Events");
self.client()
.subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
.map_err(|e| e.into())
}
}
23 changes: 21 additions & 2 deletions testing/examples/events_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

use codec::Decode;
use frame_support::dispatch::DispatchInfo;
use kitchensink_runtime::{Runtime, Signature};
use kitchensink_runtime::{Runtime, RuntimeEvent, Signature};
use sp_keyring::AccountKeyring;
use substrate_api_client::{
extrinsic::BalancesExtrinsics, rpc::JsonrpseeClient, Api, AssetTipExtrinsicParams,
EventDetails, ExtrinsicSigner, FetchEvents, GetBlock, StaticEvent, SubmitAndWatch, XtStatus,
EventDetails, ExtrinsicSigner, FetchEvents, FrameSystemConfig, GetBlock, StaticEvent,
SubmitAndWatch, SubscribeEvents, XtStatus,
};

/// Check out frame_system::Event::ExtrinsicSuccess:
Expand Down Expand Up @@ -60,6 +61,24 @@ async fn main() {
.fetch_events_for_extrinsic(report.extrinsic_hash, report.block_hash.unwrap())
.unwrap();
assert_assosciated_events_match_expected(extrinisc_events);

// Subscribe to system events.
let mut event_subscription = api.subscribe_events().unwrap();

// Wait for event callbacks from the node, which are received via subscription.
for _ in 0..5 {
let event_records = event_subscription
.next_event::<RuntimeEvent, <Runtime as FrameSystemConfig>::Hash>()
.unwrap()
.unwrap();
for event_record in &event_records {
println!("got event: {:?} {:?}", event_record.phase, event_record.event);
match &event_record.event {
RuntimeEvent::System(_) => println!("Got System event, all good"),
_ => panic!("Unexpected event"),
}
}
}
}

fn assert_assosciated_events_match_expected(events: Vec<EventDetails>) {
Expand Down
Loading

0 comments on commit 7dd79e4

Please sign in to comment.