diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index b31359f48a56..1f438df2d148 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -204,6 +204,13 @@ pub trait Network: Clone + Send + 'static { multiaddresses: HashSet, ) -> Result<(), String>; + /// Ask the network to extend the reserved set with these nodes. + async fn add_peers_to_reserved_set( + &mut self, + protocol: ProtocolName, + multiaddresses: HashSet, + ) -> Result<(), String>; + /// Removes the peers for the protocol's peer set (both reserved and non-reserved). async fn remove_from_peers_set( &mut self, @@ -240,6 +247,14 @@ impl Network for Arc { ::set_reserved_peers(&**self, protocol, multiaddresses) } + async fn add_peers_to_reserved_set( + &mut self, + protocol: ProtocolName, + multiaddresses: HashSet, + ) -> Result<(), String> { + ::add_peers_to_reserved_set(&**self, protocol, multiaddresses) + } + async fn remove_from_peers_set( &mut self, protocol: ProtocolName, diff --git a/polkadot/node/network/bridge/src/rx/tests.rs b/polkadot/node/network/bridge/src/rx/tests.rs index 392ff7391a1c..601dca5cb8a3 100644 --- a/polkadot/node/network/bridge/src/rx/tests.rs +++ b/polkadot/node/network/bridge/src/rx/tests.rs @@ -124,6 +124,14 @@ impl Network for TestNetwork { Ok(()) } + async fn add_peers_to_reserved_set( + &mut self, + _protocol: ProtocolName, + _: HashSet, + ) -> Result<(), String> { + Ok(()) + } + async fn remove_from_peers_set( &mut self, _protocol: ProtocolName, diff --git a/polkadot/node/network/bridge/src/tx/mod.rs b/polkadot/node/network/bridge/src/tx/mod.rs index 7b6dea748572..6c353195d41a 100644 --- a/polkadot/node/network/bridge/src/tx/mod.rs +++ b/polkadot/node/network/bridge/src/tx/mod.rs @@ -370,6 +370,22 @@ where .await; return (network_service, authority_discovery_service) }, + + NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => { + gum::trace!( + target: LOG_TARGET, + action = "AddToResolvedValidators", + peer_set = ?peer_set, + ?validator_addrs, + "Received a resolved validator connection request", + ); + + let all_addrs = validator_addrs.into_iter().flatten().collect(); + let network_service = validator_discovery + .on_add_to_resolved_request(all_addrs, peer_set, network_service) + .await; + return (network_service, authority_discovery_service) + }, } (network_service, authority_discovery_service) } diff --git a/polkadot/node/network/bridge/src/tx/tests.rs b/polkadot/node/network/bridge/src/tx/tests.rs index 9265358196db..30b2c3421372 100644 --- a/polkadot/node/network/bridge/src/tx/tests.rs +++ b/polkadot/node/network/bridge/src/tx/tests.rs @@ -148,6 +148,14 @@ impl Network for TestNetwork { Ok(()) } + async fn add_peers_to_reserved_set( + &mut self, + _protocol: ProtocolName, + _: HashSet, + ) -> Result<(), String> { + Ok(()) + } + async fn remove_from_peers_set( &mut self, _protocol: ProtocolName, diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index f0ef038d5eb4..9accd56d86ae 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -92,6 +92,44 @@ impl Service { network_service } + /// Connect to already resolved addresses. + pub async fn on_add_to_resolved_request( + &mut self, + newly_requested: HashSet, + peer_set: PeerSet, + mut network_service: N, + ) -> N { + let state = &mut self.state[peer_set]; + let new_peer_ids: HashSet = extract_peer_ids(newly_requested.iter().cloned()); + let num_peers = new_peer_ids.len(); + + state.previously_requested.extend(new_peer_ids); + + gum::debug!( + target: LOG_TARGET, + ?peer_set, + ?num_peers, + "New add to resolved validators request", + ); + + // ask the network to connect to these nodes and not disconnect + // from them until they are removed from the set. + // + // for peer-set management, the main protocol name should be used regardless of + // the negotiated version. + if let Err(e) = network_service + .add_peers_to_reserved_set( + self.peerset_protocol_names.get_main_name(peer_set), + newly_requested, + ) + .await + { + gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); + } + + network_service + } + /// On a new connection request, a peer set update will be issued. /// It will ask the network to connect to the validators and not disconnect /// from them at least until the next request is issued for the same peer set. @@ -222,6 +260,15 @@ mod tests { Ok(()) } + async fn add_peers_to_reserved_set( + &mut self, + _protocol: ProtocolName, + multiaddresses: HashSet, + ) -> Result<(), String> { + self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter())); + Ok(()) + } + async fn remove_from_peers_set( &mut self, _protocol: ProtocolName, diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 4dfdd1f7208f..cd327c11e408 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -69,6 +69,16 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5); #[cfg(test)] const BACKOFF_DURATION: Duration = Duration::from_millis(500); +// The authorithy_discovery queries runs every ten minutes, +// so it make sense to run a bit more often than that to +// detect changes as often as we can, but not too often since +// it won't help. +#[cfg(not(test))] +const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(5 * 60); + +#[cfg(test)] +const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(2); + /// Duration after which we consider low connectivity a problem. /// /// Especially at startup low connectivity is expected (authority discovery cache needs to be @@ -91,6 +101,14 @@ pub struct GossipSupport { // `None` otherwise. last_failure: Option, + // Validators can restart during a session, so if they change + // their PeerID, we will connect to them in the best case after + // a session, so we need to try more often to resolved peers and + // reconnect to them. The authorithy_discovery queries runs every ten + // minutes, so we can't detect changes in the address more often + // that that. + last_connection_request: Option, + /// First time we did not reach our connectivity threshold. /// /// This is the time of the first failed attempt to connect to >2/3 of all validators in a @@ -131,6 +149,7 @@ where keystore, last_session_index: None, last_failure: None, + last_connection_request: None, failure_start: None, resolved_authorities: HashMap::new(), connected_authorities: HashMap::new(), @@ -196,15 +215,22 @@ where for leaf in leaves { let current_index = util::request_session_index_for_child(leaf, sender).await.await??; let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default(); + let since_last_reconnect = + self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default(); + let force_request = since_failure >= BACKOFF_DURATION; + let re_resolve_authorities = since_last_reconnect >= TRY_RERESOLVE_AUTHORITIES; let leaf_session = Some((current_index, leaf)); let maybe_new_session = match self.last_session_index { Some(i) if current_index <= i => None, _ => leaf_session, }; - let maybe_issue_connection = - if force_request { leaf_session } else { maybe_new_session }; + let maybe_issue_connection = if force_request || re_resolve_authorities { + leaf_session + } else { + maybe_new_session + }; if let Some((session_index, relay_parent)) = maybe_issue_connection { let session_info = @@ -248,7 +274,7 @@ where // connections to a much broader set of validators. { let mut connections = authorities_past_present_future(sender, leaf).await?; - + self.last_connection_request = Some(Instant::now()); // Remove all of our locally controlled validator indices so we don't connect to // ourself. let connections = @@ -259,7 +285,12 @@ where // to clean up all connections. Vec::new() }; - self.issue_connection_request(sender, connections).await; + + if force_request || is_new_session { + self.issue_connection_request(sender, connections).await; + } else if re_resolve_authorities { + self.issue_connection_request_to_changed(sender, connections).await; + } } if is_new_session { @@ -324,17 +355,14 @@ where authority_check_result } - async fn issue_connection_request( + async fn resolve_authorities( &mut self, - sender: &mut Sender, authorities: Vec, - ) where - Sender: overseer::GossipSupportSenderTrait, - { - let num = authorities.len(); + ) -> (Vec>, HashMap>, usize) { let mut validator_addrs = Vec::with_capacity(authorities.len()); - let mut failures = 0; let mut resolved = HashMap::with_capacity(authorities.len()); + let mut failures = 0; + for authority in authorities { if let Some(addrs) = self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await @@ -350,6 +378,67 @@ where ); } } + (validator_addrs, resolved, failures) + } + + async fn issue_connection_request_to_changed( + &mut self, + sender: &mut Sender, + authorities: Vec, + ) where + Sender: overseer::GossipSupportSenderTrait, + { + let (_, resolved, _) = self.resolve_authorities(authorities).await; + + let mut changed = Vec::new(); + + for (authority, new_addresses) in &resolved { + let new_peer_ids = new_addresses + .iter() + .flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p)) + .collect::>(); + match self.resolved_authorities.get(authority) { + Some(old_addresses) => { + let old_peer_ids = old_addresses + .iter() + .flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p)) + .collect::>(); + if !old_peer_ids.is_superset(&new_peer_ids) { + changed.push(new_addresses.clone()); + } + }, + None => changed.push(new_addresses.clone()), + } + } + gum::debug!( + target: LOG_TARGET, + num_changed = ?changed.len(), + ?changed, + "Issuing a connection request to changed validators" + ); + if !changed.is_empty() { + self.resolved_authorities = resolved; + + sender + .send_message(NetworkBridgeTxMessage::AddToResolvedValidators { + validator_addrs: changed, + peer_set: PeerSet::Validation, + }) + .await; + } + } + + async fn issue_connection_request( + &mut self, + sender: &mut Sender, + authorities: Vec, + ) where + Sender: overseer::GossipSupportSenderTrait, + { + let num = authorities.len(); + + let (validator_addrs, resolved, failures) = self.resolve_authorities(authorities).await; + self.resolved_authorities = resolved; gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request"); @@ -399,16 +488,24 @@ where { let mut authority_ids: HashMap> = HashMap::new(); for authority in authorities { - let peer_id = self + let peer_ids = self .authority_discovery .get_addresses_by_authority_id(authority.clone()) .await .into_iter() .flat_map(|list| list.into_iter()) - .find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p)); + .flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p)) + .collect::>(); + + gum::trace!( + target: LOG_TARGET, + ?peer_ids, + ?authority, + "Resolved to peer ids" + ); - if let Some(p) = peer_id { - authority_ids.entry(p).or_default().insert(authority); + for p in peer_ids { + authority_ids.entry(p).or_default().insert(authority.clone()); } } diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index 42197d00e6f3..09622254f523 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -119,6 +119,14 @@ impl MockAuthorityDiscovery { } } + fn change_address_for_authority(&self, authority_id: AuthorityDiscoveryId) -> PeerId { + let new_peer_id = PeerId::random(); + let addr = Multiaddr::empty().with(Protocol::P2p(new_peer_id.into())); + self.addrs.lock().insert(authority_id.clone(), HashSet::from([addr])); + self.authorities.lock().insert(new_peer_id, HashSet::from([authority_id])); + new_peer_id + } + fn authorities(&self) -> HashMap> { self.authorities.lock().clone() } @@ -809,6 +817,313 @@ fn issues_update_authorities_after_session() { ); } +// Test we connect to authorities that changed their address `TRY_RERESOLVE_AUTHORITIES` rate +// and that is is no-op if no authority changed. +#[test] +fn test_quickly_connect_to_authorities_that_changed_address() { + let hash = Hash::repeat_byte(0xAA); + + let authorities = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + let authority_that_changes_address = authorities.get(5).unwrap().clone(); + + let mut authority_discovery_mock = MockAuthorityDiscovery::new(authorities); + + test_harness( + make_subsystem_with_authority_discovery(authority_discovery_mock.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // 1. Initialize with the first leaf in the session. + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); + + let addrs = get_multiaddrs(all_without_ferdie, authority_discovery_mock.clone()).await; + + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + // Ensure neighbors are unaffected + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::CurrentBabeEpoch(tx), + )) => { + let _ = tx.send(Ok(BabeEpoch { + epoch_index: 2 as _, + start_slot: 0.into(), + duration: 200, + authorities: vec![(Sr25519Keyring::Alice.public().into(), 1)], + randomness: [0u8; 32], + config: BabeEpochConfiguration { + c: (1, 4), + allowed_slots: AllowedSlots::PrimarySlots, + }, + })).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::NewGossipTopology { + session: _, + local_index: _, + canonical_shuffling: _, + shuffled_indices: _, + }) => { + + } + ); + + // 2. Connect all authorities that are known so far. + let known_authorities = authority_discovery_mock.authorities(); + for (peer_id, _id) in known_authorities.iter() { + let msg = + GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + *peer_id, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer.send(FromOrchestra::Communication { msg }).await + } + + // 3. Send a new leaf after TRY_RERESOLVE_AUTHORITIES, we should notice + // UpdateAuthorithies is emitted for all ConnectedPeers. + Delay::new(TRY_RERESOLVE_AUTHORITIES).await; + let hash = Hash::repeat_byte(0xBB); + overseer_signal_active_leaves(overseer, hash).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + for _ in 0..known_authorities.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap_or_default(), authority_ids); + } + ); + } + + // 4. At next re-resolve no-authorithy changes their address, so it should be no-op. + Delay::new(TRY_RERESOLVE_AUTHORITIES).await; + let hash = Hash::repeat_byte(0xCC); + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + + // Change address for one authorithy and check we try to connect to it and + // that we emit UpdateAuthorityID for the old PeerId and the new one. + Delay::new(TRY_RERESOLVE_AUTHORITIES).await; + let changed_peerid = authority_discovery_mock + .change_address_for_authority(authority_that_changes_address.clone()); + let hash = Hash::repeat_byte(0xDD); + let msg = GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + changed_peerid, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer.send(FromOrchestra::Communication { msg }).await; + + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::AddToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let expected = get_address_map(vec![authority_that_changes_address.clone()], authority_discovery_mock.clone()).await; + let expected: HashSet = expected.into_values().flat_map(|v| v.into_iter()).collect(); + assert_eq!(validator_addrs.into_iter().flat_map(|v| v.into_iter()).collect::>(), expected); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap(), HashSet::from([authority_that_changes_address.clone()])); + assert!(authority_ids.is_empty()); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_ids, HashSet::from([authority_that_changes_address])); + assert_eq!(changed_peerid, peer_id); + } + ); + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + + virtual_overseer + }, + ); +} + #[test] fn disconnect_when_not_in_past_present_future() { sp_tracing::try_init_simple(); diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index ee937bca05bf..4d27ac9b70e3 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -444,6 +444,16 @@ pub enum NetworkBridgeTxMessage { /// The peer set we want the connection on. peer_set: PeerSet, }, + + /// Extends the known validators set with new peers we already know the `Multiaddrs`, this is + /// usually needed for validators that change their address mid-session. It is usually called + /// after a ConnectToResolvedValidators at the beginning of the session. + AddToResolvedValidators { + /// Each entry corresponds to the addresses of an already resolved validator. + validator_addrs: Vec>, + /// The peer set we want the connection on. + peer_set: PeerSet, + }, } /// Availability Distribution Message. diff --git a/prdoc/pr_3786.prdoc b/prdoc/pr_3786.prdoc new file mode 100644 index 000000000000..0bb9e6c23f75 --- /dev/null +++ b/prdoc/pr_3786.prdoc @@ -0,0 +1,22 @@ +title: Make changing of peer-id while active a bit more robust + +doc: + - audience: Node Dev + description: | + Implemetation of https://github.com/polkadot-fellows/RFCs/pull/91, to use `creation_time` field to determine + the newest DHT record and to update nodes known to have the old record. + + Gossip-support is modified to try to re-resolve new address authorithies every 5 minutes instead of each session, + so that we pick autorithies that changed their address faster and try to connect to them. + +crates: +- name: sc-authority-discovery + bump: major +- name: polkadot-gossip-support + bump: major +- name: polkadot-network-bridge + bump: major +- name: polkadot-node-subsystem-types + bump: major +- name: sc-network + bump: minor \ No newline at end of file diff --git a/substrate/client/authority-discovery/build.rs b/substrate/client/authority-discovery/build.rs index 83076ac8c893..cdabc1a74427 100644 --- a/substrate/client/authority-discovery/build.rs +++ b/substrate/client/authority-discovery/build.rs @@ -18,7 +18,11 @@ fn main() { prost_build::compile_protos( - &["src/worker/schema/dht-v1.proto", "src/worker/schema/dht-v2.proto"], + &[ + "src/worker/schema/dht-v1.proto", + "src/worker/schema/dht-v2.proto", + "src/worker/schema/dht-v3.proto", + ], &["src/worker/schema"], ) .unwrap(); diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index 1f1cce160786..42994bbc7ea8 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -26,7 +26,7 @@ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, sync::Arc, - time::Duration, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt}; @@ -34,16 +34,17 @@ use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt} use addr_cache::AddrCache; use codec::{Decode, Encode}; use ip_network::IpNetwork; +use libp2p::kad::{PeerRecord, Record}; use linked_hash_set::LinkedHashSet; -use log::{debug, error, log_enabled}; +use log::{debug, error}; use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64}; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; use sc_network::{ - event::DhtEvent, multiaddr, KademliaKey, Multiaddr, NetworkDHTProvider, NetworkSigner, - NetworkStateInfo, + config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey, + Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo, }; use sc_network_types::{multihash::Code, PeerId}; use schema::PeerSignature; @@ -62,7 +63,7 @@ mod schema { #[cfg(test)] mod tests; - include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs")); + include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs")); } #[cfg(test)] pub mod tests; @@ -159,6 +160,16 @@ pub struct Worker { /// Set of in-flight lookups. in_flight_lookups: HashMap, + /// Set of lookups we can still receive records. + /// These are the entries in the `in_flight_lookups` for which + /// we got at least one successfull result. + known_lookups: HashMap, + + /// Last known record by key, here we always keep the record with + /// the highest creation time and we don't accept records older than + /// that. + last_known_records: HashMap, + addr_cache: addr_cache::AddrCache, metrics: Option, @@ -168,6 +179,17 @@ pub struct Worker { phantom: PhantomData, } +#[derive(Debug, Clone)] +struct RecordInfo { + /// Time since UNIX_EPOCH in nanoseconds. + creation_time: u128, + /// Peers that we know have this record, bounded to no more than + /// DEFAULT_KADEMLIA_REPLICATION_FACTOR(20). + peers_with_record: HashSet, + /// The record itself. + record: Record, +} + /// Wrapper for [`AuthorityDiscoveryApi`](sp_authority_discovery::AuthorityDiscoveryApi). Can be /// be implemented by any struct without dependency on the runtime. #[async_trait::async_trait] @@ -283,10 +305,12 @@ where query_interval, pending_lookups: Vec::new(), in_flight_lookups: HashMap::new(), + known_lookups: HashMap::new(), addr_cache, role, metrics, phantom: PhantomData, + last_known_records: HashMap::new(), } } @@ -444,7 +468,7 @@ where .set(addresses.len().try_into().unwrap_or(std::u64::MAX)); } - let serialized_record = serialize_authority_record(addresses)?; + let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?; let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?; let keys_vec = keys.iter().cloned().collect::>(); @@ -495,12 +519,17 @@ where self.authorities_queried_at = Some(best_hash); self.addr_cache.retain_ids(&authorities); + let now = Instant::now(); + self.last_known_records.retain(|k, value| { + self.known_authorities.contains_key(k) && !value.record.is_expired(now) + }); authorities.shuffle(&mut thread_rng()); self.pending_lookups = authorities; // Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as // query interval ticks are far enough apart for all lookups to succeed. self.in_flight_lookups.clear(); + self.known_lookups.clear(); if let Some(metrics) = &self.metrics { metrics @@ -538,16 +567,12 @@ where metrics.dht_event_received.with_label_values(&["value_found"]).inc(); } - if log_enabled!(log::Level::Debug) { - let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect(); - debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", hashes); - } + debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key); if let Err(e) = self.handle_dht_value_found_event(v) { if let Some(metrics) = &self.metrics { metrics.handle_value_found_event_failure.inc(); } - debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e); } }, @@ -651,6 +676,31 @@ where publisher, authority_id, )?; + + let records_creation_time: u128 = + schema::AuthorityRecord::decode(signed_record.record.as_slice()) + .map_err(Error::DecodingProto)? + .creation_time + .map(|creation_time| { + u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default() + }) + .unwrap_or_default(); // 0 is a sane default for records that do not have creation time present. + + let current_record_info = self.last_known_records.get(&record_key); + // If record creation time is older than the current record creation time, + // we don't store it since we want to give higher priority to newer records. + if let Some(current_record_info) = current_record_info { + if records_creation_time < current_record_info.creation_time { + debug!( + target: LOG_TARGET, + "Skip storing because record creation time {:?} is older than the current known record {:?}", + records_creation_time, + current_record_info.creation_time + ); + return Ok(()); + } + } + self.network.store_record(record_key, record_value, Some(publisher), expires); Ok(()) } @@ -701,67 +751,88 @@ where Ok(()) } - fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec)>) -> Result<()> { + fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> { // Ensure `values` is not empty and all its keys equal. - let remote_key = single(values.iter().map(|(key, _)| key.clone())) - .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)? - .ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?; - - let authority_id: AuthorityId = self - .in_flight_lookups - .remove(&remote_key) - .ok_or(Error::ReceivingUnexpectedRecord)?; + let remote_key = peer_record.record.key.clone(); + + let authority_id: AuthorityId = + if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) { + self.known_lookups.insert(remote_key.clone(), authority_id.clone()); + authority_id + } else if let Some(authority_id) = self.known_lookups.get(&remote_key) { + authority_id.clone() + } else { + return Err(Error::ReceivingUnexpectedRecord); + }; let local_peer_id = self.network.local_peer_id(); - let remote_addresses: Vec = values - .into_iter() - .map(|(_k, v)| { - let schema::SignedAuthorityRecord { record, peer_signature, .. } = - Self::check_record_signed_with_authority_id(&v, &authority_id)?; - - let addresses: Vec = schema::AuthorityRecord::decode(record.as_slice()) - .map(|a| a.addresses) - .map_err(Error::DecodingProto)? - .into_iter() - .map(|a| a.try_into()) - .collect::>() - .map_err(Error::ParsingMultiaddress)?; - - let get_peer_id = |a: &Multiaddr| match a.iter().last() { - Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(), - _ => None, - }; - - // Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses. - let addresses: Vec = addresses - .into_iter() - .filter(|a| get_peer_id(a).filter(|p| *p != local_peer_id).is_some()) - .collect(); - - let remote_peer_id = single(addresses.iter().map(get_peer_id)) - .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records - .flatten() - .ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them - - // At this point we know all the valid multiaddresses from the record, know that - // each of them belong to the same PeerId, we just need to check if the record is - // properly signed by the owner of the PeerId - self.check_record_signed_with_network_key( - &record, - peer_signature, - remote_peer_id, - &authority_id, - )?; - Ok(addresses) + let schema::SignedAuthorityRecord { record, peer_signature, .. } = + Self::check_record_signed_with_authority_id( + peer_record.record.value.as_slice(), + &authority_id, + )?; + + let authority_record = + schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?; + + let records_creation_time: u128 = authority_record + .creation_time + .as_ref() + .map(|creation_time| { + u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default() }) - .collect::>>>()? + .unwrap_or_default(); // 0 is a sane default for records that do not have creation time present. + + let addresses: Vec = authority_record + .addresses .into_iter() - .flatten() - .take(MAX_ADDRESSES_PER_AUTHORITY) + .map(|a| a.try_into()) + .collect::>() + .map_err(Error::ParsingMultiaddress)?; + + let get_peer_id = |a: &Multiaddr| match a.iter().last() { + Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(), + _ => None, + }; + + // Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses. + let addresses: Vec = addresses + .into_iter() + .filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some()) .collect(); - if !remote_addresses.is_empty() { + let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a))) + .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records + .flatten() + .ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them + + // At this point we know all the valid multiaddresses from the record, know that + // each of them belong to the same PeerId, we just need to check if the record is + // properly signed by the owner of the PeerId + self.check_record_signed_with_network_key( + &record, + peer_signature, + remote_peer_id, + &authority_id, + )?; + + let remote_addresses: Vec = + addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect(); + + let answering_peer_id = peer_record.peer.map(|peer| peer.into()); + + let addr_cache_needs_update = self.handle_new_record( + &authority_id, + remote_key.clone(), + RecordInfo { + creation_time: records_creation_time, + peers_with_record: answering_peer_id.into_iter().collect(), + record: peer_record.record, + }, + ); + + if !remote_addresses.is_empty() && addr_cache_needs_update { self.addr_cache.insert(authority_id, remote_addresses); if let Some(metrics) = &self.metrics { metrics @@ -772,6 +843,68 @@ where Ok(()) } + // Handles receiving a new DHT record for the authorithy. + // Returns true if the record was new, false if the record was older than the current one. + fn handle_new_record( + &mut self, + authority_id: &AuthorityId, + kademlia_key: KademliaKey, + new_record: RecordInfo, + ) -> bool { + let current_record_info = self + .last_known_records + .entry(kademlia_key.clone()) + .or_insert_with(|| new_record.clone()); + + if new_record.creation_time > current_record_info.creation_time { + let peers_that_need_updating = current_record_info.peers_with_record.clone(); + self.network.put_record_to( + new_record.record.clone(), + peers_that_need_updating.clone(), + // If this is empty it means we received the answer from our node local + // storage, so we need to update that as well. + current_record_info.peers_with_record.is_empty(), + ); + debug!( + target: LOG_TARGET, + "Found a newer record for {:?} new record creation time {:?} old record creation time {:?}", + authority_id, new_record.creation_time, current_record_info.creation_time + ); + self.last_known_records.insert(kademlia_key, new_record); + return true + } + + if new_record.creation_time == current_record_info.creation_time { + // Same record just update in case this is a record from old nodes that don't have + // timestamp. + debug!( + target: LOG_TARGET, + "Found same record for {:?} record creation time {:?}", + authority_id, new_record.creation_time + ); + if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <= + DEFAULT_KADEMLIA_REPLICATION_FACTOR + { + current_record_info.peers_with_record.extend(new_record.peers_with_record); + } + return true + } + + debug!( + target: LOG_TARGET, + "Found old record for {:?} received record creation time {:?} current record creation time {:?}", + authority_id, new_record.creation_time, current_record_info.creation_time, + ); + self.network.put_record_to( + current_record_info.record.clone(), + new_record.peers_with_record.clone(), + // If this is empty it means we received the answer from our node local + // storage, so we need to update that as well. + new_record.peers_with_record.is_empty(), + ); + return false + } + /// Retrieve our public keys within the current and next authority set. // A node might have multiple authority discovery keys within its keystore, e.g. an old one and // one for the upcoming session. In addition it could be participating in the current and (/ or) @@ -838,9 +971,21 @@ fn serialize_addresses(addresses: impl Iterator) -> Vec>) -> Result> { +fn build_creation_time() -> schema::TimestampInfo { + let creation_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|time| time.as_nanos()) + .unwrap_or_default(); + schema::TimestampInfo { timestamp: creation_time.encode() } +} + +fn serialize_authority_record( + addresses: Vec>, + creation_time: Option, +) -> Result> { let mut serialized_record = vec![]; - schema::AuthorityRecord { addresses } + + schema::AuthorityRecord { addresses, creation_time } .encode(&mut serialized_record) .map_err(Error::EncodingProto)?; Ok(serialized_record) @@ -876,7 +1021,6 @@ fn sign_record_with_authority_ids( // Scale encode let auth_signature = auth_signature.encode(); - let signed_record = schema::SignedAuthorityRecord { record: serialized_record.clone(), auth_signature, diff --git a/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto b/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto new file mode 100644 index 000000000000..547237573af2 --- /dev/null +++ b/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package authority_discovery_v3; + +// First we need to serialize the addresses in order to be able to sign them. +message AuthorityRecord { + // Possibly multiple `MultiAddress`es through which the node can be reached. + repeated bytes addresses = 1; + // Information about the creation time of the record + TimestampInfo creation_time = 2; +} + +message PeerSignature { + bytes signature = 1; + bytes public_key = 2; +} + +// Information regarding the creation data of the record +message TimestampInfo { + // Time since UNIX_EPOCH in nanoseconds, scale encoded + bytes timestamp = 1; +} + +// Then we need to serialize the authority record and signature to send them over the wire. +message SignedAuthorityRecord { + bytes record = 1; + bytes auth_signature = 2; + // Even if there are multiple `record.addresses`, all of them have the same peer id. + // Old versions are missing this field. It is optional in order to provide compatibility both ways. + PeerSignature peer_signature = 3; +} diff --git a/substrate/client/authority-discovery/src/worker/schema/tests.rs b/substrate/client/authority-discovery/src/worker/schema/tests.rs index ef06ed7d336b..557fa9641f97 100644 --- a/substrate/client/authority-discovery/src/worker/schema/tests.rs +++ b/substrate/client/authority-discovery/src/worker/schema/tests.rs @@ -20,7 +20,12 @@ mod schema_v1 { include!(concat!(env!("OUT_DIR"), "/authority_discovery_v1.rs")); } +mod schema_v2 { + include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs")); +} + use super::*; +use codec::Encode; use libp2p::identity::Keypair; use prost::Message; use sc_network::{Multiaddr, PeerId}; @@ -65,7 +70,7 @@ fn v1_decodes_v2() { let vec_auth_signature = b"Totally valid signature, I promise!".to_vec(); let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec(); - let record_v2 = AuthorityRecord { addresses: vec_addresses.clone() }; + let record_v2 = schema_v2::AuthorityRecord { addresses: vec_addresses.clone() }; let mut vec_record_v2 = vec![]; record_v2.encode(&mut vec_record_v2).unwrap(); let vec_peer_public = peer_public.encode_protobuf(); @@ -85,6 +90,82 @@ fn v1_decodes_v2() { assert_eq!(&signed_addresses_v1_decoded.addresses, &vec_record_v2); assert_eq!(&signed_addresses_v1_decoded.signature, &vec_auth_signature); - let addresses_v2_decoded = AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap(); + let addresses_v2_decoded = + schema_v2::AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap(); + assert_eq!(&addresses_v2_decoded.addresses, &vec_addresses); +} + +#[test] +fn v1_decodes_v3() { + let peer_secret = Keypair::generate_ed25519(); + let peer_public = peer_secret.public(); + let peer_id = peer_public.to_peer_id(); + let multiaddress: Multiaddr = + format!("/ip4/127.0.0.1/tcp/3003/p2p/{}", peer_id).parse().unwrap(); + let vec_addresses = vec![multiaddress.to_vec()]; + let vec_auth_signature = b"Totally valid signature, I promise!".to_vec(); + let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec(); + + let record_v3 = AuthorityRecord { + addresses: vec_addresses.clone(), + creation_time: Some(TimestampInfo { timestamp: Encode::encode(&55) }), + }; + let mut vec_record_v3 = vec![]; + record_v3.encode(&mut vec_record_v3).unwrap(); + let vec_peer_public = peer_public.encode_protobuf(); + let peer_signature_v3 = + PeerSignature { public_key: vec_peer_public, signature: vec_peer_signature }; + let signed_record_v3 = SignedAuthorityRecord { + record: vec_record_v3.clone(), + auth_signature: vec_auth_signature.clone(), + peer_signature: Some(peer_signature_v3.clone()), + }; + let mut vec_signed_record_v3 = vec![]; + signed_record_v3.encode(&mut vec_signed_record_v3).unwrap(); + + let signed_addresses_v1_decoded = + schema_v1::SignedAuthorityAddresses::decode(vec_signed_record_v3.as_slice()).unwrap(); + + assert_eq!(&signed_addresses_v1_decoded.addresses, &vec_record_v3); + assert_eq!(&signed_addresses_v1_decoded.signature, &vec_auth_signature); + + let addresses_v2_decoded = + schema_v2::AuthorityRecord::decode(vec_record_v3.as_slice()).unwrap(); assert_eq!(&addresses_v2_decoded.addresses, &vec_addresses); } + +#[test] +fn v3_decodes_v2() { + let peer_secret = Keypair::generate_ed25519(); + let peer_public = peer_secret.public(); + let peer_id = peer_public.to_peer_id(); + let multiaddress: Multiaddr = + format!("/ip4/127.0.0.1/tcp/3003/p2p/{}", peer_id).parse().unwrap(); + let vec_addresses = vec![multiaddress.to_vec()]; + let vec_auth_signature = b"Totally valid signature, I promise!".to_vec(); + let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec(); + + let record_v2 = schema_v2::AuthorityRecord { addresses: vec_addresses.clone() }; + let mut vec_record_v2 = vec![]; + record_v2.encode(&mut vec_record_v2).unwrap(); + let vec_peer_public = peer_public.encode_protobuf(); + let peer_signature_v2 = + schema_v2::PeerSignature { public_key: vec_peer_public, signature: vec_peer_signature }; + let signed_record_v2 = schema_v2::SignedAuthorityRecord { + record: vec_record_v2.clone(), + auth_signature: vec_auth_signature.clone(), + peer_signature: Some(peer_signature_v2.clone()), + }; + let mut vec_signed_record_v2 = vec![]; + signed_record_v2.encode(&mut vec_signed_record_v2).unwrap(); + + let signed_addresses_v3_decoded = + SignedAuthorityRecord::decode(vec_signed_record_v2.as_slice()).unwrap(); + + assert_eq!(&signed_addresses_v3_decoded.record, &vec_record_v2); + assert_eq!(&signed_addresses_v3_decoded.auth_signature, &vec_auth_signature); + + let addresses_v3_decoded = AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap(); + assert_eq!(&addresses_v3_decoded.addresses, &vec_addresses); + assert_eq!(&addresses_v3_decoded.creation_time, &None); +} diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index de7443d634fa..b49615382b8a 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -119,6 +119,7 @@ sp_api::mock_impl_runtime_apis! { pub enum TestNetworkEvent { GetCalled(KademliaKey), PutCalled(KademliaKey, Vec), + PutToCalled(Record, HashSet, bool), StoreRecordCalled(KademliaKey, Vec, Option, Option), } @@ -129,6 +130,7 @@ pub struct TestNetwork { // Whenever functions on `TestNetwork` are called, the function arguments are added to the // vectors below. pub put_value_call: Arc)>>>, + pub put_value_to_call: Arc, bool)>>>, pub get_value_call: Arc>>, pub store_value_call: Arc, Option, Option)>>>, @@ -153,6 +155,7 @@ impl Default for TestNetwork { external_addresses: vec!["/ip6/2001:db8::/tcp/30333".parse().unwrap()], put_value_call: Default::default(), get_value_call: Default::default(), + put_value_to_call: Default::default(), store_value_call: Default::default(), event_sender: tx, event_receiver: Some(rx), @@ -200,6 +203,23 @@ impl NetworkDHTProvider for TestNetwork { .unwrap(); } + fn put_record_to( + &self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + self.put_value_to_call.lock().unwrap().push(( + record.clone(), + peers.clone(), + update_local_storage, + )); + self.event_sender + .clone() + .unbounded_send(TestNetworkEvent::PutToCalled(record, peers, update_local_storage)) + .unwrap(); + } + fn store_record( &self, key: KademliaKey, @@ -262,9 +282,11 @@ fn build_dht_event( public_key: AuthorityId, key_store: &MemoryKeystore, network: Option<&Signer>, + creation_time: Option, ) -> Vec<(KademliaKey, Vec)> { let serialized_record = - serialize_authority_record(serialize_addresses(addresses.into_iter())).unwrap(); + serialize_authority_record(serialize_addresses(addresses.into_iter()), creation_time) + .unwrap(); let peer_signature = network.map(|n| sign_record_with_peer_id(&serialized_record, n).unwrap()); let kv_pairs = sign_record_with_authority_ids( @@ -372,7 +394,10 @@ fn publish_discover_cycle() { let dht_event = { let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap(); - DhtEvent::ValueFound(vec![(key, value)]) + DhtEvent::ValueFound(PeerRecord { + peer: None, + record: Record { key, value, publisher: None, expires: None }, + }) }; // Node B discovering node A's address. @@ -515,21 +540,39 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { // Send an event that should generate an error dht_event_tx - .send(DhtEvent::ValueFound(Default::default())) + .send(DhtEvent::ValueFound(PeerRecord { + peer: None, + record: Record { + key: vec![0x9u8].into(), + value: Default::default(), + publisher: None, + expires: None, + }, + })) .await .expect("Channel has capacity of 1."); // Make previously triggered lookup succeed. - let dht_event = { - let kv_pairs = build_dht_event::( - vec![remote_multiaddr.clone()], - remote_public_key.clone(), - &remote_key_store, - None, - ); - DhtEvent::ValueFound(kv_pairs) - }; - dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1."); + let kv_pairs: Vec = build_dht_event::( + vec![remote_multiaddr.clone()], + remote_public_key.clone(), + &remote_key_store, + None, + Some(build_creation_time()), + ) + .into_iter() + .map(|(key, value)| PeerRecord { + peer: None, + record: Record { key, value, publisher: None, expires: None }, + }) + .collect(); + + for kv_pair in kv_pairs { + dht_event_tx + .send(DhtEvent::ValueFound(kv_pair)) + .await + .expect("Channel has capacity of 1."); + } // Expect authority discovery to function normally, now knowing the // address for the remote node. @@ -581,37 +624,51 @@ impl DhtValueFoundTester { &mut self, strict_record_validation: bool, values: Vec<(KademliaKey, Vec)>, - ) -> Option<&HashSet> { + ) -> (Option>, Option>) { let (_dht_event_tx, dht_event_rx) = channel(1); let local_test_api = Arc::new(TestApi { authorities: vec![self.remote_authority_public.into()] }); - let local_network: Arc = Arc::new(Default::default()); let local_key_store = MemoryKeystore::new(); let (_to_worker, from_service) = mpsc::channel(0); - let mut local_worker = Worker::new( - from_service, - local_test_api, - local_network.clone(), - Box::pin(dht_event_rx), - Role::PublishAndDiscover(Arc::new(local_key_store)), - None, - WorkerConfig { strict_record_validation, ..Default::default() }, - ); + let (local_worker, local_network) = if let Some(local_work) = self.local_worker.as_mut() { + (local_work, None) + } else { + let local_network: Arc = Arc::new(Default::default()); + + self.local_worker = Some(Worker::new( + from_service, + local_test_api, + local_network.clone(), + Box::pin(dht_event_rx), + Role::PublishAndDiscover(Arc::new(local_key_store)), + None, + WorkerConfig { strict_record_validation, ..Default::default() }, + )); + (self.local_worker.as_mut().unwrap(), Some(local_network)) + }; block_on(local_worker.refill_pending_lookups_queue()).unwrap(); local_worker.start_new_lookups(); - drop(local_worker.handle_dht_value_found_event(values)); - - self.local_worker = Some(local_worker); + for record in values.into_iter().map(|(key, value)| PeerRecord { + peer: Some(PeerId::random().into()), + record: Record { key, value, publisher: None, expires: None }, + }) { + drop(local_worker.handle_dht_value_found_event(record)) + } - self.local_worker - .as_ref() - .map(|w| { - w.addr_cache.get_addresses_by_authority_id(&self.remote_authority_public.into()) - }) - .unwrap() + ( + self.local_worker + .as_ref() + .map(|w| { + w.addr_cache + .get_addresses_by_authority_id(&self.remote_authority_public.into()) + .cloned() + }) + .unwrap(), + local_network, + ) } } @@ -625,9 +682,10 @@ fn limit_number_of_addresses_added_to_cache_per_authority() { tester.remote_authority_public.into(), &tester.remote_key_store, None, + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert_eq!(MAX_ADDRESSES_PER_AUTHORITY, cached_remote_addresses.unwrap().len()); } @@ -640,17 +698,242 @@ fn strict_accept_address_with_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(true, kv_pairs); + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; assert_eq!( - Some(&HashSet::from([addr])), + Some(HashSet::from([addr])), cached_remote_addresses, "Expect worker to only cache `Multiaddr`s with `PeerId`s.", ); } +#[test] +fn strict_accept_address_without_creation_time() { + let mut tester = DhtValueFoundTester::new(); + let addr = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![addr.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + None, + ); + + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([addr])), + cached_remote_addresses, + "Expect worker to cache address without creation time", + ); +} + +#[test] +fn keep_last_received_if_no_creation_time() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let addr = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![addr.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + None, + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([addr])), + cached_remote_addresses, + "Expect worker to cache address without creation time", + ); + + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); + + let addr2 = tester.multiaddr_with_peer_id(2); + let kv_pairs = build_dht_event( + vec![addr2.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + None, + ); + + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([addr2])), + cached_remote_addresses, + "Expect worker to cache last received when no creation time", + ); + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); +} + +#[test] +fn records_with_incorrectly_signed_creation_time_are_ignored() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let addr = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![addr.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([addr.clone()])), + cached_remote_addresses, + "Expect worker to cache record with creation time", + ); + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); + + let alternative_key = tester + .remote_key_store + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap(); + + let addr2 = tester.multiaddr_with_peer_id(2); + let mut kv_pairs = build_dht_event( + vec![addr2.clone()], + alternative_key.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + let kademlia_key = hash_authority_id(tester.remote_authority_public.as_slice()); + for key in kv_pairs.iter_mut() { + key.0 = kademlia_key.clone(); + } + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([addr])), + cached_remote_addresses, + "Expect `Multiaddr` to remain the same", + ); + assert!(network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap_or_default()); +} + +#[test] +fn newer_records_overwrite_older_ones() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let old_record = tester.multiaddr_with_peer_id(1); + let kv_pairs = build_dht_event( + vec![old_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([old_record])), + cached_remote_addresses, + "Expect worker to cache record with creation time", + ); + + let nothing_updated = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap(); + assert!(nothing_updated); + + let new_record = tester.multiaddr_with_peer_id(2); + let kv_pairs = build_dht_event( + vec![new_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; + + assert_eq!( + Some(HashSet::from([new_record])), + cached_remote_addresses, + "Expect worker to store the newest recrod", + ); + + let result = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().first().unwrap().clone()) + .unwrap(); + assert!(matches!(result, (_, _, false))); + assert_eq!(result.1.len(), 1); +} + +#[test] +fn older_records_dont_affect_newer_ones() { + let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); + let old_record = tester.multiaddr_with_peer_id(1); + let old_kv_pairs = build_dht_event( + vec![old_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let new_record = tester.multiaddr_with_peer_id(2); + let kv_pairs = build_dht_event( + vec![new_record.clone()], + tester.remote_authority_public.into(), + &tester.remote_key_store, + Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), + ); + + let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs); + + assert_eq!( + Some(HashSet::from([new_record.clone()])), + cached_remote_addresses, + "Expect worker to store new record", + ); + + let nothing_updated = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().is_empty()) + .unwrap(); + assert!(nothing_updated); + + let cached_remote_addresses = tester.process_value_found(true, old_kv_pairs).0; + + assert_eq!( + Some(HashSet::from([new_record])), + cached_remote_addresses, + "Expect worker to not update stored record", + ); + + let update_peers_info = network + .as_ref() + .map(|network| network.put_value_to_call.lock().unwrap().remove(0)) + .unwrap(); + assert!(matches!(update_peers_info, (_, _, false))); + assert_eq!(update_peers_info.1.len(), 1); +} + #[test] fn reject_address_with_rogue_peer_signature() { let mut tester = DhtValueFoundTester::new(); @@ -660,9 +943,10 @@ fn reject_address_with_rogue_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, Some(&TestSigner { keypair: &rogue_remote_node_key }), + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert!( cached_remote_addresses.is_none(), @@ -678,13 +962,14 @@ fn reject_address_with_invalid_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, Some(&TestSigner { keypair: &tester.remote_node_key }), + Some(build_creation_time()), ); // tamper with the signature let mut record = schema::SignedAuthorityRecord::decode(kv_pairs[0].1.as_slice()).unwrap(); record.peer_signature.as_mut().map(|p| p.signature[1] = !p.signature[1]); record.encode(&mut kv_pairs[0].1).unwrap(); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert!( cached_remote_addresses.is_none(), @@ -700,9 +985,10 @@ fn reject_address_without_peer_signature() { tester.remote_authority_public.into(), &tester.remote_key_store, None, + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(true, kv_pairs); + let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0; assert!(cached_remote_addresses.is_none(), "Expected worker to ignore unsigned record.",); } @@ -718,12 +1004,13 @@ fn do_not_cache_addresses_without_peer_id() { tester.remote_authority_public.into(), &tester.remote_key_store, None, + Some(build_creation_time()), ); - let cached_remote_addresses = tester.process_value_found(false, kv_pairs); + let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0; assert_eq!( - Some(&HashSet::from([multiaddr_with_peer_id])), + Some(HashSet::from([multiaddr_with_peer_id])), cached_remote_addresses, "Expect worker to only cache `Multiaddr`s with `PeerId`s.", ); @@ -861,16 +1148,24 @@ fn lookup_throttling() { // Make first lookup succeed. let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap(); let remote_key: AuthorityId = remote_hash_to_key.get(&remote_hash).unwrap().clone(); - let dht_event = { - let kv_pairs = build_dht_event::( - vec![remote_multiaddr.clone()], - remote_key, - &remote_key_store, - None, - ); - DhtEvent::ValueFound(kv_pairs) - }; - dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1."); + let kv_pairs = build_dht_event::( + vec![remote_multiaddr.clone()], + remote_key, + &remote_key_store, + None, + Some(build_creation_time()), + ) + .into_iter() + .map(|(key, value)| PeerRecord { + peer: None, + record: Record { key, value, publisher: None, expires: None }, + }); + for kv_pair in kv_pairs { + dht_event_tx + .send(DhtEvent::ValueFound(kv_pair)) + .await + .expect("Channel has capacity of 1."); + } // Assert worker to trigger another lookup. assert!(matches!(receiver.next().await, Some(TestNetworkEvent::GetCalled(_)))); @@ -899,14 +1194,17 @@ fn lookup_throttling() { #[test] fn test_handle_put_record_request() { - let network = TestNetwork::default(); - let peer_id = network.peer_id; + let local_node_network = TestNetwork::default(); + let remote_node_network = TestNetwork::default(); + let peer_id = remote_node_network.peer_id; let remote_multiaddr = { let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); - address.with(multiaddr::Protocol::P2p(peer_id.into())) + address.with(multiaddr::Protocol::P2p(remote_node_network.peer_id.into())) }; + + println!("{:?}", remote_multiaddr); let remote_key_store = MemoryKeystore::new(); let remote_public_keys: Vec = (0..20) .map(|_| { @@ -928,7 +1226,7 @@ fn test_handle_put_record_request() { let (_dht_event_tx, dht_event_rx) = channel(1); let (_to_worker, from_service) = mpsc::channel(0); - let network = Arc::new(network); + let network = Arc::new(local_node_network); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: remote_public_keys.clone() }), @@ -944,10 +1242,11 @@ fn test_handle_put_record_request() { let valid_authorithy_key = remote_public_keys.first().unwrap().clone(); let kv_pairs = build_dht_event( - vec![remote_multiaddr], - valid_authorithy_key.into(), + vec![remote_multiaddr.clone()], + valid_authorithy_key.clone().into(), &remote_key_store, - Some(&TestSigner { keypair: &network.identity }), + Some(&TestSigner { keypair: &remote_node_network.identity }), + Some(build_creation_time()), ); pool.run_until( @@ -986,7 +1285,7 @@ fn test_handle_put_record_request() { let key = hash_authority_id(another_authorithy_id.as_ref()); // Valid record signed with a different key should return error. - for (_, value) in kv_pairs { + for (_, value) in kv_pairs.clone() { assert!(matches!( worker .handle_put_record_requested(key.clone(), value, Some(peer_id), None) @@ -995,6 +1294,57 @@ fn test_handle_put_record_request() { )); } assert_eq!(network.store_value_call.lock().unwrap().len(), 1); + let newer_kv_pairs = build_dht_event( + vec![remote_multiaddr], + valid_authorithy_key.clone().into(), + &remote_key_store, + Some(&TestSigner { keypair: &remote_node_network.identity }), + Some(build_creation_time()), + ); + + // Valid old authority, should not throw error, but it should not be stored since a + // newer one already exists. + for (new_key, new_value) in newer_kv_pairs.clone() { + worker.in_flight_lookups.insert(new_key.clone(), valid_authorithy_key.clone()); + + let found = PeerRecord { + peer: Some(peer_id.into()), + record: Record { + key: new_key, + value: new_value, + publisher: Some(peer_id.into()), + expires: None, + }, + }; + assert!(worker.handle_dht_value_found_event(found).is_ok()); + } + + for (key, value) in kv_pairs.clone() { + assert!(worker + .handle_put_record_requested(key, value, Some(peer_id), None) + .await + .is_ok()); + } + assert_eq!(network.store_value_call.lock().unwrap().len(), 1); + + // Newer kv pairs should always be stored. + for (key, value) in newer_kv_pairs.clone() { + assert!(worker + .handle_put_record_requested(key, value, Some(peer_id), None) + .await + .is_ok()); + } + + assert_eq!(network.store_value_call.lock().unwrap().len(), 2); + + worker.refill_pending_lookups_queue().await.unwrap(); + assert_eq!(worker.last_known_records.len(), 1); + + // Check known records gets clean up, when an authorithy gets out of the + // active set. + worker.client = Arc::new(TestApi { authorities: Default::default() }); + worker.refill_pending_lookups_queue().await.unwrap(); + assert_eq!(worker.last_known_records.len(), 0); } .boxed_local(), ); diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 68816a10980d..9a6324dafd37 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -31,8 +31,13 @@ use crate::{ use futures::channel::oneshot; use libp2p::{ - connection_limits::ConnectionLimits, core::Multiaddr, identify::Info as IdentifyInfo, - identity::PublicKey, kad::RecordKey, swarm::NetworkBehaviour, PeerId, StreamProtocol, + connection_limits::ConnectionLimits, + core::Multiaddr, + identify::Info as IdentifyInfo, + identity::PublicKey, + kad::{Record, RecordKey}, + swarm::NetworkBehaviour, + PeerId, StreamProtocol, }; use parking_lot::Mutex; @@ -289,6 +294,16 @@ impl Behaviour { self.discovery.put_value(key, value); } + /// Puts a record into DHT, on the provided Peers + pub fn put_record_to( + &mut self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + self.discovery.put_record_to(record, peers, update_local_storage); + } + /// Stores value in DHT pub fn store_record( &mut self, diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 3145b891a8d3..86c66c22701c 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -58,7 +58,8 @@ use libp2p::{ self, record::store::{MemoryStore, RecordStore}, Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent, - GetClosestPeersError, GetRecordOk, QueryId, QueryResult, Quorum, Record, RecordKey, + GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, + RecordKey, }, mdns::{self, tokio::Behaviour as TokioMdns}, multiaddr::Protocol, @@ -92,8 +93,12 @@ const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32; /// record is replicated to. pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20; +// The minimum number of peers we expect an answer before we terminate the request. +const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4; + /// `DiscoveryBehaviour` configuration. /// +/// /// Note: In order to discover nodes or load and store values via Kademlia one has to add /// Kademlia protocol via [`DiscoveryConfig::with_kademlia`]. pub struct DiscoveryConfig { @@ -234,7 +239,6 @@ impl DiscoveryConfig { // auto-insertion and instead add peers manually. config.set_kbucket_inserts(BucketInserts::Manual); config.disjoint_query_paths(kademlia_disjoint_query_paths); - let store = MemoryStore::new(local_peer_id); let mut kad = Kademlia::with_config(local_peer_id, store, config); kad.set_mode(Some(kad::Mode::Server)); @@ -437,6 +441,31 @@ impl DiscoveryBehaviour { } } + /// Puts a record into the DHT on the provided `peers` + /// + /// If `update_local_storage` is true, the local storage is update as well. + pub fn put_record_to( + &mut self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + if let Some(kad) = self.kademlia.as_mut() { + if update_local_storage { + if let Err(_e) = kad.store_mut().put(record.clone()) { + warn!(target: "sub-libp2p", "Failed to update local starage"); + } + } + + if !peers.is_empty() { + kad.put_record_to( + record, + peers.into_iter().map(|peer_id| peer_id.into()), + Quorum::All, + ); + } + } + } /// Store a record in the Kademlia record store. pub fn store_record( &mut self, @@ -527,7 +556,7 @@ pub enum DiscoveryOut { /// The DHT yielded results for the record request. /// /// Returning the result grouped in (key, value) pairs as well as the request duration. - ValueFound(Vec<(RecordKey, Vec)>, Duration), + ValueFound(PeerRecord, Duration), /// The DHT received a put record request. PutRecordRequest( @@ -860,16 +889,24 @@ impl NetworkBehaviour for DiscoveryBehaviour { Ok(GetRecordOk::FoundRecord(r)) => { debug!( target: "sub-libp2p", - "Libp2p => Found record ({:?}) with value: {:?}", + "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}", r.record.key, r.record.value, + id, + stats, ); - // Let's directly finish the query, as we are only interested in a - // quorum of 1. - if let Some(kad) = self.kademlia.as_mut() { - if let Some(mut query) = kad.query_mut(&id) { - query.finish(); + // Let's directly finish the query if we are above 4. + // This number is small enough to make sure we don't + // unnecessarily flood the network with queries, but high + // enough to make sure we also touch peers which might have + // old record, so that we can update them once we notice + // they have old records. + if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR { + if let Some(kad) = self.kademlia.as_mut() { + if let Some(mut query) = kad.query_mut(&id) { + query.finish(); + } } } @@ -877,14 +914,18 @@ impl NetworkBehaviour for DiscoveryBehaviour { // `FinishedWithNoAdditionalRecord`. self.records_to_publish.insert(id, r.record.clone()); - DiscoveryOut::ValueFound( - vec![(r.record.key, r.record.value)], - stats.duration().unwrap_or_default(), - ) + DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default()) }, Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates, }) => { + debug!( + target: "sub-libp2p", + "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms", + id, + stats, + stats.duration().map(|val| val.as_millis()) + ); // We always need to remove the record to not leak any data! if let Some(record) = self.records_to_publish.remove(&id) { if cache_candidates.is_empty() { diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index b518a2094d76..5400d11cb6ac 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -22,7 +22,10 @@ use crate::types::ProtocolName; use bytes::Bytes; -use libp2p::{kad::record::Key, PeerId}; +use libp2p::{ + kad::{record::Key, PeerRecord}, + PeerId, +}; use sc_network_common::role::ObservedRole; @@ -31,7 +34,7 @@ use sc_network_common::role::ObservedRole; #[must_use] pub enum DhtEvent { /// The value was found. - ValueFound(Vec<(Key, Vec)>), + ValueFound(PeerRecord), /// The requested record has not been found in the DHT. ValueNotFound(Key), diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 6ff05e6af327..22285e7906c6 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -50,6 +50,7 @@ use schnellru::{ByLength, LruMap}; use std::{ cmp, collections::{HashMap, HashSet, VecDeque}, + num::NonZeroUsize, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -68,6 +69,9 @@ const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30); /// Minimum number of confirmations received before an address is verified. const MIN_ADDRESS_CONFIRMATIONS: usize = 5; +// The minimum number of peers we expect an answer before we terminate the request. +const GET_RECORD_REDUNDANCY_FACTOR: usize = 4; + /// Discovery events. #[derive(Debug)] pub enum DiscoveryEvent { @@ -340,7 +344,10 @@ impl Discovery { /// Start Kademlia `GET_VALUE` query for `key`. pub async fn get_value(&mut self, key: KademliaKey) -> QueryId { self.kademlia_handle - .get_record(RecordKey::new(&key.to_vec()), Quorum::One) + .get_record( + RecordKey::new(&key.to_vec()), + Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()), + ) .await } @@ -351,6 +358,22 @@ impl Discovery { .await } + /// Put record to given peers. + pub async fn put_value_to_peers( + &mut self, + record: Record, + peers: Vec, + update_local_storage: bool, + ) -> QueryId { + self.kademlia_handle + .put_record_to_peers( + record, + peers.into_iter().map(|peer| peer.into()).collect(), + update_local_storage, + ) + .await + } + /// Store record in the local DHT store. pub async fn store_record( &mut self, diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 34ca5b716101..b09bb671faa3 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -50,7 +50,7 @@ use crate::{ use codec::Encode; use futures::StreamExt; -use libp2p::kad::RecordKey; +use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey}; use litep2p::{ config::ConfigBuilder, crypto::ed25519::Keypair, @@ -700,6 +700,12 @@ impl NetworkBackend for Litep2pNetworkBac let query_id = self.discovery.put_value(key.clone(), value).await; self.pending_put_values.insert(query_id, (key, Instant::now())); } + NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => { + let kademlia_key = record.key.to_vec().into(); + let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await; + self.pending_put_values.insert(query_id, (kademlia_key, Instant::now())); + } + NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => { self.discovery.store_record(key, value, publisher.map(Into::into), expires).await; } @@ -816,19 +822,15 @@ impl NetworkBackend for Litep2pNetworkBac "`GET_VALUE` for {:?} ({query_id:?}) succeeded", key, ); - - let value_found = match records { - RecordsType::LocalStore(record) => vec![ - (libp2p::kad::RecordKey::new(&record.key), record.value) - ], - RecordsType::Network(records) => records.into_iter().map(|peer_record| { - (libp2p::kad::RecordKey::new(&peer_record.record.key), peer_record.record.value) - }).collect(), - }; - - self.event_streams.send(Event::Dht( - DhtEvent::ValueFound(value_found) - )); + for record in litep2p_to_libp2p_peer_record(records) { + self.event_streams.send( + Event::Dht( + DhtEvent::ValueFound( + record + ) + ) + ); + } if let Some(ref metrics) = self.metrics { metrics @@ -1023,3 +1025,42 @@ impl NetworkBackend for Litep2pNetworkBac } } } + +// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord. +fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec { + match records { + litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => { + vec![PeerRecord { + record: P2PRecord { + key: record.key.to_vec().into(), + value: record.value, + publisher: record.publisher.map(|peer_id| { + let peer_id: sc_network_types::PeerId = peer_id.into(); + peer_id.into() + }), + expires: record.expires, + }, + peer: None, + }] + }, + litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records + .into_iter() + .map(|record| { + let peer_id: sc_network_types::PeerId = record.peer.into(); + + PeerRecord { + record: P2PRecord { + key: record.record.key.to_vec().into(), + value: record.record.value, + publisher: record.record.publisher.map(|peer_id| { + let peer_id: sc_network_types::PeerId = peer_id.into(); + peer_id.into() + }), + expires: record.record.expires, + }, + peer: Some(peer_id.into()), + } + }) + .collect::>(), + } +} diff --git a/substrate/client/network/src/litep2p/service.rs b/substrate/client/network/src/litep2p/service.rs index 7d972bbeee5c..67fc44e6bfe0 100644 --- a/substrate/client/network/src/litep2p/service.rs +++ b/substrate/client/network/src/litep2p/service.rs @@ -32,6 +32,7 @@ use crate::{ RequestFailure, Signature, }; +use crate::litep2p::Record; use codec::DecodeAll; use futures::{channel::oneshot, stream::BoxStream}; use libp2p::{identity::SigningError, kad::record::Key as KademliaKey}; @@ -76,6 +77,15 @@ pub enum NetworkServiceCommand { value: Vec, }, + /// Put value to DHT. + PutValueTo { + /// Record. + record: Record, + /// Peers we want to put the record. + peers: Vec, + /// If we should update the local storage or not. + update_local_storage: bool, + }, /// Store record in the local DHT store. StoreRecord { /// Record key. @@ -253,6 +263,27 @@ impl NetworkDHTProvider for Litep2pNetworkService { let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value }); } + fn put_record_to( + &self, + record: libp2p::kad::Record, + peers: HashSet, + update_local_storage: bool, + ) { + let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo { + record: Record { + key: record.key.to_vec().into(), + value: record.value, + publisher: record.publisher.map(|peer_id| { + let peer_id: sc_network_types::PeerId = peer_id.into(); + peer_id.into() + }), + expires: record.expires, + }, + peers: peers.into_iter().collect(), + update_local_storage, + }); + } + fn store_record( &self, key: KademliaKey, diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 3a685787c48e..04bcfd4b880e 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -68,7 +68,7 @@ use libp2p::{ core::{upgrade, ConnectedPoint, Endpoint}, identify::Info as IdentifyInfo, identity::ed25519, - kad::record::Key as KademliaKey, + kad::{record::Key as KademliaKey, Record}, multiaddr::{self, Multiaddr}, swarm::{ Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError, @@ -943,6 +943,19 @@ where let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value)); } + fn put_record_to( + &self, + record: Record, + peers: HashSet, + update_local_storage: bool, + ) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo { + record, + peers, + update_local_storage, + }); + } + fn store_record( &self, key: KademliaKey, @@ -1311,6 +1324,11 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> { enum ServiceToWorkerMsg { GetValue(KademliaKey), PutValue(KademliaKey, Vec), + PutRecordTo { + record: Record, + peers: HashSet, + update_local_storage: bool, + }, StoreRecord(KademliaKey, Vec, Option, Option), AddKnownAddress(PeerId, Multiaddr), EventStream(out_events::Sender), @@ -1439,6 +1457,10 @@ where self.network_service.behaviour_mut().get_value(key), ServiceToWorkerMsg::PutValue(key, value) => self.network_service.behaviour_mut().put_value(key, value), + ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self + .network_service + .behaviour_mut() + .put_record_to(record, peers, update_local_storage), ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self .network_service .behaviour_mut() diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs index fe06141f7e3b..3a22d5a64d01 100644 --- a/substrate/client/network/src/service/traits.rs +++ b/substrate/client/network/src/service/traits.rs @@ -32,6 +32,7 @@ use crate::{ }; use futures::{channel::oneshot, Stream}; +use libp2p::kad::Record; use prometheus_endpoint::Registry; use sc_client_api::BlockBackend; @@ -217,6 +218,11 @@ pub trait NetworkDHTProvider { /// Start putting a value in the DHT. fn put_value(&self, key: KademliaKey, value: Vec); + /// Start putting the record to `peers`. + /// + /// If `update_local_storage` is true the local storage is udpated as well. + fn put_record_to(&self, record: Record, peers: HashSet, update_local_storage: bool); + /// Store a record in the DHT memory store. fn store_record( &self, @@ -240,6 +246,10 @@ where T::put_value(self, key, value) } + fn put_record_to(&self, record: Record, peers: HashSet, update_local_storage: bool) { + T::put_record_to(self, record, peers, update_local_storage) + } + fn store_record( &self, key: KademliaKey,