Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Very few peer connections #661

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
20 changes: 15 additions & 5 deletions bin/oe/run.rs
Expand Up @@ -459,6 +459,21 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<RunningClient
let tx = ::parking_lot::Mutex::new(priority_tasks);
let is_ready = Arc::new(atomic::AtomicBool::new(true));
miner.add_transactions_listener(Box::new(move |hashes| {
// transaction is added into the queue
for hash in hashes {
match new_transaction_hashes.try_send(hash.clone()) {
Err(err) => {
trace!(
target: "tx_listener",
"New transaction {} has not been send into the channel: {}. Other transactions have been discarded",
hash, err
);
// if any error was encountered it does not make sense to continue further
break
}
Ok(_) => ()
}
}
// we want to have only one PendingTransactions task in the queue.
if is_ready
.compare_exchange(
Expand All @@ -469,11 +484,6 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<RunningClient
)
.is_ok()
{
for hash in hashes {
new_transaction_hashes
.send(hash.clone())
.expect("new_transaction_hashes receiving side is disconnected");
}
let task =
crate::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone());
// we ignore error cause it means that we are closing
Expand Down
19 changes: 19 additions & 0 deletions crates/concensus/miner/src/pool/queue.rs
Expand Up @@ -661,6 +661,25 @@ impl TransactionQueue {
self.pool.read().find(hash)
}

/// Retrieves multiple transactions from the pool.
///
/// Given transaction hashes looks up that transactions in the pool
/// and returns a vector of shared pointers to those of them which
/// are in the pool.
///
/// Optimizes look ups for several transactions by acquiring read lock only once.
pub fn find_mul(
&self,
hashes: impl Iterator<Item = H256>,
) -> Vec<Arc<pool::VerifiedTransaction>> {
let pool = self.pool.read();
let mut res = Vec::new();
for hash in hashes {
pool.find(&hash).map(|tx| res.push(tx));
}
res
}

/// Remove a set of transactions from the pool.
///
/// Given an iterator of transaction hashes
Expand Down
6 changes: 5 additions & 1 deletion crates/ethcore/src/client/client.rs
Expand Up @@ -90,7 +90,7 @@ use snapshot::{self, io as snapshot_io, SnapshotClient};
use spec::Spec;
use state::{self, State};
use state_db::StateDB;
use stats::{PrometheusMetrics, PrometheusRegistry};
use stats::{Corpus, PrometheusMetrics, PrometheusRegistry};
use trace::{
self, Database as TraceDatabase, ImportRequest as TraceImportRequest, LocalizedTrace, TraceDB,
};
Expand Down Expand Up @@ -2749,6 +2749,10 @@ impl BlockChainClient for Client {
self.importer.miner.transaction(tx_hash)
}

fn transactions(&self, hashes: Vec<H256>) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.transactions(hashes)
}

fn signing_chain_id(&self) -> Option<u64> {
self.engine.signing_chain_id(&self.latest_env_info())
}
Expand Down
6 changes: 5 additions & 1 deletion crates/ethcore/src/client/test_client.rs
Expand Up @@ -75,7 +75,7 @@ use miner::{self, Miner, MinerService};
use spec::Spec;
use state::StateInfo;
use state_db::StateDB;
use stats::{PrometheusMetrics, PrometheusRegistry};
use stats::{Corpus, PrometheusMetrics, PrometheusRegistry};
use trace::LocalizedTrace;
use verification::queue::{kind::blocks::Unverified, QueueInfo};

Expand Down Expand Up @@ -1123,6 +1123,10 @@ impl BlockChainClient for TestBlockChainClient {
fn transaction(&self, tx_hash: &H256) -> Option<Arc<VerifiedTransaction>> {
self.miner.transaction(tx_hash)
}

fn transactions(&self, hashes: Vec<H256>) -> Vec<Arc<VerifiedTransaction>> {
self.miner.transactions(hashes)
}
}

impl IoClient for TestBlockChainClient {
Expand Down
2 changes: 2 additions & 0 deletions crates/ethcore/src/client/traits.rs
Expand Up @@ -387,6 +387,8 @@ pub trait BlockChainClient:
/// Get verified transaction with specified transaction hash.
fn transaction(&self, tx_hash: &H256) -> Option<Arc<VerifiedTransaction>>;

fn transactions(&self, hashes: Vec<H256>) -> Vec<Arc<VerifiedTransaction>>;

/// Sorted list of transaction gas prices from at least last sample_size blocks.
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {
let mut h = self.chain_info().best_block_hash;
Expand Down
4 changes: 4 additions & 0 deletions crates/ethcore/src/miner/miner.rs
Expand Up @@ -1258,6 +1258,10 @@ impl miner::MinerService for Miner {
self.transaction_queue.find(hash)
}

fn transactions(&self, hashes: Vec<H256>) -> Vec<Arc<VerifiedTransaction>> {
self.transaction_queue.find_mul(hashes.into_iter())
}

fn remove_transaction(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>> {
self.transaction_queue
.remove(::std::iter::once(hash), false)
Expand Down
5 changes: 5 additions & 0 deletions crates/ethcore/src/miner/mod.rs
Expand Up @@ -196,6 +196,11 @@ pub trait MinerService: Send + Sync {
/// Query transaction from the pool given it's hash.
fn transaction(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>>;

/// Query transactions from the pool given their hashes.
/// Is added to allow underlying implementation optimize look ups
/// of several transactions.
fn transactions(&self, hashes: Vec<H256>) -> Vec<Arc<VerifiedTransaction>>;

/// Returns next valid nonce for given address.
///
/// This includes nonces of all transactions from this address in the pending queue
Expand Down
6 changes: 4 additions & 2 deletions crates/ethcore/sync/src/api.rs
Expand Up @@ -33,7 +33,8 @@ use std::{
use chain::{
fork_filter::ForkFilterApi, ChainSyncApi, SyncState, SyncStatus as EthSyncStatus,
ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65,
ETH_PROTOCOL_VERSION_66, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2,
ETH_PROTOCOL_VERSION_66, MAX_NEW_TRANSACTIONS_TO_PROCESS, PAR_PROTOCOL_VERSION_1,
PAR_PROTOCOL_VERSION_2,
};
use ethcore::{
client::{BlockChainClient, ChainMessageType, ChainNotify, NewBlocks},
Expand Down Expand Up @@ -254,7 +255,8 @@ impl EthSync {
connection_filter: Option<Arc<dyn ConnectionFilter>>,
) -> Result<Arc<EthSync>, Error> {
let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel();
let (new_transaction_hashes_tx, new_transaction_hashes_rx) = crossbeam_channel::unbounded();
let (new_transaction_hashes_tx, new_transaction_hashes_rx) =
crossbeam_channel::bounded(MAX_NEW_TRANSACTIONS_TO_PROCESS);
let fork_filter = ForkFilterApi::new(&*params.chain, params.forks);

let sync = ChainSyncApi::new(
Expand Down
31 changes: 14 additions & 17 deletions crates/ethcore/sync/src/chain/mod.rs
Expand Up @@ -115,6 +115,7 @@ use rlp::{DecoderError, RlpStream};
use snapshot::Snapshot;
use std::{
cmp,
cmp::max,
collections::{BTreeMap, HashMap, HashSet},
sync::mpsc,
time::{Duration, Instant},
Expand Down Expand Up @@ -167,6 +168,8 @@ pub const PAR_PROTOCOL_VERSION_1: (u8, u8) = (1, 0x15);
/// 2 version of OpenEthereum protocol (consensus messages added).
pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16);

pub const MAX_NEW_TRANSACTIONS_TO_PROCESS: usize = 4096;

pub const MAX_BODIES_TO_SEND: usize = 256;
pub const MAX_HEADERS_TO_SEND: usize = 512;
pub const MAX_NODE_DATA_TO_SEND: usize = 1024;
Expand Down Expand Up @@ -558,7 +561,7 @@ impl ChainSyncApi {
debug!(target: "sync", "Finished block propagation, took {}ms", as_ms(started));
}
PriorityTask::PropagateTransactions(time, _) => {
let hashes = sync.new_transaction_hashes(None);
let hashes = sync.new_transaction_hashes(Some(MAX_NEW_TRANSACTIONS_TO_PROCESS));
SyncPropagator::propagate_new_transactions(&mut sync, io, hashes, || {
check_deadline(deadline).is_some()
});
Expand Down Expand Up @@ -879,28 +882,22 @@ impl ChainSync {
/// Get transaction hashes that were imported but not yet processed,
/// but no more than `max_len` if provided.
pub fn new_transaction_hashes(&self, max_len: Option<usize>) -> Vec<H256> {
let size = std::cmp::min(
self.new_transaction_hashes.len(),
max_len.unwrap_or(usize::MAX),
);
let mut hashes = Vec::with_capacity(size);
for _ in 0..size {
let max_len = max_len.unwrap_or(usize::MAX);
// used just to initialize the vector with results
let expected_size = cmp::min(self.new_transaction_hashes.len(), max_len);
let mut hashes = Vec::with_capacity(expected_size);
for _ in 0..max_len {
match self.new_transaction_hashes.try_recv() {
Ok(hash) => hashes.push(hash),
Err(err) => {
// In general that should not be the case as the `size`
// must not be greater than number of messages in the channel.
// However if any error occurs we just log it for further analysis and break the loop.
debug!(target: "sync", "Error while receiving new transaction hashes: {}", err);
Err(crossbeam_channel::TryRecvError::Empty) => break, // we just collected all available hashes
Err(crossbeam_channel::TryRecvError::Disconnected) => {
// Receiving end disconnected for some reason. Log it for further analysis.
debug!(target: "sync", "New transaction hashes channel is empty and disconnected");
break;
}
}
}
trace!(
target: "sync",
"New transaction hashes received for processing. Expected: {}. Actual: {}",
size, hashes.len()
);
trace!(target: "sync", "{} new transaction hashes received for processing", hashes.len());
hashes
}

Expand Down
52 changes: 20 additions & 32 deletions crates/ethcore/sync/src/chain/propagator.rs
Expand Up @@ -106,22 +106,17 @@ impl SyncPropagator {
tx_hashes: Vec<H256>,
should_continue: F,
) -> usize {
let transactions = move |io: &dyn SyncIo| {
tx_hashes
.iter()
.filter_map(|hash| io.chain().transaction(hash))
.collect()
};
SyncPropagator::propagate_transactions(sync, io, transactions, true, should_continue)
let get_transactions = |io: &dyn SyncIo| io.chain().transactions(tx_hashes);
SyncPropagator::propagate_transactions(sync, io, get_transactions, true, should_continue)
}

pub fn propagate_ready_transactions<F: FnMut() -> bool>(
sync: &mut ChainSync,
io: &mut dyn SyncIo,
should_continue: F,
) -> usize {
let transactions = |io: &dyn SyncIo| io.chain().transactions_to_propagate();
SyncPropagator::propagate_transactions(sync, io, transactions, false, should_continue)
let get_transactions = |io: &dyn SyncIo| io.chain().transactions_to_propagate();
SyncPropagator::propagate_transactions(sync, io, get_transactions, false, should_continue)
}

fn propagate_transactions_to_peers<F: FnMut() -> bool>(
Expand Down Expand Up @@ -327,28 +322,21 @@ impl SyncPropagator {
}
}

fn select_peers_for_transactions<F>(sync: &ChainSync, filter: F, are_new: bool) -> Vec<PeerId>
fn select_peers_for_transactions<F>(sync: &ChainSync, filter: F) -> Vec<PeerId>
where
F: Fn(&PeerId) -> bool,
{
let fraction_filter: Box<dyn FnMut(&PeerId) -> bool> = if are_new {
// We propagate new transactions to all peers initially.
Box::new(|_| true)
} else {
// Otherwise, we propagate transaction only to squire root of all peers.
let mut random = random::new();
// sqrt(x)/x scaled to max u32
let fraction =
((sync.peers.len() as f64).powf(-0.5) * (u32::max_value() as f64).round()) as u32;
let small = sync.peers.len() < MIN_PEERS_PROPAGATION;
Box::new(move |_| small || random.next_u32() < fraction)
};
// sqrt(x)/x scaled to max u32
let fraction =
((sync.peers.len() as f64).powf(-0.5) * (u32::max_value() as f64).round()) as u32;
let small = sync.peers.len() < MIN_PEERS_PROPAGATION;

let mut random = random::new();
sync.peers
.keys()
.cloned()
.filter(filter)
.filter(fraction_filter)
.filter(|_| small || random.next_u32() < fraction)
.take(MAX_PEERS_PROPAGATION)
.collect()
}
Expand Down Expand Up @@ -376,7 +364,7 @@ impl SyncPropagator {
) -> usize
where
F: FnMut() -> bool,
G: Fn(&dyn SyncIo) -> Vec<Arc<VerifiedTransaction>>,
G: FnOnce(&dyn SyncIo) -> Vec<Arc<VerifiedTransaction>>,
{
// Early out if nobody to send to.
if sync.peers.is_empty() {
Expand All @@ -400,7 +388,7 @@ impl SyncPropagator {
// usual transactions could be propagated to all peers
let mut affected_peers = HashSet::new();
if !transactions.is_empty() {
let peers = SyncPropagator::select_peers_for_transactions(sync, |_| true, are_new);
let peers = SyncPropagator::select_peers_for_transactions(sync, |_| true);
affected_peers = SyncPropagator::propagate_transactions_to_peers(
sync,
io,
Expand All @@ -414,11 +402,10 @@ impl SyncPropagator {
// most of times service_transactions will be empty
// => there's no need to merge packets
if !service_transactions.is_empty() {
let service_transactions_peers = SyncPropagator::select_peers_for_transactions(
sync,
|peer_id| io.peer_version(*peer_id).accepts_service_transaction(),
are_new,
);
let service_transactions_peers =
SyncPropagator::select_peers_for_transactions(sync, |peer_id| {
io.peer_version(*peer_id).accepts_service_transaction()
});
let service_transactions_affected_peers =
SyncPropagator::propagate_transactions_to_peers(
sync,
Expand Down Expand Up @@ -608,7 +595,7 @@ mod tests {
}

#[test]
fn propagates_new_transactions_to_all_peers() {
fn propagates_new_transactions_to_subset_of_peers() {
let (new_transaction_hashes_tx, new_transaction_hashes_rx) = crossbeam_channel::unbounded();

let mut client = TestBlockChainClient::new();
Expand All @@ -625,7 +612,8 @@ mod tests {
let peer_count =
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, vec![tx_hash], || true);

assert_eq!(25, peer_count);
// Currently random implementation for test returns 8 peers as result of peers selection.
assert_eq!(8, peer_count);
}

#[test]
Expand Down
4 changes: 4 additions & 0 deletions crates/rpc/src/v1/tests/helpers/miner_service.rs
Expand Up @@ -250,6 +250,10 @@ impl MinerService for TestMinerService {
.map(|tx| Arc::new(VerifiedTransaction::from_pending_block_transaction(tx)))
}

fn transactions(&self, hashes: Vec<H256>) -> Vec<Arc<VerifiedTransaction>> {
unimplemented!()
}

fn remove_transaction(&self, hash: &H256) -> Option<Arc<VerifiedTransaction>> {
self.pending_transactions
.lock()
Expand Down