-
Notifications
You must be signed in to change notification settings - Fork 292
implement variant of subscription that returns finalized storage changes #237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,6 @@ use sp_core::{ | |
| StorageData, | ||
| StorageKey, | ||
| }, | ||
| twox_128, | ||
| Bytes, | ||
| }; | ||
| use sp_rpc::{ | ||
|
|
@@ -86,7 +85,12 @@ use crate::{ | |
| }, | ||
| metadata::Metadata, | ||
| runtimes::Runtime, | ||
| subscription::EventSubscription, | ||
| subscription::{ | ||
| EventStorageSubscription, | ||
| EventSubscription, | ||
| FinalizedEventStorageSubscription, | ||
| SystemEvents, | ||
| }, | ||
| }; | ||
|
|
||
| pub type ChainBlock<T> = | ||
|
|
@@ -256,13 +260,15 @@ pub struct ReadProof<Hash> { | |
| pub struct Rpc<T: Runtime> { | ||
| client: RpcClient, | ||
| marker: PhantomData<T>, | ||
| accept_weak_inclusion: bool, | ||
| } | ||
|
|
||
| impl<T: Runtime> Clone for Rpc<T> { | ||
| fn clone(&self) -> Self { | ||
| Self { | ||
| client: self.client.clone(), | ||
| marker: PhantomData, | ||
| accept_weak_inclusion: self.accept_weak_inclusion, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -272,9 +278,16 @@ impl<T: Runtime> Rpc<T> { | |
| Self { | ||
| client, | ||
| marker: PhantomData, | ||
| accept_weak_inclusion: false, | ||
| } | ||
| } | ||
|
|
||
| /// Configure the Rpc to accept non-finalized blocks | ||
| /// in `submit_and_watch_extrinsic` | ||
| pub fn accept_weak_inclusion(&mut self) { | ||
|
gregdhill marked this conversation as resolved.
|
||
| self.accept_weak_inclusion = true; | ||
| } | ||
|
|
||
| /// Fetch a storage key | ||
| pub async fn storage( | ||
| &self, | ||
|
|
@@ -439,22 +452,31 @@ impl<T: Runtime> Rpc<T> { | |
| Ok(version) | ||
| } | ||
|
|
||
| /// Subscribe to substrate System Events | ||
| pub async fn subscribe_events( | ||
| &self, | ||
| ) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> { | ||
| let mut storage_key = twox_128(b"System").to_vec(); | ||
| storage_key.extend(twox_128(b"Events").to_vec()); | ||
| log::debug!("Events storage key {:?}", hex::encode(&storage_key)); | ||
|
|
||
| let keys = Some(vec![StorageKey(storage_key)]); | ||
| /// Subscribe to System Events that are imported into blocks. | ||
| /// | ||
| /// *WARNING* these may not be included in the finalized chain, use | ||
| /// `subscribe_finalized_events` to ensure events are finalized. | ||
| pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> { | ||
|
gregdhill marked this conversation as resolved.
|
||
| let keys = Some(vec![StorageKey::from(SystemEvents::new())]); | ||
| let params = Params::Array(vec![to_json_value(keys)?]); | ||
|
|
||
| let subscription = self | ||
| .client | ||
| .subscribe("state_subscribeStorage", params, "state_unsubscribeStorage") | ||
| .await?; | ||
| Ok(subscription) | ||
| Ok(EventStorageSubscription::Imported(subscription)) | ||
| } | ||
|
|
||
| /// Subscribe to finalized events. | ||
| pub async fn subscribe_finalized_events( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps instead of the two different methods we could just check the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now it's probably cleaner with two distinct methods as we also have
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anyway I think we should remove |
||
| &self, | ||
| ) -> Result<EventStorageSubscription<T>, Error> { | ||
| Ok(EventStorageSubscription::Finalized( | ||
| FinalizedEventStorageSubscription::new( | ||
| self.clone(), | ||
| self.subscribe_finalized_blocks().await?, | ||
| ), | ||
| )) | ||
| } | ||
|
|
||
| /// Subscribe to blocks. | ||
|
|
@@ -464,7 +486,7 @@ impl<T: Runtime> Rpc<T> { | |
| .subscribe( | ||
| "chain_subscribeNewHeads", | ||
| Params::None, | ||
| "chain_subscribeNewHeads", | ||
| "chain_unsubscribeNewHeads", | ||
| ) | ||
| .await?; | ||
|
|
||
|
|
@@ -480,7 +502,7 @@ impl<T: Runtime> Rpc<T> { | |
| .subscribe( | ||
| "chain_subscribeFinalizedHeads", | ||
| Params::None, | ||
| "chain_subscribeFinalizedHeads", | ||
| "chain_unsubscribeFinalizedHeads", | ||
| ) | ||
| .await?; | ||
| Ok(subscription) | ||
|
|
@@ -526,66 +548,39 @@ impl<T: Runtime> Rpc<T> { | |
| let ext_hash = T::Hashing::hash_of(&extrinsic); | ||
| log::info!("Submitting Extrinsic `{:?}`", ext_hash); | ||
|
|
||
| let events_sub = self.subscribe_events().await?; | ||
| let events_sub = if self.accept_weak_inclusion { | ||
| self.subscribe_events().await | ||
| } else { | ||
| self.subscribe_finalized_events().await | ||
| }?; | ||
| let mut xt_sub = self.watch_extrinsic(extrinsic).await?; | ||
|
|
||
| while let Some(status) = xt_sub.next().await { | ||
| // log::info!("received status {:?}", status); | ||
| log::info!("received status {:?}", status); | ||
| match status { | ||
| // ignore in progress extrinsic for now | ||
| TransactionStatus::Future | ||
| | TransactionStatus::Ready | ||
| | TransactionStatus::Broadcast(_) => continue, | ||
| TransactionStatus::InBlock(block_hash) => { | ||
| log::info!("Fetching block {:?}", block_hash); | ||
| let block = self.block(Some(block_hash)).await?; | ||
| return match block { | ||
| Some(signed_block) => { | ||
| log::info!( | ||
| "Found block {:?}, with {} extrinsics", | ||
| block_hash, | ||
| signed_block.block.extrinsics.len() | ||
| ); | ||
| let ext_index = signed_block | ||
| .block | ||
| .extrinsics | ||
| .iter() | ||
| .position(|ext| { | ||
| let hash = T::Hashing::hash_of(ext); | ||
| hash == ext_hash | ||
| }) | ||
| .ok_or_else(|| { | ||
| Error::Other(format!( | ||
| "Failed to find Extrinsic with hash {:?}", | ||
| ext_hash, | ||
| )) | ||
| })?; | ||
| let mut sub = EventSubscription::new(events_sub, &decoder); | ||
| sub.filter_extrinsic(block_hash, ext_index); | ||
| let mut events = vec![]; | ||
| while let Some(event) = sub.next().await { | ||
| events.push(event?); | ||
| } | ||
| Ok(ExtrinsicSuccess { | ||
| block: block_hash, | ||
| extrinsic: ext_hash, | ||
| events, | ||
| }) | ||
| } | ||
| None => { | ||
| Err(format!("Failed to find block {:?}", block_hash).into()) | ||
| } | ||
| if self.accept_weak_inclusion { | ||
| return self | ||
| .process_block(events_sub, decoder, block_hash, ext_hash) | ||
| .await | ||
| } | ||
| continue | ||
| } | ||
| TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()), | ||
| TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()), | ||
| TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()), | ||
| TransactionStatus::Retracted(_) => { | ||
| return Err("Extrinsic Retracted".into()) | ||
| } | ||
| // should have made it `InBlock` before either of these | ||
| TransactionStatus::Finalized(_) => { | ||
| return Err("Extrinsic Finalized".into()) | ||
| TransactionStatus::Finalized(block_hash) => { | ||
| // read finalized blocks by default | ||
| return self | ||
| .process_block(events_sub, decoder, block_hash, ext_hash) | ||
| .await | ||
| } | ||
| TransactionStatus::FinalityTimeout(_) => { | ||
| return Err("Extrinsic FinalityTimeout".into()) | ||
|
|
@@ -595,6 +590,50 @@ impl<T: Runtime> Rpc<T> { | |
| Err(RpcError::Custom("RPC subscription dropped".into()).into()) | ||
| } | ||
|
|
||
| async fn process_block<'a>( | ||
| &self, | ||
| events_sub: EventStorageSubscription<T>, | ||
| decoder: &'a EventsDecoder<T>, | ||
| block_hash: T::Hash, | ||
| ext_hash: T::Hash, | ||
| ) -> Result<ExtrinsicSuccess<T>, Error> { | ||
| log::info!("Fetching block {:?}", block_hash); | ||
| if let Some(signed_block) = self.block(Some(block_hash)).await? { | ||
| log::info!( | ||
| "Found block {:?}, with {} extrinsics", | ||
| block_hash, | ||
| signed_block.block.extrinsics.len() | ||
| ); | ||
| let ext_index = signed_block | ||
| .block | ||
| .extrinsics | ||
| .iter() | ||
| .position(|ext| { | ||
| let hash = T::Hashing::hash_of(ext); | ||
| hash == ext_hash | ||
| }) | ||
| .ok_or_else(|| { | ||
| Error::Other(format!( | ||
| "Failed to find Extrinsic with hash {:?}", | ||
| ext_hash, | ||
| )) | ||
| })?; | ||
| let mut sub = EventSubscription::new(events_sub, &decoder); | ||
| sub.filter_extrinsic(block_hash, ext_index); | ||
| let mut events = vec![]; | ||
| while let Some(event) = sub.next().await { | ||
| events.push(event?); | ||
| } | ||
| Ok(ExtrinsicSuccess { | ||
| block: block_hash, | ||
| extrinsic: ext_hash, | ||
| events, | ||
| }) | ||
| } else { | ||
| Err(format!("Failed to find block {:?}", block_hash).into()) | ||
| } | ||
| } | ||
|
|
||
| /// Insert a key into the keystore. | ||
| pub async fn insert_key( | ||
| &self, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.