From 3ebcaa1f4a5c67cb73a2d00e3144d4da9ca74713 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Mon, 24 Oct 2022 17:30:48 +0200 Subject: [PATCH] replaced Client struct with async_trait (#7913) The concrete implementation wrapping ClientActor and ViewClientActor has been moved to near_client crate. Network(View)ClientMessage will be moved to near_client crate in a separate PR. --- Cargo.lock | 25 +- Cargo.toml | 1 + chain/client/Cargo.toml | 1 + chain/client/src/adapter.rs | 347 +++++++++++++ chain/client/src/lib.rs | 1 + chain/network/Cargo.toml | 1 + chain/network/src/client.rs | 456 ++++-------------- chain/network/src/peer/peer_actor.rs | 30 +- chain/network/src/peer/testonly.rs | 5 +- .../network/src/peer_manager/network_state.rs | 4 +- .../src/peer_manager/peer_manager_actor.rs | 2 +- chain/network/src/peer_manager/testonly.rs | 12 +- chain/network/src/test_utils.rs | 61 --- chain/network/src/testonly/fake_client.rs | 185 ++++--- .../src/tests/network/peer_handshake.rs | 17 +- integration-tests/src/tests/network/runner.rs | 2 +- .../src/tests/network/stress_network.rs | 17 +- nearcore/src/lib.rs | 5 +- tools/chainsync-loadtest/Cargo.toml | 1 + tools/chainsync-loadtest/src/main.rs | 12 +- tools/chainsync-loadtest/src/network.rs | 196 ++++---- 21 files changed, 716 insertions(+), 665 deletions(-) create mode 100644 chain/client/src/adapter.rs diff --git a/Cargo.lock b/Cargo.lock index 88db047e438..934aa709e85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,9 +383,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.53" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" dependencies = [ "proc-macro2", "quote", @@ -878,6 +878,7 @@ version = "0.0.0" dependencies = [ "actix", "anyhow", + "async-trait", "clap 3.1.18", "dirs", "futures", @@ -2928,6 +2929,7 @@ dependencies = [ "actix-rt", "ansi_term", "assert_matches", + "async-trait", "borsh", "chrono", "delay-detector", @@ -3222,6 +3224,7 @@ dependencies = [ "anyhow", "arc-swap", "assert_matches", + "async-trait", "borsh", "bytes", "bytesize", @@ -4288,11 +4291,11 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.38" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa" +checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -5483,13 +5486,13 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.94" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a" +checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -6042,6 +6045,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +[[package]] +name = "unicode-ident" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" + [[package]] name = "unicode-normalization" version = "0.1.19" diff --git a/Cargo.toml b/Cargo.toml index 60a4ac317f8..12983b5c60a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ arc-swap = "1.5" arrayref = "0.3" assert_matches = "1.5.0" async-recursion = "0.3.2" +async-trait = "0.1.58" atty = "0.2" awc = { version = "3", features = ["openssl"] } backtrace = "0.3.64" diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index 82abfb0d52d..2d991d553b2 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -11,6 +11,7 @@ edition.workspace = true actix-rt.workspace = true actix.workspace = true ansi_term.workspace = true +async-trait.workspace = true borsh.workspace = true chrono.workspace = true futures.workspace = true diff --git a/chain/client/src/adapter.rs b/chain/client/src/adapter.rs new file mode 100644 index 00000000000..de2c1f818a4 --- /dev/null +++ b/chain/client/src/adapter.rs @@ -0,0 +1,347 @@ +use crate::client_actor::ClientActor; +use crate::view_client::ViewClientActor; +use near_network::time; +use near_network::types::{ + NetworkClientMessages, NetworkClientResponses, NetworkInfo, NetworkViewClientMessages, + NetworkViewClientResponses, PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg, + PartialEncodedChunkResponseMsg, ReasonForBan, StateResponseInfo, +}; +use near_o11y::WithSpanContextExt; +use near_primitives::block::{Approval, Block, BlockHeader}; +use near_primitives::challenge::Challenge; +use near_primitives::hash::CryptoHash; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::sharding::PartialEncodedChunk; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountId, EpochId, ShardId}; +use near_primitives::views::FinalExecutionOutcomeView; + +pub struct Adapter { + /// Address of the client actor. + client_addr: actix::Addr, + /// Address of the view client actor. + view_client_addr: actix::Addr, +} + +impl Adapter { + pub fn new( + client_addr: actix::Addr, + view_client_addr: actix::Addr, + ) -> Self { + Self { client_addr, view_client_addr } + } +} + +#[async_trait::async_trait] +impl near_network::client::Client for Adapter { + async fn tx_status_request( + &self, + account_id: AccountId, + tx_hash: CryptoHash, + ) -> Option> { + match self + .view_client_addr + .send( + NetworkViewClientMessages::TxStatus { + tx_hash: tx_hash, + signer_account_id: account_id, + } + .with_span_context(), + ) + .await + { + Ok(NetworkViewClientResponses::TxStatus(tx_result)) => Some(tx_result), + Ok(NetworkViewClientResponses::NoResponse) => None, + Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), + Err(err) => { + tracing::error!("mailbox error: {err}"); + None + } + } + } + + async fn tx_status_response(&self, tx_result: FinalExecutionOutcomeView) { + match self + .view_client_addr + .send( + NetworkViewClientMessages::TxStatusResponse(Box::new(tx_result.clone())) + .with_span_context(), + ) + .await + { + Ok(NetworkViewClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn state_request_header( + &self, + shard_id: ShardId, + sync_hash: CryptoHash, + ) -> Result, ReasonForBan> { + match self + .view_client_addr + .send( + NetworkViewClientMessages::StateRequestHeader { + shard_id: shard_id, + sync_hash: sync_hash, + } + .with_span_context(), + ) + .await + { + Ok(NetworkViewClientResponses::StateResponse(resp)) => Ok(Some(*resp)), + Ok(NetworkViewClientResponses::NoResponse) => Ok(None), + Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), + Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), + Err(err) => { + tracing::error!("mailbox error: {err}"); + Ok(None) + } + } + } + + async fn state_request_part( + &self, + shard_id: ShardId, + sync_hash: CryptoHash, + part_id: u64, + ) -> Result, ReasonForBan> { + match self + .view_client_addr + .send( + NetworkViewClientMessages::StateRequestPart { + shard_id: shard_id, + sync_hash: sync_hash, + part_id: part_id, + } + .with_span_context(), + ) + .await + { + Ok(NetworkViewClientResponses::StateResponse(resp)) => Ok(Some(*resp)), + Ok(NetworkViewClientResponses::NoResponse) => Ok(None), + Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), + Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), + Err(err) => { + tracing::error!("mailbox error: {err}"); + Ok(None) + } + } + } + + async fn state_response(&self, info: StateResponseInfo) { + match self + .client_addr + .send(NetworkClientMessages::StateResponse(info).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn block_approval(&self, approval: Approval, peer_id: PeerId) { + match self + .client_addr + .send(NetworkClientMessages::BlockApproval(approval, peer_id).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn transaction(&self, transaction: SignedTransaction, is_forwarded: bool) { + match self + .client_addr + .send( + NetworkClientMessages::Transaction { transaction, is_forwarded, check_only: false } + .with_span_context(), + ) + .await + { + // Almost all variants of NetworkClientResponse are used only in response + // to NetworkClientMessages::Transaction (except for Ban). It will be clearer + // once NetworkClientMessage is split into separate requests. + Ok(resp @ NetworkClientResponses::Ban { .. }) => { + panic!("unexpected ClientResponse: {resp:?}") + } + Ok(_) => {} + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn partial_encoded_chunk_request( + &self, + req: PartialEncodedChunkRequestMsg, + msg_hash: CryptoHash, + ) { + match self + .client_addr + .send( + NetworkClientMessages::PartialEncodedChunkRequest(req, msg_hash) + .with_span_context(), + ) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn partial_encoded_chunk_response( + &self, + resp: PartialEncodedChunkResponseMsg, + timestamp: time::Instant, + ) { + match self + .client_addr + .send( + NetworkClientMessages::PartialEncodedChunkResponse(resp, timestamp.into()) + .with_span_context(), + ) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn partial_encoded_chunk(&self, chunk: PartialEncodedChunk) { + match self + .client_addr + .send(NetworkClientMessages::PartialEncodedChunk(chunk).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn partial_encoded_chunk_forward(&self, msg: PartialEncodedChunkForwardMsg) { + match self + .client_addr + .send(NetworkClientMessages::PartialEncodedChunkForward(msg).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn block_request(&self, hash: CryptoHash) -> Option> { + match self + .view_client_addr + .send(NetworkViewClientMessages::BlockRequest(hash).with_span_context()) + .await + { + Ok(NetworkViewClientResponses::Block(block)) => Some(block), + Ok(NetworkViewClientResponses::NoResponse) => None, + Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), + Err(err) => { + tracing::error!("mailbox error: {err}"); + None + } + } + } + + async fn block_headers_request(&self, hashes: Vec) -> Option> { + match self + .view_client_addr + .send(NetworkViewClientMessages::BlockHeadersRequest(hashes).with_span_context()) + .await + { + Ok(NetworkViewClientResponses::BlockHeaders(block_headers)) => Some(block_headers), + Ok(NetworkViewClientResponses::NoResponse) => None, + Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), + Err(err) => { + tracing::error!("mailbox error: {err}"); + None + } + } + } + + async fn block(&self, block: Block, peer_id: PeerId, was_requested: bool) { + match self + .client_addr + .send(NetworkClientMessages::Block(block, peer_id, was_requested).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn block_headers( + &self, + headers: Vec, + peer_id: PeerId, + ) -> Result<(), ReasonForBan> { + match self + .client_addr + .send(NetworkClientMessages::BlockHeaders(headers, peer_id).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => Ok(()), + Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => { + tracing::error!("mailbox error: {err}"); + Ok(()) + } + } + } + + async fn challenge(&self, challenge: Challenge) { + match self + .client_addr + .send(NetworkClientMessages::Challenge(challenge).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn network_info(&self, info: NetworkInfo) { + match self + .client_addr + .send(NetworkClientMessages::NetworkInfo(info).with_span_context()) + .await + { + Ok(NetworkClientResponses::NoResponse) => {} + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => tracing::error!("mailbox error: {err}"), + } + } + + async fn announce_account( + &self, + accounts: Vec<(AnnounceAccount, Option)>, + ) -> Result, ReasonForBan> { + match self + .view_client_addr + .send(NetworkViewClientMessages::AnnounceAccount(accounts).with_span_context()) + .await + { + Ok(NetworkViewClientResponses::AnnounceAccount(accounts)) => Ok(accounts), + Ok(NetworkViewClientResponses::NoResponse) => Ok(vec![]), + Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), + Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), + Err(err) => { + tracing::error!("mailbox error: {err}"); + Ok(vec![]) + } + } + } +} diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index 60b783fe074..02bff5d5fa6 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -13,6 +13,7 @@ pub use crate::client::Client; pub use crate::client_actor::{start_client, ClientActor}; pub use crate::view_client::{start_view_client, ViewClientActor}; +pub mod adapter; pub mod adversarial; mod client; mod client_actor; diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index ac55091664b..ff62defa2b5 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -12,6 +12,7 @@ anyhow.workspace = true protobuf-codegen.workspace = true [dependencies] +async-trait.workspace = true actix.workspace = true anyhow.workspace = true arc-swap.workspace = true diff --git a/chain/network/src/client.rs b/chain/network/src/client.rs index 4361b6ac7e3..78783bf617a 100644 --- a/chain/network/src/client.rs +++ b/chain/network/src/client.rs @@ -2,11 +2,7 @@ use crate::network_protocol::{ PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg, StateResponseInfo, }; -use crate::types::{ - NetworkClientMessages, NetworkClientResponses, NetworkInfo, NetworkViewClientMessages, - NetworkViewClientResponses, ReasonForBan, -}; -use near_o11y::{WithSpanContext, WithSpanContextExt}; +use crate::types::{NetworkInfo, ReasonForBan}; use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; use near_primitives::hash::CryptoHash; @@ -19,400 +15,154 @@ use near_primitives::views::FinalExecutionOutcomeView; /// A strongly typed asynchronous API for the Client logic. /// It abstracts away the fact that client is implemented using actix /// actors. -/// TODO(gprusak): eventually we might want to replace this concrete -/// implementation with an (async) trait, and move the -/// concrete implementation to the near_client crate. This way we will -/// be able to remove actix from the near_network crate entirely. -pub struct Client { - /// Address of the client actor. - client_addr: actix::Recipient>, - /// Address of the view client actor. - view_client_addr: actix::Recipient>, -} - -impl Client { - pub fn new( - client_addr: actix::Recipient>, - view_client_addr: actix::Recipient>, - ) -> Self { - Self { client_addr, view_client_addr } - } - - pub async fn tx_status_request( +#[async_trait::async_trait] +pub trait Client: Send + Sync + 'static { + async fn tx_status_request( &self, account_id: AccountId, tx_hash: CryptoHash, - ) -> Result, ReasonForBan> { - match self - .view_client_addr - .send( - NetworkViewClientMessages::TxStatus { - tx_hash: tx_hash, - signer_account_id: account_id, - } - .with_span_context(), - ) - .await - { - Ok(NetworkViewClientResponses::TxStatus(tx_result)) => Ok(Some(*tx_result)), - Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(None) - } - } - } + ) -> Option>; - pub async fn tx_status_response( - &self, - tx_result: FinalExecutionOutcomeView, - ) -> Result<(), ReasonForBan> { - match self - .view_client_addr - .send( - NetworkViewClientMessages::TxStatusResponse(Box::new(tx_result.clone())) - .with_span_context(), - ) - .await - { - Ok(NetworkViewClientResponses::NoResponse) => Ok(()), - Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } - } + async fn tx_status_response(&self, tx_result: FinalExecutionOutcomeView); - pub async fn state_request_header( + async fn state_request_header( &self, shard_id: ShardId, sync_hash: CryptoHash, - ) -> Result, ReasonForBan> { - match self - .view_client_addr - .send( - NetworkViewClientMessages::StateRequestHeader { - shard_id: shard_id, - sync_hash: sync_hash, - } - .with_span_context(), - ) - .await - { - Ok(NetworkViewClientResponses::StateResponse(resp)) => Ok(Some(*resp)), - Ok(NetworkViewClientResponses::NoResponse) => Ok(None), - Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(None) - } - } - } + ) -> Result, ReasonForBan>; - pub async fn state_request_part( + async fn state_request_part( &self, shard_id: ShardId, sync_hash: CryptoHash, part_id: u64, - ) -> Result, ReasonForBan> { - match self - .view_client_addr - .send( - NetworkViewClientMessages::StateRequestPart { - shard_id: shard_id, - sync_hash: sync_hash, - part_id: part_id, - } - .with_span_context(), - ) - .await - { - Ok(NetworkViewClientResponses::StateResponse(resp)) => Ok(Some(*resp)), - Ok(NetworkViewClientResponses::NoResponse) => Ok(None), - Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(None) - } - } - } + ) -> Result, ReasonForBan>; - pub async fn state_response(&self, info: StateResponseInfo) -> Result<(), ReasonForBan> { - match self - .client_addr - .send(NetworkClientMessages::StateResponse(info).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } - } + async fn state_response(&self, info: StateResponseInfo); - pub async fn block_approval( - &self, - approval: Approval, - peer_id: PeerId, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send(NetworkClientMessages::BlockApproval(approval, peer_id).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } - } + async fn block_approval(&self, approval: Approval, peer_id: PeerId); - pub async fn transaction( - &self, - transaction: SignedTransaction, - is_forwarded: bool, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send( - NetworkClientMessages::Transaction { transaction, is_forwarded, check_only: false } - .with_span_context(), - ) - .await - { - Ok(NetworkClientResponses::ValidTx) => Ok(()), - Ok(NetworkClientResponses::InvalidTx(err)) => { - tracing::warn!(target: "network", ?err, "Received invalid tx"); - // TODO: count as malicious behavior? - Ok(()) - } - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } - } + async fn transaction(&self, transaction: SignedTransaction, is_forwarded: bool); - pub async fn partial_encoded_chunk_request( + async fn partial_encoded_chunk_request( &self, req: PartialEncodedChunkRequestMsg, msg_hash: CryptoHash, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send( - NetworkClientMessages::PartialEncodedChunkRequest(req, msg_hash) - .with_span_context(), - ) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } - } + ); - pub async fn partial_encoded_chunk_response( + async fn partial_encoded_chunk_response( &self, resp: PartialEncodedChunkResponseMsg, timestamp: time::Instant, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send( - NetworkClientMessages::PartialEncodedChunkResponse(resp, timestamp.into()) - .with_span_context(), - ) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } - } + ); + + async fn partial_encoded_chunk(&self, chunk: PartialEncodedChunk); + + async fn partial_encoded_chunk_forward(&self, msg: PartialEncodedChunkForwardMsg); + + async fn block_request(&self, hash: CryptoHash) -> Option>; - pub async fn partial_encoded_chunk( + async fn block_headers_request(&self, hashes: Vec) -> Option>; + + async fn block(&self, block: Block, peer_id: PeerId, was_requested: bool); + + async fn block_headers( &self, - chunk: PartialEncodedChunk, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send(NetworkClientMessages::PartialEncodedChunk(chunk).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } - } + headers: Vec, + peer_id: PeerId, + ) -> Result<(), ReasonForBan>; + + async fn challenge(&self, challenge: Challenge); + + async fn network_info(&self, info: NetworkInfo); - pub async fn partial_encoded_chunk_forward( + async fn announce_account( &self, - msg: PartialEncodedChunkForwardMsg, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send(NetworkClientMessages::PartialEncodedChunkForward(msg).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } + accounts: Vec<(AnnounceAccount, Option)>, + ) -> Result, ReasonForBan>; +} + +/// Implementation of Client which doesn't do anything and never returns errors. +pub struct Noop; + +#[async_trait::async_trait] +impl Client for Noop { + async fn tx_status_request( + &self, + _account_id: AccountId, + _tx_hash: CryptoHash, + ) -> Option> { + None } - pub async fn block_request(&self, hash: CryptoHash) -> Result, ReasonForBan> { - match self - .view_client_addr - .send(NetworkViewClientMessages::BlockRequest(hash).with_span_context()) - .await - { - Ok(NetworkViewClientResponses::Block(block)) => Ok(Some(*block)), - Ok(NetworkViewClientResponses::NoResponse) => Ok(None), - Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(None) - } - } + async fn tx_status_response(&self, _tx_result: FinalExecutionOutcomeView) {} + + async fn state_request_header( + &self, + _shard_id: ShardId, + _sync_hash: CryptoHash, + ) -> Result, ReasonForBan> { + Ok(None) } - pub async fn block_headers_request( + async fn state_request_part( &self, - hashes: Vec, - ) -> Result>, ReasonForBan> { - match self - .view_client_addr - .send(NetworkViewClientMessages::BlockHeadersRequest(hashes).with_span_context()) - .await - { - Ok(NetworkViewClientResponses::BlockHeaders(block_headers)) => Ok(Some(block_headers)), - Ok(NetworkViewClientResponses::NoResponse) => Ok(None), - Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ViewClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(None) - } - } + _shard_id: ShardId, + _sync_hash: CryptoHash, + _part_id: u64, + ) -> Result, ReasonForBan> { + Ok(None) } - pub async fn block( + async fn state_response(&self, _info: StateResponseInfo) {} + async fn block_approval(&self, _approval: Approval, _peer_id: PeerId) {} + + async fn transaction(&self, _transaction: SignedTransaction, _is_forwarded: bool) {} + + async fn partial_encoded_chunk_request( &self, - block: Block, - peer_id: PeerId, - was_requested: bool, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send(NetworkClientMessages::Block(block, peer_id, was_requested).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } + _req: PartialEncodedChunkRequestMsg, + _msg_hash: CryptoHash, + ) { } - pub async fn block_headers( + async fn partial_encoded_chunk_response( &self, - headers: Vec, - peer_id: PeerId, - ) -> Result<(), ReasonForBan> { - match self - .client_addr - .send(NetworkClientMessages::BlockHeaders(headers, peer_id).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } + _resp: PartialEncodedChunkResponseMsg, + _timestamp: time::Instant, + ) { } - pub async fn challenge(&self, challenge: Challenge) -> Result<(), ReasonForBan> { - match self - .client_addr - .send(NetworkClientMessages::Challenge(challenge).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => Ok(()), - Ok(NetworkClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(()) - } - } + async fn partial_encoded_chunk(&self, _chunk: PartialEncodedChunk) {} + + async fn partial_encoded_chunk_forward(&self, _msg: PartialEncodedChunkForwardMsg) {} + + async fn block_request(&self, _hash: CryptoHash) -> Option> { + None } - pub async fn network_info(&self, info: NetworkInfo) { - match self - .client_addr - .send(NetworkClientMessages::NetworkInfo(info).with_span_context()) - .await - { - Ok(NetworkClientResponses::NoResponse) => {} - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => tracing::error!("mailbox error: {err}"), - } + async fn block_headers_request(&self, _hashes: Vec) -> Option> { + None } - pub async fn announce_account( + async fn block(&self, _block: Block, _peer_id: PeerId, _was_requested: bool) {} + + async fn block_headers( &self, - accounts: Vec<(AnnounceAccount, Option)>, + _headers: Vec, + _peer_id: PeerId, + ) -> Result<(), ReasonForBan> { + Ok(()) + } + + async fn challenge(&self, _challenge: Challenge) {} + + async fn network_info(&self, _info: NetworkInfo) {} + + async fn announce_account( + &self, + _accounts: Vec<(AnnounceAccount, Option)>, ) -> Result, ReasonForBan> { - match self - .view_client_addr - .send(NetworkViewClientMessages::AnnounceAccount(accounts).with_span_context()) - .await - { - Ok(NetworkViewClientResponses::AnnounceAccount(accounts)) => Ok(accounts), - Ok(NetworkViewClientResponses::NoResponse) => Ok(vec![]), - Ok(NetworkViewClientResponses::Ban { ban_reason }) => Err(ban_reason), - Ok(resp) => panic!("unexpected ClientResponse: {resp:?}"), - Err(err) => { - tracing::error!("mailbox error: {err}"); - Ok(vec![]) - } - } + Ok(vec![]) } } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index b97e1c753af..bfd066d6c2e 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -704,10 +704,10 @@ impl PeerActor { RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state .client .tx_status_request(account_id, tx_hash) - .await? - .map(RoutedMessageBody::TxStatusResponse), + .await + .map(|v| RoutedMessageBody::TxStatusResponse(*v)), RoutedMessageBody::TxStatusResponse(tx_result) => { - network_state.client.tx_status_response(tx_result).await?; + network_state.client.tx_status_response(tx_result).await; None } RoutedMessageBody::StateRequestHeader(shard_id, sync_hash) => network_state @@ -721,31 +721,31 @@ impl PeerActor { .await? .map(RoutedMessageBody::VersionedStateResponse), RoutedMessageBody::VersionedStateResponse(info) => { - network_state.client.state_response(info).await?; + network_state.client.state_response(info).await; None } RoutedMessageBody::BlockApproval(approval) => { - network_state.client.block_approval(approval, peer_id).await?; + network_state.client.block_approval(approval, peer_id).await; None } RoutedMessageBody::ForwardTx(transaction) => { - network_state.client.transaction(transaction, /*is_forwarded=*/ true).await?; + network_state.client.transaction(transaction, /*is_forwarded=*/ true).await; None } RoutedMessageBody::PartialEncodedChunkRequest(request) => { - network_state.client.partial_encoded_chunk_request(request, msg_hash).await?; + network_state.client.partial_encoded_chunk_request(request, msg_hash).await; None } RoutedMessageBody::PartialEncodedChunkResponse(response) => { - network_state.client.partial_encoded_chunk_response(response, clock.now()).await?; + network_state.client.partial_encoded_chunk_response(response, clock.now()).await; None } RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => { - network_state.client.partial_encoded_chunk(chunk).await?; + network_state.client.partial_encoded_chunk(chunk).await; None } RoutedMessageBody::PartialEncodedChunkForward(msg) => { - network_state.client.partial_encoded_chunk_forward(msg).await?; + network_state.client.partial_encoded_chunk_forward(msg).await; None } RoutedMessageBody::ReceiptOutcomeRequest(_) => { @@ -802,17 +802,17 @@ impl PeerActor { ) } PeerMessage::BlockRequest(hash) => { - network_state.client.block_request(hash).await?.map(PeerMessage::Block) + network_state.client.block_request(hash).await.map(|b|PeerMessage::Block(*b)) } PeerMessage::BlockHeadersRequest(hashes) => { - network_state.client.block_headers_request(hashes).await?.map(PeerMessage::BlockHeaders) + network_state.client.block_headers_request(hashes).await.map(PeerMessage::BlockHeaders) } PeerMessage::Block(block) => { - network_state.client.block(block, peer_id, was_requested).await?; + network_state.client.block(block, peer_id, was_requested).await; None } PeerMessage::Transaction(transaction) => { - network_state.client.transaction(transaction, /*is_forwarded=*/ false).await?; + network_state.client.transaction(transaction, /*is_forwarded=*/ false).await; None } PeerMessage::BlockHeaders(headers) => { @@ -820,7 +820,7 @@ impl PeerActor { None } PeerMessage::Challenge(challenge) => { - network_state.client.challenge(challenge).await?; + network_state.client.challenge(challenge).await; None } msg => { diff --git a/chain/network/src/peer/testonly.rs b/chain/network/src/peer/testonly.rs index 17e76409d41..9d0cd8a0dfc 100644 --- a/chain/network/src/peer/testonly.rs +++ b/chain/network/src/peer/testonly.rs @@ -1,5 +1,4 @@ use crate::broadcast; -use crate::client; use crate::config::NetworkConfig; use crate::network_protocol::testonly as data; use crate::network_protocol::{ @@ -161,7 +160,7 @@ impl PeerHandle { let (send, recv) = broadcast::unbounded_channel(); let actix = ActixSystem::spawn(move || { let fpm = FakePeerManagerActor { cfg: cfg.clone() }.start(); - let fc = fake_client::start(send.sink().compose(Event::Client)); + let fc = Arc::new(fake_client::Fake { event_sink: send.sink().compose(Event::Client) }); let store = store::Store::from(near_store::db::TestDB::new()); let routing_table_view = RoutingTableView::new(store.clone(), cfg.id()); // WARNING: this is a hack to make PeerActor use a specific nonce @@ -185,7 +184,7 @@ impl PeerHandle { let network_state = Arc::new(NetworkState::new( Arc::new(network_cfg.verify().unwrap()), cfg.chain.genesis_id.clone(), - client::Client::new(fc.clone().recipient(), fc.clone().recipient()), + fc, fpm.recipient(), routing_table_addr, routing_table_view, diff --git a/chain/network/src/peer_manager/network_state.rs b/chain/network/src/peer_manager/network_state.rs index 37c9c463895..430bcdf4ebe 100644 --- a/chain/network/src/peer_manager/network_state.rs +++ b/chain/network/src/peer_manager/network_state.rs @@ -36,7 +36,7 @@ pub(crate) struct NetworkState { pub config: Arc, /// GenesisId of the chain. pub genesis_id: GenesisId, - pub client: client::Client, + pub client: Arc, /// Address of the peer manager actor. pub peer_manager_addr: Recipient>, /// RoutingTableActor, responsible for computing routing table, routing table exchange, etc. @@ -69,7 +69,7 @@ impl NetworkState { pub fn new( config: Arc, genesis_id: GenesisId, - client: client::Client, + client: Arc, peer_manager_addr: Recipient>, routing_table_addr: actix::Addr, routing_table_view: RoutingTableView, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index b59e6135905..1d6e4e43c6c 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -263,7 +263,7 @@ impl PeerManagerActor { clock: time::Clock, store: Arc, config: config::NetworkConfig, - client: client::Client, + client: Arc, genesis_id: GenesisId, ) -> anyhow::Result> { let config = config.verify().context("config")?; diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index fa955b4cb66..4652a48a26a 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -1,5 +1,4 @@ use crate::broadcast; -use crate::client; use crate::config; use crate::network_protocol::testonly as data; use crate::network_protocol::{ @@ -287,16 +286,9 @@ pub(crate) async fn start( let chain = chain.clone(); move || { let genesis_id = chain.genesis_id.clone(); - let fc = fake_client::start(send.sink().compose(Event::Client)); + let fc = Arc::new(fake_client::Fake { event_sink: send.sink().compose(Event::Client) }); cfg.event_sink = send.sink().compose(Event::PeerManager); - PeerManagerActor::spawn( - clock, - store, - cfg, - client::Client::new(fc.clone().recipient(), fc.clone().recipient()), - genesis_id, - ) - .unwrap() + PeerManagerActor::spawn(clock, store, cfg, fc, genesis_id).unwrap() } }) .await; diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 22b5089d60c..c713c6809a2 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -322,67 +322,6 @@ impl MockPeerManagerAdapter { } } -pub mod test_features { - use crate::client; - use crate::config; - use crate::test_utils::convert_boot_nodes; - use crate::time; - use crate::types::{NetworkClientMessages, NetworkClientResponses}; - use crate::types::{NetworkViewClientMessages, NetworkViewClientResponses}; - use crate::PeerManagerActor; - use actix::actors::mocker::Mocker; - use actix::Actor; - use near_primitives::block::GenesisId; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - - /// Mock for `ClientActor` - type ClientMock = Mocker; - /// Mock for `ViewClientActor` - type ViewClientMock = Mocker; - - // Make peer manager for unit tests - pub fn spawn_peer_manager( - store: Arc, - mut config: config::NetworkConfig, - boot_nodes: Vec<(&str, u16)>, - peer_max_count: u32, - ) -> actix::Addr { - config.boot_nodes = convert_boot_nodes(boot_nodes); - config.max_num_peers = peer_max_count; - let counter = Arc::new(AtomicUsize::new(0)); - let counter1 = counter.clone(); - let client_addr = ClientMock::mock(Box::new(move |_msg, _ctx| { - Box::new(Some(NetworkClientResponses::NoResponse)) - })) - .start(); - - let view_client_addr = ViewClientMock::mock(Box::new(move |msg, _ctx| { - let msg = msg.downcast_ref::().unwrap(); - match msg { - NetworkViewClientMessages::AnnounceAccount(accounts) => { - if !accounts.is_empty() { - counter1.fetch_add(1, Ordering::SeqCst); - } - Box::new(Some(NetworkViewClientResponses::AnnounceAccount( - accounts.clone().into_iter().map(|obj| obj.0).collect(), - ))) - } - _ => Box::new(Some(NetworkViewClientResponses::NoResponse)), - } - })) - .start(); - PeerManagerActor::spawn( - time::Clock::real(), - store, - config, - client::Client::new(client_addr.recipient(), view_client_addr.recipient()), - GenesisId::default(), - ) - .unwrap() - } -} - #[derive(Message, Clone, Debug)] #[rtype(result = "()")] pub struct SetAdvOptions { diff --git a/chain/network/src/testonly/fake_client.rs b/chain/network/src/testonly/fake_client.rs index 7aca62f78a7..e515dfa7937 100644 --- a/chain/network/src/testonly/fake_client.rs +++ b/chain/network/src/testonly/fake_client.rs @@ -1,15 +1,18 @@ +use crate::client; +use crate::network_protocol::{ + PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg, + StateResponseInfo, +}; use crate::sink::Sink; -use crate::types::{NetworkClientMessages, NetworkClientResponses}; -use crate::types::{NetworkViewClientMessages, NetworkViewClientResponses}; -use actix::Actor as _; -use near_o11y::WithSpanContext; -use near_primitives::block::{Block, BlockHeader}; +use crate::types::{NetworkInfo, ReasonForBan}; +use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; use near_primitives::hash::CryptoHash; -use near_primitives::network::AnnounceAccount; -use near_primitives::sharding::{ChunkHash, PartialEncodedChunkPart}; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::sharding::{ChunkHash, PartialEncodedChunk, PartialEncodedChunkPart}; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::EpochId; +use near_primitives::types::{AccountId, EpochId, ShardId}; +use near_primitives::views::FinalExecutionOutcomeView; #[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { @@ -24,79 +27,109 @@ pub enum Event { AnnounceAccount(Vec<(AnnounceAccount, Option)>), } -pub struct Actor { - event_sink: Sink, +pub(crate) struct Fake { + pub event_sink: Sink, } -impl actix::Actor for Actor { - type Context = actix::Context; -} +#[async_trait::async_trait] +impl client::Client for Fake { + async fn tx_status_request( + &self, + _account_id: AccountId, + _tx_hash: CryptoHash, + ) -> Option> { + unimplemented!(); + } -pub fn start(event_sink: Sink) -> actix::Addr { - Actor { event_sink }.start() -} + async fn tx_status_response(&self, _tx_result: FinalExecutionOutcomeView) {} -impl actix::Handler> for Actor { - type Result = NetworkViewClientResponses; - fn handle( - &mut self, - msg: WithSpanContext, - _ctx: &mut Self::Context, - ) -> Self::Result { - let msg = msg.msg; - match msg { - NetworkViewClientMessages::BlockRequest(block_hash) => { - self.event_sink.push(Event::BlockRequest(block_hash)); - NetworkViewClientResponses::NoResponse - } - NetworkViewClientMessages::BlockHeadersRequest(req) => { - self.event_sink.push(Event::BlockHeadersRequest(req)); - NetworkViewClientResponses::NoResponse - } - NetworkViewClientMessages::AnnounceAccount(aas) => { - self.event_sink.push(Event::AnnounceAccount(aas.clone())); - NetworkViewClientResponses::AnnounceAccount(aas.into_iter().map(|a| a.0).collect()) - } - msg => { - let msg_type: &'static str = msg.into(); - panic!("unsupported message {msg_type}") - } - } + async fn state_request_header( + &self, + _shard_id: ShardId, + _sync_hash: CryptoHash, + ) -> Result, ReasonForBan> { + unimplemented!(); } -} -impl actix::Handler> for Actor { - type Result = NetworkClientResponses; - fn handle( - &mut self, - msg: WithSpanContext, - _ctx: &mut Self::Context, - ) -> Self::Result { - let msg = msg.msg; - - let mut resp = NetworkClientResponses::NoResponse; - match msg { - NetworkClientMessages::Block(b, _, _) => self.event_sink.push(Event::Block(b)), - NetworkClientMessages::BlockHeaders(bhs, _) => { - self.event_sink.push(Event::BlockHeaders(bhs)) - } - NetworkClientMessages::PartialEncodedChunkResponse(resp, _) => { - self.event_sink.push(Event::Chunk(resp.parts)) - } - NetworkClientMessages::PartialEncodedChunkRequest(req, _) => { - self.event_sink.push(Event::ChunkRequest(req.chunk_hash)) - } - NetworkClientMessages::Transaction { transaction, .. } => { - self.event_sink.push(Event::Transaction(transaction)); - resp = NetworkClientResponses::ValidTx; - } - NetworkClientMessages::Challenge(c) => self.event_sink.push(Event::Challenge(c)), - NetworkClientMessages::NetworkInfo(_) => {} - msg => { - let msg_type: &'static str = msg.into(); - panic!("unsupported message {msg_type}") - } - }; - resp + async fn state_request_part( + &self, + _shard_id: ShardId, + _sync_hash: CryptoHash, + _part_id: u64, + ) -> Result, ReasonForBan> { + unimplemented!(); + } + + async fn state_response(&self, _info: StateResponseInfo) { + unimplemented!(); + } + + async fn block_approval(&self, _approval: Approval, _peer_id: PeerId) { + unimplemented!(); + } + + async fn transaction(&self, transaction: SignedTransaction, _is_forwarded: bool) { + self.event_sink.push(Event::Transaction(transaction)); + } + + async fn partial_encoded_chunk_request( + &self, + req: PartialEncodedChunkRequestMsg, + _msg_hash: CryptoHash, + ) { + self.event_sink.push(Event::ChunkRequest(req.chunk_hash)); + } + + async fn partial_encoded_chunk_response( + &self, + resp: PartialEncodedChunkResponseMsg, + _timestamp: time::Instant, + ) { + self.event_sink.push(Event::Chunk(resp.parts)); + } + + async fn partial_encoded_chunk(&self, _chunk: PartialEncodedChunk) { + unimplemented!(); + } + + async fn partial_encoded_chunk_forward(&self, _msg: PartialEncodedChunkForwardMsg) { + unimplemented!(); + } + + async fn block_request(&self, hash: CryptoHash) -> Option> { + self.event_sink.push(Event::BlockRequest(hash)); + None + } + + async fn block_headers_request(&self, hashes: Vec) -> Option> { + self.event_sink.push(Event::BlockHeadersRequest(hashes)); + None + } + + async fn block(&self, block: Block, _peer_id: PeerId, _was_requested: bool) { + self.event_sink.push(Event::Block(block)); + } + + async fn block_headers( + &self, + headers: Vec, + _peer_id: PeerId, + ) -> Result<(), ReasonForBan> { + self.event_sink.push(Event::BlockHeaders(headers)); + Ok(()) + } + + async fn challenge(&self, challenge: Challenge) { + self.event_sink.push(Event::Challenge(challenge)); + } + + async fn network_info(&self, _info: NetworkInfo) {} + + async fn announce_account( + &self, + accounts: Vec<(AnnounceAccount, Option)>, + ) -> Result, ReasonForBan> { + self.event_sink.push(Event::AnnounceAccount(accounts.clone())); + Ok(accounts.into_iter().map(|a| a.0).collect()) } } diff --git a/integration-tests/src/tests/network/peer_handshake.rs b/integration-tests/src/tests/network/peer_handshake.rs index bb9b7dddaa7..22ef6c0fbdb 100644 --- a/integration-tests/src/tests/network/peer_handshake.rs +++ b/integration-tests/src/tests/network/peer_handshake.rs @@ -3,28 +3,21 @@ use near_network::time; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use actix::actors::mocker::Mocker; use actix::Actor; use actix::System; use futures::{future, FutureExt}; use near_primitives::block::GenesisId; use near_actix_test_utils::run_actix; -use near_client::{ClientActor, ViewClientActor}; use near_o11y::testonly::init_test_logger; use near_network::config; use near_network::test_utils::{ convert_boot_nodes, open_port, wait_or_timeout, GetInfo, StopSignal, WaitOrTimeoutActor, }; -use near_network::types::NetworkClientResponses; -use near_network::types::NetworkViewClientResponses; use near_network::PeerManagerActor; use near_o11y::WithSpanContextExt; -type ClientMock = Mocker; -type ViewClientMock = Mocker; - #[cfg(test)] fn make_peer_manager( seed: &str, @@ -37,20 +30,12 @@ fn make_peer_manager( config.max_num_peers = peer_max_count; config.ideal_connections_hi = peer_max_count; config.ideal_connections_lo = peer_max_count; - let client_addr = ClientMock::mock(Box::new(move |_msg, _ctx| { - Box::new(Some(NetworkClientResponses::NoResponse)) - })) - .start(); - let view_client_addr = ViewClientMock::mock(Box::new(|_msg, _ctx| { - Box::new(Some(NetworkViewClientResponses::NoResponse)) - })) - .start(); PeerManagerActor::spawn( time::Clock::real(), near_store::db::TestDB::new(), config, - near_network::client::Client::new(client_addr.recipient(), view_client_addr.recipient()), + Arc::new(near_network::client::Noop), GenesisId::default(), ) .unwrap() diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 03765ac3149..2e1edaf6ea5 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -100,7 +100,7 @@ fn setup_network_node( time::Clock::real(), db.clone(), config, - near_network::client::Client::new(client_actor.recipient(), view_client_actor.recipient()), + Arc::new(near_client::adapter::Adapter::new(client_actor, view_client_actor)), genesis_id, ) .unwrap(); diff --git a/integration-tests/src/tests/network/stress_network.rs b/integration-tests/src/tests/network/stress_network.rs index 9d565094264..08ebb87daa5 100644 --- a/integration-tests/src/tests/network/stress_network.rs +++ b/integration-tests/src/tests/network/stress_network.rs @@ -2,13 +2,11 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use actix::actors::mocker::Mocker; use actix::{Actor, AsyncContext, System}; use futures::FutureExt; use tracing::info; use near_actix_test_utils::run_actix; -use near_client::{ClientActor, ViewClientActor}; use near_network::time; use near_o11y::testonly::init_test_logger_allow_panic; use near_primitives::block::GenesisId; @@ -17,14 +15,9 @@ use near_network::config; use near_network::test_utils::{ convert_boot_nodes, open_port, GetInfo, StopSignal, WaitOrTimeoutActor, }; -use near_network::types::NetworkClientResponses; -use near_network::types::NetworkViewClientResponses; use near_network::PeerManagerActor; use near_o11y::WithSpanContextExt; -type ClientMock = Mocker; -type ViewClientMock = Mocker; - fn make_peer_manager( seed: &str, port: u16, @@ -32,19 +25,11 @@ fn make_peer_manager( ) -> actix::Addr { let mut config = config::NetworkConfig::from_seed(seed, port); config.boot_nodes = convert_boot_nodes(boot_nodes); - let client_addr = ClientMock::mock(Box::new(move |_msg, _ctx| { - Box::new(Some(NetworkClientResponses::NoResponse)) - })) - .start(); - let view_client_addr = ViewClientMock::mock(Box::new(|_msg, _ctx| { - Box::new(Some(NetworkViewClientResponses::NoResponse)) - })) - .start(); PeerManagerActor::spawn( time::Clock::real(), near_store::db::TestDB::new(), config, - near_network::client::Client::new(client_addr.recipient(), view_client_addr.recipient()), + Arc::new(near_network::client::Noop), GenesisId::default(), ) .unwrap() diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 47f3f748673..fcde2590e5a 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -187,10 +187,7 @@ pub fn start_with_config_and_synchronization( time::Clock::real(), store.into_inner(near_store::Temperature::Hot), config.network_config, - near_network::client::Client::new( - client_actor.clone().recipient(), - view_client.clone().recipient(), - ), + Arc::new(near_client::adapter::Adapter::new(client_actor.clone(), view_client.clone())), genesis_id, ) .context("PeerManager::spawn()")?; diff --git a/tools/chainsync-loadtest/Cargo.toml b/tools/chainsync-loadtest/Cargo.toml index 41a4216fb6e..f4336985190 100644 --- a/tools/chainsync-loadtest/Cargo.toml +++ b/tools/chainsync-loadtest/Cargo.toml @@ -15,6 +15,7 @@ name = "chainsync-loadtest" [dependencies] actix.workspace = true anyhow.workspace = true +async-trait.workspace = true clap.workspace = true dirs.workspace = true futures.workspace = true diff --git a/tools/chainsync-loadtest/src/main.rs b/tools/chainsync-loadtest/src/main.rs index 00dbced8611..caee136068d 100644 --- a/tools/chainsync-loadtest/src/main.rs +++ b/tools/chainsync-loadtest/src/main.rs @@ -4,13 +4,12 @@ mod network; use std::sync::Arc; -use actix::{Actor, Arbiter}; use anyhow::{anyhow, Context}; use clap::Parser; use openssl_probe; use concurrency::{Ctx, Scope}; -use network::{FakeClientActor, Network}; +use network::Network; use near_chain_configs::Genesis; use near_network::time; @@ -38,19 +37,12 @@ fn genesis_hash(chain_id: &str) -> CryptoHash { pub fn start_with_config(config: NearConfig, qps_limit: u32) -> anyhow::Result> { let network_adapter = Arc::new(NetworkRecipient::default()); let network = Network::new(&config, network_adapter.clone(), qps_limit); - let client_actor = FakeClientActor::start_in_arbiter(&Arbiter::new().handle(), { - let network = network.clone(); - move |_| FakeClientActor::new(network) - }); let network_actor = PeerManagerActor::spawn( time::Clock::real(), near_store::db::TestDB::new(), config.network_config, - near_network::client::Client::new( - client_actor.clone().recipient(), - client_actor.clone().recipient(), - ), + network.clone(), GenesisId { chain_id: config.client_config.chain_id.clone(), hash: genesis_hash(&config.client_config.chain_id), diff --git a/tools/chainsync-loadtest/src/network.rs b/tools/chainsync-loadtest/src/network.rs index e77b19a13fc..923976a4c63 100644 --- a/tools/chainsync-loadtest/src/network.rs +++ b/tools/chainsync-loadtest/src/network.rs @@ -1,30 +1,31 @@ -use std::sync::atomic::{AtomicU64, Ordering}; - use crate::concurrency::{Ctx, Once, RateLimiter, Scope, WeakMap}; - +use log::info; +use near_network::time; use near_network::types::{ - AccountIdOrPeerTrackingShard, NetworkViewClientMessages, NetworkViewClientResponses, - PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg, + AccountIdOrPeerTrackingShard, PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg, + PartialEncodedChunkResponseMsg, ReasonForBan, StateResponseInfo, }; - -use actix::{Actor, Context, Handler}; -use log::info; use near_network::types::{ - FullPeerInfo, NetworkClientMessages, NetworkClientResponses, NetworkInfo, NetworkRequests, - PeerManagerAdapter, PeerManagerMessageRequest, + FullPeerInfo, NetworkInfo, NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest, }; -use near_o11y::{WithSpanContext, WithSpanContextExt}; -use near_primitives::block::{Block, BlockHeader}; +use near_o11y::WithSpanContextExt; +use near_primitives::block::{Approval, Block, BlockHeader}; +use near_primitives::challenge::Challenge; use near_primitives::hash::CryptoHash; -use near_primitives::sharding::{ChunkHash, ShardChunkHeader}; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::sharding::ShardChunkHeader; +use near_primitives::sharding::{ChunkHash, PartialEncodedChunk}; use near_primitives::time::Clock; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountId, EpochId, ShardId}; +use near_primitives::views::FinalExecutionOutcomeView; use nearcore::config::NearConfig; use rand::seq::SliceRandom; use rand::thread_rng; use std::future::Future; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::oneshot; -use tokio::time; #[derive(Default, Debug)] pub struct Stats { @@ -96,10 +97,10 @@ impl Network { min_peers: config.client_config.min_num_peers, parts_per_chunk: config.genesis.config.num_block_producer_seats, rate_limiter: RateLimiter::new( - time::Duration::from_secs(1) / qps_limit, + tokio::time::Duration::from_secs(1) / qps_limit, qps_limit as u64, ), - request_timeout: time::Duration::from_secs(2), + request_timeout: tokio::time::Duration::from_secs(2), }) } @@ -248,87 +249,104 @@ impl Network { }) .await } +} - fn notify(&self, msg: WithSpanContext) { - let msg = msg.msg; - self.stats.msgs_recv.fetch_add(1, Ordering::Relaxed); - match msg { - NetworkClientMessages::NetworkInfo(info) => { - let mut n = self.data.lock().unwrap(); - n.info_ = Arc::new(info); - if n.info_.num_connected_peers < self.min_peers { - info!("connected = {}/{}", n.info_.num_connected_peers, self.min_peers); - return; - } - for s in n.info_futures.split_off(0) { - s.send(n.info_.clone()).unwrap(); - } - } - NetworkClientMessages::Block(block, _, _) => { - self.blocks.get(&block.hash().clone()).map(|p| p.set(block)); - } - NetworkClientMessages::BlockHeaders(headers, _) => { - if let Some(h) = headers.iter().min_by_key(|h| h.height()) { - let hash = h.prev_hash().clone(); - self.block_headers.get(&hash).map(|p| p.set(headers)); - } - } - NetworkClientMessages::PartialEncodedChunkResponse(resp, _) => { - self.chunks.get(&resp.chunk_hash.clone()).map(|p| p.set(resp)); - } - _ => {} - } +#[async_trait::async_trait] +impl near_network::client::Client for Network { + async fn tx_status_request( + &self, + _account_id: AccountId, + _tx_hash: CryptoHash, + ) -> Option> { + None } -} -pub struct FakeClientActor { - network: Arc, -} + async fn tx_status_response(&self, _tx_result: FinalExecutionOutcomeView) {} -impl FakeClientActor { - pub fn new(network: Arc) -> Self { - FakeClientActor { network } + async fn state_request_header( + &self, + _shard_id: ShardId, + _sync_hash: CryptoHash, + ) -> Result, ReasonForBan> { + Ok(None) } -} -impl Actor for FakeClientActor { - type Context = Context; -} + async fn state_request_part( + &self, + _shard_id: ShardId, + _sync_hash: CryptoHash, + _part_id: u64, + ) -> Result, ReasonForBan> { + Ok(None) + } -impl Handler> for FakeClientActor { - type Result = NetworkViewClientResponses; - fn handle( - &mut self, - msg: WithSpanContext, - _ctx: &mut Self::Context, - ) -> Self::Result { - let msg = msg.msg; - let name = match msg { - NetworkViewClientMessages::TxStatus { .. } => "TxStatus", - NetworkViewClientMessages::TxStatusResponse(_) => "TxStatusResponse", - NetworkViewClientMessages::BlockRequest(_) => "BlockRequest", - NetworkViewClientMessages::BlockHeadersRequest(_) => "BlockHeadersRequest", - NetworkViewClientMessages::StateRequestHeader { .. } => "StateRequestHeader", - NetworkViewClientMessages::StateRequestPart { .. } => "StateRequestPart", - NetworkViewClientMessages::AnnounceAccount(_) => { - return NetworkViewClientResponses::NoResponse; - } - #[allow(unreachable_patterns)] - _ => "unknown", - }; - info!("view_request: {}", name); - return NetworkViewClientResponses::NoResponse; + async fn state_response(&self, _info: StateResponseInfo) {} + + async fn block_approval(&self, _approval: Approval, _peer_id: PeerId) {} + + async fn transaction(&self, _transaction: SignedTransaction, _is_forwarded: bool) {} + + async fn partial_encoded_chunk_request( + &self, + _req: PartialEncodedChunkRequestMsg, + _msg_hash: CryptoHash, + ) { + } + + async fn partial_encoded_chunk_response( + &self, + resp: PartialEncodedChunkResponseMsg, + _timestamp: time::Instant, + ) { + self.chunks.get(&resp.chunk_hash.clone()).map(|p| p.set(resp)); + } + + async fn partial_encoded_chunk(&self, _chunk: PartialEncodedChunk) {} + + async fn partial_encoded_chunk_forward(&self, _msg: PartialEncodedChunkForwardMsg) {} + + async fn block_request(&self, _hash: CryptoHash) -> Option> { + None + } + + async fn block_headers_request(&self, _hashes: Vec) -> Option> { + None + } + + async fn block(&self, block: Block, _peer_id: PeerId, _was_requested: bool) { + self.blocks.get(&block.hash().clone()).map(|p| p.set(block)); + } + + async fn block_headers( + &self, + headers: Vec, + _peer_id: PeerId, + ) -> Result<(), ReasonForBan> { + if let Some(h) = headers.iter().min_by_key(|h| h.height()) { + let hash = h.prev_hash().clone(); + self.block_headers.get(&hash).map(|p| p.set(headers)); + } + Ok(()) + } + + async fn challenge(&self, _challenge: Challenge) {} + + async fn network_info(&self, info: NetworkInfo) { + let mut n = self.data.lock().unwrap(); + n.info_ = Arc::new(info); + if n.info_.num_connected_peers < self.min_peers { + info!("connected = {}/{}", n.info_.num_connected_peers, self.min_peers); + return; + } + for s in n.info_futures.split_off(0) { + s.send(n.info_.clone()).unwrap(); + } } -} -impl Handler> for FakeClientActor { - type Result = NetworkClientResponses; - fn handle( - &mut self, - msg: WithSpanContext, - _ctx: &mut Context, - ) -> Self::Result { - self.network.notify(msg); - return NetworkClientResponses::NoResponse; + async fn announce_account( + &self, + accounts: Vec<(AnnounceAccount, Option)>, + ) -> Result, ReasonForBan> { + Ok(accounts.into_iter().map(|a| a.0).collect()) } }