Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

rewrite network code to use notifications_protocol APIs from Substrate #788

Merged
merged 25 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3139b3a
extract all network code to legacy submodule
rphmeier Jan 20, 2020
181e0db
update references to legacy proto
rphmeier Jan 20, 2020
e89ac2d
skeleton of futures-based protocol
rphmeier Jan 22, 2020
e8271bd
refactor skeleton to use background task
rphmeier Jan 22, 2020
58b07c5
rename communication_for to build_table_router
rphmeier Jan 22, 2020
f6b3015
implement internal message types for validation network
rphmeier Jan 22, 2020
9113135
basic ParachainNetwork and TableRouter implementations
rphmeier Jan 22, 2020
b106470
Merge branch 'master' into rh-notifications-protocol
rphmeier Jan 22, 2020
d17e8d7
add some module docs
rphmeier Jan 22, 2020
02e32d3
remove exit-future from validation
rphmeier Jan 23, 2020
d600197
hack: adapt legacy protocol to lack of exit-future
rphmeier Jan 23, 2020
152ee8c
generalize RegisteredMessageValidator somewhat
rphmeier Jan 23, 2020
fecfc6a
instantiate and teardown table routers
rphmeier Jan 23, 2020
92fd527
clean up RouterInner drop logic
rphmeier Jan 23, 2020
623b136
implement most of the statement import loop
rphmeier Feb 4, 2020
c9f2ca5
implement statement loop in async/await
rphmeier Feb 4, 2020
656653a
remove unneeded TODO
rphmeier Feb 4, 2020
1e93ee2
most of the collation skeleton
rphmeier Feb 4, 2020
790ff87
send session keys and validator roles
rphmeier Feb 5, 2020
3bab4d3
also send role after status
rphmeier Feb 5, 2020
d69ccf2
use config in startup
rphmeier Feb 5, 2020
8c0e245
point TODO to issue
rphmeier Feb 5, 2020
3b489f0
fix test compilation
rphmeier Feb 5, 2020
a2bff7e
Merge branch 'master' into rh-notifications-protocol
rphmeier Feb 5, 2020
afeee8f
Merge remote-tracking branch 'upstream/master' into rh-notifications-…
rphmeier Feb 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ use polkadot_cli::{
ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama,
service::{self, Roles, SelectChain}
};
use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
use polkadot_network::legacy::validation::{LeafWorkParams, ValidationNetwork};

pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration};
pub use polkadot_network::validation::Incoming;
pub use polkadot_network::legacy::validation::Incoming;
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
pub use sc_network::PeerId;
Expand Down Expand Up @@ -316,7 +316,7 @@ fn run_collator_node<S, P, Extrinsic>(

let is_known = move |block_hash: &Hash| {
use consensus_common::BlockStatus;
use polkadot_network::gossip::Known;
use polkadot_network::legacy::gossip::Known;

match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Expand All @@ -333,7 +333,7 @@ fn run_collator_node<S, P, Extrinsic>(
}
};

let message_validator = polkadot_network::gossip::register_validator(
let message_validator = polkadot_network::legacy::gossip::register_validator(
network.clone(),
(is_known, client.clone()),
&spawner,
Expand Down
3 changes: 3 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ edition = "2018"

[dependencies]
arrayvec = "0.4.12"
bytes = "0.5"
parking_lot = "0.9.0"
derive_more = "0.14.1"
av_store = { package = "polkadot-availability-store", path = "../availability-store" }
polkadot-validation = { path = "../validation" }
polkadot-primitives = { path = "../primitives" }
Expand All @@ -20,6 +22,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkad
futures = "0.3.4"
log = "0.4.8"
exit-future = "0.2.0"
futures-timer = "2.0"
sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct ParachainCollators {
}

/// Manages connected collators and role assignments from the perspective of a validator.
#[derive(Default)]
pub struct CollatorPool {
collators: HashMap<CollatorId, (ParaId, PeerId)>,
parachain_collators: HashMap<ParaId, ParachainCollators>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use polkadot_primitives::Hash;
use std::collections::{HashMap, HashSet};

use log::warn;
use crate::router::attestation_topic;
use crate::legacy::router::attestation_topic;

use super::{cost, benefit, MAX_CHAIN_HEADS, LeavesVec,
ChainContext, Known, MessageValidationData, GossipStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ impl View {
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::TestChainContext;
use crate::gossip::{Known, GossipParachainMessages};
use crate::legacy::tests::TestChainContext;
use crate::legacy::gossip::{Known, GossipParachainMessages};
use polkadot_primitives::parachain::Message as ParachainMessage;

fn hash(x: u8) -> Hash {
Expand Down
57 changes: 41 additions & 16 deletions network/src/gossip.rs → network/src/legacy/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
use sp_runtime::{generic::BlockId, traits::{BlakeTwo256, Hash as HashT}};
use sp_blockchain::Error as ClientError;
use sc_network::{config::Roles, Context, PeerId, ReputationChange};
use sc_network::{NetworkService as SubstrateNetworkService, specialization::NetworkSpecialization};
use sc_network_gossip::{
ValidationResult as GossipValidationResult,
ValidatorContext, MessageIntent,
Expand All @@ -73,8 +74,7 @@ use futures::prelude::*;
use parking_lot::RwLock;
use log::warn;

use super::PolkadotNetworkService;
use crate::{GossipMessageStream, NetworkService, PolkadotProtocol, router::attestation_topic};
use crate::legacy::{GossipMessageStream, NetworkService, GossipService, PolkadotProtocol, router::attestation_topic};

use attestation::{View as AttestationView, PeerData as AttestationPeerData};
use message_routing::{View as MessageRoutingView};
Expand Down Expand Up @@ -308,11 +308,11 @@ impl<F, P> ChainContext for (F, P) where
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<C: ChainContext + 'static>(
service: Arc<PolkadotNetworkService>,
pub fn register_validator<C: ChainContext + 'static, S: NetworkSpecialization<Block>>(
service: Arc<SubstrateNetworkService<Block, S, Hash>>,
chain: C,
executor: &impl futures::task::Spawn,
) -> RegisteredMessageValidator
) -> RegisteredMessageValidator<S>
{
let s = service.clone();
let report_handle = Box::new(move |peer: &PeerId, cost_benefit: ReputationChange| {
Expand Down Expand Up @@ -366,7 +366,7 @@ impl NewLeafActions {
/// Perform the queued actions, feeding into gossip.
pub fn perform(
self,
gossip: &dyn crate::NetworkService,
gossip: &dyn crate::legacy::GossipService,
) {
for action in self.actions {
match action {
Expand All @@ -382,16 +382,25 @@ impl NewLeafActions {
/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
pub struct RegisteredMessageValidator<S: NetworkSpecialization<Block>> {
inner: Arc<MessageValidator<dyn ChainContext>>,
// Note: this is always `Some` in real code and `None` in tests.
service: Option<Arc<PolkadotNetworkService>>,
service: Option<Arc<SubstrateNetworkService<Block, S, Hash>>>,
// Note: this is always `Some` in real code and `None` in tests.
gossip_engine: Option<sc_network_gossip::GossipEngine<Block>>,
}

impl RegisteredMessageValidator {
impl<S: NetworkSpecialization<Block>> Clone for RegisteredMessageValidator<S> {
fn clone(&self) -> Self {
RegisteredMessageValidator {
inner: self.inner.clone(),
service: self.service.clone(),
gossip_engine: self.gossip_engine.clone(),
}
}
}

impl RegisteredMessageValidator<crate::legacy::PolkadotProtocol> {
#[cfg(test)]
pub(crate) fn new_test<C: ChainContext + 'static>(
chain: C,
Expand All @@ -405,7 +414,9 @@ impl RegisteredMessageValidator {
gossip_engine: None,
}
}
}

impl<S: NetworkSpecialization<Block>> RegisteredMessageValidator<S> {
pub fn register_availability_store(&mut self, availability_store: av_store::Store) {
self.inner.inner.write().availability_store = Some(availability_store);
}
Expand Down Expand Up @@ -469,10 +480,8 @@ impl RegisteredMessageValidator {

NewLeafActions { actions }
}
}

impl NetworkService for RegisteredMessageValidator {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
pub(crate) fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.messages_for(topic)
} else {
Expand All @@ -483,7 +492,7 @@ impl NetworkService for RegisteredMessageValidator {
GossipMessageStream::new(topic_stream.boxed())
}

fn gossip_message(&self, topic: Hash, message: GossipMessage) {
pub(crate) fn gossip_message(&self, topic: Hash, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.gossip_message(
topic,
Expand All @@ -495,14 +504,30 @@ impl NetworkService for RegisteredMessageValidator {
}
}

fn send_message(&self, who: PeerId, message: GossipMessage) {
pub(crate) fn send_message(&self, who: PeerId, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.send_message(vec![who], message.encode());
} else {
log::error!("Called send_message on a test engine");
}
}
}

impl<S: NetworkSpecialization<Block>> GossipService for RegisteredMessageValidator<S> {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
RegisteredMessageValidator::gossip_messages_for(self, topic)
}

fn gossip_message(&self, topic: Hash, message: GossipMessage) {
RegisteredMessageValidator::gossip_message(self, topic, message)
}

fn send_message(&self, who: PeerId, message: GossipMessage) {
RegisteredMessageValidator::send_message(self, who, message)
}
}

impl NetworkService for RegisteredMessageValidator<crate::legacy::PolkadotProtocol> {
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
{
Expand Down Expand Up @@ -806,7 +831,7 @@ mod tests {
use polkadot_validation::GenericStatement;
use super::message_routing::queue_topic;

use crate::tests::TestChainContext;
use crate::legacy::tests::TestChainContext;

#[derive(PartialEq, Clone, Debug)]
enum ContextEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! a validator changes his session key, or when they are generated.

use polkadot_primitives::{Hash, parachain::{ValidatorId}};
use crate::collator_pool::Role;
use crate::legacy::collator_pool::Role;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use wasm_timer::Instant;
Expand All @@ -39,6 +39,12 @@ pub struct LocalCollations<C> {
local_collations: HashMap<Hash, LocalCollation<C>>,
}

impl<C: Clone> Default for LocalCollations<C> {
fn default() -> Self {
Self::new()
}
}

impl<C: Clone> LocalCollations<C> {
/// Create a new `LocalCollations` tracker.
pub fn new() -> Self {
Expand Down
Loading