Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

authorithy-discovery: Make changing of peer-id while active a bit more robust #3786

Open
wants to merge 47 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
0103c7f
authorithy-discovery: Make changing of peer-id while active a bit mor…
alexggh Mar 21, 2024
4172677
Make clippy happy
alexggh Mar 22, 2024
64f38d2
Fix warnings
alexggh Mar 22, 2024
ce87688
Refactor gossip support
alexggh Mar 25, 2024
bd69a55
Merge remote-tracking branch 'origin/master' into fix_change_node_id_…
alexggh Mar 25, 2024
a69ba99
Make clippy happy
alexggh Mar 25, 2024
311aade
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 1, 2024
1305763
Some other hacks
alexggh Apr 3, 2024
7e16e58
Add more changes
alexggh Apr 3, 2024
a82ebed
More refactoring
alexggh Apr 3, 2024
d569ab3
A bit more refactoring
alexggh Apr 3, 2024
1c9a40d
A few more improvements
alexggh Apr 3, 2024
91e647c
Fixup even more
alexggh Apr 3, 2024
0a53ec2
Another something
alexggh Apr 3, 2024
ef6ddb6
Fixup everything
alexggh Apr 3, 2024
b3eb615
Post refactoring
alexggh Apr 3, 2024
f6d4b29
Fixup Cargo's
alexggh Apr 3, 2024
b41cdf7
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 3, 2024
1ae1759
Make clippy happy
alexggh Apr 3, 2024
7b56bc8
Some minor tweaks
alexggh Apr 3, 2024
d610df6
Minor updates
alexggh Apr 3, 2024
4263dc4
Minor review feedback
alexggh Apr 3, 2024
4d7b164
Add authorithy-discovery-unittests
alexggh Apr 5, 2024
7cc3333
Add unittest for gossip-support changes
alexggh Apr 5, 2024
fabdb4b
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 17, 2024
42afffc
Simplify indentation levels
alexggh Apr 17, 2024
28ac0a2
Minor cleanups
alexggh Apr 17, 2024
95950a7
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh Apr 24, 2024
bd21fda
Update litep2p
alexggh Apr 24, 2024
57f1b38
Use put_valut_to from litep2p
alexggh Apr 24, 2024
9f56b02
Minor feedback
alexggh Apr 24, 2024
6d196c4
Minor changes
alexggh Apr 25, 2024
9979f8f
Minor updates
alexggh Apr 25, 2024
fd5dc46
Integrate with https://github.com/paritytech/litep2p/pull/96
alexggh Apr 30, 2024
20e351e
Fix warning on quorum failed
alexggh May 1, 2024
98398e3
Reconnect only if new peer ids pop-up
alexggh May 1, 2024
8066044
Revert kademlia removal
alexggh May 2, 2024
8eef5c3
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh May 2, 2024
b4fe357
Update cargo.lock
alexggh May 2, 2024
97f09a3
Merge branch 'master' into alexaggh/fix_change_node_id_at_restart
alexggh May 2, 2024
586a0f1
Merge remote-tracking branch 'origin/master' into alexaggh/fix_change…
alexggh May 9, 2024
09b6306
Update substrate/client/network/src/litep2p/mod.rs
alexggh May 12, 2024
fb534b5
Update polkadot/node/network/gossip-support/src/tests.rs
alexggh May 12, 2024
7399dc5
Update substrate/client/authority-discovery/src/worker/tests.rs
alexggh May 12, 2024
17df838
Update assert messages
alexggh May 12, 2024
3313fd7
Address review feedback
alexggh May 12, 2024
999a710
Use a single signature
alexggh May 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,6 @@ wasmi = { opt-level = 3 }
x25519-dalek = { opt-level = 3 }
yamux = { opt-level = 3 }
zeroize = { opt-level = 3 }

