diff --git a/node/network/availability-distribution/src/lib.rs b/node/network/availability-distribution/src/lib.rs index 0eb48a6f356a..eae5b93d6e0c 100644 --- a/node/network/availability-distribution/src/lib.rs +++ b/node/network/availability-distribution/src/lib.rs @@ -22,8 +22,10 @@ //! peers. Verified in this context means, the erasure chunks contained merkle proof //! is checked. +#[deny(unused_extern_crates, unused_results, unused_qualifications)] + use codec::{Decode, Encode}; -use futures::{channel::oneshot, FutureExt}; +use futures::{channel::oneshot, FutureExt, TryFutureExt}; use sp_core::crypto::Public; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; @@ -53,27 +55,71 @@ use polkadot_node_network_protocol::{ NetworkBridgeEvent, }; use std::collections::{HashMap, HashSet}; -use std::io; use std::iter; use thiserror::Error; const TARGET: &'static str = "avad"; #[derive(Debug, Error)] enum Error { - #[error(transparent)] - Erasure(polkadot_erasure_coding::Error), - - #[error(transparent)] - Io(io::Error), + #[error("Sending PendingAvailability query failed")] + QueryPendingAvailabilitySendQuery(#[source] SubsystemError), + #[error("Response channel to obtain PendingAvailability failed")] + QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled), + #[error("RuntimeAPI to obtain PendingAvailability failed")] + QueryPendingAvailability(#[source] RuntimeApiError), + + #[error("Sending StoreChunk query failed")] + StoreChunkSendQuery(#[source] SubsystemError), + #[error("Response channel to obtain StoreChunk failed")] + StoreChunkResponseChannel(#[source] oneshot::Canceled), + + #[error("Sending QueryChunk query failed")] + QueryChunkSendQuery(#[source] SubsystemError), + #[error("Response channel to obtain QueryChunk failed")] + QueryChunkResponseChannel(#[source] oneshot::Canceled), + + #[error("Sending QueryAncestors query failed")] + QueryAncestorsSendQuery(#[source] SubsystemError), + #[error("Response channel to obtain QueryAncestors failed")] + QueryAncestorsResponseChannel(#[source] oneshot::Canceled), + #[error("RuntimeAPI to obtain QueryAncestors failed")] + QueryAncestors(#[source] ChainApiError), + + + #[error("Sending QuerySession query failed")] + QuerySessionSendQuery(#[source] SubsystemError), + #[error("Response channel to obtain QuerySession failed")] + QuerySessionResponseChannel(#[source] oneshot::Canceled), + #[error("RuntimeAPI to obtain QuerySession failed")] + QuerySession(#[source] RuntimeApiError), + + #[error("Sending QueryValidators query failed")] + QueryValidatorsSendQuery(#[source] SubsystemError), + #[error("Response channel to obtain QueryValidators failed")] + QueryValidatorsResponseChannel(#[source] oneshot::Canceled), + #[error("RuntimeAPI to obtain QueryValidators failed")] + QueryValidators(#[source] RuntimeApiError), + + #[error("Sending AvailabilityCores query failed")] + AvailabilityCoresSendQuery(#[source] SubsystemError), + #[error("Response channel to obtain AvailabilityCores failed")] + AvailabilityCoresResponseChannel(#[source] oneshot::Canceled), + #[error("RuntimeAPI to obtain AvailabilityCores failed")] + AvailabilityCores(#[source] RuntimeApiError), + + #[error("Sending AvailabilityCores query failed")] + QueryAvailabilitySendQuery(#[source] SubsystemError), + #[error("Response channel to obtain AvailabilityCores failed")] + QueryAvailabilityResponseChannel(#[source] oneshot::Canceled), + + #[error("Sending out a peer report message")] + ReportPeerMessageSend(#[source] SubsystemError), - #[error(transparent)] - Oneshot(oneshot::Canceled), - - #[error(transparent)] - RuntimeApi(RuntimeApiError), + #[error("Sending a gossip message")] + TrackedGossipMessage(#[source] SubsystemError), - #[error(transparent)] - ChainApi(ChainApiError), + #[error("Receive channel closed")] + IncomingMessageChannel(#[source] SubsystemError), } type Result = std::result::Result; @@ -508,7 +554,7 @@ where ), )) .await - .map_err::(Into::into)?; + .map_err(|e| Error::TrackedGossipMessage(e))?; metrics.on_chunk_distributed(); } @@ -730,7 +776,10 @@ impl AvailabilityDistributionSubsystem { // work: process incoming messages from the overseer. let mut state = ProtocolState::default(); loop { - let message = ctx.recv().await.map_err::(Into::into)?; + let message = ctx + .recv() + .await + .map_err(|e| Error::IncomingMessageChannel(e))?; match message { FromOverseer::Communication { msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event), @@ -770,9 +819,9 @@ where fn start(self, ctx: Context) -> SpawnedSubsystem { let future = self.run(ctx) - .map_err(|e| { + .map_err(|e| SubsystemError::with_origin("availability-distribution", e) - }) + ) .map(|_| ()).boxed(); SpawnedSubsystem { @@ -881,14 +930,16 @@ where { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx), - ))) - .await - .map_err::(Into::into)?; + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx), + ))) + .await + .map_err(|e| Error::AvailabilityCoresSendQuery(e))?; let all_para_ids: Vec<_> = rx - .await??; + .await + .map_err(|e| Error::AvailabilityCoresResponseChannel(e))? + .map_err(|e| Error::AvailabilityCores(e))?; let occupied_para_ids = all_para_ids .into_iter() @@ -915,10 +966,10 @@ where peer ); ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep), - )) - .await - .map_err::(Into::into) + NetworkBridgeMessage::ReportPeer(peer, rep), + )) + .await + .map_err(|e| Error::ReportPeerMessageSend(e)) } /// Query the proof of validity for a particular candidate hash. @@ -931,10 +982,14 @@ where { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( - AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), - )) - .await?; - rx.await.map_err::(Into::into) + AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), + )) + .await + .map_err(|e| Error::QueryAvailabilitySendQuery(e)) + ?; + rx + .await + .map_err(|e| Error::QueryAvailabilityResponseChannel(e)) } @@ -950,8 +1005,10 @@ where ctx.send_message(AllMessages::AvailabilityStore( AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx), )) - .await?; - rx.await.map_err::(Into::into) + .await + .map_err(|e| Error::QueryChunkSendQuery(e))?; + rx.await + .map_err(|e| Error::QueryChunkResponseChannel(e)) } @@ -966,9 +1023,13 @@ where { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx), - )).await?; - rx.await.map_err::(Into::into) + AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx), + )) + .await + .map_err(|e| Error::StoreChunkSendQuery(e))?; + rx + .await + .map_err(|e| Error::StoreChunkResponseChannel(e)) } /// Request the head data for a particular para. @@ -985,9 +1046,12 @@ where relay_parent, RuntimeApiRequest::CandidatePendingAvailability(para, tx), ))) - .await?; - rx.await? - .map_err::(Into::into) + .await + .map_err(|e| Error::QueryPendingAvailabilitySendQuery(e))?; + + rx.await + .map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))? + .map_err(|e| Error::QueryPendingAvailability(e)) } /// Query the validator set. @@ -1005,9 +1069,11 @@ where )); ctx.send_message(query_validators) - .await?; - rx.await? - .map_err::(Into::into) + .await + .map_err(|e| Error::QueryValidatorsSendQuery(e))?; + rx.await + .map_err(|e| Error::QueryValidatorsResponseChannel(e))? + .map_err(|e| Error::QueryValidators(e)) } /// Query the hash of the `K` ancestors @@ -1027,9 +1093,11 @@ where }); ctx.send_message(query_ancestors) - .await?; - rx.await? - .map_err::(Into::into) + .await + .map_err(|e| Error::QueryAncestorsSendQuery(e))?; + rx.await + .map_err(|e| Error::QueryAncestorsResponseChannel(e))? + .map_err(|e| Error::QueryAncestors(e)) } /// Query the session index of a relay parent @@ -1047,9 +1115,11 @@ where )); ctx.send_message(query_session_idx_for_child) - .await?; - rx.await? - .map_err::(Into::into) + .await + .map_err(|e| Error::QuerySessionSendQuery(e))?; + rx.await + .map_err(|e| Error::QuerySessionResponseChannel(e))? + .map_err(|e| Error::QuerySession(e)) } /// Queries up to k ancestors with the constraints of equiv session diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index cb220ce636c4..126ff3a1f6e5 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -142,14 +142,14 @@ pub enum SubsystemError { /// An additional anotation tag for the origin of `source`. origin: &'static str, /// The wrapped error. Marked as source for tracking the error chain. - #[source] source: Box + #[source] source: Box }, } impl SubsystemError { /// Adds a `str` as `origin` to the given error `err`. - pub fn with_origin(origin: &'static str, err: impl Into) -> Self { - Self::FromOrigin{ origin, source: Box::new(err.into()) } + pub fn with_origin(origin: &'static str, err: E) -> Self { + Self::FromOrigin{ origin, source: Box::new(err) } } }