Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Increase number of requested block bodies in chain sync #10247

Merged
merged 22 commits into from
Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use devp2p::NetworkService;
use network::{NetworkProtocolHandler, NetworkContext, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, Error, ErrorKind,
ConnectionFilter};
use network::client_version::ClientVersion;

use types::pruning_info::PruningInfo;
use ethereum_types::{H256, H512, U256};
Expand Down Expand Up @@ -158,7 +159,7 @@ pub struct PeerInfo {
/// Public node id
pub id: Option<String>,
/// Node client ID
pub client_version: String,
pub client_version: ClientVersion,
/// Capabilities
pub capabilities: Vec<String>,
/// Remote endpoint address
Expand Down
21 changes: 16 additions & 5 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError, Error as Ethco
use sync_io::SyncIo;
use blocks::{BlockCollection, SyncBody, SyncHeader};
use chain::BlockSet;
use network::PeerId;
use network::client_version::ClientCapabilities;

const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 32;
const MAX_RECEPITS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST_LARGE: usize = 128;
const MAX_BODIES_TO_REQUEST_SMALL: usize = 32; // Size request for parity clients prior to 2.3.0
elferdo marked this conversation as resolved.
Show resolved Hide resolved
const MAX_RECEPITS_TO_REQUEST: usize = 256;
const SUBCHAIN_SIZE: u64 = 256;
const MAX_ROUND_PARENTS: usize = 16;
const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5;
Expand Down Expand Up @@ -464,12 +467,12 @@ impl BlockDownloader {
}

/// Find some headers or blocks to download for a peer.
pub fn request_blocks(&mut self, io: &mut SyncIo, num_active_peers: usize) -> Option<BlockRequest> {
pub fn request_blocks(&mut self, peer_id: PeerId, io: &mut SyncIo, num_active_peers: usize) -> Option<BlockRequest> {
match self.state {
State::Idle => {
self.start_sync_round(io);
if self.state == State::ChainHead {
return self.request_blocks(io, num_active_peers);
return self.request_blocks(peer_id, io, num_active_peers);
}
},
State::ChainHead => {
Expand All @@ -487,7 +490,15 @@ impl BlockDownloader {
},
State::Blocks => {
// check to see if we need to download any block bodies first
let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, false);
let client_version = io.peer_version(peer_id);

let number_of_bodies_to_request = if client_version.can_handle_large_requests() {
MAX_BODIES_TO_REQUEST_LARGE
} else {
MAX_BODIES_TO_REQUEST_SMALL
};

let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request, false);
if !needed_bodies.is_empty() {
return Some(BlockRequest::Bodies {
hashes: needed_bodies,
Expand Down
12 changes: 7 additions & 5 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ethcore::verification::queue::kind::blocks::Unverified;
use ethereum_types::{H256, U256};
use hash::keccak;
use network::PeerId;
use network::client_version::ClientVersion;
use rlp::Rlp;
use snapshot::ChunkType;
use std::time::Instant;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl SyncHandler {

/// Called by peer when it is disconnecting
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_info(peer_id));
trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_version(peer_id));
sync.handshaking_peers.remove(&peer_id);
if sync.peers.contains_key(&peer_id) {
debug!(target: "sync", "Disconnected {}", peer_id);
Expand All @@ -133,7 +134,7 @@ impl SyncHandler {

/// Called when a new peer is connected
pub fn on_peer_connected(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_info(peer));
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_version(peer));
if let Err(e) = sync.send_status(io, peer) {
debug!(target:"sync", "Error sending status request: {:?}", e);
io.disconnect_peer(peer);
Expand Down Expand Up @@ -579,6 +580,7 @@ impl SyncHandler {
snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None },
block_set: None,
private_tx_enabled: if private_tx_protocol { r.val_at(7).unwrap_or(false) } else { false },
client_version: ClientVersion::from(io.peer_version(peer_id)),
};

trace!(target: "sync", "New peer {} (\
Expand All @@ -599,12 +601,12 @@ impl SyncHandler {
peer.private_tx_enabled
);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_version(peer_id));
return Ok(());
}

if sync.peers.contains_key(&peer_id) {
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_version(peer_id));
return Ok(());
}
let chain_info = io.chain().chain_info();
Expand Down Expand Up @@ -633,7 +635,7 @@ impl SyncHandler {
// Don't activate peer immediatelly when searching for common block.
// Let the current sync round complete first.
sync.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_version(peer_id));

if let Some((fork_block, _)) = sync.fork_block {
SyncRequester::request_fork_header(sync, io, peer_id, fork_block);
Expand Down
8 changes: 6 additions & 2 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use bytes::Bytes;
use rlp::{RlpStream, DecoderError};
use network::{self, PeerId, PacketId};
use network::client_version::ClientVersion;
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
use ethcore::snapshot::{RestorationStatus};
use sync_io::SyncIo;
Expand Down Expand Up @@ -342,6 +343,8 @@ pub struct PeerInfo {
snapshot_number: Option<BlockNumber>,
/// Block set requested
block_set: Option<BlockSet>,
/// Version of the software the peer is running
client_version: ClientVersion
elferdo marked this conversation as resolved.
Show resolved Hide resolved
}

impl PeerInfo {
Expand Down Expand Up @@ -964,7 +967,7 @@ impl ChainSync {
if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) {
// check if got new blocks to download
trace!(target: "sync", "Syncing with peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) {
if let Some(request) = self.new_blocks.request_blocks(peer_id, io, num_active_peers) {
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::NewBlocks);
if self.state == SyncState::Idle {
self.state = SyncState::Blocks;
Expand All @@ -977,7 +980,7 @@ impl ChainSync {
let equal_or_higher_difficulty = peer_difficulty.map_or(false, |pd| pd >= syncing_difficulty);

if force || equal_or_higher_difficulty {
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) {
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(peer_id, io, num_active_peers)) {
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks);
return;
}
Expand Down Expand Up @@ -1459,6 +1462,7 @@ pub mod tests {
snapshot_hash: None,
asking_snapshot_data: None,
block_set: None,
client_version: ClientVersion::from(""),
});

}
Expand Down
41 changes: 9 additions & 32 deletions ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bytes::Bytes;
use ethereum_types::H256;
use fastmap::H256FastSet;
use network::{PeerId, PacketId};
use network::client_version::ClientCapabilities;
use rand::Rng;
use rlp::{Encodable, RlpStream};
use sync_io::SyncIo;
Expand All @@ -41,28 +42,6 @@ use super::{
TRANSACTIONS_PACKET,
};

/// Checks if peer is able to process service transactions
fn accepts_service_transaction(client_id: &str) -> bool {
// Parity versions starting from this will accept service-transactions
const SERVICE_TRANSACTIONS_VERSION: (u32, u32) = (1u32, 6u32);
// Parity client string prefix
const LEGACY_CLIENT_ID_PREFIX: &'static str = "Parity/";
const PARITY_CLIENT_ID_PREFIX: &'static str = "Parity-Ethereum/";
const VERSION_PREFIX: &'static str = "/v";

let idx = client_id.rfind(VERSION_PREFIX).map(|idx| idx + VERSION_PREFIX.len()).unwrap_or(client_id.len());
let splitted = if client_id.starts_with(LEGACY_CLIENT_ID_PREFIX) || client_id.starts_with(PARITY_CLIENT_ID_PREFIX) {
client_id[idx..].split('.')
} else {
return false;
};

let ver: Vec<u32> = splitted
.take(2)
.filter_map(|s| s.parse().ok())
.collect();
ver.len() == 2 && (ver[0] > SERVICE_TRANSACTIONS_VERSION.0 || (ver[0] == SERVICE_TRANSACTIONS_VERSION.0 && ver[1] >= SERVICE_TRANSACTIONS_VERSION.1))
}

/// The Chain Sync Propagator: propagates data to peers
pub struct SyncPropagator;
Expand Down Expand Up @@ -146,7 +125,7 @@ impl SyncPropagator {
// most of times service_transactions will be empty
// => there's no need to merge packets
if !service_transactions.is_empty() {
let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| accepts_service_transaction(&io.peer_info(*peer_id)));
let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| io.peer_version(*peer_id).accepts_service_transaction());
let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(
sync, io, service_transactions_peers, service_transactions, &mut should_continue
);
Expand Down Expand Up @@ -451,6 +430,7 @@ mod tests {
snapshot_hash: None,
asking_snapshot_data: None,
block_set: None,
client_version: ClientVersion::from(""),
});
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
Expand Down Expand Up @@ -598,20 +578,17 @@ mod tests {
io.peers_info.insert(1, "Geth".to_owned());
// and peer#2 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 2, block_hash);
io.peers_info.insert(2, "Parity-Ethereum/v2.6".to_owned());
// and peer#3 is Parity, discarding service transactions
io.peers_info.insert(2, "Parity-Ethereum/v2.6.0/linux/rustc".to_owned());
// and peer#3 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 3, block_hash);
io.peers_info.insert(3, "Parity/v1.5".to_owned());
// and peer#4 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 4, block_hash);
io.peers_info.insert(4, "Parity-Ethereum/ABCDEFGH/v2.7.3".to_owned());
io.peers_info.insert(3, "Parity-Ethereum/ABCDEFGH/v2.7.3/linux/rustc".to_owned());

// and new service transaction is propagated to peers
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);

// peer#2 && peer#4 are receiving service transaction
// peer#2 && peer#3 are receiving service transaction
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 4)); // TRANSACTIONS_PACKET
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 3)); // TRANSACTIONS_PACKET
assert_eq!(io.packets.len(), 2);
}

Expand All @@ -628,7 +605,7 @@ mod tests {

// when peer#1 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 1, block_hash);
io.peers_info.insert(1, "Parity-Ethereum/v2.6".to_owned());
io.peers_info.insert(1, "Parity-Ethereum/v2.6.0/linux/rustc".to_owned());

// and service + non-service transactions are propagated to peers
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/src/chain/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl SyncRequester {
for h in &hashes {
rlp.append(&h.clone());
}

SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out());
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
peer.asking_blocks = hashes;
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/supplier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl SyncSupplier {
// Packets that require the peer to be confirmed
_ => {
if !sync.read().peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer));
return;
}
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
Expand Down
9 changes: 5 additions & 4 deletions ethcore/sync/src/sync_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::collections::HashMap;
use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId};
use network::client_version::ClientVersion;
use bytes::Bytes;
use ethcore::client::BlockChainClient;
use types::BlockNumber;
Expand All @@ -40,9 +41,9 @@ pub trait SyncIo {
fn chain(&self) -> &BlockChainClient;
/// Get the snapshot service.
fn snapshot_service(&self) -> &SnapshotService;
/// Returns peer identifier string
fn peer_info(&self, peer_id: PeerId) -> String {
peer_id.to_string()
/// Returns peer version identifier
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
ClientVersion::from(peer_id.to_string())
}
/// Returns information on p2p session
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
Expand Down Expand Up @@ -134,7 +135,7 @@ impl<'s> SyncIo for NetSyncIo<'s> {
self.network.protocol_version(*protocol, peer_id).unwrap_or(0)
}

fn peer_info(&self, peer_id: PeerId) -> String {
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
self.network.peer_client_version(peer_id)
}

Expand Down
9 changes: 6 additions & 3 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use ethereum_types::H256;
use parking_lot::{RwLock, Mutex};
use bytes::Bytes;
use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
use network::client_version::ClientVersion;
use tests::snapshot::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient,
ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage};
Expand Down Expand Up @@ -119,10 +120,12 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
&*self.chain
}

