Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add some logs, fix reorg import. (#4250)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomusdrw authored and gavofyork committed Dec 1, 2019
1 parent 64177da commit 77324be
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 19 deletions.
14 changes: 7 additions & 7 deletions client/transaction-pool/graph/src/base_pool.rs
Expand Up @@ -341,7 +341,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
if removed.iter().any(|tx| tx.hash == hash) {
// We still need to remove all transactions that we promoted
// since they depend on each other and will never get to the best iterator.
self.ready.remove_invalid(&promoted);
self.ready.remove_subtree(&promoted);

debug!(target: "txpool", "[{:?}] Cycle detected, bailing.", hash);
return Err(error::Error::CycleDetected)
Expand Down Expand Up @@ -403,7 +403,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_invalid(&[minimal.transaction.hash.clone()]))
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
} else {
break;
}
Expand All @@ -423,7 +423,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_invalid(&[minimal.transaction.hash.clone()]))
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
} else {
break;
}
Expand All @@ -440,8 +440,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
/// they were part of a chain, you may attempt to re-import them later.
/// NOTE If you want to remove ready transactions that were already used
/// and you don't want them to be stored in the pool use `prune_tags` method.
pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = self.ready.remove_invalid(hashes);
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = self.ready.remove_subtree(hashes);
removed.extend(self.future.remove(hashes));
removed
}
Expand All @@ -454,7 +454,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
/// Prunes transactions that provide given list of tags.
///
/// This will cause all transactions that provide these tags to be removed from the pool,
/// but unlike `remove_invalid`, dependent transactions are not touched.
/// but unlike `remove_subtree`, dependent transactions are not touched.
/// Additional transactions from future queue might be promoted to ready if you satisfy tags
/// that the pool didn't previously know about.
pub fn prune_tags(&mut self, tags: impl IntoIterator<Item=Tag>) -> PruneStatus<Hash, Ex> {
Expand Down Expand Up @@ -905,7 +905,7 @@ mod tests {
assert_eq!(pool.future.len(), 1);

// when
pool.remove_invalid(&[6, 1]);
pool.remove_subtree(&[6, 1]);

// then
assert_eq!(pool.ready().count(), 1);
Expand Down
6 changes: 3 additions & 3 deletions client/transaction-pool/graph/src/pool.rs
Expand Up @@ -283,7 +283,7 @@ impl<B: ChainApi> Pool<B> {
tags: impl IntoIterator<Item=Tag>,
known_imported_hashes: impl IntoIterator<Item=ExHash<B>> + Clone,
) -> impl Future<Output=Result<(), B::Error>> {
log::trace!(target: "txpool", "Pruning at {:?}", at);
log::debug!(target: "txpool", "Pruning at {:?}", at);
// Prune all transactions that provide given tags
let prune_status = match self.validated_pool.prune_tags(tags) {
Ok(prune_status) => prune_status,
Expand Down Expand Up @@ -317,7 +317,7 @@ impl<B: ChainApi> Pool<B> {
}

/// Return an event stream of notifications for when transactions are imported to the pool.
///
///
/// Consumers of this stream should use the `ready` method to actually get the
/// pending transactions in the right order.
pub fn import_notification_stream(&self) -> EventStream {
Expand All @@ -329,7 +329,7 @@ impl<B: ChainApi> Pool<B> {
self.validated_pool.on_broadcasted(propagated)
}

/// Remove from the pool.
/// Remove invalid transactions from the pool.
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
self.validated_pool.remove_invalid(hashes)
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction-pool/graph/src/ready.rs
Expand Up @@ -230,12 +230,12 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
}).collect()
}

/// Removes invalid transactions from the ready pool.
/// Removes a subtree of transactions from the ready pool.
///
/// NOTE removing a transaction will also cause a removal of all transactions that depend on that one
/// (i.e. the entire subgraph that this transaction is a start of will be removed).
/// All removed transactions are returned.
pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut to_remove = hashes.iter().cloned().collect::<Vec<_>>();

Expand Down
18 changes: 14 additions & 4 deletions client/transaction-pool/graph/src/validated_pool.rs
Expand Up @@ -215,9 +215,9 @@ impl<B: ChainApi> ValidatedPool<B> {
let hash = updated_transactions.keys().next().cloned().expect("transactions is not empty; qed");

// note we are not considering tx with hash invalid here - we just want
// to remove it along with dependent transactions and `remove_invalid()`
// to remove it along with dependent transactions and `remove_subtree()`
// does exactly what we need
let removed = pool.remove_invalid(&[hash.clone()]);
let removed = pool.remove_subtree(&[hash.clone()]);
for removed_tx in removed {
let removed_hash = removed_tx.hash.clone();
let updated_transaction = updated_transactions.remove(&removed_hash);
Expand Down Expand Up @@ -451,13 +451,23 @@ impl<B: ChainApi> ValidatedPool<B> {
}
}

/// Remove from the pool.
/// Remove a subtree of transactions from the pool and mark them invalid.
///
/// The transactions passed as an argument will be additionally banned
/// to prevent them from entering the pool right away.
/// Note this is not the case for the dependent transactions - those may
/// still be valid so we want to be able to re-import them.
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
// early exit in case there is no invalid transactions.
if hashes.is_empty() {
return vec![]
}

// temporarily ban invalid transactions
debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes);
self.rotator.ban(&time::Instant::now(), hashes.iter().cloned());

let invalid = self.pool.write().remove_invalid(hashes);
let invalid = self.pool.write().remove_subtree(hashes);

let mut listener = self.listener.write();
for tx in &invalid {
Expand Down
7 changes: 4 additions & 3 deletions client/transaction-pool/src/maintainer.rs
Expand Up @@ -23,7 +23,7 @@ use futures::{
Future, FutureExt,
future::{Either, join, ready},
};
use log::warn;
use log::{warn, debug};
use parking_lot::Mutex;

use client_api::{
Expand Down Expand Up @@ -79,13 +79,14 @@ where
let retracted_transactions = retracted.to_vec().into_iter()
.filter_map(move |hash| client_copy.block_body(&BlockId::hash(hash)).ok().unwrap_or(None))
.flat_map(|block| block.into_iter())
.filter(|tx| tx.is_signed().unwrap_or(false));
// if signed information is not present, attempt to resubmit anyway.
.filter(|tx| tx.is_signed().unwrap_or(true));
let resubmit_future = self.pool
.submit_at(id, retracted_transactions, true)
.then(|resubmit_result| ready(match resubmit_result {
Ok(_) => (),
Err(e) => {
warn!("Error re-submitting transactions: {:?}", e);
debug!(target: "txpool", "Error re-submitting transactions: {:?}", e);
()
}
}));
Expand Down

0 comments on commit 77324be

Please sign in to comment.