Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Improve block and transaction propagation (#9954)
Browse files Browse the repository at this point in the history
* Refactor sync to add priority tasks.

* Send priority tasks notifications.

* Propagate blocks, optimize transactions.

* Implement transaction propagation. Use sync_channel.

* Tone down info.

* Prevent deadlock by not waiting forever for sync lock.

* Fix lock order.

* Don't use sync_channel to prevent deadlocks.

* Fix tests.
  • Loading branch information
tomusdrw authored and niklasad1 committed Dec 16, 2018
1 parent a373f4b commit 86529c9
Show file tree
Hide file tree
Showing 18 changed files with 626 additions and 295 deletions.
8 changes: 7 additions & 1 deletion ethcore/service/src/service.rs
Expand Up @@ -106,7 +106,13 @@ impl ClientService {
info!("Configured for {} using {} engine", Colour::White.bold().paint(spec.name.clone()), Colour::Yellow.bold().paint(spec.engine.name()));

let pruning = config.pruning;
let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?;
let client = Client::new(
config,
&spec,
blockchain_db.clone(),
miner.clone(),
io_service.channel(),
)?;
miner.set_io_channel(io_service.channel());
miner.set_in_chain_checker(&client.clone());

Expand Down
12 changes: 10 additions & 2 deletions ethcore/src/client/chain_notify.rs
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use bytes::Bytes;
use ethereum_types::H256;
use ethereum_types::{H256, U256};
use transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
Expand Down Expand Up @@ -141,7 +141,15 @@ pub trait ChainNotify : Send + Sync {
}

/// fires when chain broadcasts a message
fn broadcast(&self, _message_type: ChainMessageType) {}
fn broadcast(&self, _message_type: ChainMessageType) {
// does nothing by default
}

/// fires when new block is about to be imported
/// implementations should be light
fn block_pre_import(&self, _bytes: &Bytes, _hash: &H256, _difficulty: &U256) {
// does nothing by default
}

/// fires when new transactions are received from a peer
fn transactions_received(&self,
Expand Down
36 changes: 28 additions & 8 deletions ethcore/src/client/client.rs
Expand Up @@ -881,7 +881,7 @@ impl Client {
/// Flush the block import queue.
pub fn flush_queue(&self) {
self.importer.block_queue.flush();
while !self.importer.block_queue.queue_info().is_empty() {
while !self.importer.block_queue.is_empty() {
self.import_verified_blocks();
}
}
Expand Down Expand Up @@ -1423,8 +1423,21 @@ impl ImportBlock for Client {
bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(unverified.parent_hash())));
}

let raw = if self.importer.block_queue.is_empty() {
Some((
unverified.bytes.clone(),
unverified.header.hash(),
*unverified.header.difficulty(),
))
} else { None };

match self.importer.block_queue.import(unverified) {
Ok(res) => Ok(res),
Ok(hash) => {
if let Some((raw, hash, difficulty)) = raw {
self.notify(move |n| n.block_pre_import(&raw, &hash, &difficulty));
}
Ok(hash)
},
// we only care about block errors (not import errors)
Err((block, EthcoreError(EthcoreErrorKind::Block(err), _))) => {
self.importer.bad_blocks.report(block.bytes, format!("{:?}", err));
Expand Down Expand Up @@ -1878,6 +1891,10 @@ impl BlockChainClient for Client {
self.importer.block_queue.queue_info()
}

fn is_queue_empty(&self) -> bool {
self.importer.block_queue.is_empty()
}

fn clear_queue(&self) {
self.importer.block_queue.clear();
}
Expand Down Expand Up @@ -2288,7 +2305,11 @@ impl ScheduleInfo for Client {
impl ImportSealedBlock for Client {
fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult<H256> {
let start = Instant::now();
let raw = block.rlp_bytes();
let header = block.header().clone();
let hash = header.hash();
self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));

let route = {
// Do a super duper basic verification to detect potential bugs
if let Err(e) = self.engine.verify_block_basic(&header) {
Expand All @@ -2306,32 +2327,31 @@ impl ImportSealedBlock for Client {
let block_data = block.rlp_bytes();

let route = self.importer.commit_block(block, &header, encoded::Block::new(block_data), self);
trace!(target: "client", "Imported sealed block #{} ({})", header.number(), header.hash());
trace!(target: "client", "Imported sealed block #{} ({})", header.number(), hash);
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route
};
let h = header.hash();
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(
self,
&[h],
&[hash],
&[],
route.enacted(),
route.retracted(),
self.engine.seals_internally().is_some(),
);
self.notify(|notify| {
notify.new_blocks(
vec![h],
vec![hash],
vec![],
route.clone(),
vec![h],
vec![hash],
vec![],
start.elapsed(),
);
});
self.db.read().key_value().flush().expect("DB flush failed.");
Ok(h)
Ok(hash)
}
}

Expand Down
5 changes: 5 additions & 0 deletions ethcore/src/client/traits.rs
Expand Up @@ -300,6 +300,11 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get block queue information.
fn queue_info(&self) -> BlockQueueInfo;

/// Returns true if block queue is empty.
fn is_queue_empty(&self) -> bool {
self.queue_info().is_empty()
}

/// Clear block queue and abort all import activity.
fn clear_queue(&self);

Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/miner/miner.rs
Expand Up @@ -576,7 +576,7 @@ impl Miner {
trace!(target: "miner", "requires_reseal: sealing enabled");

// Disable sealing if there were no requests for SEALING_TIMEOUT_IN_BLOCKS
let had_requests = sealing.last_request.map(|last_request|
let had_requests = sealing.last_request.map(|last_request|
best_block.saturating_sub(last_request) <= SEALING_TIMEOUT_IN_BLOCKS
).unwrap_or(false);

Expand Down
7 changes: 7 additions & 0 deletions ethcore/src/verification/queue/mod.rs
Expand Up @@ -583,6 +583,13 @@ impl<K: Kind> VerificationQueue<K> {
result
}

/// Returns true if there is nothing currently in the queue.
/// TODO [ToDr] Optimize to avoid locking
pub fn is_empty(&self) -> bool {
let v = &self.verification;
v.unverified.lock().is_empty() && v.verifying.lock().is_empty() && v.verified.lock().is_empty()
}

/// Get queue status.
pub fn queue_info(&self) -> QueueInfo {
use std::mem::size_of;
Expand Down
95 changes: 76 additions & 19 deletions ethcore/sync/src/api.rs
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::sync::{Arc, mpsc, atomic};
use std::collections::{HashMap, BTreeMap};
use std::io;
use std::ops::Range;
Expand All @@ -33,10 +33,10 @@ use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageTyp
use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber;
use sync_io::NetSyncIo;
use chain::{ChainSync, SyncStatus as EthSyncStatus};
use chain::{ChainSyncApi, SyncStatus as EthSyncStatus};
use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::RwLock;
use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
Expand Down Expand Up @@ -228,6 +228,37 @@ impl AttachedProtocol {
}
}

/// A prioritized tasks run in a specialised timer.
/// Every task should be completed within a hard deadline,
/// if it's not it's either cancelled or split into multiple tasks.
/// NOTE These tasks might not complete at all, so anything
/// that happens here should work even if the task is cancelled.
#[derive(Debug)]
pub enum PriorityTask {
/// Propagate given block
PropagateBlock {
/// When the task was initiated
started: ::std::time::Instant,
/// Raw block RLP to propagate
block: Bytes,
/// Block hash
hash: H256,
/// Blocks difficulty
difficulty: U256,
},
/// Propagate a list of transactions
PropagateTransactions(::std::time::Instant, Arc<atomic::AtomicBool>),
}
impl PriorityTask {
/// Mark the task as being processed, right after it's retrieved from the queue.
pub fn starting(&self) {
match *self {
PriorityTask::PropagateTransactions(_, ref is_ready) => is_ready.store(true, atomic::Ordering::SeqCst),
_ => {},
}
}
}

/// EthSync initialization parameters.
pub struct Params {
/// Configuration.
Expand Down Expand Up @@ -260,6 +291,8 @@ pub struct EthSync {
subprotocol_name: [u8; 3],
/// Light subprotocol name.
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
}

fn light_params(
Expand Down Expand Up @@ -312,13 +345,19 @@ impl EthSync {
})
};

let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone());
let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel();
let sync = ChainSyncApi::new(
params.config,
&*params.chain,
params.private_tx_handler.clone(),
priority_tasks_rx,
);
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;

let sync = Arc::new(EthSync {
network: service,
eth_handler: Arc::new(SyncProtocolHandler {
sync: RwLock::new(chain_sync),
sync,
chain: params.chain,
snapshot_service: params.snapshot_service,
overlay: RwLock::new(HashMap::new()),
Expand All @@ -327,26 +366,32 @@ impl EthSync {
subprotocol_name: params.config.subprotocol_name,
light_subprotocol_name: params.config.light_subprotocol_name,
attached_protos: params.attached_protos,
priority_tasks: Mutex::new(priority_tasks_tx),
});

Ok(sync)
}

/// Priority tasks producer
pub fn priority_tasks(&self) -> mpsc::Sender<PriorityTask> {
self.priority_tasks.lock().clone()
}
}

impl SyncProvider for EthSync {
/// Get sync status
fn status(&self) -> EthSyncStatus {
self.eth_handler.sync.read().status()
self.eth_handler.sync.status()
}

/// Get sync peers
fn peers(&self) -> Vec<PeerInfo> {
self.network.with_context_eval(self.subprotocol_name, |ctx| {
let peer_ids = self.network.connected_peers();
let eth_sync = self.eth_handler.sync.read();
let light_proto = self.light_proto.as_ref();

peer_ids.into_iter().filter_map(|peer_id| {
let peer_info = self.eth_handler.sync.peer_info(&peer_ids);
peer_ids.into_iter().zip(peer_info).filter_map(|(peer_id, peer_info)| {
let session_info = match ctx.session_info(peer_id) {
None => return None,
Some(info) => info,
Expand All @@ -358,7 +403,7 @@ impl SyncProvider for EthSync {
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
eth_info: eth_sync.peer_info(&peer_id),
eth_info: peer_info,
pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(peer_id)).map(Into::into),
})
}).collect()
Expand All @@ -370,25 +415,24 @@ impl SyncProvider for EthSync {
}

fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
let sync = self.eth_handler.sync.read();
sync.transactions_stats()
.iter()
.map(|(hash, stats)| (*hash, stats.into()))
.collect()
self.eth_handler.sync.transactions_stats()
}
}

const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;
const PRIORITY_TIMER: TimerToken = 3;

pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);

struct SyncProtocolHandler {
/// Shared blockchain client.
chain: Arc<BlockChainClient>,
/// Shared snapshot service.
snapshot_service: Arc<SnapshotService>,
/// Sync strategy
sync: RwLock<ChainSync>,
sync: ChainSyncApi,
/// Chain overlay used to cache data such as fork block.
overlay: RwLock<HashMap<BlockNumber, Bytes>>,
}
Expand All @@ -399,11 +443,13 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");

io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
}
}

fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}

fn connected(&self, io: &NetworkContext, peer: &PeerId) {
Expand All @@ -429,15 +475,26 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
TX_TIMER => {
self.sync.write().propagate_new_transactions(&mut io);
},
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}

impl ChainNotify for EthSync {
fn block_pre_import(&self, bytes: &Bytes, hash: &H256, difficulty: &U256) {
let task = PriorityTask::PropagateBlock {
started: ::std::time::Instant::now(),
block: bytes.clone(),
hash: *hash,
difficulty: *difficulty,
};
if let Err(e) = self.priority_tasks.lock().send(task) {
warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e);
}
}

fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
Expand Down

0 comments on commit 86529c9

Please sign in to comment.