fn peer_info(&self, peer_id: PeerId) -> String {
self.peers_info.get(&peer_id)
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
let client_id = self.peers_info.get(&peer_id)
.cloned()
.unwrap_or_else(|| peer_id.to_string())
.unwrap_or_else(|| peer_id.to_string());

ClientVersion::from(client_id)
}

fn snapshot_service(&self) -> &SnapshotService {
Expand Down
2 changes: 1 addition & 1 deletion rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ethcore-io = { path = "../util/io" }
ethcore-light = { path = "../ethcore/light" }
ethcore-logger = { path = "../parity/logger" }
ethcore-miner = { path = "../miner" }
ethcore-network = { path = "../util/network" }
ethcore-private-tx = { path = "../ethcore/private-tx" }
ethcore-sync = { path = "../ethcore/sync" }
ethereum-types = "0.4"
Expand All @@ -70,7 +71,6 @@ fake-hardware-wallet = { path = "../accounts/fake-hardware-wallet" }

[dev-dependencies]
ethcore = { path = "../ethcore", features = ["test-helpers"] }
ethcore-network = { path = "../util/network" }
fake-fetch = { path = "../util/fake-fetch" }
kvdb-memorydb = "0.1"
macros = { path = "../util/macros" }
Expand Down
1 change: 1 addition & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ extern crate ethcore_io as io;
extern crate ethcore_light as light;
extern crate ethcore_logger;
extern crate ethcore_miner as miner;
extern crate ethcore_network as network;
extern crate ethcore_private_tx;
extern crate ethcore_sync as sync;
extern crate ethereum_types;
Expand Down
5 changes: 3 additions & 2 deletions rpc/src/v1/tests/helpers/sync_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::BTreeMap;
use ethereum_types::H256;
use parking_lot::RwLock;
use sync::{SyncProvider, EthProtocolInfo, SyncStatus, SyncState, PeerInfo, TransactionStats};
use network::client_version::ClientVersion;

/// TestSyncProvider config.
pub struct Config {
Expand Down Expand Up @@ -75,7 +76,7 @@ impl SyncProvider for TestSyncProvider {
vec![
PeerInfo {
id: Some("node1".to_owned()),
client_version: "Parity-Ethereum/1".to_owned(),
client_version: ClientVersion::from("Parity-Ethereum/1/v2.3.0/linux/rustc"),
capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()],
remote_address: "127.0.0.1:7777".to_owned(),
local_address: "127.0.0.1:8888".to_owned(),
Expand All @@ -88,7 +89,7 @@ impl SyncProvider for TestSyncProvider {
},
PeerInfo {
id: None,
client_version: "Parity-Ethereum/2".to_owned(),
client_version: ClientVersion::from("Parity-Ethereum/2/v2.3.0/linux/rustc"),
capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()],
remote_address: "Handshake".to_owned(),
local_address: "127.0.0.1:3333".to_owned(),
Expand Down
Loading