Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Avoid broadcasting transactions to peers that send them #3796

Merged
merged 9 commits into from
Dec 12, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use ipc::IpcConfig;
use util::H256;
use util::{H256, H512};

/// Represents what has to be handled by actor listening to chain events
#[ipc]
Expand All @@ -40,6 +40,15 @@ pub trait ChainNotify : Send + Sync {
fn stop(&self) {
// does nothing by default
}

/// fires when new transactions are imported
fn transactions_imported(&self,
_hashes: Vec<H256>,
_peer_id: Option<H512>,
_block_num: u64,
) {
// does nothing by default
}
}

impl IpcConfig for ChainNotify { }
15 changes: 10 additions & 5 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use time::precise_time_ns;
use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock, Hashable};
use util::{journaldb, TrieFactory, Trie};
use util::trie::TrieSpec;
use util::{U256, H256, Address, H2048, Uint, FixedHash};
use util::{U256, H256, H512, Address, H2048, Uint, FixedHash};
use util::kvdb::*;

// other
Expand Down Expand Up @@ -559,11 +559,16 @@ impl Client {
}

/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: Option<H512>) -> usize {
trace!(target: "external_tx", "Importing queued");
let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
let txs: Vec<SignedTransaction> = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect();
let block_number = self.chain_info().best_block_number;
self.notify(|notify| {
notify.transactions_imported(hashes.clone(), peer_id.clone(), block_number);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not it happen only after successful import?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the event is misleading, even if the transaction is already in queue we want to note the the peer has propagated the transaction again, so we don't really care if it was imported or not.

Will rename to transactions_received.

});
let results = self.miner.import_external_transactions(self, txs);
results.len()
}
Expand Down Expand Up @@ -1264,14 +1269,14 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(self.chain.read().best_block_hash())).clone()
}

fn queue_transactions(&self, transactions: Vec<Bytes>) {
fn queue_transactions(&self, transactions: Vec<Bytes>, node_id: Option<H512>) {
let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed);
trace!(target: "external_tx", "Queue size: {}", queue_size);
if queue_size > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) {
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, node_id)) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}

fn queue_transactions(&self, transactions: Vec<Bytes>) {
fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: Option<H512>) {
// import right here
let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);
Expand Down
8 changes: 4 additions & 4 deletions ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeMap;
use util::{U256, Address, H256, H2048, Bytes, Itertools};
use util::{U256, Address, H256, H512, H2048, Bytes, Itertools};
use util::stats::Histogram;
use blockchain::TreeRoute;
use verification::queue::QueueInfo as BlockQueueInfo;
Expand Down Expand Up @@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send {
fn last_hashes(&self) -> LastHashes;

/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>);
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: Option<H512>);

/// list all transactions
fn pending_transactions(&self) -> Vec<SignedTransaction>;
Expand Down Expand Up @@ -294,9 +294,9 @@ pub trait ProvingBlockChainClient: BlockChainClient {
/// The key is the keccak hash of the account's address.
/// Returns a vector of raw trie nodes (in order from the root) proving the query.
/// Nodes after `from_level` may be omitted.
/// An empty vector indicates unservable query.
/// An empty vector indicates unservable query.
fn prove_account(&self, key1: H256, from_level: u32, id: BlockId) -> Vec<Bytes>;

/// Get code by address hash.
fn code_by_hash(&self, account_key: H256, id: BlockId) -> Bytes;
}
}
6 changes: 4 additions & 2 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub enum ClientIoMessage {
/// A block is ready
BlockVerified,
/// New transaction RLPs are ready to be imported
NewTransactions(Vec<Bytes>),
NewTransactions(Vec<Bytes>, Option<H512>),
/// Begin snapshot restoration
BeginRestoration(ManifestData),
/// Feed a state chunk to the snapshot service
Expand Down Expand Up @@ -196,7 +196,9 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {

match *net_message {
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); }
ClientIoMessage::NewTransactions(ref transactions, ref peer_id) => {
self.client.import_queued_transactions(transactions, peer_id.clone());
}
ClientIoMessage::BeginRestoration(ref manifest) => {
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
warn!("Failed to initialize snapshot restoration: {}", e);
Expand Down
26 changes: 23 additions & 3 deletions js/src/dapps/localtx/Transaction/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class BaseTransaction extends Component {
<IdentityIcon
address={ transaction.from }
/>
0x{ transaction.nonce.toString(16) }
</div>
);
}
Expand Down Expand Up @@ -87,6 +86,17 @@ class BaseTransaction extends Component {
</span>
);
}

renderReceived (stats) {
const noOfPeers = Object.keys(stats.receivedFrom).length;
const noOfPropagations = Object.values(stats.receivedFrom).reduce((sum, val) => sum + val, 0);

return (
<span className={ styles.nowrap }>
{ noOfPropagations } ({ noOfPeers } peers)
</span>
);
}
}

