Skip to content
Permalink
Browse files

Network refactor (#1589)

* Refactor messages from PeerMessage 2 RoutedMessage
* Route back some type of messages.
* Remove done TODO
* Nit
* Merge refs/heads/staging into network-refactor
* script for start staging testnet
* Merge branch 'staging' into start-staging-net
* Make it executable
* Merge pull request #1613 from nearprotocol/start-staging-net

Script for start staging testnet
* Merge refs/heads/staging into network-refactor
* Merge refs/heads/staging into network-refactor
* Fixing account announce being filtered because of the next epoch id
  • Loading branch information
mfornet authored and nearprotocol-bulldozer committed Nov 3, 2019
1 parent 94bfb7b commit 935c7e05496b4c214b7b5d6b7aa6822a69270e43
@@ -15,7 +15,7 @@ use near_chain::{
ValidTransaction,
};
use near_crypto::Signer;
use near_network::types::{ChunkOnePartRequestMsg, ChunkPartMsg, ChunkPartRequestMsg, PeerId};
use near_network::types::{ChunkOnePartRequestMsg, ChunkPartMsg, ChunkPartRequestMsg};
use near_network::NetworkRequests;
use near_pool::TransactionPool;
use near_primitives::hash::CryptoHash;
@@ -142,8 +142,8 @@ pub struct ShardsManager {
requested_one_parts: RequestPool,
requested_chunks: RequestPool,

requests_fifo: VecDeque<(ShardId, ChunkHash, PeerId, u64)>,
requests: HashMap<(ShardId, ChunkHash, u64), HashSet<(PeerId)>>,
requests_fifo: VecDeque<(ShardId, ChunkHash, CryptoHash, u64)>,
requests: HashMap<(ShardId, ChunkHash, u64), HashSet<CryptoHash>>,
}

impl ShardsManager {
@@ -481,7 +481,7 @@ impl ShardsManager {
pub fn process_chunk_part_request(
&mut self,
request: ChunkPartRequestMsg,
peer_id: PeerId,
route_back: CryptoHash,
) -> Result<(), Error> {
let mut served = false;
if let Some(chunk) = self.encoded_chunks.get(&request.chunk_hash) {
@@ -492,7 +492,7 @@ impl ShardsManager {
if chunk.content.parts[request.part_id as usize].is_some() {
served = true;
self.network_adapter.send(NetworkRequests::ChunkPart {
peer_id: peer_id.clone(),
route_back: route_back.clone(),
part: chunk.create_chunk_part_msg(
request.part_id,
// Part should never exist in the chunk content if the merkle path for it is
@@ -534,12 +534,12 @@ impl ShardsManager {
.requests
.entry((request.shard_id, request.chunk_hash.clone(), request.part_id))
.or_insert_with(HashSet::default)
.insert(peer_id.clone())
.insert(route_back.clone())
{
self.requests_fifo.push_back((
request.shard_id,
request.chunk_hash,
peer_id,
route_back,
request.part_id,
));
}
@@ -581,7 +581,7 @@ impl ShardsManager {
pub fn process_chunk_one_part_request(
&mut self,
request: ChunkOnePartRequestMsg,
peer_id: PeerId,
route_back: CryptoHash,
chain_store: &mut ChainStore,
) -> Result<(), Error> {
debug!(target:"chunks", "Received one part request for {:?}, I'm {:?}", request.chunk_hash.0, self.me);
@@ -608,7 +608,7 @@ impl ShardsManager {
request.chunk_hash.0, self.me
);
self.network_adapter.send(NetworkRequests::ChunkOnePartResponse {
peer_id,
route_back,
header_and_part: encoded_chunk.create_chunk_one_part(
request.part_id,
one_part_receipt_proofs,
@@ -829,7 +829,7 @@ impl ShardsManager {
{
for whom in send_to {
self.network_adapter.send(NetworkRequests::ChunkPart {
peer_id: whom,
route_back: whom,
part: ChunkPartMsg {
shard_id: one_part.shard_id,
chunk_hash: chunk_hash.clone(),
@@ -9,6 +9,7 @@ use actix::{
Actor, ActorFuture, Addr, AsyncContext, Context, ContextFutureSpawner, Handler, Recipient,
WrapFuture,
};
use cached::Cached;
use chrono::{DateTime, Utc};
use log::{debug, error, info, warn};

@@ -41,7 +42,6 @@ use crate::types::{
Status, StatusSyncInfo, SyncStatus,
};
use crate::{sync, StatusResponse};
use cached::Cached;

enum AccountAnnounceVerificationResult {
Valid,
@@ -132,11 +132,6 @@ impl ClientActor {
AccountAnnounceVerificationResult::UnknownEpoch
);

// If we are currently not at the epoch that this announcement is in, skip it.
if announce_account.epoch_id != head.epoch_id {
return AccountAnnounceVerificationResult::UnknownEpoch;
}

match self.client.runtime_adapter.verify_validator_signature(
&announce_account.epoch_id,
&head.last_block_hash,
@@ -257,7 +252,13 @@ impl Handler<NetworkClientMessages> for ClientActor {
// NetworkClientResponses::Ban { ban_reason: ReasonForBan::BadBlockApproval }
}
}
NetworkClientMessages::StateRequest(shard_id, hash, need_header, parts_ranges) => {
NetworkClientMessages::StateRequest(
shard_id,
hash,
need_header,
parts_ranges,
route_back,
) => {
let mut parts = vec![];
for Range(from, to) in parts_ranges {
for part_id in from..to {
@@ -273,22 +274,31 @@ impl Handler<NetworkClientMessages> for ClientActor {
if need_header {
match self.client.chain.get_state_response_header(shard_id, hash) {
Ok(header) => {
return NetworkClientResponses::StateResponse(StateResponseInfo {
shard_id,
hash,
shard_state: ShardStateSyncResponse { header: Some(header), parts },
});
return NetworkClientResponses::StateResponse(
StateResponseInfo {
shard_id,
hash,
shard_state: ShardStateSyncResponse {
header: Some(header),
parts,
},
},
route_back,
);
}
Err(_) => {
return NetworkClientResponses::NoResponse;
}
}
} else {
return NetworkClientResponses::StateResponse(StateResponseInfo {
shard_id,
hash,
shard_state: ShardStateSyncResponse { header: None, parts },
});
return NetworkClientResponses::StateResponse(
StateResponseInfo {
shard_id,
hash,
shard_state: ShardStateSyncResponse { header: None, parts },
},
route_back,
);
}
}
NetworkClientMessages::StateResponse(StateResponseInfo {
@@ -385,15 +395,15 @@ impl Handler<NetworkClientMessages> for ClientActor {

NetworkClientResponses::NoResponse
}
NetworkClientMessages::ChunkPartRequest(part_request_msg, peer_id) => {
NetworkClientMessages::ChunkPartRequest(part_request_msg, route_back) => {
let _ =
self.client.shards_mgr.process_chunk_part_request(part_request_msg, peer_id);
self.client.shards_mgr.process_chunk_part_request(part_request_msg, route_back);
NetworkClientResponses::NoResponse
}
NetworkClientMessages::ChunkOnePartRequest(part_request_msg, peer_id) => {
NetworkClientMessages::ChunkOnePartRequest(part_request_msg, route_back) => {
let _ = self.client.shards_mgr.process_chunk_one_part_request(
part_request_msg,
peer_id,
route_back,
self.client.chain.mut_store(),
);
NetworkClientResponses::NoResponse
@@ -422,7 +432,9 @@ impl Handler<NetworkClientMessages> for ClientActor {
filtered_announce_accounts.push(announce_account);
}
// Filter this account
AccountAnnounceVerificationResult::UnknownEpoch => {}
AccountAnnounceVerificationResult::UnknownEpoch => {
info!(target: "client", "{:?} failed to validate account announce signature: unknown epoch in {:?}", self.client.block_producer.as_ref().map(|bp| bp.account_id.clone()), announce_account);
}
}
}

@@ -29,6 +29,7 @@ use near_store::Store;
use near_telemetry::TelemetryActor;

use crate::{BlockProducer, Client, ClientActor, ClientConfig, ViewClientActor};
use near_primitives::hash::hash;

pub type NetworkMock = Mocker<PeerManagerActor>;

@@ -169,6 +170,8 @@ pub fn setup_mock_all_validators(
) -> (Block, Vec<(Addr<ClientActor>, Addr<ViewClientActor>)>) {
let validators_clone = validators.clone();
let key_pairs = key_pairs.clone();

let addresses: Vec<_> = (0..key_pairs.len()).map(|i| hash(vec![i as u8].as_ref())).collect();
let genesis_time = Utc::now();
let mut ret = vec![];

@@ -192,6 +195,7 @@ pub fn setup_mock_all_validators(
let validators_clone2 = validators_clone.clone();
let genesis_block1 = genesis_block.clone();
let key_pairs = key_pairs.clone();
let addresses = addresses.clone();
let connectors1 = connectors.clone();
let network_mock1 = network_mock.clone();
let announced_accounts1 = announced_accounts.clone();
@@ -207,14 +211,17 @@ pub fn setup_mock_all_validators(

if perform_default {
let mut my_key_pair = None;
let mut my_address = None;
let mut my_ord = None;
for (i, name) in validators_clone2.iter().flatten().enumerate() {
if *name == account_id {
my_key_pair = Some(key_pairs[i].clone());
my_address = Some(addresses[i].clone());
my_ord = Some(i);
}
}
let my_key_pair = my_key_pair.unwrap();
let my_address = my_address.unwrap();
let my_ord = my_ord.unwrap();
let my_account_id = account_id;

@@ -270,7 +277,7 @@ pub fn setup_mock_all_validators(
connectors1.read().unwrap()[i].0.do_send(
NetworkClientMessages::ChunkPartRequest(
part_request.clone(),
my_key_pair.id.clone(),
my_address,
),
);
}
@@ -287,7 +294,7 @@ pub fn setup_mock_all_validators(
connectors1.read().unwrap()[i].0.do_send(
NetworkClientMessages::ChunkOnePartRequest(
one_part_request.clone(),
my_key_pair.id.clone(),
my_address,
),
);
}
@@ -307,9 +314,9 @@ pub fn setup_mock_all_validators(
}
}
}
NetworkRequests::ChunkOnePartResponse { peer_id, header_and_part } => {
for (i, peer_info) in key_pairs.iter().enumerate() {
if peer_info.id == *peer_id {
NetworkRequests::ChunkOnePartResponse { route_back, header_and_part } => {
for (i, address) in addresses.iter().enumerate() {
if route_back == address {
if !drop_chunks || !sample_binary(1, 10) {
connectors1.read().unwrap()[i].0.do_send(
NetworkClientMessages::ChunkOnePart(
@@ -320,9 +327,9 @@ pub fn setup_mock_all_validators(
}
}
}
NetworkRequests::ChunkPart { peer_id, part } => {
for (i, peer_info) in key_pairs.iter().enumerate() {
if peer_info.id == *peer_id {
NetworkRequests::ChunkPart { route_back, part } => {
for (i, address) in addresses.iter().enumerate() {
if route_back == address {
if !drop_chunks || !sample_binary(1, 10) {
connectors1.read().unwrap()[i].0.do_send(
NetworkClientMessages::ChunkPart(part.clone()),
@@ -411,11 +418,15 @@ pub fn setup_mock_all_validators(
*hash,
*need_header,
parts_ranges.to_vec(),
my_address,
))
.then(move |response| {
let response = response.unwrap();
match response {
NetworkClientResponses::StateResponse(info) => {
NetworkClientResponses::StateResponse(
info,
_,
) => {
connectors2.read().unwrap()[my_ord]
.0
.do_send(
@@ -8,7 +8,7 @@ use futures::{future, Future};
use near_chain::ChainGenesis;
use near_client::test_utils::{setup_mock_all_validators, TestEnv};
use near_client::{ClientActor, GetBlock, ViewClientActor};
use near_network::types::{ChunkOnePartRequestMsg, PeerId};
use near_network::types::ChunkOnePartRequestMsg;
use near_network::{NetworkClientMessages, NetworkRequests, NetworkResponses, PeerInfo};
use near_primitives::block::BlockHeader;
use near_primitives::hash::CryptoHash;
@@ -116,13 +116,14 @@ fn chunks_produced_and_distributed_common(validator_groups: u64) {
}
}
NetworkRequests::ChunkOnePartMessage { account_id: _, header_and_part: _ }
| NetworkRequests::ChunkOnePartResponse { peer_id: _, header_and_part: _ } => {
| NetworkRequests::ChunkOnePartResponse { route_back: _, header_and_part: _ } =>
{
one_part_msgs += 1;
}
NetworkRequests::ChunkPartRequest { account_id: _, part_request: _ } => {
part_request_msgs += 1;
}
NetworkRequests::ChunkPart { peer_id: _, part: _ } => {
NetworkRequests::ChunkPart { route_back: _, part: _ } => {
part_msgs += 1;
}
_ => {}
@@ -171,15 +172,19 @@ fn test_request_chunk_restart() {
let client = &mut env.clients[0];
client
.shards_mgr
.process_chunk_one_part_request(request.clone(), PeerId::random(), client.chain.mut_store())
.process_chunk_one_part_request(
request.clone(),
CryptoHash::default(),
client.chain.mut_store(),
)
.unwrap();
assert!(env.network_adapters[0].pop().is_some());

env.restart(0);
let client = &mut env.clients[0];
client
.shards_mgr
.process_chunk_one_part_request(request, PeerId::random(), client.chain.mut_store())
.process_chunk_one_part_request(request, CryptoHash::default(), client.chain.mut_store())
.unwrap();
// TODO(1434): should be some() with the same chunk.
assert!(env.network_adapters[0].pop().is_none());

0 comments on commit 935c7e0

Please sign in to comment.
You can’t perform that action at this time.