Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix reimporting transactions from retracted blocks. #4250

Merged
merged 1 commit into from Dec 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions client/transaction-pool/graph/src/base_pool.rs
Expand Up @@ -341,7 +341,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
if removed.iter().any(|tx| tx.hash == hash) {
// We still need to remove all transactions that we promoted
// since they depend on each other and will never get to the best iterator.
self.ready.remove_invalid(&promoted);
self.ready.remove_subtree(&promoted);

debug!(target: "txpool", "[{:?}] Cycle detected, bailing.", hash);
return Err(error::Error::CycleDetected)
Expand Down Expand Up @@ -403,7 +403,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_invalid(&[minimal.transaction.hash.clone()]))
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
} else {
break;
}
Expand All @@ -423,7 +423,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_invalid(&[minimal.transaction.hash.clone()]))
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
} else {
break;
}
Expand All @@ -440,8 +440,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
/// they were part of a chain, you may attempt to re-import them later.
/// NOTE If you want to remove ready transactions that were already used
/// and you don't want them to be stored in the pool use `prune_tags` method.
pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = self.ready.remove_invalid(hashes);
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = self.ready.remove_subtree(hashes);
removed.extend(self.future.remove(hashes));
removed
}
Expand All @@ -454,7 +454,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
/// Prunes transactions that provide given list of tags.
///
/// This will cause all transactions that provide these tags to be removed from the pool,
/// but unlike `remove_invalid`, dependent transactions are not touched.
/// but unlike `remove_subtree`, dependent transactions are not touched.
/// Additional transactions from future queue might be promoted to ready if you satisfy tags
/// that the pool didn't previously know about.
pub fn prune_tags(&mut self, tags: impl IntoIterator<Item=Tag>) -> PruneStatus<Hash, Ex> {
Expand Down Expand Up @@ -905,7 +905,7 @@ mod tests {
assert_eq!(pool.future.len(), 1);

// when
pool.remove_invalid(&[6, 1]);
pool.remove_subtree(&[6, 1]);

// then
assert_eq!(pool.ready().count(), 1);
Expand Down
6 changes: 3 additions & 3 deletions client/transaction-pool/graph/src/pool.rs
Expand Up @@ -283,7 +283,7 @@ impl<B: ChainApi> Pool<B> {
tags: impl IntoIterator<Item=Tag>,
known_imported_hashes: impl IntoIterator<Item=ExHash<B>> + Clone,
) -> impl Future<Output=Result<(), B::Error>> {
log::trace!(target: "txpool", "Pruning at {:?}", at);
log::debug!(target: "txpool", "Pruning at {:?}", at);
// Prune all transactions that provide given tags
let prune_status = match self.validated_pool.prune_tags(tags) {
Ok(prune_status) => prune_status,
Expand Down Expand Up @@ -317,7 +317,7 @@ impl<B: ChainApi> Pool<B> {
}

/// Return an event stream of notifications for when transactions are imported to the pool.
///
///
/// Consumers of this stream should use the `ready` method to actually get the
/// pending transactions in the right order.
pub fn import_notification_stream(&self) -> EventStream {
Expand All @@ -329,7 +329,7 @@ impl<B: ChainApi> Pool<B> {
self.validated_pool.on_broadcasted(propagated)
}

/// Remove from the pool.
/// Remove invalid transactions from the pool.
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
self.validated_pool.remove_invalid(hashes)
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction-pool/graph/src/ready.rs
Expand Up @@ -230,12 +230,12 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
}).collect()
}

/// Removes invalid transactions from the ready pool.
/// Removes a subtree of transactions from the ready pool.
///
/// NOTE removing a transaction will also cause a removal of all transactions that depend on that one
/// (i.e. the entire subgraph that this transaction is a start of will be removed).
/// All removed transactions are returned.
pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut to_remove = hashes.iter().cloned().collect::<Vec<_>>();

Expand Down
18 changes: 14 additions & 4 deletions client/transaction-pool/graph/src/validated_pool.rs
Expand Up @@ -215,9 +215,9 @@ impl<B: ChainApi> ValidatedPool<B> {
let hash = updated_transactions.keys().next().cloned().expect("transactions is not empty; qed");

// note we are not considering tx with hash invalid here - we just want
// to remove it along with dependent transactions and `remove_invalid()`
// to remove it along with dependent transactions and `remove_subtree()`
// does exactly what we need
let removed = pool.remove_invalid(&[hash.clone()]);
let removed = pool.remove_subtree(&[hash.clone()]);
for removed_tx in removed {
let removed_hash = removed_tx.hash.clone();
let updated_transaction = updated_transactions.remove(&removed_hash);
Expand Down Expand Up @@ -451,13 +451,23 @@ impl<B: ChainApi> ValidatedPool<B> {
}
}

/// Remove from the pool.
/// Remove a subtree of transactions from the pool and mark them invalid.
///
/// The transactions passed as an argument will be additionally banned
/// to prevent them from entering the pool right away.
/// Note this is not the case for the dependent transactions - those may
/// still be valid so we want to be able to re-import them.
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
// early exit in case there is no invalid transactions.
if hashes.is_empty() {
return vec![]
}

// temporarily ban invalid transactions
debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes);
self.rotator.ban(&time::Instant::now(), hashes.iter().cloned());

let invalid = self.pool.write().remove_invalid(hashes);
let invalid = self.pool.write().remove_subtree(hashes);

let mut listener = self.listener.write();
for tx in &invalid {
Expand Down
7 changes: 4 additions & 3 deletions client/transaction-pool/src/maintainer.rs
Expand Up @@ -23,7 +23,7 @@ use futures::{
Future, FutureExt,
future::{Either, join, ready},
};
use log::warn;
use log::{warn, debug};
use parking_lot::Mutex;

use client_api::{
Expand Down Expand Up @@ -79,13 +79,14 @@ where
let retracted_transactions = retracted.to_vec().into_iter()
.filter_map(move |hash| client_copy.block_body(&BlockId::hash(hash)).ok().unwrap_or(None))
.flat_map(|block| block.into_iter())
.filter(|tx| tx.is_signed().unwrap_or(false));
// if signed information is not present, attempt to resubmit anyway.
.filter(|tx| tx.is_signed().unwrap_or(true));
let resubmit_future = self.pool
.submit_at(id, retracted_transactions, true)
.then(|resubmit_result| ready(match resubmit_result {
Ok(_) => (),
Err(e) => {
warn!("Error re-submitting transactions: {:?}", e);
debug!(target: "txpool", "Error re-submitting transactions: {:?}", e);
()
}
}));
Expand Down