Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Increase number of requested block bodies in chain sync (#10247)
Browse files Browse the repository at this point in the history
* Increase the number of block bodies requested during Sync.

* Increase the number of block bodies requested during Sync.

* Check if our peer is an older parity client with the bug
  of not handling large requests properly

* Add a ClientVersion struct and a ClientCapabilites trait

* Make ClientVersion its own module

* Refactor and extend use of ClientVersion

* Replace strings with ClientVersion in PeerInfo

* Group further functionality in ClientCapabilities

* Move parity client version data from tuple to its own struct.

* Implement accessor methods for ParityClientData and remove them
from ClientVersion.

* Minor fixes

* Make functions specific to parity return types specific to parity.

* Test for shorter ID strings

* Fix formatting and remove unneeded dependencies.

* Roll back Cargo.lock

* Commit last Cargo.lock

* Convert from string to ClientVersion

* * When checking if peer accepts service transactions just check
  if it's parity, remove version check.

* Remove dependency on semver in ethcore-sync

* Remove unnecessary String instantiation

* Rename peer_info to peer_version

* Update RPC test helpers

* Simplify From<String>

* Parse static version string only once

* Update RPC tests to new ClientVersion struct

* Document public members

* More robust parsing of ID string

* Minor changes.

* Update version in which large block bodies requests appear.

* Update ethcore/sync/src/block_sync.rs

Co-Authored-By: elferdo <elferdo@gmail.com>

* Update util/network/src/client_version.rs

Co-Authored-By: elferdo <elferdo@gmail.com>

* Update util/network/src/client_version.rs

Co-Authored-By: elferdo <elferdo@gmail.com>

* Update tests.

* Minor fixes.
  • Loading branch information
elferdo authored and 5chdn committed Feb 7, 2019
1 parent d5c19f8 commit b7e8621
Show file tree
Hide file tree
Showing 21 changed files with 607 additions and 65 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use devp2p::NetworkService;
use network::{NetworkProtocolHandler, NetworkContext, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, Error, ErrorKind,
ConnectionFilter};
use network::client_version::ClientVersion;

use types::pruning_info::PruningInfo;
use ethereum_types::{H256, H512, U256};
Expand Down Expand Up @@ -158,7 +159,7 @@ pub struct PeerInfo {
/// Public node id
pub id: Option<String>,
/// Node client ID
pub client_version: String,
pub client_version: ClientVersion,
/// Capabilities
pub capabilities: Vec<String>,
/// Remote endpoint address
Expand Down
21 changes: 16 additions & 5 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError, Error as Ethco
use sync_io::SyncIo;
use blocks::{BlockCollection, SyncBody, SyncHeader};
use chain::BlockSet;
use network::PeerId;
use network::client_version::ClientCapabilities;

const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 32;
const MAX_RECEPITS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST_LARGE: usize = 128;
const MAX_BODIES_TO_REQUEST_SMALL: usize = 32; // Size request for parity clients prior to 2.4.0
const MAX_RECEPITS_TO_REQUEST: usize = 256;
const SUBCHAIN_SIZE: u64 = 256;
const MAX_ROUND_PARENTS: usize = 16;
const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5;
Expand Down Expand Up @@ -464,12 +467,12 @@ impl BlockDownloader {
}

/// Find some headers or blocks to download for a peer.
pub fn request_blocks(&mut self, io: &mut SyncIo, num_active_peers: usize) -> Option<BlockRequest> {
pub fn request_blocks(&mut self, peer_id: PeerId, io: &mut SyncIo, num_active_peers: usize) -> Option<BlockRequest> {
match self.state {
State::Idle => {
self.start_sync_round(io);
if self.state == State::ChainHead {
return self.request_blocks(io, num_active_peers);
return self.request_blocks(peer_id, io, num_active_peers);
}
},
State::ChainHead => {
Expand All @@ -487,7 +490,15 @@ impl BlockDownloader {
},
State::Blocks => {
// check to see if we need to download any block bodies first
let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, false);
let client_version = io.peer_version(peer_id);

let number_of_bodies_to_request = if client_version.can_handle_large_requests() {
MAX_BODIES_TO_REQUEST_LARGE
} else {
MAX_BODIES_TO_REQUEST_SMALL
};

let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request, false);
if !needed_bodies.is_empty() {
return Some(BlockRequest::Bodies {
hashes: needed_bodies,
Expand Down
12 changes: 7 additions & 5 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ethcore::verification::queue::kind::blocks::Unverified;
use ethereum_types::{H256, U256};
use hash::keccak;
use network::PeerId;
use network::client_version::ClientVersion;
use rlp::Rlp;
use snapshot::ChunkType;
use std::time::Instant;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl SyncHandler {

/// Called by peer when it is disconnecting
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_info(peer_id));
trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_version(peer_id));
sync.handshaking_peers.remove(&peer_id);
if sync.peers.contains_key(&peer_id) {
debug!(target: "sync", "Disconnected {}", peer_id);
Expand All @@ -133,7 +134,7 @@ impl SyncHandler {

/// Called when a new peer is connected
pub fn on_peer_connected(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_info(peer));
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_version(peer));
if let Err(e) = sync.send_status(io, peer) {
debug!(target:"sync", "Error sending status request: {:?}", e);
io.disconnect_peer(peer);
Expand Down Expand Up @@ -579,6 +580,7 @@ impl SyncHandler {
snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None },
block_set: None,
private_tx_enabled: if private_tx_protocol { r.val_at(7).unwrap_or(false) } else { false },
client_version: ClientVersion::from(io.peer_version(peer_id)),
};

trace!(target: "sync", "New peer {} (\
Expand All @@ -599,12 +601,12 @@ impl SyncHandler {
peer.private_tx_enabled
);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_version(peer_id));
return Ok(());
}

if sync.peers.contains_key(&peer_id) {
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_version(peer_id));
return Ok(());
}
let chain_info = io.chain().chain_info();
Expand Down Expand Up @@ -633,7 +635,7 @@ impl SyncHandler {
// Don't activate peer immediatelly when searching for common block.
// Let the current sync round complete first.
sync.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_version(peer_id));

if let Some((fork_block, _)) = sync.fork_block {
SyncRequester::request_fork_header(sync, io, peer_id, fork_block);
Expand Down
8 changes: 6 additions & 2 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use bytes::Bytes;
use rlp::{RlpStream, DecoderError};
use network::{self, PeerId, PacketId};
use network::client_version::ClientVersion;
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
use ethcore::snapshot::{RestorationStatus};
use sync_io::SyncIo;
Expand Down Expand Up @@ -342,6 +343,8 @@ pub struct PeerInfo {
snapshot_number: Option<BlockNumber>,
/// Block set requested
block_set: Option<BlockSet>,
/// Version of the software the peer is running
client_version: ClientVersion,
}

impl PeerInfo {
Expand Down Expand Up @@ -964,7 +967,7 @@ impl ChainSync {
if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) {
// check if got new blocks to download
trace!(target: "sync", "Syncing with peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) {
if let Some(request) = self.new_blocks.request_blocks(peer_id, io, num_active_peers) {
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::NewBlocks);
if self.state == SyncState::Idle {
self.state = SyncState::Blocks;
Expand All @@ -977,7 +980,7 @@ impl ChainSync {
let equal_or_higher_difficulty = peer_difficulty.map_or(false, |pd| pd >= syncing_difficulty);

if force || equal_or_higher_difficulty {
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) {
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(peer_id, io, num_active_peers)) {
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks);
return;
}
Expand Down Expand Up @@ -1459,6 +1462,7 @@ pub mod tests {
snapshot_hash: None,
asking_snapshot_data: None,
block_set: None,
client_version: ClientVersion::from(""),
});

}
Expand Down
41 changes: 9 additions & 32 deletions ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bytes::Bytes;
use ethereum_types::H256;
use fastmap::H256FastSet;
use network::{PeerId, PacketId};
use network::client_version::ClientCapabilities;
use rand::Rng;
use rlp::{Encodable, RlpStream};
use sync_io::SyncIo;
Expand All @@ -41,28 +42,6 @@ use super::{
TRANSACTIONS_PACKET,
};

/// Checks if peer is able to process service transactions
fn accepts_service_transaction(client_id: &str) -> bool {
// Parity versions starting from this will accept service-transactions
const SERVICE_TRANSACTIONS_VERSION: (u32, u32) = (1u32, 6u32);
// Parity client string prefix
const LEGACY_CLIENT_ID_PREFIX: &'static str = "Parity/";
const PARITY_CLIENT_ID_PREFIX: &'static str = "Parity-Ethereum/";
const VERSION_PREFIX: &'static str = "/v";

let idx = client_id.rfind(VERSION_PREFIX).map(|idx| idx + VERSION_PREFIX.len()).unwrap_or(client_id.len());
let splitted = if client_id.starts_with(LEGACY_CLIENT_ID_PREFIX) || client_id.starts_with(PARITY_CLIENT_ID_PREFIX) {
client_id[idx..].split('.')
} else {
return false;
};

let ver: Vec<u32> = splitted
.take(2)
.filter_map(|s| s.parse().ok())
.collect();
ver.len() == 2 && (ver[0] > SERVICE_TRANSACTIONS_VERSION.0 || (ver[0] == SERVICE_TRANSACTIONS_VERSION.0 && ver[1] >= SERVICE_TRANSACTIONS_VERSION.1))
}

/// The Chain Sync Propagator: propagates data to peers
pub struct SyncPropagator;
Expand Down Expand Up @@ -146,7 +125,7 @@ impl SyncPropagator {
// most of times service_transactions will be empty
// => there's no need to merge packets
if !service_transactions.is_empty() {
let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| accepts_service_transaction(&io.peer_info(*peer_id)));
let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| io.peer_version(*peer_id).accepts_service_transaction());
let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(
sync, io, service_transactions_peers, service_transactions, &mut should_continue
);
Expand Down Expand Up @@ -451,6 +430,7 @@ mod tests {
snapshot_hash: None,
asking_snapshot_data: None,
block_set: None,
client_version: ClientVersion::from(""),
});
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
Expand Down Expand Up @@ -598,20 +578,17 @@ mod tests {
io.peers_info.insert(1, "Geth".to_owned());
// and peer#2 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 2, block_hash);
io.peers_info.insert(2, "Parity-Ethereum/v2.6".to_owned());
// and peer#3 is Parity, discarding service transactions
io.peers_info.insert(2, "Parity-Ethereum/v2.6.0/linux/rustc".to_owned());
// and peer#3 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 3, block_hash);
io.peers_info.insert(3, "Parity/v1.5".to_owned());
// and peer#4 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 4, block_hash);
io.peers_info.insert(4, "Parity-Ethereum/ABCDEFGH/v2.7.3".to_owned());
io.peers_info.insert(3, "Parity-Ethereum/ABCDEFGH/v2.7.3/linux/rustc".to_owned());

// and new service transaction is propagated to peers
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);

// peer#2 && peer#4 are receiving service transaction
// peer#2 && peer#3 are receiving service transaction
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 4)); // TRANSACTIONS_PACKET
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 3)); // TRANSACTIONS_PACKET
assert_eq!(io.packets.len(), 2);
}

Expand All @@ -628,7 +605,7 @@ mod tests {

// when peer#1 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 1, block_hash);
io.peers_info.insert(1, "Parity-Ethereum/v2.6".to_owned());
io.peers_info.insert(1, "Parity-Ethereum/v2.6.0/linux/rustc".to_owned());

// and service + non-service transactions are propagated to peers
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/src/chain/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl SyncRequester {
for h in &hashes {
rlp.append(&h.clone());
}

SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out());
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
peer.asking_blocks = hashes;
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/supplier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl SyncSupplier {
// Packets that require the peer to be confirmed
_ => {
if !sync.read().peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer));
return;
}
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
Expand Down
9 changes: 5 additions & 4 deletions ethcore/sync/src/sync_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::collections::HashMap;
use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId};
use network::client_version::ClientVersion;
use bytes::Bytes;
use ethcore::client::BlockChainClient;
use types::BlockNumber;
Expand All @@ -40,9 +41,9 @@ pub trait SyncIo {
fn chain(&self) -> &BlockChainClient;
/// Get the snapshot service.
fn snapshot_service(&self) -> &SnapshotService;
/// Returns peer identifier string
fn peer_info(&self, peer_id: PeerId) -> String {
peer_id.to_string()
/// Returns peer version identifier
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
ClientVersion::from(peer_id.to_string())
}
/// Returns information on p2p session
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
Expand Down Expand Up @@ -134,7 +135,7 @@ impl<'s> SyncIo for NetSyncIo<'s> {
self.network.protocol_version(*protocol, peer_id).unwrap_or(0)
}

fn peer_info(&self, peer_id: PeerId) -> String {
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
self.network.peer_client_version(peer_id)
}

Expand Down
9 changes: 6 additions & 3 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use ethereum_types::H256;
use parking_lot::{RwLock, Mutex};
use bytes::Bytes;
use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
use network::client_version::ClientVersion;
use tests::snapshot::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient,
ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage};
Expand Down Expand Up @@ -118,10 +119,12 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
&*self.chain
}

