Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
refactor+feat: allow subsystems to send only declared messages, gener…
Browse files Browse the repository at this point in the history
…ate graphviz (#5314)

Closes #3774
Closes #3826
  • Loading branch information
drahnr committed May 12, 2022
1 parent 6969a59 commit 2c934ed
Show file tree
Hide file tree
Showing 102 changed files with 3,847 additions and 2,508 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bridges/.config/lingua.dic
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Best/MS
BlockId
BlockNumber
BridgeStorage
clonable
CLI/MS
Chain1
Chain2
Expand Down Expand Up @@ -177,6 +178,7 @@ plancks
polkadot/MS
pov-block/MS
precommit
proc-macro/MS
prometheus
proxying
provisioner/MS
Expand Down
2 changes: 1 addition & 1 deletion core-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub type ChainId = u32;
/// A hash of some data used by the relay chain.
pub type Hash = sp_core::H256;

/// Unit type wrapper around [`Hash`] that represents a candidate hash.
/// Unit type wrapper around [`type@Hash`] that represents a candidate hash.
///
/// This type is produced by [`CandidateReceipt::hash`].
///
Expand Down
40 changes: 16 additions & 24 deletions node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use futures::{channel::mpsc, future::FutureExt, join, select, sink::SinkExt, str
use parity_scale_codec::Encode;
use polkadot_node_primitives::{AvailableData, CollationGenerationConfig, PoV};
use polkadot_node_subsystem::{
messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
messages::{CollationGenerationMessage, CollatorProtocolMessage},
overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext,
SubsystemError, SubsystemResult, SubsystemSender,
SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
request_availability_cores, request_persisted_validation_data, request_validation_code,
Expand Down Expand Up @@ -54,6 +54,7 @@ pub struct CollationGenerationSubsystem {
metrics: Metrics,
}

#[overseer::contextbounds(CollationGeneration, prefix = self::overseer)]
impl CollationGenerationSubsystem {
/// Create a new instance of the `CollationGenerationSubsystem`.
pub fn new(metrics: Metrics) -> Self {
Expand All @@ -71,11 +72,7 @@ impl CollationGenerationSubsystem {
///
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded.
async fn run<Context>(mut self, mut ctx: Context)
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
Context: overseer::SubsystemContext<Message = CollationGenerationMessage>,
{
async fn run<Context>(mut self, mut ctx: Context) {
// when we activate new leaves, we spawn a bunch of sub-tasks, each of which is
// expected to generate precisely one message. We don't want to block the main loop
// at any point waiting for them all, so instead, we create a channel on which they can
Expand Down Expand Up @@ -108,12 +105,8 @@ impl CollationGenerationSubsystem {
&mut self,
incoming: SubsystemResult<FromOverseer<<Context as SubsystemContext>::Message>>,
ctx: &mut Context,
sender: &mpsc::Sender<AllMessages>,
) -> bool
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
Context: overseer::SubsystemContext<Message = CollationGenerationMessage>,
{
sender: &mpsc::Sender<overseer::CollationGenerationOutgoingMessages>,
) -> bool {
match incoming {
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated,
Expand Down Expand Up @@ -162,11 +155,8 @@ impl CollationGenerationSubsystem {
}
}

impl<Context> overseer::Subsystem<Context, SubsystemError> for CollationGenerationSubsystem
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
Context: overseer::SubsystemContext<Message = CollationGenerationMessage>,
{
#[overseer::subsystem(CollationGeneration, error=SubsystemError, prefix=self::overseer)]
impl<Context> CollationGenerationSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
self.run(ctx).await;
Expand All @@ -178,12 +168,13 @@ where
}
}

async fn handle_new_activations<Context: SubsystemContext>(
#[overseer::contextbounds(CollationGeneration, prefix = self::overseer)]
async fn handle_new_activations<Context>(
config: Arc<CollationGenerationConfig>,
activated: impl IntoIterator<Item = Hash>,
ctx: &mut Context,
metrics: Metrics,
sender: &mpsc::Sender<AllMessages>,
sender: &mpsc::Sender<overseer::CollationGenerationOutgoingMessages>,
) -> crate::error::Result<()> {
// follow the procedure from the guide:
// https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html
Expand Down Expand Up @@ -393,9 +384,10 @@ async fn handle_new_activations<Context: SubsystemContext>(
metrics.on_collation_generated();

if let Err(err) = task_sender
.send(AllMessages::CollatorProtocol(
CollatorProtocolMessage::DistributeCollation(ccr, pov, result_sender),
))
.send(
CollatorProtocolMessage::DistributeCollation(ccr, pov, result_sender)
.into(),
)
.await
{
gum::warn!(
Expand All @@ -417,7 +409,7 @@ async fn obtain_current_validation_code_hash(
relay_parent: Hash,
para_id: ParaId,
assumption: OccupiedCoreAssumption,
sender: &mut impl SubsystemSender,
sender: &mut impl overseer::CollationGenerationSenderTrait,
) -> Result<Option<ValidationCodeHash>, crate::error::Error> {
use polkadot_node_subsystem::RuntimeApiError;

Expand Down
18 changes: 10 additions & 8 deletions node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ mod handle_new_activations {
*subsystem_sent_messages.lock().await = rx.collect().await;
});

let sent_messages = Arc::try_unwrap(sent_messages)
let mut sent_messages = Arc::try_unwrap(sent_messages)
.expect("subsystem should have shut down by now")
.into_inner();

Expand Down Expand Up @@ -328,7 +328,7 @@ mod handle_new_activations {
};

assert_eq!(sent_messages.len(), 1);
match &sent_messages[0] {
match AllMessages::from(sent_messages.pop().unwrap()) {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
Expand Down Expand Up @@ -356,7 +356,7 @@ mod handle_new_activations {
expect_descriptor.erasure_root = descriptor.erasure_root.clone();
expect_descriptor
};
assert_eq!(descriptor, &expect_descriptor);
assert_eq!(descriptor, expect_descriptor);
},
_ => panic!("received wrong message type"),
}
Expand Down Expand Up @@ -470,11 +470,13 @@ mod handle_new_activations {

assert_eq!(sent_messages.len(), 1);
match &sent_messages[0] {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
)) => {
overseer::CollationGenerationOutgoingMessages::CollatorProtocolMessage(
CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
),
) => {
assert_eq!(expect_validation_code_hash, descriptor.validation_code_hash);
},
_ => panic!("received wrong message type"),
Expand Down
30 changes: 18 additions & 12 deletions node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use polkadot_node_subsystem::{
ApprovalDistributionMessage, ChainApiMessage, ChainSelectionMessage, RuntimeApiMessage,
RuntimeApiRequest,
},
overseer, RuntimeApiError, SubsystemContext, SubsystemError, SubsystemResult,
overseer, RuntimeApiError, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
determine_new_blocks,
Expand Down Expand Up @@ -107,8 +107,9 @@ enum ImportedBlockInfoError {
}

/// Computes information about the imported block. Returns an error if the info couldn't be extracted.
async fn imported_block_info(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn imported_block_info<Context>(
ctx: &mut Context,
env: ImportedBlockInfoEnv<'_>,
block_hash: Hash,
block_header: &Header,
Expand Down Expand Up @@ -319,10 +320,11 @@ pub struct BlockImportedCandidates {
/// * and return information about all candidates imported under each block.
///
/// It is the responsibility of the caller to schedule wakeups for each block.
pub(crate) async fn handle_new_head(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
pub(crate) async fn handle_new_head<Context, B: Backend>(
ctx: &mut Context,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
db: &mut OverlayedBackend<'_, B>,
head: Hash,
finalized_number: &Option<BlockNumber>,
) -> SubsystemResult<Vec<BlockImportedCandidates>> {
Expand Down Expand Up @@ -609,7 +611,7 @@ pub(crate) mod tests {
use assert_matches::assert_matches;
use merlin::Transcript;
use polkadot_node_primitives::approval::{VRFOutput, VRFProof};
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::messages::{AllMessages, ApprovalVotingMessage};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_subsystem_util::database::Database;
use polkadot_primitives::v2::{Id as ParaId, SessionInfo, ValidatorIndex};
Expand Down Expand Up @@ -724,7 +726,8 @@ pub(crate) mod tests {
#[test]
fn imported_block_info_is_good() {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let (mut ctx, mut handle) =
make_subsystem_context::<ApprovalVotingMessage, _>(pool.clone());

let session = 5;
let session_info = dummy_session_info(session);
Expand Down Expand Up @@ -847,7 +850,8 @@ pub(crate) mod tests {
#[test]
fn imported_block_info_fails_if_no_babe_vrf() {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let (mut ctx, mut handle) =
make_subsystem_context::<ApprovalVotingMessage, _>(pool.clone());

let session = 5;
let session_info = dummy_session_info(session);
Expand Down Expand Up @@ -950,7 +954,8 @@ pub(crate) mod tests {
#[test]
fn imported_block_info_fails_if_ancient_session() {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let (mut ctx, mut handle) =
make_subsystem_context::<ApprovalVotingMessage, _>(pool.clone());

let session = 5;

Expand Down Expand Up @@ -1027,7 +1032,7 @@ pub(crate) mod tests {
#[test]
fn imported_block_info_extracts_force_approve() {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let (mut ctx, mut handle) = make_subsystem_context(pool.clone());

let session = 5;
let session_info = dummy_session_info(session);
Expand Down Expand Up @@ -1158,7 +1163,8 @@ pub(crate) mod tests {
let mut overlay_db = OverlayedBackend::new(&db);

let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let (mut ctx, mut handle) =
make_subsystem_context::<ApprovalVotingMessage, _>(pool.clone());

let session = 5;
let irrelevant = 666;
Expand Down
Loading

0 comments on commit 2c934ed

Please sign in to comment.