[patch."https://github.com/paritytech/litep2p"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be remove before merging once: paritytech/litep2p#96, gets merged.

litep2p = { git = "https://github.com/paritytech//litep2p", branch = "lenxv/expose-peer-records" }
15 changes: 15 additions & 0 deletions polkadot/node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ pub trait Network: Clone + Send + 'static {
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;

/// Ask the network to extend the reserved set with these nodes.
async fn add_peers_to_reserved_set(
&mut self,
protocol: ProtocolName,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;

/// Removes the peers for the protocol's peer set (both reserved and non-reserved).
async fn remove_from_peers_set(
&mut self,
Expand Down Expand Up @@ -240,6 +247,14 @@ impl Network for Arc<dyn NetworkService> {
<dyn NetworkService>::set_reserved_peers(&**self, protocol, multiaddresses)
}

async fn add_peers_to_reserved_set(
&mut self,
protocol: ProtocolName,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
<dyn NetworkService>::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
}

async fn remove_from_peers_set(
&mut self,
protocol: ProtocolName,
Expand Down
8 changes: 8 additions & 0 deletions polkadot/node/network/bridge/src/rx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ impl Network for TestNetwork {
Ok(())
}

async fn add_peers_to_reserved_set(
&mut self,
_protocol: ProtocolName,
_: HashSet<Multiaddr>,
) -> Result<(), String> {
Ok(())
}

async fn remove_from_peers_set(
&mut self,
_protocol: ProtocolName,
Expand Down
16 changes: 16 additions & 0 deletions polkadot/node/network/bridge/src/tx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,22 @@ where
.await;
return (network_service, authority_discovery_service)
},

NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => {
gum::trace!(
target: LOG_TARGET,
action = "AddToResolvedValidators",
peer_set = ?peer_set,
?validator_addrs,
"Received a resolved validator connection request",
);

let all_addrs = validator_addrs.into_iter().flatten().collect();
let network_service = validator_discovery
.on_add_to_resolved_request(all_addrs, peer_set, network_service)
.await;
return (network_service, authority_discovery_service)
},
}
(network_service, authority_discovery_service)
}
Expand Down
8 changes: 8 additions & 0 deletions polkadot/node/network/bridge/src/tx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ impl Network for TestNetwork {
Ok(())
}

async fn add_peers_to_reserved_set(
&mut self,
_protocol: ProtocolName,
_: HashSet<Multiaddr>,
) -> Result<(), String> {
Ok(())
}

async fn remove_from_peers_set(
&mut self,
_protocol: ProtocolName,
Expand Down
47 changes: 47 additions & 0 deletions polkadot/node/network/bridge/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,44 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
network_service
}

/// Connect to already resolved addresses.
pub async fn on_add_to_resolved_request(
&mut self,
newly_requested: HashSet<Multiaddr>,
peer_set: PeerSet,
mut network_service: N,
) -> N {
let state = &mut self.state[peer_set];
let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
let num_peers = new_peer_ids.len();

state.previously_requested.extend(new_peer_ids);

gum::debug!(
target: LOG_TARGET,
?peer_set,
?num_peers,
"New add to resolved validators request",
);

// ask the network to connect to these nodes and not disconnect
// from them until they are removed from the set.
//
// for peer-set management, the main protocol name should be used regardless of
// the negotiated version.
if let Err(e) = network_service
.add_peers_to_reserved_set(
self.peerset_protocol_names.get_main_name(peer_set),
newly_requested,
)
.await
{
gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}

network_service
}

/// On a new connection request, a peer set update will be issued.
/// It will ask the network to connect to the validators and not disconnect
/// from them at least until the next request is issued for the same peer set.
Expand Down Expand Up @@ -232,6 +270,15 @@ mod tests {
Ok(())
}

async fn add_peers_to_reserved_set(
&mut self,
_protocol: ProtocolName,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter()));
Ok(())
}

