Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
refactor grid topology to expose more info to subsystems (#6140)
Browse files Browse the repository at this point in the history
* refactor grid topology to expose more info to subsystems

* fix grid_topology test

* fix overseer test

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Vsevolod Stakhov <vsevolod.stakhov@parity.io>

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/network/protocol/src/grid_topology.rs

Co-authored-by: Andronik <write@reusable.software>

* fix bug in populating topology

* fmt

Co-authored-by: Vsevolod Stakhov <vsevolod.stakhov@parity.io>
Co-authored-by: Andronik <write@reusable.software>
  • Loading branch information
3 people committed Oct 12, 2022
1 parent 15709ee commit 0398050
Show file tree
Hide file tree
Showing 17 changed files with 617 additions and 331 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 31 additions & 12 deletions node/network/approval-distribution/src/lib.rs
Expand Up @@ -343,9 +343,13 @@ impl State {
})
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
let session = topology.session;
self.handle_new_session_topology(ctx, session, SessionGridTopology::from(topology))
.await;
self.handle_new_session_topology(
ctx,
topology.session,
topology.topology,
topology.local_index,
)
.await;
},
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await;
Expand Down Expand Up @@ -500,8 +504,14 @@ impl State {
ctx: &mut Context,
session: SessionIndex,
topology: SessionGridTopology,
local_index: Option<ValidatorIndex>,
) {
self.topologies.insert_topology(session, topology);
if local_index.is_none() {
// this subsystem only matters to validators.
return
}

self.topologies.insert_topology(session, topology, local_index);
let topology = self.topologies.get_topology(session).expect("just inserted above; qed");

adjust_required_routing_and_propagate(
Expand All @@ -511,7 +521,9 @@ impl State {
|block_entry| block_entry.session == session,
|required_routing, local, validator_index| {
if *required_routing == RequiredRouting::PendingTopology {
*required_routing = topology.required_routing_by_index(*validator_index, local);
*required_routing = topology
.local_grid_neighbors()
.required_routing_by_index(*validator_index, local);
}
},
)
Expand Down Expand Up @@ -861,7 +873,7 @@ impl State {
let local = source == MessageSource::Local;

let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
t.required_routing_by_index(validator_index, local)
t.local_grid_neighbors().required_routing_by_index(validator_index, local)
});

let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) {
Expand Down Expand Up @@ -902,7 +914,10 @@ impl State {
return false
}

if let Some(true) = topology.as_ref().map(|t| t.route_to_peer(required_routing, peer)) {
if let Some(true) = topology
.as_ref()
.map(|t| t.local_grid_neighbors().route_to_peer(required_routing, peer))
{
return true
}

Expand Down Expand Up @@ -1169,7 +1184,8 @@ impl State {
// the assignment to all aware peers in the required routing _except_ the original
// source of the assignment. Hence the `in_topology_check`.
// 3. Any randomly selected peers have been sent the assignment already.
let in_topology = topology.map_or(false, |t| t.route_to_peer(required_routing, peer));
let in_topology = topology
.map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
in_topology || knowledge.sent.contains(message_subject, MessageKind::Assignment)
};

Expand Down Expand Up @@ -1301,9 +1317,9 @@ impl State {
let required_routing = message_state.required_routing;
let rng = &mut *rng;
let mut peer_filter = move |peer_id| {
let in_topology = topology
.as_ref()
.map_or(false, |t| t.route_to_peer(required_routing, peer_id));
let in_topology = topology.as_ref().map_or(false, |t| {
t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
});
in_topology || {
let route_random = random_routing.sample(total_peers, rng);
if route_random {
Expand Down Expand Up @@ -1564,7 +1580,10 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi
});

for (peer, peer_knowledge) in &mut block_entry.known_by {
if !topology.route_to_peer(message_state.required_routing, peer) {
if !topology
.local_grid_neighbors()
.route_to_peer(message_state.required_routing, peer)
{
continue
}

Expand Down
95 changes: 73 additions & 22 deletions node/network/approval-distribution/src/tests.rs
Expand Up @@ -17,7 +17,12 @@
use super::*;
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use polkadot_node_network_protocol::{our_view, peer_set::ValidationVersion, view, ObservedRole};
use polkadot_node_network_protocol::{
grid_topology::{SessionGridTopology, TopologyPeerInfo},
our_view,
peer_set::ValidationVersion,
view, ObservedRole,
};
use polkadot_node_primitives::approval::{
AssignmentCertKind, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT,
};
Expand Down Expand Up @@ -119,33 +124,79 @@ fn make_gossip_topology(
neighbors_x: &[usize],
neighbors_y: &[usize],
) -> network_bridge_event::NewGossipTopology {
let mut t = network_bridge_event::NewGossipTopology {
session,
our_neighbors_x: HashMap::new(),
our_neighbors_y: HashMap::new(),
// This builds a grid topology which is a square matrix.
// The local validator occupies the top left-hand corner.
// The X peers occupy the same row and the Y peers occupy
// the same column.

let local_index = 1;

assert_eq!(
neighbors_x.len(),
neighbors_y.len(),
"mocking grid topology only implemented for squares",
);

let d = neighbors_x.len() + 1;

let grid_size = d * d;
assert!(grid_size > 0);
assert!(all_peers.len() >= grid_size);

let peer_info = |i: usize| TopologyPeerInfo {
peer_ids: vec![all_peers[i].0.clone()],
validator_index: ValidatorIndex::from(i as u32),
discovery_id: all_peers[i].1.clone(),
};

for &i in neighbors_x {
t.our_neighbors_x.insert(
all_peers[i].1.clone(),
network_bridge_event::TopologyPeerInfo {
peer_ids: vec![all_peers[i].0.clone()],
validator_index: ValidatorIndex::from(i as u32),
},
);
let mut canonical_shuffling: Vec<_> = (0..)
.filter(|i| local_index != *i)
.filter(|i| !neighbors_x.contains(i))
.filter(|i| !neighbors_y.contains(i))
.take(grid_size)
.map(peer_info)
.collect();

// filled with junk except for own.
let mut shuffled_indices = vec![d + 1; grid_size];
shuffled_indices[local_index] = 0;
canonical_shuffling[0] = peer_info(local_index);

for (x_pos, v) in neighbors_x.iter().enumerate() {
let pos = 1 + x_pos;
canonical_shuffling[pos] = peer_info(*v);
}

for &i in neighbors_y {
t.our_neighbors_y.insert(
all_peers[i].1.clone(),
network_bridge_event::TopologyPeerInfo {
peer_ids: vec![all_peers[i].0.clone()],
validator_index: ValidatorIndex::from(i as u32),
},
);
for (y_pos, v) in neighbors_y.iter().enumerate() {
let pos = d * (1 + y_pos);
canonical_shuffling[pos] = peer_info(*v);
}

let topology = SessionGridTopology::new(shuffled_indices, canonical_shuffling);

// sanity check.
{
let g_n = topology
.compute_grid_neighbors_for(ValidatorIndex(local_index as _))
.expect("topology just constructed with this validator index");

assert_eq!(g_n.validator_indices_x.len(), neighbors_x.len());
assert_eq!(g_n.validator_indices_y.len(), neighbors_y.len());

for i in neighbors_x {
assert!(g_n.validator_indices_x.contains(&ValidatorIndex(*i as _)));
}

for i in neighbors_y {
assert!(g_n.validator_indices_y.contains(&ValidatorIndex(*i as _)));
}
}

t
network_bridge_event::NewGossipTopology {
session,
topology,
local_index: Some(ValidatorIndex(local_index as _)),
}
}

async fn setup_gossip_topology(
Expand Down
1 change: 1 addition & 0 deletions node/network/bitfield-distribution/Cargo.toml
Expand Up @@ -18,6 +18,7 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
maplit = "1.0.2"
Expand Down
33 changes: 22 additions & 11 deletions node/network/bitfield-distribution/src/lib.rs
Expand Up @@ -27,7 +27,7 @@ use futures::{channel::oneshot, FutureExt};
use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{
RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology,
GridNeighbors, RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage,
},
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
Expand Down Expand Up @@ -327,7 +327,7 @@ async fn handle_bitfield_distribution<Context>(
};

let msg = BitfieldGossipMessage { relay_parent, signed_availability };
let topology = state.topologies.get_topology_or_fallback(session_idx);
let topology = state.topologies.get_topology_or_fallback(session_idx).local_grid_neighbors();
let required_routing = topology.required_routing_by_index(validator_index, true);

relay_message(
Expand All @@ -352,7 +352,7 @@ async fn handle_bitfield_distribution<Context>(
async fn relay_message<Context>(
ctx: &mut Context,
job_data: &mut PerRelayParentData,
topology: &SessionGridTopology,
topology_neighbors: &GridNeighbors,
peer_views: &mut HashMap<PeerId, View>,
validator: ValidatorId,
message: BitfieldGossipMessage,
Expand Down Expand Up @@ -384,7 +384,7 @@ async fn relay_message<Context>(
let message_needed =
job_data.message_from_validator_needed_by_peer(&peer, &validator);
if message_needed {
let in_topology = topology.route_to_peer(required_routing, &peer);
let in_topology = topology_neighbors.route_to_peer(required_routing, &peer);
let need_routing = in_topology || {
let route_random = random_routing.sample(total_peers, rng);
if route_random {
Expand Down Expand Up @@ -533,7 +533,8 @@ async fn process_incoming_peer_message<Context>(

let topology = state
.topologies
.get_topology_or_fallback(job_data.signing_context.session_index);
.get_topology_or_fallback(job_data.signing_context.session_index)
.local_grid_neighbors();
let required_routing = topology.required_routing_by_index(validator_index, false);

metrics.on_bitfield_received();
Expand Down Expand Up @@ -579,14 +580,24 @@ async fn handle_network_msg<Context>(
},
NetworkBridgeEvent::NewGossipTopology(gossip_topology) => {
let session_index = gossip_topology.session;
let new_topology = SessionGridTopology::from(gossip_topology);
let newly_added = new_topology.peers_diff(&new_topology);
state.topologies.update_topology(session_index, new_topology);
let new_topology = gossip_topology.topology;
let prev_neighbors =
state.topologies.get_current_topology().local_grid_neighbors().clone();

state.topologies.update_topology(
session_index,
new_topology,
gossip_topology.local_index,
);
let current_topology = state.topologies.get_current_topology();

let newly_added = current_topology.local_grid_neighbors().peers_diff(&prev_neighbors);

gum::debug!(
target: LOG_TARGET,
?session_index,
"New gossip topology received {} unseen peers",
newly_added.len()
newly_added_peers = ?newly_added.len(),
"New gossip topology received",
);

for new_peer in newly_added {
Expand Down Expand Up @@ -651,7 +662,7 @@ async fn handle_peer_view_change<Context>(
.cloned()
.collect::<Vec<_>>();

let topology = state.topologies.get_current_topology();
let topology = state.topologies.get_current_topology().local_grid_neighbors();
let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin);
let lucky = is_gossip_peer ||
util::gen_ratio_rng(
Expand Down

0 comments on commit 0398050

Please sign in to comment.