export class Transaction extends BaseTransaction {
Expand All @@ -103,7 +113,8 @@ export class Transaction extends BaseTransaction {
isLocal: false,
stats: {
firstSeen: 0,
propagatedTo: {}
propagatedTo: {},
receivedFrom: {}
}
};

Expand All @@ -129,6 +140,9 @@ export class Transaction extends BaseTransaction {
<th>
# Propagated
</th>
<th>
# Received
</th>
<th />
</tr>
);
Expand Down Expand Up @@ -165,6 +179,9 @@ export class Transaction extends BaseTransaction {
<td>
{ this.renderPropagation(stats) }
</td>
<td>
{ this.renderReceived(stats) }
</td>
</tr>
);
}
Expand Down Expand Up @@ -193,7 +210,8 @@ export class LocalTransaction extends BaseTransaction {

static defaultProps = {
stats: {
propagatedTo: {}
propagatedTo: {},
receivedFrom: {}
}
};

Expand Down Expand Up @@ -317,6 +335,8 @@ export class LocalTransaction extends BaseTransaction {
{ this.renderStatus() }
<br />
{ status === 'pending' ? this.renderPropagation(stats) : null }
<br />
{ status === 'pending' ? this.renderReceived(stats) : null }
</td>
</tr>
);
Expand Down
2 changes: 1 addition & 1 deletion js/src/dapps/localtx/Transaction/transaction.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('dapps/localtx/Transaction', () => {
it('renders without crashing', () => {
const transaction = {
hash: '0x1234567890',
nonce: 15,
nonce: new BigNumber(15),
gasPrice: new BigNumber(10),
gas: new BigNumber(10)
};
Expand Down
10 changes: 8 additions & 2 deletions rpc/src/v1/tests/helpers/sync_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,19 @@ impl SyncProvider for TestSyncProvider {
first_seen: 10,
propagated_to: map![
128.into() => 16
]
],
received_from: map![
1.into() => 10
],
},
5.into() => TransactionStats {
first_seen: 16,
propagated_to: map![
16.into() => 1
]
],
received_from: map![
256.into() => 2
],
}
]
}
Expand Down
16 changes: 13 additions & 3 deletions rpc/src/v1/types/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ pub struct TransactionStats {
/// Peers this transaction was propagated to with count.
#[serde(rename="propagatedTo")]
pub propagated_to: BTreeMap<H512, usize>,
/// Peers that propagated this transaction back.
#[serde(rename="receivedFrom")]
pub received_from: BTreeMap<H512, usize>,
}

impl From<SyncPeerInfo> for PeerInfo {
Expand Down Expand Up @@ -157,7 +160,11 @@ impl From<SyncTransactionStats> for TransactionStats {
propagated_to: s.propagated_to
.into_iter()
.map(|(id, count)| (id.into(), count))
.collect()
.collect(),
received_from: s.received_from
.into_iter()
.map(|(id, count)| (id.into(), count))
.collect(),
}
}
}
Expand Down Expand Up @@ -208,10 +215,13 @@ mod tests {
first_seen: 100,
propagated_to: map![
10.into() => 50
]
],
received_from: map![
1.into() => 1000
],
};

let serialized = serde_json::to_string(&stats).unwrap();
assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#)
assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":1000}}"#)
}
}
20 changes: 14 additions & 6 deletions sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub struct TransactionStats {
pub first_seen: u64,
/// Peers it was propagated to.
pub propagated_to: BTreeMap<H512, usize>,
/// Peers that propagated the transaction back.
pub received_from: BTreeMap<H512, usize>,
}

/// Peer connection information
Expand Down Expand Up @@ -144,7 +146,7 @@ pub struct EthSync {
network: NetworkService,
/// Main (eth/par) protocol handler
sync_handler: Arc<SyncProtocolHandler>,
/// Light (les) protocol handler
/// Light (les) protocol handler
light_proto: Option<Arc<LightProtocol>>,
/// The main subprotocol name
subprotocol_name: [u8; 3],
Expand All @@ -155,7 +157,7 @@ pub struct EthSync {
impl EthSync {
/// Creates and register protocol with the network service
pub fn new(params: Params) -> Result<Arc<EthSync>, NetworkError> {
let pruning_info = params.chain.pruning_info();
let pruning_info = params.chain.pruning_info();
let light_proto = match params.config.serve_light {
false => None,
true => Some({
Expand Down Expand Up @@ -297,7 +299,7 @@ impl ChainNotify for EthSync {
Some(lp) => lp,
None => return,
};

let chain_info = self.sync_handler.chain.chain_info();
light_proto.make_announcement(context, Announcement {
head_hash: chain_info.best_block_hash,
Expand All @@ -323,7 +325,7 @@ impl ChainNotify for EthSync {
// register the warp sync subprotocol
self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8])
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));

// register the light protocol.
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
Expand All @@ -335,6 +337,11 @@ impl ChainNotify for EthSync {
self.sync_handler.snapshot_service.abort_restore();
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
}

fn transactions_imported(&self, hashes: Vec<H256>, peer_id: Option<H512>, block_number: u64) {
let mut sync = self.sync_handler.sync.write();
sync.transactions_imported(hashes, peer_id, block_number);
}
}

/// LES event handler.
Expand All @@ -344,7 +351,8 @@ struct TxRelay(Arc<BlockChainClient>);
impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) {
trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect())
// TODO [ToDr] Can we get a peer enode somehow?
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.persistent_peer_id(&ctx.peer()) once the method I mentioned has been added.

}
}

Expand Down Expand Up @@ -547,4 +555,4 @@ pub struct ServiceConfiguration {
pub net: NetworkConfiguration,
/// IPC path.
pub io_path: String,
}
}
10 changes: 9 additions & 1 deletion sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,13 @@ impl ChainSync {
self.transactions_stats.stats()
}

/// Updates statistics for imported transactions.
pub fn transactions_imported(&mut self, hashes: Vec<H256>, peer_id: Option<H512>, block_number: u64) {
for hash in hashes {
self.transactions_stats.received(hash, peer_id, block_number);
}
}

/// Abort all sync activity
pub fn abort(&mut self, io: &mut SyncIo) {
self.reset_and_continue(io);
Expand Down Expand Up @@ -1409,7 +1416,8 @@ impl ChainSync {
let tx = rlp.as_raw().to_vec();
transactions.push(tx);
}
io.chain().queue_transactions(transactions);
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
io.chain().queue_transactions(transactions, id);
Ok(())
}

Expand Down
Loading