Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
avail-dist: expressive errors
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr committed Oct 22, 2020
1 parent 39c756e commit d4e231d
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 52 deletions.
168 changes: 119 additions & 49 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.

#[deny(unused_extern_crates, unused_results, unused_qualifications)]

use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
use futures::{channel::oneshot, FutureExt, TryFutureExt};

use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
Expand Down Expand Up @@ -53,27 +55,71 @@ use polkadot_node_network_protocol::{
NetworkBridgeEvent,
};
use std::collections::{HashMap, HashSet};
use std::io;
use std::iter;
use thiserror::Error;
const TARGET: &'static str = "avad";

#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Erasure(polkadot_erasure_coding::Error),

#[error(transparent)]
Io(io::Error),
#[error("Sending PendingAvailability query failed")]
QueryPendingAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),

#[error("Sending StoreChunk query failed")]
StoreChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled),

#[error("Sending QueryChunk query failed")]
QueryChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryChunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),

#[error("Sending QueryAncestors query failed")]
QueryAncestorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryAncestors failed")]
QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryAncestors failed")]
QueryAncestors(#[source] ChainApiError),


#[error("Sending QuerySession query failed")]
QuerySessionSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QuerySession failed")]
QuerySessionResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QuerySession failed")]
QuerySession(#[source] RuntimeApiError),

#[error("Sending QueryValidators query failed")]
QueryValidatorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryValidators failed")]
QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryValidators failed")]
QueryValidators(#[source] RuntimeApiError),

#[error("Sending AvailabilityCores query failed")]
AvailabilityCoresSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain AvailabilityCores failed")]
AvailabilityCores(#[source] RuntimeApiError),

#[error("Sending AvailabilityCores query failed")]
QueryAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),

#[error("Sending out a peer report message")]
ReportPeerMessageSend(#[source] SubsystemError),

#[error(transparent)]
Oneshot(oneshot::Canceled),

#[error(transparent)]
RuntimeApi(RuntimeApiError),
#[error("Sending a gossip message")]
TrackedGossipMessage(#[source] SubsystemError),

#[error(transparent)]
ChainApi(ChainApiError),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
}

type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -508,7 +554,7 @@ where
),
))
.await
.map_err::<Error, _>(Into::into)?;
.map_err(|e| Error::TrackedGossipMessage(e))?;

metrics.on_chunk_distributed();
}
Expand Down Expand Up @@ -730,7 +776,10 @@ impl AvailabilityDistributionSubsystem {
// work: process incoming messages from the overseer.
let mut state = ProtocolState::default();
loop {
let message = ctx.recv().await.map_err::<Error, _>(Into::into)?;
let message = ctx
.recv()
.await
.map_err(|e| Error::IncomingMessageChannel(e))?;
match message {
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event),
Expand Down Expand Up @@ -770,9 +819,9 @@ where
fn start(self, ctx: Context) -> SpawnedSubsystem {

let future = self.run(ctx)
.map_err(|e| {
.map_err(|e|
SubsystemError::with_origin("availability-distribution", e)
})
)
.map(|_| ()).boxed();

SpawnedSubsystem {
Expand Down Expand Up @@ -881,14 +930,16 @@ where
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
)))
.await
.map_err::<Error, _>(Into::into)?;
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
)))
.await
.map_err(|e| Error::AvailabilityCoresSendQuery(e))?;

let all_para_ids: Vec<_> = rx
.await??;
.await
.map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
.map_err(|e| Error::AvailabilityCores(e))?;

let occupied_para_ids = all_para_ids
.into_iter()
Expand All @@ -915,10 +966,10 @@ where
peer
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
))
.await
.map_err::<Error, _>(Into::into)
NetworkBridgeMessage::ReportPeer(peer, rep),
))
.await
.map_err(|e| Error::ReportPeerMessageSend(e))
}

/// Query the proof of validity for a particular candidate hash.
Expand All @@ -931,10 +982,14 @@ where
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
))
.await?;
rx.await.map_err::<Error, _>(Into::into)
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
))
.await
.map_err(|e| Error::QueryAvailabilitySendQuery(e))
?;
rx
.await
.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
}


Expand All @@ -950,8 +1005,10 @@ where
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
))
.await?;
rx.await.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QueryChunkSendQuery(e))?;
rx.await
.map_err(|e| Error::QueryChunkResponseChannel(e))
}


Expand All @@ -966,9 +1023,13 @@ where
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx),
)).await?;
rx.await.map_err::<Error, _>(Into::into)
AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx),
))
.await
.map_err(|e| Error::StoreChunkSendQuery(e))?;
rx
.await
.map_err(|e| Error::StoreChunkResponseChannel(e))
}

/// Request the head data for a particular para.
Expand All @@ -985,9 +1046,12 @@ where
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
)))
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QueryPendingAvailabilitySendQuery(e))?;

rx.await
.map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))?
.map_err(|e| Error::QueryPendingAvailability(e))
}

/// Query the validator set.
Expand All @@ -1005,9 +1069,11 @@ where
));

ctx.send_message(query_validators)
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QueryValidatorsSendQuery(e))?;
rx.await
.map_err(|e| Error::QueryValidatorsResponseChannel(e))?
.map_err(|e| Error::QueryValidators(e))
}

/// Query the hash of the `K` ancestors
Expand All @@ -1027,9 +1093,11 @@ where
});

ctx.send_message(query_ancestors)
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QueryAncestorsSendQuery(e))?;
rx.await
.map_err(|e| Error::QueryAncestorsResponseChannel(e))?
.map_err(|e| Error::QueryAncestors(e))
}

/// Query the session index of a relay parent
Expand All @@ -1047,9 +1115,11 @@ where
));

ctx.send_message(query_session_idx_for_child)
.await?;
rx.await?
.map_err::<Error, _>(Into::into)
.await
.map_err(|e| Error::QuerySessionSendQuery(e))?;
rx.await
.map_err(|e| Error::QuerySessionResponseChannel(e))?
.map_err(|e| Error::QuerySession(e))
}

/// Queries up to k ancestors with the constraints of equiv session
Expand Down
6 changes: 3 additions & 3 deletions node/subsystem/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ pub enum SubsystemError {
/// An additional anotation tag for the origin of `source`.
origin: &'static str,
/// The wrapped error. Marked as source for tracking the error chain.
#[source] source: Box<Self>
#[source] source: Box<dyn std::error::Error + Send>
},
}

impl SubsystemError {
/// Adds a `str` as `origin` to the given error `err`.
pub fn with_origin(origin: &'static str, err: impl Into<Self>) -> Self {
Self::FromOrigin{ origin, source: Box::new(err.into()) }
pub fn with_origin<E: 'static + Send+ std::error::Error>(origin: &'static str, err: E) -> Self {
Self::FromOrigin{ origin, source: Box::new(err) }
}
}

Expand Down

0 comments on commit d4e231d

Please sign in to comment.