fn peer_info(&self, peer_id: PeerId) -> String {
self.peers_info.get(&peer_id)
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
let client_id = self.peers_info.get(&peer_id)
.cloned()
.unwrap_or_else(|| peer_id.to_string())
.unwrap_or_else(|| peer_id.to_string());

ClientVersion::from(client_id)
}

fn snapshot_service(&self) -> &SnapshotService {
Expand Down
1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ethcore-io = { path = "../util/io" }
ethcore-light = { path = "../ethcore/light" }
ethcore-logger = { path = "../parity/logger" }
ethcore-miner = { path = "../miner" }
ethcore-network = { path = "../util/network" }
ethcore-private-tx = { path = "../ethcore/private-tx" }
ethcore-sync = { path = "../ethcore/sync" }
ethereum-types = "0.4"
Expand Down
1 change: 1 addition & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ extern crate ethcore_io as io;
extern crate ethcore_light as light;
extern crate ethcore_logger;
extern crate ethcore_miner as miner;
extern crate ethcore_network as network;
extern crate ethcore_private_tx;
extern crate ethcore_sync as sync;
extern crate ethereum_types;
Expand Down
5 changes: 3 additions & 2 deletions rpc/src/v1/tests/helpers/sync_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::BTreeMap;
use ethereum_types::H256;
use parking_lot::RwLock;
use sync::{SyncProvider, EthProtocolInfo, SyncStatus, SyncState, PeerInfo, TransactionStats};
use network::client_version::ClientVersion;

/// TestSyncProvider config.
pub struct Config {
Expand Down Expand Up @@ -75,7 +76,7 @@ impl SyncProvider for TestSyncProvider {
vec![
PeerInfo {
id: Some("node1".to_owned()),
client_version: "Parity-Ethereum/1".to_owned(),
client_version: ClientVersion::from("Parity-Ethereum/1/v2.4.0/linux/rustc"),
capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()],
remote_address: "127.0.0.1:7777".to_owned(),
local_address: "127.0.0.1:8888".to_owned(),
Expand All @@ -88,7 +89,7 @@ impl SyncProvider for TestSyncProvider {
},
PeerInfo {
id: None,
client_version: "Parity-Ethereum/2".to_owned(),
client_version: ClientVersion::from("Parity-Ethereum/2/v2.4.0/linux/rustc"),
capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()],
remote_address: "Handshake".to_owned(),
local_address: "127.0.0.1:3333".to_owned(),
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/tests/mocked/parity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ fn rpc_parity_net_peers() {
let io = deps.default_client();

let request = r#"{"jsonrpc": "2.0", "method": "parity_netPeers", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"active":0,"connected":120,"max":50,"peers":[{"caps":["eth/62","eth/63"],"id":"node1","name":"Parity-Ethereum/1","network":{"localAddress":"127.0.0.1:8888","remoteAddress":"127.0.0.1:7777"},"protocols":{"eth":{"difficulty":"0x28","head":"0000000000000000000000000000000000000000000000000000000000000032","version":62},"pip":null}},{"caps":["eth/63","eth/64"],"id":null,"name":"Parity-Ethereum/2","network":{"localAddress":"127.0.0.1:3333","remoteAddress":"Handshake"},"protocols":{"eth":{"difficulty":null,"head":"000000000000000000000000000000000000000000000000000000000000003c","version":64},"pip":null}}]},"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"active":0,"connected":120,"max":50,"peers":[{"caps":["eth/62","eth/63"],"id":"node1","name":{"ParityClient":{"can_handle_large_requests":true,"compiler":"rustc","identity":"1","name":"Parity-Ethereum","os":"linux","semver":"2.4.0"}},"network":{"localAddress":"127.0.0.1:8888","remoteAddress":"127.0.0.1:7777"},"protocols":{"eth":{"difficulty":"0x28","head":"0000000000000000000000000000000000000000000000000000000000000032","version":62},"pip":null}},{"caps":["eth/63","eth/64"],"id":null,"name":{"ParityClient":{"can_handle_large_requests":true,"compiler":"rustc","identity":"2","name":"Parity-Ethereum","os":"linux","semver":"2.4.0"}},"network":{"localAddress":"127.0.0.1:3333","remoteAddress":"Handshake"},"protocols":{"eth":{"difficulty":null,"head":"000000000000000000000000000000000000000000000000000000000000003c","version":64},"pip":null}}]},"id":1}"#;

assert_eq!(io.handle_request_sync(request), Some(response.to_owned()));
}
Expand Down
Loading

0 comments on commit b7e8621

Please sign in to comment.