async fn remove_from_peers_set(
&mut self,
_protocol: ProtocolName,
Expand Down
127 changes: 112 additions & 15 deletions polkadot/node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
#[cfg(test)]
const BACKOFF_DURATION: Duration = Duration::from_millis(500);

// The authorithy_discovery queries runs every ten minutes,
// so it make sense to run a bit more often than that to
// detect changes as often as we can, but not too often since
// it won't help.
#[cfg(not(test))]
const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(5 * 60);

#[cfg(test)]
const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(2);

/// Duration after which we consider low connectivity a problem.
///
/// Especially at startup low connectivity is expected (authority discovery cache needs to be
Expand All @@ -91,6 +101,14 @@ pub struct GossipSupport<AD> {
// `None` otherwise.
last_failure: Option<Instant>,

// Validators can restart during a session, so if they change
// their PeerID, we will connect to them in the best case after
// a session, so we need to try more often to resolved peers and
// reconnect to them. The authorithy_discovery queries runs every ten
// minutes, so we can't detect changes in the address more often
// that that.
last_connection_request: Option<Instant>,

/// First time we did not reach our connectivity threshold.
///
/// This is the time of the first failed attempt to connect to >2/3 of all validators in a
Expand Down Expand Up @@ -131,6 +149,7 @@ where
keystore,
last_session_index: None,
last_failure: None,
last_connection_request: None,
failure_start: None,
resolved_authorities: HashMap::new(),
connected_authorities: HashMap::new(),
Expand Down Expand Up @@ -196,15 +215,22 @@ where
for leaf in leaves {
let current_index = util::request_session_index_for_child(leaf, sender).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let since_last_reconnect =
self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default();

let force_request = since_failure >= BACKOFF_DURATION;
let re_resolve_authorities = since_last_reconnect >= TRY_RERESOLVE_AUTHORITIES;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i => None,
_ => leaf_session,
};

let maybe_issue_connection =
if force_request { leaf_session } else { maybe_new_session };
let maybe_issue_connection = if force_request || re_resolve_authorities {
leaf_session
} else {
maybe_new_session
};

if let Some((session_index, relay_parent)) = maybe_issue_connection {
let session_info =
Expand Down Expand Up @@ -248,7 +274,7 @@ where
// connections to a much broader set of validators.
{
let mut connections = authorities_past_present_future(sender, leaf).await?;

self.last_connection_request = Some(Instant::now());
// Remove all of our locally controlled validator indices so we don't connect to
// ourself.
let connections =
Expand All @@ -259,7 +285,12 @@ where
// to clean up all connections.
Vec::new()
};
self.issue_connection_request(sender, connections).await;

if force_request || is_new_session {
self.issue_connection_request(sender, connections).await;
} else if re_resolve_authorities {
self.issue_connection_request_to_changed(sender, connections).await;
}
alexggh marked this conversation as resolved.
Show resolved Hide resolved
}

if is_new_session {
Expand Down Expand Up @@ -324,17 +355,14 @@ where
authority_check_result
}

async fn issue_connection_request<Sender>(
async fn resolve_authorities(
&mut self,
sender: &mut Sender,
authorities: Vec<AuthorityDiscoveryId>,
) where
Sender: overseer::GossipSupportSenderTrait,
{
let num = authorities.len();
) -> (Vec<HashSet<Multiaddr>>, HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>, usize) {
let mut validator_addrs = Vec::with_capacity(authorities.len());
let mut failures = 0;
let mut resolved = HashMap::with_capacity(authorities.len());
let mut failures = 0;

for authority in authorities {
if let Some(addrs) =
self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await
Expand All @@ -350,6 +378,67 @@ where
);
}
}
(validator_addrs, resolved, failures)
}

async fn issue_connection_request_to_changed<Sender>(
&mut self,
sender: &mut Sender,
authorities: Vec<AuthorityDiscoveryId>,
) where
Sender: overseer::GossipSupportSenderTrait,
{
let (_, resolved, _) = self.resolve_authorities(authorities).await;

let mut changed = Vec::new();

for (authority, new_addresses) in &resolved {
let new_peer_ids = new_addresses
.iter()
.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
.collect::<HashSet<_>>();
match self.resolved_authorities.get(authority) {
Some(old_addresses) => {
let old_peer_ids = old_addresses
.iter()
.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
.collect::<HashSet<_>>();
if !old_peer_ids.is_superset(&new_peer_ids) {
changed.push(new_addresses.clone());
}
},
None => changed.push(new_addresses.clone()),
}
}
gum::debug!(
target: LOG_TARGET,
num_changed = ?changed.len(),
?changed,
"Issuing a connection request to changed validators"
);
if !changed.is_empty() {
self.resolved_authorities = resolved;

sender
.send_message(NetworkBridgeTxMessage::AddToResolvedValidators {
validator_addrs: changed,
peer_set: PeerSet::Validation,
})
.await;
}
}

async fn issue_connection_request<Sender>(
&mut self,
sender: &mut Sender,
authorities: Vec<AuthorityDiscoveryId>,
) where
Sender: overseer::GossipSupportSenderTrait,
{
let num = authorities.len();

let (validator_addrs, resolved, failures) = self.resolve_authorities(authorities).await;

self.resolved_authorities = resolved;
gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request");

Expand Down Expand Up @@ -399,16 +488,24 @@ where
{
let mut authority_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>> = HashMap::new();
for authority in authorities {
let peer_id = self
let peer_ids = self
.authority_discovery
.get_addresses_by_authority_id(authority.clone())
.await
.into_iter()
.flat_map(|list| list.into_iter())
.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p));
.flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
.collect::<HashSet<_>>();

gum::trace!(
target: LOG_TARGET,
?peer_ids,
?authority,
"Resolved to peer ids"
);

if let Some(p) = peer_id {
authority_ids.entry(p).or_default().insert(authority);
for p in peer_ids {
authority_ids.entry(p).or_default().insert(authority.clone());
}
}

Expand Down