From 2a2893e11d5dcc451e9952c94d096afde63a9e9d Mon Sep 17 00:00:00 2001 From: aschran Date: Mon, 12 Sep 2022 20:41:15 -0700 Subject: [PATCH] network: convert worker-to-primary interface to quic/anemo (#941) Fixes #884 --- narwhal/benchmark/benchmark/config.py | 22 +-- narwhal/config/src/lib.rs | 37 ++--- narwhal/config/tests/config_tests.rs | 20 ++- .../snapshots/config_tests__committee.snap | 20 +-- narwhal/network/src/worker.rs | 138 +++++++++------- narwhal/node/Cargo.toml | 2 + narwhal/node/src/generate_format.rs | 16 +- narwhal/node/src/restarter.rs | 24 ++- narwhal/node/tests/reconfigure.rs | 24 ++- narwhal/node/tests/staged/narwhal.yaml | 7 +- .../tests/block_synchronizer_tests.rs | 46 ++---- .../primary/src/grpc_server/configuration.rs | 52 ++---- narwhal/primary/src/grpc_server/mod.rs | 3 +- narwhal/primary/src/primary.rs | 124 ++++----------- narwhal/primary/src/proposer.rs | 3 +- narwhal/primary/src/tests/core_tests.rs | 45 ++---- .../primary/src/tests/header_waiter_tests.rs | 4 +- narwhal/primary/src/tests/helper_tests.rs | 56 ++----- narwhal/primary/tests/epoch_change.rs | 149 ++++++++++-------- .../integration_tests_configuration_api.rs | 24 +-- narwhal/types/build.rs | 24 ++- narwhal/types/proto/narwhal.proto | 15 +- narwhal/types/src/primary.rs | 4 +- narwhal/types/src/proto.rs | 6 +- narwhal/worker/src/primary_connector.rs | 40 ++--- narwhal/worker/src/tests/worker_tests.rs | 10 +- narwhal/worker/src/worker.rs | 40 +++-- 27 files changed, 397 insertions(+), 558 deletions(-) diff --git a/narwhal/benchmark/benchmark/config.py b/narwhal/benchmark/benchmark/config.py index c3e84fc6858af..2072bedafaa84 100644 --- a/narwhal/benchmark/benchmark/config.py +++ b/narwhal/benchmark/benchmark/config.py @@ -151,10 +151,7 @@ class Committee: "authorities: { "name": { "stake": 1, - "primary: { - "primary_to_primary": x.x.x.x:x, - "worker_to_primary": x.x.x.x:x, - }, + "primary_address": x.x.x.x:x, "network_key: NETWORK_KEY== }, ... @@ -183,15 +180,12 @@ def __init__(self, addresses, base_port): self.json = {'authorities': OrderedDict(), 'epoch': 0} for name, (network_name, hosts) in addresses.items(): host = hosts.pop(0) - primary_addr = { - 'primary_to_primary': f'/ip4/{host}/tcp/{port}/http', - 'worker_to_primary': f'/ip4/{host}/tcp/{port + 1}/http' - } - port += 2 + primary_addr = f'/ip4/{host}/tcp/{port}/http' + port += 1 self.json['authorities'][name] = { 'stake': 1, - 'primary': primary_addr, + 'primary_address': primary_addr, 'network_key': network_name } @@ -201,8 +195,7 @@ def primary_addresses(self, faults=0): addresses = [] good_nodes = self.size() - faults for authority in list(self.json['authorities'].values())[:good_nodes]: - addresses += [multiaddr_to_url_data( - authority['primary']['primary_to_primary'])] + addresses += [multiaddr_to_url_data(authority['primary_address'])] return addresses def ips(self, name=None): @@ -214,10 +207,7 @@ def ips(self, name=None): ips = set() for name in names: - addresses = self.json['authorities'][name]['primary'] - ips.add(self.ip(addresses['primary_to_primary'])) - ips.add(self.ip(addresses['worker_to_primary'])) - + ips.add(self.ip(self.json['authorities'][name]['primary_address'])) return ips def remove_nodes(self, nodes): diff --git a/narwhal/config/src/lib.rs b/narwhal/config/src/lib.rs index 77d70f31d5da8..5b4d7b4c3c7c0 100644 --- a/narwhal/config/src/lib.rs +++ b/narwhal/config/src/lib.rs @@ -468,27 +468,19 @@ impl WorkerCache { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PrimaryAddresses { - /// Address to receive messages from other primaries (WAN). - pub primary_to_primary: Multiaddr, - /// Address to receive messages from our workers (LAN). - pub worker_to_primary: Multiaddr, -} - -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)] pub struct Authority { /// The voting power of this authority. pub stake: Stake, - /// The network addresses of the primary. - pub primary: PrimaryAddresses, + /// The network address of the primary. + pub primary_address: Multiaddr, /// Network key of the primary. pub network_key: NetworkPublicKey, } pub type SharedCommittee = Arc>; -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)] pub struct Committee { /// The authorities of epoch. pub authorities: BTreeMap, @@ -570,11 +562,11 @@ impl Committee { .clone() } - /// Returns the primary addresses of the target primary. - pub fn primary(&self, to: &PublicKey) -> Result { + /// Returns the primary address of the target primary. + pub fn primary(&self, to: &PublicKey) -> Result { self.authorities .get(&to.clone()) - .map(|x| x.primary.clone()) + .map(|x| x.primary_address.clone()) .ok_or_else(|| ConfigError::NotInCommittee((*to).encode_base64())) } @@ -589,14 +581,14 @@ impl Committee { pub fn others_primaries( &self, myself: &PublicKey, - ) -> Vec<(PublicKey, PrimaryAddresses, NetworkPublicKey)> { + ) -> Vec<(PublicKey, Multiaddr, NetworkPublicKey)> { self.authorities .iter() .filter(|(name, _)| *name != myself) .map(|(name, authority)| { ( name.clone(), - authority.primary.clone(), + authority.primary_address.clone(), authority.network_key.clone(), ) }) @@ -606,10 +598,7 @@ impl Committee { fn get_all_network_addresses(&self) -> HashSet<&Multiaddr> { self.authorities .values() - .flat_map(|authority| { - std::iter::once(&authority.primary.primary_to_primary) - .chain(std::iter::once(&authority.primary.worker_to_primary)) - }) + .map(|authority| &authority.primary_address) .collect() } @@ -627,7 +616,7 @@ impl Committee { /// will generate no update and return a vector of errors. pub fn update_primary_network_info( &mut self, - mut new_info: BTreeMap, + mut new_info: BTreeMap, ) -> Result<(), Vec> { let mut errors = None; @@ -645,13 +634,13 @@ impl Committee { let res = table .iter() .fold(Ok(BTreeMap::new()), |acc, (pk, authority)| { - if let Some((stake, addresses)) = new_info.remove(pk) { + if let Some((stake, address)) = new_info.remove(pk) { if stake == authority.stake { match acc { // No error met yet, update the accumulator Ok(mut bmap) => { let mut res = authority.clone(); - res.primary = addresses; + res.primary_address = address; bmap.insert(pk.clone(), res); Ok(bmap) } diff --git a/narwhal/config/tests/config_tests.rs b/narwhal/config/tests/config_tests.rs index b7c7190d95909..46ae5829c6bc8 100644 --- a/narwhal/config/tests/config_tests.rs +++ b/narwhal/config/tests/config_tests.rs @@ -18,12 +18,10 @@ // 1. Run `cargo insta test --review` under `./config`. // 2. Review, accept or reject changes. -use config::{ - ConsensusAPIGrpcParameters, Import, Parameters, PrimaryAddresses, PrometheusMetricsParameters, - Stake, -}; +use config::{ConsensusAPIGrpcParameters, Import, Parameters, PrometheusMetricsParameters, Stake}; use crypto::PublicKey; use insta::assert_json_snapshot; +use multiaddr::Multiaddr; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; use std::collections::{BTreeMap, HashMap}; use std::{fs::File, io::Write}; @@ -71,8 +69,8 @@ fn update_primary_network_info_test() { let invalid_new_info = committee2 .authorities .iter() - .map(|(pk, a)| (pk.clone(), (a.stake, a.primary.clone()))) - .collect::>(); + .map(|(pk, a)| (pk.clone(), (a.stake, a.primary_address.clone()))) + .collect::>(); let res2 = committee .clone() .update_primary_network_info(invalid_new_info) @@ -90,8 +88,8 @@ fn update_primary_network_info_test() { .authorities .iter() // change the stake - .map(|(pk, a)| (pk.clone(), (a.stake + 1, a.primary.clone()))) - .collect::>(); + .map(|(pk, a)| (pk.clone(), (a.stake + 1, a.primary_address.clone()))) + .collect::>(); let res2 = committee .clone() .update_primary_network_info(invalid_new_info) @@ -109,7 +107,7 @@ fn update_primary_network_info_test() { committee4.authorities.iter().for_each(|(pk, a)| { pk_n_stake.push((pk.clone(), a.stake)); - addresses.push(a.primary.clone()) + addresses.push(a.primary_address.clone()) }); let mut rng = rand::thread_rng(); @@ -119,13 +117,13 @@ fn update_primary_network_info_test() { .into_iter() .zip(addresses) .map(|((pk, stk), addr)| (pk, (stk, addr))) - .collect::>(); + .collect::>(); let mut comm = committee; let res = comm.update_primary_network_info(new_info.clone()); assert!(res.is_ok()); for (pk, a) in comm.authorities.iter() { - assert_eq!(a.primary, new_info.get(pk).unwrap().1); + assert_eq!(a.primary_address, new_info.get(pk).unwrap().1); } } diff --git a/narwhal/config/tests/snapshots/config_tests__committee.snap b/narwhal/config/tests/snapshots/config_tests__committee.snap index 5adcdcbbcd279..66af395c3c306 100644 --- a/narwhal/config/tests/snapshots/config_tests__committee.snap +++ b/narwhal/config/tests/snapshots/config_tests__committee.snap @@ -6,34 +6,22 @@ expression: committee "authorities": { "i9DpjC/kZbDd57csSg7qWFMF3uTECO2jBD3M5BOXU8T7AebTrMO+9D5IbnUI8N7lFhmjEiKFhwUOlFGbx1jkdP212PR/XvnayHxDaJuMkytxACBnshi9TrbZiZP10tqL": { "stake": 1, - "primary": { - "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", - "worker_to_primary": "/ip4/127.0.0.1/tcp/0/http" - }, + "primary_address": "/ip4/127.0.0.1/tcp/0/http", "network_key": "+oKq7VFT0pAQvjlvEti2YUw5fHXEYT/bs21P9wTlAKs=" }, "jGbcLB6p5T8JhcF7TnrxmRK208QODFkgpaElCbTrNhn14H7Fbqd/CzBim6HMctdbE5RgeCpfDi+J+0xCtLil+uPSYBAiIOY9B1Tn4YRt7v05iOreTtN/E4VDfRneGhYY": { "stake": 1, - "primary": { - "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", - "worker_to_primary": "/ip4/127.0.0.1/tcp/0/http" - }, + "primary_address": "/ip4/127.0.0.1/tcp/0/http", "network_key": "rvP0pLjsod/DQzYb+OQ2vULeklnAS4MU644gVN1ugqs=" }, "ld89LijKV9Ra6U/hkj6PnWKgcDbW4IgHAy1sHOLjHNvxLDoMQYJhDA5yb3+rOIfRBm91BOK0aSlGEtfdAZOB5af1kfxciSrnzDAHDKw1eCMo92nMEHWj5bpJbSlnySf7": { "stake": 1, - "primary": { - "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", - "worker_to_primary": "/ip4/127.0.0.1/tcp/0/http" - }, + "primary_address": "/ip4/127.0.0.1/tcp/0/http", "network_key": "r7SQXpVQz0YqggB4JKBV2XWuk0RwxHfZ3bttKyZMNvo=" }, "teRXk6Iq1Pod0UgOcrvBrvVvs8ZpM0bUgbZNL7YpH4n06TvGuiaId9SfpuYe+JOOF4rXiNzQv7CW6npbn3SVZMv9NzwYJcz5RL6F3gAMpNx+o9YM5+gqNtn/OGArUTCC": { "stake": 1, - "primary": { - "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", - "worker_to_primary": "/ip4/127.0.0.1/tcp/0/http" - }, + "primary_address": "/ip4/127.0.0.1/tcp/0/http", "network_key": "Kt9mRluFkBunwfq2VREQbBXuSYGOsFo95bA/PIrvVhc=" } }, diff --git a/narwhal/network/src/worker.rs b/narwhal/network/src/worker.rs index 8f9618b236d86..d70bfeee6a7da 100644 --- a/narwhal/network/src/worker.rs +++ b/narwhal/network/src/worker.rs @@ -1,101 +1,119 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use crate::{ - traits::{BaseNetwork, ReliableNetwork}, - BoundedExecutor, CancelOnDropHandler, MessageResult, RetryConfig, MAX_TASK_CONCURRENCY, + traits::{ReliableNetwork2, UnreliableNetwork2}, + BoundedExecutor, CancelOnDropHandler, RetryConfig, MAX_TASK_CONCURRENCY, }; +use anemo::PeerId; use async_trait::async_trait; -use multiaddr::Multiaddr; -use tokio::runtime::Handle; -use tonic::{transport::Channel, Code}; -use tracing::error; -use types::{BincodeEncodedPayload, WorkerPrimaryMessage, WorkerToPrimaryClient}; +use crypto::{NetworkKeyPair, NetworkPublicKey}; +use fastcrypto::traits::KeyPair as _; + +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use tokio::{runtime::Handle, task::JoinHandle}; + +use types::{WorkerPrimaryMessage, WorkerToPrimaryClient}; pub struct WorkerToPrimaryNetwork { - address: Option, - client: Option>, - config: mysten_network::config::Config, + network: anemo::Network, retry_config: RetryConfig, executor: BoundedExecutor, } -impl Default for WorkerToPrimaryNetwork { - fn default() -> Self { +impl WorkerToPrimaryNetwork { + pub fn new(network: anemo::Network) -> Self { let retry_config = RetryConfig { - // Retry for forever + // Retry forever. retrying_max_elapsed_time: None, ..Default::default() }; Self { - address: None, - client: Default::default(), - config: Default::default(), + network, retry_config, // Note that this does not strictly break the primitive that BoundedExecutor is per address because // this network sender only transmits to a single address. executor: BoundedExecutor::new(MAX_TASK_CONCURRENCY, Handle::current()), } } -} -impl BaseNetwork for WorkerToPrimaryNetwork { - type Client = WorkerToPrimaryClient; - type Message = WorkerPrimaryMessage; - - fn client(&mut self, address: Multiaddr) -> Self::Client { - match (self.address.as_ref(), self.client.as_ref()) { - (Some(addr), Some(client)) if *addr == address => client.clone(), - (_, _) => { - let client = Self::create_client(&self.config, address.clone()); - self.client = Some(client.clone()); - self.address = Some(address); - client - } - } + // Creates a new single-use anemo::Network to connect outbound to a single + // address. This is for tests and should not be used from worker code. + pub async fn new_for_single_address( + name: NetworkPublicKey, + address: anemo::types::Address, + ) -> Self { + let routes = anemo::Router::new(); + let network = anemo::Network::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)) + .server_name("narwhal") + .private_key( + NetworkKeyPair::generate(&mut rand::rngs::OsRng) + .private() + .0 + .to_bytes(), + ) + .start(routes) + .unwrap(); + network + .connect_with_peer_id(address, anemo::PeerId(name.0.to_bytes())) + .await + .unwrap(); + Self::new(network) } +} - fn create_client(config: &mysten_network::config::Config, address: Multiaddr) -> Self::Client { - //TODO don't panic here if address isn't supported - let channel = config.connect_lazy(&address).unwrap(); - WorkerToPrimaryClient::new(channel) +#[async_trait] +impl UnreliableNetwork2 for WorkerToPrimaryNetwork { + async fn unreliable_send( + &mut self, + peer: NetworkPublicKey, + message: &WorkerPrimaryMessage, + ) -> JoinHandle<()> { + let network = self.network.clone(); + let peer_id = PeerId(peer.0.to_bytes()); + let message = message.to_owned(); + self.executor + .spawn(async move { + if let Some(peer) = network.peer(peer_id) { + let _ = WorkerToPrimaryClient::new(peer).send_message(message).await; + } + }) + .await } } #[async_trait] -impl ReliableNetwork for WorkerToPrimaryNetwork { - // Safety - // Since this spawns an unbounded task, this should be called in a time-restricted fashion. - // Here the callers are [`WorkerToPrimaryNetwork::send`]. - // See the TODO on spawn_with_retries for lifting this restriction. - async fn send_message( +impl ReliableNetwork2 for WorkerToPrimaryNetwork { + async fn send( &mut self, - address: Multiaddr, - message: BincodeEncodedPayload, - ) -> CancelOnDropHandler { - let client = self.client(address); + peer: NetworkPublicKey, + message: &WorkerPrimaryMessage, + ) -> CancelOnDropHandler>> { + let network = self.network.clone(); + let peer_id = PeerId(peer.0.to_bytes()); + let message = message.to_owned(); let message_send = move || { - let mut client = client.clone(); + let network = network.clone(); let message = message.clone(); async move { - client.send_message(message).await.map_err(|e| { - match e.code() { - Code::FailedPrecondition | Code::InvalidArgument => { - // these errors are not recoverable through retrying, see - // https://github.com/hyperium/tonic/blob/master/tonic/src/status.rs - error!("Irrecoverable network error: {e}"); - backoff::Error::permanent(eyre::Report::from(e)) - } - _ => { + if let Some(peer) = network.peer(peer_id) { + WorkerToPrimaryClient::new(peer) + .send_message(message) + .await + .map_err(|e| { // this returns a backoff::Error::Transient - // so that if tonic::Status is returned, we retry - Into::>::into(eyre::Report::from(e)) - } - } - }) + // so that if anemo::Status is returned, we retry + backoff::Error::transient(anyhow::anyhow!("RPC error: {e:?}")) + }) + } else { + Err(backoff::Error::transient(anyhow::anyhow!( + "not connected to peer {peer_id}" + ))) + } } }; + let handle = self .executor .spawn_with_retries(self.retry_config, message_send); diff --git a/narwhal/node/Cargo.toml b/narwhal/node/Cargo.toml index 805c8ff21b62c..3d7ed570922e7 100644 --- a/narwhal/node/Cargo.toml +++ b/narwhal/node/Cargo.toml @@ -44,6 +44,8 @@ worker = { path = "../worker" } workspace-hack = { version = "0.1", path = "../workspace-hack" } eyre = "0.6.8" +anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "6278d0fa78147a49ff2cb9dd2e45e763886be0a0" } + [dev-dependencies] hex = "0.4.3" pretty_assertions = "1.3.0" diff --git a/narwhal/node/src/generate_format.rs b/narwhal/node/src/generate_format.rs index 80d809a7c3f64..3ba3efa24d56a 100644 --- a/narwhal/node/src/generate_format.rs +++ b/narwhal/node/src/generate_format.rs @@ -1,11 +1,12 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use config::{Authority, Committee, Epoch, PrimaryAddresses, WorkerIndex, WorkerInfo}; +use config::{Authority, Committee, Epoch, WorkerIndex, WorkerInfo}; use crypto::{KeyPair, NetworkKeyPair}; use fastcrypto::{ traits::{KeyPair as _, Signer}, Digest, Hash, }; +use multiaddr::Multiaddr; use primary::PrimaryWorkerMessage; use rand::{prelude::StdRng, SeedableRng}; use serde_reflection::{Registry, Result, Samples, Tracer, TracerConfig}; @@ -52,19 +53,14 @@ fn get_registry() -> Result { .enumerate() .map(|(i, (kp, network_key))| { let id = kp.public(); - let primary = PrimaryAddresses { - primary_to_primary: format!("/ip4/127.0.0.1/tcp/{}/http", 100 + i) - .parse() - .unwrap(), - worker_to_primary: format!("/ip4/127.0.0.1/tcp/{}/http", 200 + i) - .parse() - .unwrap(), - }; + let primary_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}/http", 100 + i) + .parse() + .unwrap(); ( id.clone(), Authority { stake: 1, - primary, + primary_address, network_key: network_key.public().clone(), }, ) diff --git a/narwhal/node/src/restarter.rs b/narwhal/node/src/restarter.rs index 3c948234844f4..d1a3e07cba3d6 100644 --- a/narwhal/node/src/restarter.rs +++ b/narwhal/node/src/restarter.rs @@ -7,7 +7,7 @@ use crypto::{KeyPair, NetworkKeyPair}; use executor::{ExecutionState, ExecutorOutput}; use fastcrypto::traits::KeyPair as _; use futures::future::join_all; -use network::{PrimaryToWorkerNetwork, ReliableNetwork, WorkerToPrimaryNetwork}; +use network::{PrimaryToWorkerNetwork, ReliableNetwork, ReliableNetwork2, WorkerToPrimaryNetwork}; use prometheus::Registry; use std::{fmt::Debug, path::PathBuf, sync::Arc}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -48,7 +48,6 @@ impl NodeRestarter { let mut committee = committee.clone(); let mut handles = Vec::new(); - let mut primary_network = WorkerToPrimaryNetwork::default(); let mut worker_network = PrimaryToWorkerNetwork::default(); // Listen for new committees. @@ -103,12 +102,23 @@ impl NodeRestarter { tracing::info!("Starting reconfiguration with committee {committee}"); // Shutdown all relevant components. - let address = committee - .primary(&name) - .expect("Our key is not in the committee") - .worker_to_primary; + // TODO: shutdown message should probably be sent in a better way than by injecting + // it through the networking stack. + let address = network::multiaddr_to_address( + &committee + .primary(&name) + .expect("Our key is not in the committee"), + ) + .unwrap(); + let network_key = committee + .network_key(&name) + .expect("Our key is not in the committee"); + let mut primary_network = + WorkerToPrimaryNetwork::new_for_single_address(network_key.to_owned(), address) + .await; let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::Shutdown); - let primary_cancel_handle = primary_network.send(address, &message).await; + let primary_cancel_handle = + primary_network.send(network_key.to_owned(), &message).await; let addresses = worker_cache .load() diff --git a/narwhal/node/tests/reconfigure.rs b/narwhal/node/tests/reconfigure.rs index ff45c479246ee..043715018242e 100644 --- a/narwhal/node/tests/reconfigure.rs +++ b/narwhal/node/tests/reconfigure.rs @@ -8,7 +8,7 @@ use crypto::{KeyPair, NetworkKeyPair, PublicKey}; use executor::{ExecutionIndices, ExecutionState, ExecutionStateError}; use fastcrypto::traits::KeyPair as _; use futures::future::join_all; -use network::{PrimaryToWorkerNetwork, ReliableNetwork, WorkerToPrimaryNetwork}; +use network::{PrimaryToWorkerNetwork, ReliableNetwork, ReliableNetwork2, WorkerToPrimaryNetwork}; use node::{restarter::NodeRestarter, Node, NodeStorage}; use primary::PrimaryWorkerMessage; use prometheus::Registry; @@ -323,18 +323,28 @@ async fn epoch_change() { let name_clone = name.clone(); let worker_cache_clone = worker_cache.clone(); tokio::spawn(async move { - let mut primary_network = WorkerToPrimaryNetwork::default(); let mut worker_network = PrimaryToWorkerNetwork::default(); while let Some((_, _, committee, _, _)) = rx_node_reconfigure.recv().await { - let address = committee - .primary(&name_clone) - .expect("Our key is not in the committee") - .primary_to_primary; + // TODO: shutdown message should probably be sent in a better way than by injecting + // it through the networking stack. + let address = network::multiaddr_to_address( + &committee + .primary(&name_clone) + .expect("Our key is not in the committee"), + ) + .unwrap(); + let network_key = committee + .network_key(&name_clone) + .expect("Our key is not in the committee"); + let mut primary_network = + WorkerToPrimaryNetwork::new_for_single_address(network_key.to_owned(), address) + .await; let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch( committee.clone(), )); - let primary_cancel_handle = primary_network.send(address, &message).await; + let primary_cancel_handle = + primary_network.send(network_key.to_owned(), &message).await; let addresses = worker_cache_clone .load() diff --git a/narwhal/node/tests/staged/narwhal.yaml b/narwhal/node/tests/staged/narwhal.yaml index 80ecbd9e83ec0..21cf4f0e8b40a 100644 --- a/narwhal/node/tests/staged/narwhal.yaml +++ b/narwhal/node/tests/staged/narwhal.yaml @@ -2,8 +2,7 @@ Authority: STRUCT: - stake: U32 - - primary: - TYPENAME: PrimaryAddresses + - primary_address: BYTES - network_key: TYPENAME: Ed25519PublicKey BLS12381AggregateSignature: @@ -66,10 +65,6 @@ HeaderDigest: TUPLEARRAY: CONTENT: U8 SIZE: 32 -PrimaryAddresses: - STRUCT: - - primary_to_primary: BYTES - - worker_to_primary: BYTES PrimaryWorkerMessage: ENUM: 0: diff --git a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs index d6cc588e4248d..cbe050187248e 100644 --- a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs +++ b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs @@ -75,9 +75,7 @@ async fn test_successful_headers_synchronization() { certificates.insert(certificate.clone().digest(), certificate.clone()); } - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); println!("New primary added: {:?}", own_address); let network = anemo::Network::bind(own_address) .server_name("narwhal") @@ -85,9 +83,9 @@ async fn test_successful_headers_synchronization() { .start(anemo::Router::new()) .unwrap(); - for (_pubkey, addresses, network_pubkey) in committee.others_primaries(&name) { + for (_pubkey, address, network_pubkey) in committee.others_primaries(&name) { let peer_id = PeerId(network_pubkey.0.to_bytes()); - let address = network::multiaddr_to_address(&addresses.primary_to_primary).unwrap(); + let address = network::multiaddr_to_address(&address).unwrap(); let peer_info = PeerInfo { peer_id, affinity: anemo::types::PeerAffinity::High, @@ -120,10 +118,7 @@ async fn test_successful_headers_synchronization() { .authorities() .filter(|a| a.public_key() != name) .map(|a| { - let address = committee - .primary(&a.public_key()) - .unwrap() - .primary_to_primary; + let address = committee.primary(&a.public_key()).unwrap(); println!("New primary added: {:?}", address); primary_listener(1, a.network_keypair().copy(), address) }) @@ -276,9 +271,7 @@ async fn test_successful_payload_synchronization() { certificates.insert(certificate.clone().digest(), certificate.clone()); } - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); println!("New primary added: {:?}", own_address); let network = anemo::Network::bind(own_address) .server_name("narwhal") @@ -286,9 +279,9 @@ async fn test_successful_payload_synchronization() { .start(anemo::Router::new()) .unwrap(); - for (_pubkey, addresses, network_pubkey) in committee.others_primaries(&name) { + for (_pubkey, address, network_pubkey) in committee.others_primaries(&name) { let peer_id = PeerId(network_pubkey.0.to_bytes()); - let address = network::multiaddr_to_address(&addresses.primary_to_primary).unwrap(); + let address = network::multiaddr_to_address(&address).unwrap(); let peer_info = PeerInfo { peer_id, affinity: anemo::types::PeerAffinity::High, @@ -321,10 +314,7 @@ async fn test_successful_payload_synchronization() { .authorities() .filter(|a| a.public_key() != name) .map(|a| { - let address = committee - .primary(&a.public_key()) - .unwrap() - .primary_to_primary; + let address = committee.primary(&a.public_key()).unwrap(); println!("New primary added: {:?}", address); primary_listener(1, a.network_keypair().copy(), address) }) @@ -510,9 +500,7 @@ async fn test_multiple_overlapping_requests() { let mut block_ids: Vec = certificates.keys().copied().collect(); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); println!("New primary added: {:?}", own_address); let network = anemo::Network::bind(own_address) .server_name("narwhal") @@ -640,9 +628,7 @@ async fn test_timeout_while_waiting_for_certificates() { }) .collect(); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); println!("New primary added: {:?}", own_address); let network = anemo::Network::bind(own_address) .server_name("narwhal") @@ -737,9 +723,7 @@ async fn test_reply_with_certificates_already_in_storage() { let (_, rx_certificate_responses) = test_utils::test_channel!(10); let (_, rx_payload_availability_responses) = test_utils::test_channel!(10); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") @@ -845,9 +829,7 @@ async fn test_reply_with_payload_already_in_storage() { let (_, rx_certificate_responses) = test_utils::test_channel!(10); let (_, rx_payload_availability_responses) = test_utils::test_channel!(10); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") @@ -958,9 +940,7 @@ async fn test_reply_with_payload_already_in_storage_for_own_certificates() { let (_, rx_certificate_responses) = test_utils::test_channel!(10); let (_, rx_payload_availability_responses) = test_utils::test_channel!(10); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") diff --git a/narwhal/primary/src/grpc_server/configuration.rs b/narwhal/primary/src/grpc_server/configuration.rs index 21d9fd4848a18..2a98ebb9c09f1 100644 --- a/narwhal/primary/src/grpc_server/configuration.rs +++ b/narwhal/primary/src/grpc_server/configuration.rs @@ -1,6 +1,6 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use config::{PrimaryAddresses, SharedCommittee}; +use config::SharedCommittee; use crypto::PublicKey; use fastcrypto::traits::ToFromBytes; use multiaddr::Multiaddr; @@ -60,35 +60,18 @@ impl Configuration for NarwhalConfiguration { let public_key = self.get_public_key(validator.public_key.as_ref())?; let stake_weight = validator.stake_weight; - let primary_addresses = validator - .primary_addresses + let primary_address: Multiaddr = validator + .primary_address .as_ref() - .ok_or_else(|| Status::invalid_argument("Missing primary addresses"))?; - let primary_to_primary = primary_addresses - .primary_to_primary - .as_ref() - .ok_or_else(|| Status::invalid_argument("Missing primary to primary address"))? - .address - .parse() - .map_err(|err| { - Status::invalid_argument(format!("Could not serialize: {:?}", err)) - })?; - let worker_to_primary = primary_addresses - .primary_to_primary - .as_ref() - .ok_or_else(|| Status::invalid_argument("Missing worker to primary address"))? + .ok_or_else(|| Status::invalid_argument("Missing primary address"))? .address .parse() .map_err(|err| { Status::invalid_argument(format!("Could not serialize: {:?}", err)) })?; - let primary = PrimaryAddresses { - primary_to_primary, - worker_to_primary, - }; parsed_input.push(format!( - "public_key: {:?} stake_weight: {:?} primary addresses: {:?}", - public_key, stake_weight, primary + "public_key: {:?} stake_weight: {:?} primary address: {:?}", + public_key, stake_weight, primary_address )); } Err(Status::internal(format!( @@ -119,12 +102,8 @@ impl Configuration for NarwhalConfiguration { .stake_weight .try_into() .map_err(|_| Status::invalid_argument("Invalid stake weight"))?; - let primary_addresses = validator - .primary_addresses - .as_ref() - .ok_or_else(|| Status::invalid_argument("Missing primary addresses"))?; - let primary_to_primary = primary_addresses - .primary_to_primary + let primary_address = validator + .primary_address .as_ref() .ok_or_else(|| Status::invalid_argument("Missing primary to primary address"))? .address @@ -132,20 +111,7 @@ impl Configuration for NarwhalConfiguration { .map_err(|err| { Status::invalid_argument(format!("Could not serialize: {:?}", err)) })?; - let worker_to_primary = primary_addresses - .primary_to_primary - .as_ref() - .ok_or_else(|| Status::invalid_argument("Missing worker to primary address"))? - .address - .parse() - .map_err(|err| { - Status::invalid_argument(format!("Could not serialize: {:?}", err)) - })?; - let primary = PrimaryAddresses { - primary_to_primary, - worker_to_primary, - }; - new_network_info.insert(public_key, (stake_weight, primary)); + new_network_info.insert(public_key, (stake_weight, primary_address)); } let mut new_committee = (**self.committee.load()).clone(); let res = new_committee.update_primary_network_info(new_network_info); diff --git a/narwhal/primary/src/grpc_server/mod.rs b/narwhal/primary/src/grpc_server/mod.rs index 2cc3be6dcec05..5347bd881e626 100644 --- a/narwhal/primary/src/grpc_server/mod.rs +++ b/narwhal/primary/src/grpc_server/mod.rs @@ -84,8 +84,7 @@ impl ConsensusAPIGrpc, } -impl WorkerReceiverHandler { - async fn wait_for_shutdown(mut rx_reconfigure: watch::Receiver) { - loop { - let result = rx_reconfigure.changed().await; - result.expect("Committee channel dropped"); - let message = rx_reconfigure.borrow().clone(); - if let ReconfigureNotification::Shutdown = message { - break; - } - } - } - - #[must_use] - fn spawn( - self, - address: Multiaddr, - rx_reconfigure: watch::Receiver, - ) -> JoinHandle<()> { - tokio::spawn(async move { - info!("WorkerReceiverHandler has started successfully."); - tokio::select! { - _result = mysten_network::config::Config::default() - .server_builder() - .add_service(WorkerToPrimaryServer::new(self)) - .bind(&address) - .await - .unwrap() - .serve() => (), - - () = Self::wait_for_shutdown(rx_reconfigure) => () - } - }) - } -} - #[async_trait] impl WorkerToPrimary for WorkerReceiverHandler { async fn send_message( &self, - request: Request, - ) -> Result, Status> { - let message: WorkerPrimaryMessage = request - .into_inner() - .deserialize() - .map_err(|e| Status::invalid_argument(e.to_string()))?; + request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + let message = request.into_body(); match message { WorkerPrimaryMessage::OurBatch(digest, worker_id) => { @@ -655,22 +595,16 @@ impl WorkerToPrimary for WorkerReceiverHandler { .await .map_err(|_| DagError::ShuttingDown), } - .map_err(|e| Status::not_found(e.to_string()))?; - - Ok(Response::new(Empty {})) + .map(|_| anemo::Response::new(())) + .map_err(|e| anemo::rpc::Status::internal(e.to_string())) } async fn worker_info( &self, - _request: Request, - ) -> Result, Status> { - let workers = WorkerInfoResponse { + _request: anemo::Request<()>, + ) -> Result, anemo::rpc::Status> { + Ok(anemo::Response::new(WorkerInfoResponse { workers: self.our_workers.clone(), - }; - - let response = BincodeEncodedPayload::try_from(&workers) - .map_err(|e| Status::internal(e.to_string()))?; - - Ok(Response::new(response)) + })) } } diff --git a/narwhal/primary/src/proposer.rs b/narwhal/primary/src/proposer.rs index e8c271fecdeed..4fd92d5220304 100644 --- a/narwhal/primary/src/proposer.rs +++ b/narwhal/primary/src/proposer.rs @@ -218,7 +218,7 @@ impl Proposer { // in partially synchrony. let enough_parents = !self.last_parents.is_empty(); let enough_digests = self.payload_size >= self.header_size; - let timer_expired = timer.is_elapsed(); + let mut timer_expired = timer.is_elapsed(); if (timer_expired || (enough_digests && advance)) && enough_parents { if timer_expired && matches!(self.network_model, NetworkModel::PartiallySynchronous) @@ -249,6 +249,7 @@ impl Proposer { // Reschedule the timer. let deadline = Instant::now() + self.max_header_delay; timer.as_mut().reset(deadline); + timer_expired = false; } tokio::select! { diff --git a/narwhal/primary/src/tests/core_tests.rs b/narwhal/primary/src/tests/core_tests.rs index efc169c704a81..4de47772cb508 100644 --- a/narwhal/primary/src/tests/core_tests.rs +++ b/narwhal/primary/src/tests/core_tests.rs @@ -44,10 +44,7 @@ async fn process_header() { let expected = Vote::new(&header, &name, &mut signature_service).await; // Spawn a listener to receive the vote. - let address = committee - .primary(&header.author) - .unwrap() - .primary_to_primary; + let address = committee.primary(&header.author).unwrap(); let (mut handle, _network) = PrimaryToPrimaryMockServer::spawn(author.network_keypair().copy(), address.clone()); @@ -64,9 +61,7 @@ async fn process_header() { let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(network_key) @@ -166,9 +161,7 @@ async fn process_header_missing_parent() { let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(network_key) @@ -255,9 +248,7 @@ async fn process_header_missing_payload() { let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(network_key) @@ -344,9 +335,7 @@ async fn process_votes() { ); let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(network_key) @@ -355,7 +344,7 @@ async fn process_votes() { for (_pubkey, addresses, network_pubkey) in committee.others_primaries(&name) { let peer_id = PeerId(network_pubkey.0.to_bytes()); - let address = network::multiaddr_to_address(&addresses.primary_to_primary).unwrap(); + let address = network::multiaddr_to_address(&addresses).unwrap(); let peer_info = PeerInfo { peer_id, affinity: anemo::types::PeerAffinity::High, @@ -395,10 +384,7 @@ async fn process_votes() { .authorities() .skip(1) .map(|a| { - let address = committee - .primary(&a.public_key()) - .unwrap() - .primary_to_primary; + let address = committee.primary(&a.public_key()).unwrap(); PrimaryToPrimaryMockServer::spawn(a.network_keypair().copy(), address) }) .collect(); @@ -469,9 +455,7 @@ async fn process_certificates() { let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(network_key) @@ -586,9 +570,7 @@ async fn shutdown_core() { None, ); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(network_key) @@ -659,10 +641,7 @@ async fn reconfigure_core() { let expected = Vote::new(&header, &name, &mut signature_service).await; // Spawn a listener to receive the vote. - let address = new_committee - .primary(&header.author) - .unwrap() - .primary_to_primary; + let address = new_committee.primary(&header.author).unwrap(); let (mut handle, _network) = PrimaryToPrimaryMockServer::spawn(author.network_keypair().copy(), address.clone()); @@ -677,9 +656,7 @@ async fn reconfigure_core() { None, ); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(network_key) diff --git a/narwhal/primary/src/tests/header_waiter_tests.rs b/narwhal/primary/src/tests/header_waiter_tests.rs index 8a7623fb9b688..c2d20bbbc5241 100644 --- a/narwhal/primary/src/tests/header_waiter_tests.rs +++ b/narwhal/primary/src/tests/header_waiter_tests.rs @@ -33,9 +33,7 @@ async fn successfully_synchronize_batches() { let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); let (_tx_consensus_round_updates, rx_consensus_round_updates) = watch::channel(0u64); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") diff --git a/narwhal/primary/src/tests/helper_tests.rs b/narwhal/primary/src/tests/helper_tests.rs index 9b078d447629f..7f24b2069a61c 100644 --- a/narwhal/primary/src/tests/helper_tests.rs +++ b/narwhal/primary/src/tests/helper_tests.rs @@ -37,19 +37,14 @@ async fn test_process_certificates_stream_mode() { watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(author.network_keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let address = network::multiaddr_to_address(&address).unwrap(); let peer_info = PeerInfo { peer_id: PeerId(requestor.network_public_key().0.to_bytes()), @@ -88,10 +83,7 @@ async fn test_process_certificates_stream_mode() { } // AND spin up a mock node - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let requestor_key = requestor.network_keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); @@ -153,19 +145,14 @@ async fn test_process_certificates_batch_mode() { watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(author.network_keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let address = network::multiaddr_to_address(&address).unwrap(); let peer_info = PeerInfo { peer_id: PeerId(requestor.network_public_key().0.to_bytes()), @@ -213,10 +200,7 @@ async fn test_process_certificates_batch_mode() { } // AND spin up a mock node - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let requestor_key = requestor.network_keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); @@ -290,19 +274,14 @@ async fn test_process_payload_availability_success() { watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(author.network_keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let address = network::multiaddr_to_address(&address).unwrap(); let peer_info = PeerInfo { peer_id: PeerId(requestor.network_public_key().0.to_bytes()), @@ -354,10 +333,7 @@ async fn test_process_payload_availability_success() { } // AND spin up a mock node - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let requestor_key = requestor.network_keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); @@ -450,19 +426,14 @@ async fn test_process_payload_availability_when_failures() { watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); - let own_address = - network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) - .unwrap(); + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") .private_key(author.network_keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let address = network::multiaddr_to_address(&address).unwrap(); let peer_info = PeerInfo { peer_id: PeerId(requestor.network_public_key().0.to_bytes()), @@ -520,10 +491,7 @@ async fn test_process_payload_availability_when_failures() { } // AND spin up a mock node - let address = committee - .primary(&requestor_name) - .unwrap() - .primary_to_primary; + let address = committee.primary(&requestor_name).unwrap(); let requestor_key = requestor.network_keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); diff --git a/narwhal/primary/tests/epoch_change.rs b/narwhal/primary/tests/epoch_change.rs index e53bafe455e38..68d3e55fc6f46 100644 --- a/narwhal/primary/tests/epoch_change.rs +++ b/narwhal/primary/tests/epoch_change.rs @@ -4,7 +4,7 @@ use arc_swap::ArcSwap; use config::{Committee, Parameters}; use fastcrypto::traits::KeyPair; use futures::future::join_all; -use network::{CancelOnDropHandler, ReliableNetwork, WorkerToPrimaryNetwork}; +use network::{CancelOnDropHandler, ReliableNetwork2, WorkerToPrimaryNetwork}; use node::NodeStorage; use primary::{NetworkModel, Primary, CHANNEL_CAPACITY}; use prometheus::Registry; @@ -80,6 +80,28 @@ async fn test_simple_epoch_change() { } } + let network = anemo::Network::bind("127.0.0.1:0") + .server_name("narwhal") + .private_key( + crypto::NetworkKeyPair::generate(&mut rand::rngs::OsRng) + .private() + .0 + .to_bytes(), + ) + .start(anemo::Router::new()) + .unwrap(); + + for authority in committee_0.authorities.values() { + let address = network::multiaddr_to_address(&authority.primary_address).unwrap(); + let peer_id = anemo::PeerId(authority.network_key.0.to_bytes()); + + network + .connect_with_peer_id(address, peer_id) + .await + .unwrap(); + } + let mut network = WorkerToPrimaryNetwork::new(network); + // Move to the next epochs. let mut old_committee = committee_0; for epoch in 1..=3 { @@ -90,19 +112,14 @@ async fn test_simple_epoch_change() { }; // Notify the old committee to change epoch. - let addresses: Vec<_> = old_committee - .authorities - .values() - .map(|authority| authority.primary.worker_to_primary.clone()) - .collect(); let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch( new_committee.clone(), )); let mut _do_not_drop: Vec> = Vec::new(); - for address in addresses { + for authority in old_committee.authorities.values() { _do_not_drop.push( - WorkerToPrimaryNetwork::default() - .send(address, &message) + network + .send(authority.network_key.to_owned(), &message) .await, ); } @@ -124,6 +141,7 @@ async fn test_simple_epoch_change() { #[allow(clippy::mutable_key_type)] #[tokio::test] async fn test_partial_committee_change() { + telemetry_subscribers::init_for_testing(); let parameters = Parameters { header_size: 32, // One batch digest ..Parameters::default() @@ -193,6 +211,40 @@ async fn test_partial_committee_change() { let committee_1 = fixture.committee(); let worker_cache_1 = fixture.shared_worker_cache(); + // Tell the nodes of epoch 0 to transition to epoch 1. + let network = anemo::Network::bind("127.0.0.1:0") + .server_name("narwhal") + .private_key( + crypto::NetworkKeyPair::generate(&mut rand::rngs::OsRng) + .private() + .0 + .to_bytes(), + ) + .start(anemo::Router::new()) + .unwrap(); + + for authority in committee_0.authorities.values() { + let address = network::multiaddr_to_address(&authority.primary_address).unwrap(); + let peer_id = anemo::PeerId(authority.network_key.0.to_bytes()); + + network + .connect_with_peer_id(address, peer_id) + .await + .unwrap(); + } + let mut network = WorkerToPrimaryNetwork::new(network); + + let message = + WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch(committee_1.clone())); + let mut _do_not_drop: Vec> = Vec::new(); + for authority in committee_0.authorities.values() { + _do_not_drop.push( + network + .send(authority.network_key.to_owned(), &message) + .await, + ); + } + // Spawn the committee of epoch 1 (only the node not already booted). let mut epoch_1_rx_channels = Vec::new(); let mut epoch_1_tx_channels = Vec::new(); @@ -236,23 +288,6 @@ async fn test_partial_committee_change() { ); } - // Tell the nodes of epoch 0 to transition to epoch 1. - let addresses: Vec<_> = committee_0 - .authorities - .values() - .map(|authority| authority.primary.worker_to_primary.clone()) - .collect(); - let message = - WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch(committee_1.clone())); - let mut _do_not_drop: Vec> = Vec::new(); - for address in addresses { - _do_not_drop.push( - WorkerToPrimaryNetwork::default() - .send(address, &message) - .await, - ); - } - // Run for a while in epoch 1. for rx in epoch_1_rx_channels.iter_mut() { loop { @@ -336,17 +371,17 @@ async fn test_restart_with_new_committee_change() { } // Shutdown the committee of the previous epoch; - let addresses: Vec<_> = committee_0 - .authorities - .values() - .map(|authority| authority.primary.worker_to_primary.clone()) - .collect(); let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::Shutdown); let mut _do_not_drop: Vec> = Vec::new(); - for address in addresses { + for authority in committee_0.authorities.values() { + let mut network = WorkerToPrimaryNetwork::new_for_single_address( + authority.network_key.to_owned(), + network::multiaddr_to_address(&authority.primary_address).unwrap(), + ) + .await; _do_not_drop.push( - WorkerToPrimaryNetwork::default() - .send(address, &message) + network + .send(authority.network_key.to_owned(), &message) .await, ); } @@ -419,17 +454,17 @@ async fn test_restart_with_new_committee_change() { } // Shutdown the committee of the previous epoch; - let addresses: Vec<_> = committee_0 - .authorities - .values() - .map(|authority| authority.primary.worker_to_primary.clone()) - .collect(); let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::Shutdown); let mut _do_not_drop: Vec> = Vec::new(); - for address in addresses { + for authority in committee_0.authorities.values() { + let mut network = WorkerToPrimaryNetwork::new_for_single_address( + authority.network_key.to_owned(), + network::multiaddr_to_address(&authority.primary_address).unwrap(), + ) + .await; _do_not_drop.push( - WorkerToPrimaryNetwork::default() - .send(address, &message) + network + .send(authority.network_key.to_owned(), &message) .await, ); } @@ -513,36 +548,24 @@ async fn test_simple_committee_update() { for _ in 1..=3 { // Update the committee let mut new_committee = old_committee.clone(); - - let mut total_stake = 0; - let threshold = new_committee.validity_threshold(); for (_, authority) in new_committee.authorities.iter_mut() { - if total_stake < threshold { - authority.primary.primary_to_primary = format!( - "/ip4/127.0.0.1/tcp/{}/http", - config::utils::get_available_port() - ) - .parse() - .unwrap(); - - total_stake += authority.stake; - } + authority.stake += 1; } // Notify the old committee about the change in committee information. - let addresses: Vec<_> = old_committee - .authorities - .values() - .map(|authority| authority.primary.worker_to_primary.clone()) - .collect(); let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::UpdateCommittee( new_committee.clone(), )); let mut _do_not_drop: Vec> = Vec::new(); - for address in addresses { + for authority in old_committee.authorities.values() { + let mut network = WorkerToPrimaryNetwork::new_for_single_address( + authority.network_key.to_owned(), + network::multiaddr_to_address(&authority.primary_address).unwrap(), + ) + .await; _do_not_drop.push( - WorkerToPrimaryNetwork::default() - .send(address, &message) + network + .send(authority.network_key.to_owned(), &message) .await, ); } diff --git a/narwhal/primary/tests/integration_tests_configuration_api.rs b/narwhal/primary/tests/integration_tests_configuration_api.rs index ed1af790419a7..1fa88bc34d337 100644 --- a/narwhal/primary/tests/integration_tests_configuration_api.rs +++ b/narwhal/primary/tests/integration_tests_configuration_api.rs @@ -3,8 +3,7 @@ use std::time::Duration; use test_utils::cluster::Cluster; use types::{ - Empty, MultiAddrProto, NewEpochRequest, NewNetworkInfoRequest, PrimaryAddressesProto, - PublicKeyProto, ValidatorData, + Empty, MultiAddrProto, NewEpochRequest, NewNetworkInfoRequest, PublicKeyProto, ValidatorData, }; #[tokio::test] @@ -24,10 +23,7 @@ async fn test_new_epoch() { let public_key = PublicKeyProto::from(name); let stake_weight = 1; - let primary_to_primary = Some(MultiAddrProto { - address: "/ip4/127.0.0.1".to_string(), - }); - let worker_to_primary = Some(MultiAddrProto { + let primary_address = Some(MultiAddrProto { address: "/ip4/127.0.0.1".to_string(), }); @@ -36,10 +32,7 @@ async fn test_new_epoch() { validators: vec![ValidatorData { public_key: Some(public_key), stake_weight, - primary_addresses: Some(PrimaryAddressesProto { - primary_to_primary, - worker_to_primary, - }), + primary_address, }], }); @@ -73,20 +66,14 @@ async fn test_new_network_info() { for public_key in public_keys.iter() { let public_key_proto = PublicKeyProto::from(public_key.clone()); let stake_weight = 1; - let primary_to_primary = Some(MultiAddrProto { - address: "/ip4/127.0.0.1".to_string(), - }); - let worker_to_primary = Some(MultiAddrProto { + let primary_address = Some(MultiAddrProto { address: "/ip4/127.0.0.1".to_string(), }); validators.push(ValidatorData { public_key: Some(public_key_proto), stake_weight, - primary_addresses: Some(PrimaryAddressesProto { - primary_to_primary, - worker_to_primary, - }), + primary_address, }); } @@ -139,7 +126,6 @@ async fn test_get_primary_address() { .load() .primary(&name) .expect("Our public key or worker id is not in the committee") - .primary_to_primary .to_string() ) } diff --git a/narwhal/types/build.rs b/narwhal/types/build.rs index fb336b5c36a12..24e0a66e7f9cd 100644 --- a/narwhal/types/build.rs +++ b/narwhal/types/build.rs @@ -53,6 +53,28 @@ fn build_anemo_services(out_dir: &Path) { .build(), ) .build(); + let worker_to_primary = anemo_build::manual::Service::builder() + .name("WorkerToPrimary") + .package("narwhal") + .method( + anemo_build::manual::Method::builder() + .name("send_message") + .route_name("SendMessage") + .request_type("crate::WorkerPrimaryMessage") + .response_type("()") + .codec_path("anemo::rpc::codec::BincodeCodec") + .build(), + ) + .method( + anemo_build::manual::Method::builder() + .name("worker_info") + .route_name("WorkerInfo") + .request_type("()") + .response_type("crate::WorkerInfoResponse") + .codec_path("anemo::rpc::codec::BincodeCodec") + .build(), + ) + .build(); let worker_to_worker = anemo_build::manual::Service::builder() .name("WorkerToWorker") @@ -70,7 +92,7 @@ fn build_anemo_services(out_dir: &Path) { anemo_build::manual::Builder::new() .out_dir(out_dir) - .compile(&[primary_to_primary, worker_to_worker]); + .compile(&[primary_to_primary, worker_to_primary, worker_to_worker]); } #[rustversion::nightly] diff --git a/narwhal/types/proto/narwhal.proto b/narwhal/types/proto/narwhal.proto index 78928613a9f36..91f3ce19ebd9b 100644 --- a/narwhal/types/proto/narwhal.proto +++ b/narwhal/types/proto/narwhal.proto @@ -25,11 +25,6 @@ message CollectionError { CollectionErrorType error = 2; } -message PrimaryAddresses { - MultiAddr primary_to_primary = 1; - MultiAddr worker_to_primary = 2; -} - message MultiAddr { string address = 1; } @@ -41,7 +36,7 @@ message PublicKey { message ValidatorData { PublicKey public_key = 1; int64 stake_weight = 2; - PrimaryAddresses primary_addresses = 3; + MultiAddr primary_address = 3; } message Collection { @@ -162,14 +157,6 @@ service Configuration { rpc GetPrimaryAddress(Empty) returns (GetPrimaryAddressResponse); } -// The worker-to-primary interface -service WorkerToPrimary { - // Sends a message - rpc SendMessage(BincodeEncodedPayload) returns (Empty) {} - - rpc WorkerInfo(Empty) returns (BincodeEncodedPayload) {} -} - // The primary-to-worker interface service PrimaryToWorker { // Sends a message diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index f9c5685ce84d4..911ac8b652165 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -664,7 +664,7 @@ pub enum PrimaryMessage { } /// Message to reconfigure worker tasks. This message must be sent by a trusted source. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum ReconfigureNotification { /// Indicate the committee has changed. This happens at epoch change. NewEpoch(Committee), @@ -747,7 +747,7 @@ impl fmt::Display for BlockErrorKind { } /// The messages sent by the workers to their primary. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum WorkerPrimaryMessage { /// The worker indicates it sealed a new batch. OurBatch(BatchDigest, WorkerId), diff --git a/narwhal/types/src/proto.rs b/narwhal/types/src/proto.rs index 0ee2e675f96d7..a28448f4e4770 100644 --- a/narwhal/types/src/proto.rs +++ b/narwhal/types/src/proto.rs @@ -5,6 +5,7 @@ mod narwhal { tonic::include_proto!("narwhal"); include!(concat!(env!("OUT_DIR"), "/narwhal.PrimaryToPrimary.rs")); + include!(concat!(env!("OUT_DIR"), "/narwhal.WorkerToPrimary.rs")); include!(concat!(env!("OUT_DIR"), "/narwhal.WorkerToWorker.rs")); } @@ -37,9 +38,8 @@ pub use narwhal::{ CollectionError, CollectionRetrievalResult, Empty, GetCollectionsRequest, GetCollectionsResponse, GetPrimaryAddressResponse, MultiAddr as MultiAddrProto, NewEpochRequest, NewNetworkInfoRequest, NodeReadCausalRequest, NodeReadCausalResponse, - PrimaryAddresses as PrimaryAddressesProto, PublicKey as PublicKeyProto, ReadCausalRequest, - ReadCausalResponse, RemoveCollectionsRequest, RoundsRequest, RoundsResponse, - Transaction as TransactionProto, ValidatorData, + PublicKey as PublicKeyProto, ReadCausalRequest, ReadCausalResponse, RemoveCollectionsRequest, + RoundsRequest, RoundsResponse, Transaction as TransactionProto, ValidatorData, }; impl From for PublicKeyProto { diff --git a/narwhal/worker/src/primary_connector.rs b/narwhal/worker/src/primary_connector.rs index 38da2b7a0a7fb..b22bcf18abccb 100644 --- a/narwhal/worker/src/primary_connector.rs +++ b/narwhal/worker/src/primary_connector.rs @@ -1,10 +1,10 @@ // Copyright (c) 2021, Facebook, Inc. and its affiliates // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use config::Committee; -use crypto::PublicKey; + +use crypto::NetworkPublicKey; use futures::{stream::FuturesUnordered, StreamExt}; -use network::{ReliableNetwork, WorkerToPrimaryNetwork}; +use network::{ReliableNetwork2, WorkerToPrimaryNetwork}; use tokio::{sync::watch, task::JoinHandle}; use types::{metered_channel::Receiver, ReconfigureNotification, WorkerPrimaryMessage}; @@ -14,9 +14,7 @@ pub const MAX_PENDING_DIGESTS: usize = 10_000; // Send batches' digests to the primary. pub struct PrimaryConnector { /// The public key of this authority. - name: PublicKey, - /// The committee information. - committee: Committee, + primary_name: NetworkPublicKey, /// Receive reconfiguration updates. rx_reconfigure: watch::Receiver, /// Input channel to receive the messages to send to the primary. @@ -28,18 +26,17 @@ pub struct PrimaryConnector { impl PrimaryConnector { #[must_use] pub fn spawn( - name: PublicKey, - committee: Committee, + primary_name: NetworkPublicKey, rx_reconfigure: watch::Receiver, rx_digest: Receiver, + primary_client: WorkerToPrimaryNetwork, ) -> JoinHandle<()> { tokio::spawn(async move { Self { - name, - committee, + primary_name, rx_reconfigure, rx_digest, - primary_client: WorkerToPrimaryNetwork::default(), + primary_client, } .run() .await; @@ -57,29 +54,18 @@ impl PrimaryConnector { continue; } - let address = self.committee - .primary(&self.name) - .expect("Our public key is not in the committee") - .worker_to_primary; - let handle = self.primary_client.send(address, &digest).await; + let handle = self.primary_client.send(self.primary_name.to_owned(), &digest).await; futures.push(handle); }, // Trigger reconfigure. result = self.rx_reconfigure.changed() => { result.expect("Committee channel dropped"); - let message = self.rx_reconfigure.borrow().clone(); - match message { - ReconfigureNotification::NewEpoch(new_committee) => { - self.committee = new_committee; - }, - ReconfigureNotification::UpdateCommittee(new_committee) => { - self.committee = new_committee; - - }, - ReconfigureNotification::Shutdown => return + // TODO: Move logic to handle epoch & committee changes to wherever anemo + // network is managed after worker-to-worker interface is migrated. + if self.rx_reconfigure.borrow().clone() == ReconfigureNotification::Shutdown { + return } - tracing::debug!("Committee updated to {}", self.committee); } Some(_result) = futures.next() => () diff --git a/narwhal/worker/src/tests/worker_tests.rs b/narwhal/worker/src/tests/worker_tests.rs index 5ece0e20b7f46..186f1e58605ab 100644 --- a/narwhal/worker/src/tests/worker_tests.rs +++ b/narwhal/worker/src/tests/worker_tests.rs @@ -57,10 +57,10 @@ async fn handle_clients_transactions() { let batch = batch(); let batch_digest = batch.digest(); - let primary_address = committee.primary(&name).unwrap().worker_to_primary; - let expected = - bincode::serialize(&WorkerPrimaryMessage::OurBatch(batch_digest, worker_id)).unwrap(); - let mut handle = WorkerToPrimaryMockServer::spawn(primary_address); + let primary_address = committee.primary(&name).unwrap(); + let expected = WorkerPrimaryMessage::OurBatch(batch_digest, worker_id); + let (mut handle, _network) = + WorkerToPrimaryMockServer::spawn(my_primary.network_keypair().copy(), primary_address); // Spawn enough workers' listeners to acknowledge our batches. let mut other_workers = Vec::new(); @@ -91,5 +91,5 @@ async fn handle_clients_transactions() { } // Ensure the primary received the batch's digest (ie. it did not panic). - assert_eq!(handle.recv().await.unwrap().payload, expected); + assert_eq!(handle.recv().await.unwrap(), expected); } diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index d639f0476c8a3..617377bec8d46 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -41,8 +41,6 @@ pub struct Worker { /// The public key of this authority. primary_name: PublicKey, // The private-public key pair of this worker. - // TODO: utilize keypair in network communication - #[allow(dead_code)] keypair: NetworkKeyPair, /// The id of this worker used for index-based lookup by other NW nodes. id: WorkerId, @@ -56,8 +54,6 @@ pub struct Worker { store: Store, } -const INADDR_ANY: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0); - impl Worker { pub fn spawn( primary_name: PublicKey, @@ -89,7 +85,7 @@ impl Worker { let initial_committee = (*(*(*committee).load()).clone()).clone(); let (tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewEpoch(initial_committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(initial_committee)); let (tx_worker_helper, rx_worker_helper) = channel(CHANNEL_CAPACITY, &channel_metrics.tx_worker_helper); @@ -123,7 +119,7 @@ impl Worker { info!("Worker {} listening to worker messages on {}", id, address); - // add other workers we want to talk with to the known peers set + // Add other workers we want to talk with to the known peers set. for (_primary_pubkey, worker_info) in worker .worker_cache .load() @@ -139,6 +135,30 @@ impl Worker { network.known_peers().insert(peer_info); } + // Connect worker to its corresponding primary. + let primary_address = network::multiaddr_to_address( + &committee + .load() + .primary(&primary_name) + .expect("Our primary is not in the committee"), + ) + .unwrap(); + let primary_network_key = committee + .load() + .network_key(&primary_name) + .expect("Our primary is not in the committee"); + network.known_peers().insert(PeerInfo { + peer_id: anemo::PeerId(primary_network_key.0.to_bytes()), + affinity: anemo::types::PeerAffinity::High, + address: vec![primary_address], + }); + let handle = PrimaryConnector::spawn( + primary_network_key, + rx_reconfigure, + rx_primary, + network::WorkerToPrimaryNetwork::new(network.clone()), + ); + let client_flow_handles = worker.handle_clients_transactions( &tx_reconfigure, tx_primary.clone(), @@ -162,10 +182,6 @@ impl Worker { network, ); - // The `PrimaryConnector` allows the worker to send messages to its primary. - let handle = - PrimaryConnector::spawn(primary_name, initial_committee, rx_reconfigure, rx_primary); - // NOTE: This log entry is used to compute performance. info!( "Worker {} successfully booted on {}", @@ -205,7 +221,7 @@ impl Worker { .expect("Our public key or worker id is not in the worker cache") .primary_to_worker; let address = address - .replace(0, |_protocol| Some(Protocol::Ip4(INADDR_ANY))) + .replace(0, |_protocol| Some(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))) .unwrap(); let primary_handle = PrimaryReceiverHandler { tx_synchronizer } .spawn(address.clone(), tx_reconfigure.subscribe()); @@ -261,7 +277,7 @@ impl Worker { .expect("Our public key or worker id is not in the worker cache") .transactions; let address = address - .replace(0, |_protocol| Some(Protocol::Ip4(INADDR_ANY))) + .replace(0, |_protocol| Some(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))) .unwrap(); let tx_receiver_handle = TxReceiverHandler { tx_batch_maker }.spawn( address.clone(),