Skip to content

Commit

Permalink
Check authority status on active leaves update
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim committed Jan 12, 2022
1 parent e9fc173 commit 55a30ac
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 12 deletions.
67 changes: 56 additions & 11 deletions node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use polkadot_node_subsystem::{
messages::{ChainApiMessage, ChainSelectionMessage},
overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::Validator;
use polkadot_primitives::v1::{BlockNumber, ConsensusLog, Hash, Header};
use sp_keystore::SyncCryptoStorePtr;

use futures::{channel::oneshot, future::Either, prelude::*};
use kvdb::KeyValueDB;
Expand Down Expand Up @@ -307,13 +309,23 @@ pub struct Config {
pub struct ChainSelectionSubsystem {
config: Config,
db: Arc<dyn KeyValueDB>,
keystore: SyncCryptoStorePtr,
}

/// A structure to pass arguments to the subsystem main loop.
struct ChainSelectionSubsystemArgs<'a, Context, B> {
ctx: &'a mut Context,
backend: &'a mut B,
stagnant_check_interval: &'a StagnantCheckInterval,
clock: &'a (dyn Clock + Sync),
keystore: SyncCryptoStorePtr,
}

impl ChainSelectionSubsystem {
/// Create a new instance of the subsystem with the given config
/// and key-value store.
pub fn new(config: Config, db: Arc<dyn KeyValueDB>) -> Self {
ChainSelectionSubsystem { config, db }
pub fn new(config: Config, db: Arc<dyn KeyValueDB>, keystore: SyncCryptoStorePtr) -> Self {
ChainSelectionSubsystem { config, db, keystore }
}
}

Expand All @@ -329,9 +341,15 @@ where
);

SpawnedSubsystem {
future: run(ctx, backend, self.config.stagnant_check_interval, Box::new(SystemClock))
.map(Ok)
.boxed(),
future: run(
ctx,
backend,
self.config.stagnant_check_interval,
Box::new(SystemClock),
self.keystore,
)
.map(Ok)
.boxed(),
name: "chain-selection-subsystem",
}
}
Expand All @@ -342,13 +360,21 @@ async fn run<Context, B>(
mut backend: B,
stagnant_check_interval: StagnantCheckInterval,
clock: Box<dyn Clock + Send + Sync>,
keystore: SyncCryptoStorePtr,
) where
Context: SubsystemContext<Message = ChainSelectionMessage>,
Context: overseer::SubsystemContext<Message = ChainSelectionMessage>,
B: Backend,
{
loop {
let res = run_until_error(&mut ctx, &mut backend, &stagnant_check_interval, &*clock).await;
let res = run_until_error(ChainSelectionSubsystemArgs {
ctx: &mut ctx,
backend: &mut backend,
stagnant_check_interval: &stagnant_check_interval,
clock: &*clock,
keystore,
})
.await;
match res {
Err(e) => {
e.trace();
Expand All @@ -368,18 +394,20 @@ async fn run<Context, B>(
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_until_error<Context, B>(
ctx: &mut Context,
backend: &mut B,
stagnant_check_interval: &StagnantCheckInterval,
clock: &(dyn Clock + Sync),
async fn run_until_error<'a, Context, B>(
args: ChainSelectionSubsystemArgs<'a, Context, B>,
) -> Result<(), Error>
where
Context: SubsystemContext<Message = ChainSelectionMessage>,
Context: overseer::SubsystemContext<Message = ChainSelectionMessage>,
B: Backend,
{
let ChainSelectionSubsystemArgs { ctx, backend, stagnant_check_interval, clock, keystore } =
args;

let mut stagnant_check_stream = stagnant_check_interval.timeout_stream();
let mut is_validator = false;
let mut sender = ctx.sender().clone();
loop {
futures::select! {
msg = ctx.recv().fuse() => {
Expand All @@ -398,6 +426,23 @@ where
).await?;

backend.write(write_ops)?;

if is_validator != Validator::new(leaf.hash, keystore.clone(), &mut sender).await.is_ok() {
is_validator = !is_validator;
tracing::info!(
"👮 Authority status changed to `{}` at block #{}({})",
is_validator,
leaf.number,
leaf.hash
);
} else {
tracing::info!(
"🚔 Authority status is `{}` at block #{}({})",
is_validator,
leaf.number,
leaf.hash
);
}
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(h, n)) => {
Expand Down
6 changes: 5 additions & 1 deletion node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ where
authority_discovery_service.clone(),
Metrics::register(registry)?,
))
.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
.chain_selection(ChainSelectionSubsystem::new(
chain_selection_config,
parachains_db,
keystore.clone(),
))
.leaves(Vec::from_iter(
leaves
.into_iter()
Expand Down

0 comments on commit 55a30ac

Please sign in to comment.