From 7e28edf98244208269c5ec3219d3ec8c23728975 Mon Sep 17 00:00:00 2001 From: keroro Date: Fri, 24 Jul 2020 15:49:32 +0800 Subject: [PATCH 1/6] feat(rpc): attach tip_hash and tip_number within TxPoolInfo * This PR adds `tip_hash` and `tip_number` into `TxPoolInfo` which returns by RPC `tx_pool_info`. --- rpc/README.md | 2 ++ rpc/json/rpc.json | 48 ++++++++++++++++++---------------- rpc/src/module/pool.rs | 2 ++ tx-pool/src/pool.rs | 6 +++++ util/jsonrpc-types/src/pool.rs | 5 +++- 5 files changed, 39 insertions(+), 24 deletions(-) diff --git a/rpc/README.md b/rpc/README.md index 00a4ac2c55..d8af6bbe43 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -2104,6 +2104,8 @@ http://localhost:8114 "orphan": "0x0", "pending": "0x1", "proposed": "0x0", + "tip_hash": "0xa5f5c85987a15de25661e5a214f2c1449cd803f071acc7999820f25246471f40", + "tip_number": "0x400", "total_tx_cycles": "0x219", "total_tx_size": "0x112" } diff --git a/rpc/json/rpc.json b/rpc/json/rpc.json index 72a7bfb6bd..cb9fa5671e 100644 --- a/rpc/json/rpc.json +++ b/rpc/json/rpc.json @@ -342,8 +342,8 @@ { "addresses": [ { - "score": "0x64", - "address": "/ip6/::ffff:18.185.102.19/tcp/8115/p2p/QmXwUgF48ULy6hkgfqrEwEfuHW7WyWyWauueRDAYQHNDfN" + "address": "/ip6/::ffff:18.185.102.19/tcp/8115/p2p/QmXwUgF48ULy6hkgfqrEwEfuHW7WyWyWauueRDAYQHNDfN", + "score": "0x64" }, { "address": "/ip4/18.185.102.19/tcp/8115/p2p/QmXwUgF48ULy6hkgfqrEwEfuHW7WyWyWauueRDAYQHNDfN", @@ -355,15 +355,15 @@ "version": "0.31.0 (4231360 2020-04-20)" }, { - "version": "0.29.0 (a6733e6 2020-02-26)", "addresses": [ { "address": "/ip4/174.80.182.60/tcp/52965/p2p/QmVTMd7SEXfxS5p4EEM5ykTe1DwWWVewEM3NwjLY242vr2", "score": "0x1" } ], + "is_outbound": false, "node_id": "QmVTMd7SEXfxS5p4EEM5ykTe1DwWWVewEM3NwjLY242vr2", - "is_outbound": false + "version": "0.29.0 (a6733e6 2020-02-26)" } ], "skip": true @@ -800,6 +800,8 @@ "orphan": "0x0", "pending": "0x1", "proposed": "0x0", + "tip_hash": "0xa5f5c85987a15de25661e5a214f2c1449cd803f071acc7999820f25246471f40", + "tip_number": "0x400", "total_tx_cycles": "0x219", "total_tx_size": "0x112" } @@ -1349,15 +1351,10 @@ "0x4ceaa32f692948413e213ce6f3a83337145bde6e11fd8cb94377ce2637dcc412" ], "result": { + "block_number": "0x400", "capacity": "0xb00fb84df292", - "cells_count": "0x3f5", - "block_number": "0x400" + "cells_count": "0x3f5" }, - "types": [ - { - "lock_hash": "Cell lock script hash" - } - ], "returns": [ { "capacity": "Total capacity" @@ -1368,6 +1365,11 @@ { "block_number": "At which block capacity was calculated" } + ], + "types": [ + { + "lock_hash": "Cell lock script hash" + } ] }, { @@ -1563,17 +1565,17 @@ "new_tip_header" ], "result": "0x2a", - "types": [ - { - "topic": "Subscription topic (enum: new_tip_header | new_tip_block)" - } - ], "returns": [ { "id": "Subscription id" } ], - "skip": true + "skip": true, + "types": [ + { + "topic": "Subscription topic (enum: new_tip_header | new_tip_block)" + } + ] }, { "description": "unsubscribe from a subscribed topic", @@ -1583,16 +1585,16 @@ "0x2a" ], "result": true, - "types": [ - { - "id": "Subscription id" - } - ], "returns": [ { "result": "Unsubscribe result" } ], - "skip": true + "skip": true, + "types": [ + { + "id": "Subscription id" + } + ] } ] diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index d526733c02..2a260416c0 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -128,6 +128,8 @@ impl PoolRpc for PoolRpcImpl { let tx_pool_info = get_tx_pool_info.unwrap(); Ok(TxPoolInfo { + tip_hash: tx_pool_info.tip_hash.unpack(), + tip_number: tx_pool_info.tip_number.into(), pending: (tx_pool_info.pending_size as u64).into(), proposed: (tx_pool_info.proposed_size as u64).into(), orphan: (tx_pool_info.orphan_size as u64).into(), diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 06362938a4..a7f9db12c7 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -10,6 +10,7 @@ use ckb_error::{Error, ErrorKind, InternalErrorKind}; use ckb_logger::{debug, error, trace}; use ckb_snapshot::Snapshot; use ckb_store::ChainStore; +use ckb_types::core::BlockNumber; use ckb_types::{ core::{ cell::{resolve_transaction, OverlayCellProvider, ResolvedTransaction}, @@ -55,6 +56,8 @@ pub struct TxPool { #[derive(Clone, Debug)] pub struct TxPoolInfo { + pub tip_hash: Byte32, + pub tip_number: BlockNumber, pub pending_size: usize, pub proposed_size: usize, pub orphan_size: usize, @@ -96,7 +99,10 @@ impl TxPool { } pub fn info(&self) -> TxPoolInfo { + let tip_header = self.snapshot.tip_header(); TxPoolInfo { + tip_hash: tip_header.hash(), + tip_number: tip_header.number(), pending_size: self.pending.size() + self.gap.size(), proposed_size: self.proposed.size(), orphan_size: self.orphan.size(), diff --git a/util/jsonrpc-types/src/pool.rs b/util/jsonrpc-types/src/pool.rs index 26b8c7df8e..3f12971673 100644 --- a/util/jsonrpc-types/src/pool.rs +++ b/util/jsonrpc-types/src/pool.rs @@ -1,9 +1,12 @@ -use crate::{Timestamp, Uint64}; +use crate::{BlockNumber, Timestamp, Uint64}; +use ckb_types::H256; use serde::{Deserialize, Serialize}; use serde_json; #[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] pub struct TxPoolInfo { + pub tip_hash: H256, + pub tip_number: BlockNumber, pub pending: Uint64, pub proposed: Uint64, pub orphan: Uint64, From 083e995492ff67b74a138f2ad564f33e17335217 Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Thu, 16 Jul 2020 15:59:04 +0800 Subject: [PATCH 2/6] refactor: rename ContextualTransactionVerifier -> TimeRelativeTransactionVerifier --- tx-pool/src/pool.rs | 4 ++-- tx-pool/src/process.rs | 4 ++-- verification/src/contextual_block_verifier.rs | 6 +++--- verification/src/lib.rs | 2 +- verification/src/transaction_verifier.rs | 6 +++--- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 06362938a4..3cacb542f4 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -19,7 +19,7 @@ use ckb_types::{ packed::{Byte32, OutPoint, ProposalShortId}, }; use ckb_verification::cache::CacheEntry; -use ckb_verification::{ContextualTransactionVerifier, TransactionVerifier}; +use ckb_verification::{TimeRelativeTransactionVerifier, TransactionVerifier}; use faketime::unix_time_as_millis; use lru_cache::LruCache; use std::collections::HashMap; @@ -341,7 +341,7 @@ impl TxPool { match cache_entry { Some(cache_entry) => { - ContextualTransactionVerifier::new( + TimeRelativeTransactionVerifier::new( &rtx, snapshot, tip_number + 1, diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 79f3513a85..3b1350e879 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -24,7 +24,7 @@ use ckb_types::{ prelude::*, }; use ckb_util::LinkedHashSet; -use ckb_verification::{cache::CacheEntry, ContextualTransactionVerifier, TransactionVerifier}; +use ckb_verification::{cache::CacheEntry, TimeRelativeTransactionVerifier, TransactionVerifier}; use failure::Error as FailureError; use faketime::unix_time_as_millis; use std::collections::HashSet; @@ -593,7 +593,7 @@ fn verify_rtxs( .map(|tx| { let tx_hash = tx.transaction.hash(); if let Some(cache_entry) = txs_verify_cache.get(&tx_hash) { - ContextualTransactionVerifier::new( + TimeRelativeTransactionVerifier::new( &tx, snapshot, tip_number + 1, diff --git a/verification/src/contextual_block_verifier.rs b/verification/src/contextual_block_verifier.rs index 2ff6e94cda..c0ac17c5d8 100644 --- a/verification/src/contextual_block_verifier.rs +++ b/verification/src/contextual_block_verifier.rs @@ -2,8 +2,8 @@ use crate::cache::{CacheEntry, TxVerifyCache}; use crate::error::{BlockTransactionsError, EpochError}; use crate::uncles_verifier::{UncleProvider, UnclesVerifier}; use crate::{ - BlockErrorKind, CellbaseError, CommitError, ContextualTransactionVerifier, TransactionVerifier, - UnknownParentError, + BlockErrorKind, CellbaseError, CommitError, TimeRelativeTransactionVerifier, + TransactionVerifier, UnknownParentError, }; use ckb_async_runtime::Handle; use ckb_chain_spec::consensus::Consensus; @@ -392,7 +392,7 @@ impl<'a, CS: ChainStore<'a>> BlockTxsVerifier<'a, CS> { .map(|(index, tx)| { let tx_hash = tx.transaction.hash(); if let Some(cache_entry) = fetched_cache.get(&tx_hash) { - ContextualTransactionVerifier::new( + TimeRelativeTransactionVerifier::new( &tx, self.context, self.block_number, diff --git a/verification/src/lib.rs b/verification/src/lib.rs index 6e5884341c..0c74981cfd 100644 --- a/verification/src/lib.rs +++ b/verification/src/lib.rs @@ -24,7 +24,7 @@ pub use crate::error::{ pub use crate::genesis_verifier::GenesisVerifier; pub use crate::header_verifier::{HeaderResolver, HeaderVerifier}; pub use crate::transaction_verifier::{ - ContextualTransactionVerifier, ScriptVerifier, Since, SinceMetric, TransactionVerifier, + ScriptVerifier, Since, SinceMetric, TimeRelativeTransactionVerifier, TransactionVerifier, }; pub const ALLOWED_FUTURE_BLOCKTIME: u64 = 15 * 1000; // 15 Second diff --git a/verification/src/transaction_verifier.rs b/verification/src/transaction_verifier.rs index 524395c4ae..0d7062880d 100644 --- a/verification/src/transaction_verifier.rs +++ b/verification/src/transaction_verifier.rs @@ -20,12 +20,12 @@ use lru_cache::LruCache; use std::cell::RefCell; use std::collections::HashSet; -pub struct ContextualTransactionVerifier<'a, M> { +pub struct TimeRelativeTransactionVerifier<'a, M> { pub maturity: MaturityVerifier<'a>, pub since: SinceVerifier<'a, M>, } -impl<'a, M> ContextualTransactionVerifier<'a, M> +impl<'a, M> TimeRelativeTransactionVerifier<'a, M> where M: BlockMedianTimeContext, { @@ -37,7 +37,7 @@ where parent_hash: Byte32, consensus: &'a Consensus, ) -> Self { - ContextualTransactionVerifier { + TimeRelativeTransactionVerifier { maturity: MaturityVerifier::new( &rtx, epoch_number_with_fraction, From 392511c06d674649f27e770a2a73187306ad244d Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Fri, 17 Jul 2020 04:30:50 +0800 Subject: [PATCH 3/6] refactor: split TransactionVerifier --- error/src/internal.rs | 6 -- rpc/src/error.rs | 45 +++++++------ test/src/specs/tx_pool/limit.rs | 4 +- tx-pool/src/component/container.rs | 6 +- tx-pool/src/component/pending.rs | 4 +- tx-pool/src/component/proposed.rs | 4 +- tx-pool/src/error.rs | 20 +++++- tx-pool/src/pool.rs | 18 ++--- tx-pool/src/process.rs | 32 +++++++-- tx-pool/src/service.rs | 6 ++ util/snapshot/src/lib.rs | 4 ++ verification/src/lib.rs | 3 +- verification/src/transaction_verifier.rs | 86 +++++++++++++++++++----- 13 files changed, 166 insertions(+), 72 deletions(-) diff --git a/error/src/internal.rs b/error/src/internal.rs index bf05383bab..17c10791ef 100644 --- a/error/src/internal.rs +++ b/error/src/internal.rs @@ -13,12 +13,6 @@ pub enum InternalErrorKind { /// e.g. `Capacity::safe_add` CapacityOverflow, - /// The transaction_pool is already full - TransactionPoolFull, - - /// The transaction already exist in transaction_pool - PoolTransactionDuplicated, - /// Persistent data had corrupted DataCorrupted, diff --git a/rpc/src/error.rs b/rpc/src/error.rs index fc407e2094..55f1a8c8eb 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -1,5 +1,5 @@ use ckb_error::{Error as CKBError, InternalError, InternalErrorKind}; -use ckb_tx_pool::error::SubmitTxError; +use ckb_tx_pool::error::Reject; use jsonrpc_core::{Error, ErrorCode, Value}; use std::fmt::{Debug, Display}; @@ -33,6 +33,7 @@ pub enum RPCError { PoolRejectedTransactionByMaxAncestorsCountLimit = -1105, PoolIsFull = -1106, PoolRejectedDuplicatedTransaction = -1107, + PoolRejectedMalformedTransaction = -1108, } impl RPCError { @@ -82,21 +83,22 @@ impl RPCError { err.unwrap_cause_or_self(), ), SubmitTransaction => { - let submit_tx_err = match err.downcast_ref::() { + let reject = match err.downcast_ref::() { Some(err) => err, None => return Self::ckb_internal_error(err), }; - let kind = match *submit_tx_err { - SubmitTxError::LowFeeRate(_, _) => { - RPCError::PoolRejectedTransactionByMinFeeRate - } - SubmitTxError::ExceededMaximumAncestorsCount => { + let kind = match *reject { + Reject::LowFeeRate(_, _) => RPCError::PoolRejectedTransactionByMinFeeRate, + Reject::ExceededMaximumAncestorsCount => { RPCError::PoolRejectedTransactionByMaxAncestorsCountLimit } + Reject::Full(_, _) => RPCError::PoolIsFull, + Reject::Duplicated(_) => RPCError::PoolRejectedDuplicatedTransaction, + Reject::Malformed(_) => RPCError::PoolRejectedMalformedTransaction, }; - RPCError::custom_with_error(kind, submit_tx_err) + RPCError::custom_with_error(kind, reject) } Internal => { let internal_err = match err.downcast_ref::() { @@ -106,10 +108,6 @@ impl RPCError { let kind = match internal_err.kind() { InternalErrorKind::CapacityOverflow => RPCError::IntegerOverflow, - InternalErrorKind::TransactionPoolFull => RPCError::PoolIsFull, - InternalErrorKind::PoolTransactionDuplicated => { - RPCError::PoolRejectedDuplicatedTransaction - } InternalErrorKind::DataCorrupted => RPCError::DatabaseIsCorrupt, InternalErrorKind::Database => RPCError::DatabaseError, InternalErrorKind::Config => RPCError::ConfigError, @@ -162,24 +160,33 @@ mod tests { #[test] fn test_submit_tx_error_from_ckb_error() { - let err: CKBError = SubmitTxError::LowFeeRate(100, 50).into(); + let err: CKBError = Reject::LowFeeRate(100, 50).into(); assert_eq!( "PoolRejectedTransactionByMinFeeRate: Transaction fee rate must >= 100 shannons/KB, got: 50", RPCError::from_ckb_error(err).message ); - let err: CKBError = SubmitTxError::ExceededMaximumAncestorsCount.into(); + let err: CKBError = Reject::ExceededMaximumAncestorsCount.into(); assert_eq!( "PoolRejectedTransactionByMaxAncestorsCountLimit: Transaction exceeded maximum ancestors count limit, try send it later", RPCError::from_ckb_error(err).message ); - } - #[test] - fn test_internal_error_from_ckb_error() { - let err: CKBError = InternalErrorKind::TransactionPoolFull.into(); + let err: CKBError = Reject::Full("size".to_owned(), 10).into(); + assert_eq!( + "PoolIsFull: Transaction pool exceeded maximum size limit(10), try send it later", + RPCError::from_ckb_error(err).message + ); + + let err: CKBError = Reject::Duplicated(Byte32::new([0; 32])).into(); + assert_eq!( + "PoolRejectedDuplicatedTransaction: Transaction(Byte32(0x0000000000000000000000000000000000000000000000000000000000000000)) already exist in transaction_pool", + RPCError::from_ckb_error(err).message + ); + + let err: CKBError = Reject::Malformed("cellbase like".to_owned()).into(); assert_eq!( - "PoolIsFull: TransactionPoolFull", + "PoolRejectedMalformedTransaction: Malformed cellbase like transaction", RPCError::from_ckb_error(err).message ); } diff --git a/test/src/specs/tx_pool/limit.rs b/test/src/specs/tx_pool/limit.rs index df2e2f3649..15264cd2c9 100644 --- a/test/src/specs/tx_pool/limit.rs +++ b/test/src/specs/tx_pool/limit.rs @@ -48,7 +48,7 @@ impl Spec for SizeLimit { info!("The next tx reach size limit"); let tx = node.new_transaction(hash); - assert_send_transaction_fail(node, &tx, "TransactionPoolFull"); + assert_send_transaction_fail(node, &tx, "Transaction pool exceeded maximum size limit"); node.assert_tx_pool_serialized_size(max_tx_num * one_tx_size); (0..DEFAULT_TX_PROPOSAL_WINDOW.0).for_each(|_| { @@ -111,7 +111,7 @@ impl Spec for CyclesLimit { info!("The next tx reach cycles limit"); let tx = node.new_transaction(hash); - assert_send_transaction_fail(node, &tx, "TransactionPoolFull"); + assert_send_transaction_fail(node, &tx, "Transaction pool exceeded maximum cycles limit"); node.assert_tx_pool_cycles(max_tx_num * one_tx_cycles); (0..DEFAULT_TX_PROPOSAL_WINDOW.0).for_each(|_| { diff --git a/tx-pool/src/component/container.rs b/tx-pool/src/component/container.rs index d20d2a6369..9d7d41086c 100644 --- a/tx-pool/src/component/container.rs +++ b/tx-pool/src/component/container.rs @@ -1,7 +1,7 @@ //! The primary module containing the implementations of the transaction pool //! and its top-level members. -use crate::{component::entry::TxEntry, error::SubmitTxError}; +use crate::{component::entry::TxEntry, error::Reject}; use ckb_types::{core::Capacity, packed::ProposalShortId}; use std::cmp::Ordering; use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; @@ -161,7 +161,7 @@ impl SortedTxMap { } } - pub fn add_entry(&mut self, mut entry: TxEntry) -> Result, SubmitTxError> { + pub fn add_entry(&mut self, mut entry: TxEntry) -> Result, Reject> { let short_id = entry.transaction.proposal_short_id(); // find in pool parents @@ -185,7 +185,7 @@ impl SortedTxMap { self.update_ancestors_stat_for_entry(&mut entry, &parents); if entry.ancestors_count > self.max_ancestors_count { - return Err(SubmitTxError::ExceededMaximumAncestorsCount); + return Err(Reject::ExceededMaximumAncestorsCount); } // check duplicate tx diff --git a/tx-pool/src/component/pending.rs b/tx-pool/src/component/pending.rs index 1ec181cea1..8a1f7f2b88 100644 --- a/tx-pool/src/component/pending.rs +++ b/tx-pool/src/component/pending.rs @@ -1,6 +1,6 @@ use crate::component::container::{AncestorsScoreSortKey, SortedTxMap}; use crate::component::entry::TxEntry; -use crate::error::SubmitTxError; +use crate::error::Reject; use ckb_fee_estimator::FeeRate; use ckb_types::{ core::{ @@ -28,7 +28,7 @@ impl PendingQueue { self.inner.size() } - pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result, SubmitTxError> { + pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result, Reject> { self.inner.add_entry(entry) } diff --git a/tx-pool/src/component/proposed.rs b/tx-pool/src/component/proposed.rs index 306e55163b..582cdee582 100644 --- a/tx-pool/src/component/proposed.rs +++ b/tx-pool/src/component/proposed.rs @@ -1,6 +1,6 @@ use crate::component::container::SortedTxMap; use crate::component::entry::TxEntry; -use crate::error::SubmitTxError; +use crate::error::Reject; use ckb_types::{ bytes::Bytes, core::{ @@ -196,7 +196,7 @@ impl ProposedPool { removed } - pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result, SubmitTxError> { + pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result, Reject> { let inputs = entry.transaction.input_pts_iter(); let outputs = entry.transaction.output_pts(); diff --git a/tx-pool/src/error.rs b/tx-pool/src/error.rs index ea681dd167..182e6f7909 100644 --- a/tx-pool/src/error.rs +++ b/tx-pool/src/error.rs @@ -1,21 +1,35 @@ use ckb_error::{Error, ErrorKind}; +use ckb_types::packed::Byte32; use failure::Fail; use tokio::sync::mpsc::error::TrySendError as TokioTrySendError; #[derive(Debug, PartialEq, Clone, Eq, Fail)] -pub enum SubmitTxError { +pub enum Reject { /// The fee rate of transaction is lower than min fee rate #[fail( display = "Transaction fee rate must >= {} shannons/KB, got: {}", _0, _1 )] LowFeeRate(u64, u64), + #[fail(display = "Transaction exceeded maximum ancestors count limit, try send it later")] ExceededMaximumAncestorsCount, + + #[fail( + display = "Transaction pool exceeded maximum {} limit({}), try send it later", + _0, _1 + )] + Full(String, u64), + + #[fail(display = "Transaction({}) already exist in transaction_pool", _0)] + Duplicated(Byte32), + + #[fail(display = "Malformed {} transaction", _0)] + Malformed(String), } -impl From for Error { - fn from(error: SubmitTxError) -> Self { +impl From for Error { + fn from(error: Reject) -> Self { error.context(ErrorKind::SubmitTransaction).into() } } diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 3cacb542f4..4ae877e868 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -3,10 +3,10 @@ use super::component::{DefectEntry, TxEntry}; use crate::component::orphan::OrphanPool; use crate::component::pending::PendingQueue; use crate::component::proposed::ProposedPool; -use crate::error::SubmitTxError; +use crate::error::Reject; use ckb_app_config::TxPoolConfig; use ckb_dao::DaoCalculator; -use ckb_error::{Error, ErrorKind, InternalErrorKind}; +use ckb_error::{Error, ErrorKind}; use ckb_logger::{debug, error, trace}; use ckb_snapshot::Snapshot; use ckb_store::ChainStore; @@ -140,7 +140,7 @@ impl TxPool { } // If did have this value present, false is returned. - pub fn add_pending(&mut self, entry: TxEntry) -> Result { + pub fn add_pending(&mut self, entry: TxEntry) -> Result { if self .gap .contains_key(&entry.transaction.proposal_short_id()) @@ -152,12 +152,12 @@ impl TxPool { } // add_gap inserts proposed but still uncommittable transaction. - pub fn add_gap(&mut self, entry: TxEntry) -> Result { + pub fn add_gap(&mut self, entry: TxEntry) -> Result { trace!("add_gap {}", entry.transaction.hash()); self.gap.add_entry(entry).map(|entry| entry.is_none()) } - pub fn add_proposed(&mut self, entry: TxEntry) -> Result { + pub fn add_proposed(&mut self, entry: TxEntry) -> Result { trace!("add_proposed {}", entry.transaction.hash()); self.touch_last_txs_updated_at(); self.proposed.add_entry(entry).map(|entry| entry.is_none()) @@ -549,9 +549,7 @@ impl TxPool { if tx_pool.add_gap(entry)? { Ok(()) } else { - Err(InternalErrorKind::PoolTransactionDuplicated - .reason(tx_hash) - .into()) + Err(Reject::Duplicated(tx_hash).into()) } }, ) @@ -609,9 +607,7 @@ impl TxPool { if tx_pool.add_pending(entry)? { Ok(()) } else { - Err(InternalErrorKind::PoolTransactionDuplicated - .reason(tx_hash) - .into()) + Err(Reject::Duplicated(tx_hash).into()) } }, ) diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 3b1350e879..b9a26079cf 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -1,7 +1,7 @@ use crate::block_assembler::{BlockAssembler, BlockTemplateCacheKey, TemplateCache}; use crate::component::commit_txs_scanner::CommitTxsScanner; use crate::component::entry::TxEntry; -use crate::error::{BlockAssemblerError, SubmitTxError}; +use crate::error::{BlockAssemblerError, Reject}; use crate::pool::TxPool; use crate::service::TxPoolService; use ckb_app_config::BlockAssemblerConfig; @@ -24,7 +24,10 @@ use ckb_types::{ prelude::*, }; use ckb_util::LinkedHashSet; -use ckb_verification::{cache::CacheEntry, TimeRelativeTransactionVerifier, TransactionVerifier}; +use ckb_verification::{ + cache::CacheEntry, ContextualTransactionVerifier, NonContextualTransactionVerifier, + TimeRelativeTransactionVerifier, +}; use failure::Error as FailureError; use faketime::unix_time_as_millis; use std::collections::HashSet; @@ -395,13 +398,13 @@ impl TxPoolService { for ((rtx, cache_entry), (tx_size, fee, status)) in txs.into_iter().zip(status.into_iter()) { if tx_pool.reach_cycles_limit(cache_entry.cycles) { - return Err(InternalErrorKind::TransactionPoolFull.into()); + return Err(Reject::Full("cycles".to_owned(), tx_pool.config.max_cycles).into()); } let min_fee = tx_pool.config.min_fee_rate.fee(tx_size); // reject txs which fee lower than min fee rate if fee < min_fee { - return Err(SubmitTxError::LowFeeRate(min_fee.as_u64(), fee.as_u64()).into()); + return Err(Reject::LowFeeRate(min_fee.as_u64(), fee.as_u64()).into()); } let related_dep_out_points = rtx.related_dep_out_points(); @@ -424,10 +427,25 @@ impl TxPoolService { Ok(()) } + fn non_contextual_verify(&self, txs: &[TransactionView]) -> Result<(), Error> { + for tx in txs { + NonContextualTransactionVerifier::new(tx, &self.consensus).verify()?; + + // cellbase is only valid in a block, not as a loose transaction + if tx.is_cellbase() { + return Err(Reject::Malformed("cellbase like".to_owned()).into()); + } + } + Ok(()) + } + pub(crate) async fn process_txs( &self, txs: Vec, ) -> Result, Error> { + // non contextual verify first + self.non_contextual_verify(&txs)?; + let max_tx_verify_cycles = self.tx_pool_config.max_tx_verify_cycles; let (tip_hash, snapshot, rtxs, status) = self.pre_resolve_txs(&txs).await?; let fetched_cache = self.fetch_txs_verify_cache(txs.iter()).await; @@ -517,7 +535,7 @@ fn check_transaction_hash_collision( for tx in txs { let short_id = tx.proposal_short_id(); if tx_pool.contains_proposal_id(&short_id) { - return Err(InternalErrorKind::PoolTransactionDuplicated.into()); + return Err(Reject::Duplicated(tx.hash()).into()); } } Ok(()) @@ -531,7 +549,7 @@ fn resolve_tx<'a>( ) -> ResolveResult { let tx_size = tx.data().serialized_size_in_block(); if tx_pool.reach_size_limit(tx_size) { - return Err(InternalErrorKind::TransactionPoolFull.into()); + return Err(Reject::Full("size".to_owned(), tx_pool.config.max_mem_size as u64).into()); } let short_id = tx.proposal_short_id(); @@ -604,7 +622,7 @@ fn verify_rtxs( .verify() .map(|_| (tx, *cache_entry)) } else { - TransactionVerifier::new( + ContextualTransactionVerifier::new( &tx, snapshot, tip_number + 1, diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index ad1b52c23c..d00527c4d6 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -5,6 +5,7 @@ use crate::pool::{TxPool, TxPoolInfo}; use crate::process::PlugTarget; use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig}; use ckb_async_runtime::{new_runtime, Handle}; +use ckb_chain_spec::consensus::Consensus; use ckb_error::Error; use ckb_jsonrpc_types::BlockTemplate; use ckb_logger::error; @@ -279,12 +280,14 @@ impl TxPoolServiceBuilder { snapshot_mgr: Arc, ) -> TxPoolServiceBuilder { let last_txs_updated_at = Arc::new(AtomicU64::new(0)); + let consensus = snapshot.cloned_consensus(); let tx_pool = TxPool::new(tx_pool_config, snapshot, Arc::clone(&last_txs_updated_at)); let block_assembler = block_assembler_config.map(BlockAssembler::new); TxPoolServiceBuilder { service: Some(TxPoolService::new( tx_pool, + consensus, block_assembler, txs_verify_cache, last_txs_updated_at, @@ -323,6 +326,7 @@ impl TxPoolServiceBuilder { #[derive(Clone)] pub struct TxPoolService { pub(crate) tx_pool: Arc>, + pub(crate) consensus: Arc, pub(crate) tx_pool_config: Arc, pub(crate) block_assembler: Option, pub(crate) txs_verify_cache: Arc>, @@ -333,6 +337,7 @@ pub struct TxPoolService { impl TxPoolService { pub fn new( tx_pool: TxPool, + consensus: Arc, block_assembler: Option, txs_verify_cache: Arc>, last_txs_updated_at: Arc, @@ -341,6 +346,7 @@ impl TxPoolService { let tx_pool_config = Arc::new(tx_pool.config); Self { tx_pool: Arc::new(RwLock::new(tx_pool)), + consensus, tx_pool_config, block_assembler, txs_verify_cache, diff --git a/util/snapshot/src/lib.rs b/util/snapshot/src/lib.rs index 85962f2967..b3e27cdf37 100644 --- a/util/snapshot/src/lib.rs +++ b/util/snapshot/src/lib.rs @@ -110,6 +110,10 @@ impl Snapshot { &self.consensus } + pub fn cloned_consensus(&self) -> Arc { + Arc::clone(&self.consensus) + } + pub fn proposals(&self) -> &ProposalView { &self.proposals } diff --git a/verification/src/lib.rs b/verification/src/lib.rs index 0c74981cfd..6575731acb 100644 --- a/verification/src/lib.rs +++ b/verification/src/lib.rs @@ -24,7 +24,8 @@ pub use crate::error::{ pub use crate::genesis_verifier::GenesisVerifier; pub use crate::header_verifier::{HeaderResolver, HeaderVerifier}; pub use crate::transaction_verifier::{ - ScriptVerifier, Since, SinceMetric, TimeRelativeTransactionVerifier, TransactionVerifier, + ContextualTransactionVerifier, NonContextualTransactionVerifier, ScriptVerifier, Since, + SinceMetric, TimeRelativeTransactionVerifier, TransactionVerifier, }; pub const ALLOWED_FUTURE_BLOCKTIME: u64 = 15 * 1000; // 15 Second diff --git a/verification/src/transaction_verifier.rs b/verification/src/transaction_verifier.rs index 0d7062880d..d2d58682b4 100644 --- a/verification/src/transaction_verifier.rs +++ b/verification/src/transaction_verifier.rs @@ -60,20 +60,44 @@ where } } -pub struct TransactionVerifier<'a, M, CS> { +pub struct NonContextualTransactionVerifier<'a> { pub version: VersionVerifier<'a>, pub size: SizeVerifier<'a>, pub empty: EmptyVerifier<'a>, - pub maturity: MaturityVerifier<'a>, - pub capacity: CapacityVerifier<'a>, pub duplicate_deps: DuplicateDepsVerifier<'a>, pub outputs_data_verifier: OutputsDataVerifier<'a>, - pub script: ScriptVerifier<'a, CS>, +} + +impl<'a> NonContextualTransactionVerifier<'a> { + pub fn new(tx: &'a TransactionView, consensus: &'a Consensus) -> Self { + NonContextualTransactionVerifier { + version: VersionVerifier::new(tx, consensus.tx_version()), + size: SizeVerifier::new(tx, consensus.max_block_bytes()), + empty: EmptyVerifier::new(tx), + duplicate_deps: DuplicateDepsVerifier::new(tx), + outputs_data_verifier: OutputsDataVerifier::new(tx), + } + } + + pub fn verify(&self) -> Result<(), Error> { + self.version.verify()?; + self.size.verify()?; + self.empty.verify()?; + self.duplicate_deps.verify()?; + self.outputs_data_verifier.verify()?; + Ok(()) + } +} + +pub struct ContextualTransactionVerifier<'a, M, CS> { + pub maturity: MaturityVerifier<'a>, pub since: SinceVerifier<'a, M>, + pub capacity: CapacityVerifier<'a>, + pub script: ScriptVerifier<'a, CS>, pub fee_calculator: FeeCalculator<'a, CS>, } -impl<'a, M, CS> TransactionVerifier<'a, M, CS> +impl<'a, M, CS> ContextualTransactionVerifier<'a, M, CS> where M: BlockMedianTimeContext, CS: ChainStore<'a>, @@ -88,17 +112,12 @@ where consensus: &'a Consensus, chain_store: &'a CS, ) -> Self { - TransactionVerifier { - version: VersionVerifier::new(&rtx.transaction, consensus.tx_version()), - size: SizeVerifier::new(&rtx.transaction, consensus.max_block_bytes()), - empty: EmptyVerifier::new(&rtx.transaction), + ContextualTransactionVerifier { maturity: MaturityVerifier::new( &rtx, epoch_number_with_fraction, consensus.cellbase_maturity(), ), - duplicate_deps: DuplicateDepsVerifier::new(&rtx.transaction), - outputs_data_verifier: OutputsDataVerifier::new(&rtx.transaction), script: ScriptVerifier::new(rtx, chain_store), capacity: CapacityVerifier::new(rtx, consensus.dao_type_hash()), since: SinceVerifier::new( @@ -113,13 +132,8 @@ where } pub fn verify(&self, max_cycles: Cycle) -> Result { - self.version.verify()?; - self.size.verify()?; - self.empty.verify()?; self.maturity.verify()?; self.capacity.verify()?; - self.duplicate_deps.verify()?; - self.outputs_data_verifier.verify()?; self.since.verify()?; let cycles = self.script.verify(max_cycles)?; let fee = self.fee_calculator.transaction_fee()?; @@ -127,6 +141,46 @@ where } } +pub struct TransactionVerifier<'a, M, CS> { + pub non_contextual: NonContextualTransactionVerifier<'a>, + pub contextual: ContextualTransactionVerifier<'a, M, CS>, +} + +impl<'a, M, CS> TransactionVerifier<'a, M, CS> +where + M: BlockMedianTimeContext, + CS: ChainStore<'a>, +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + rtx: &'a ResolvedTransaction, + median_time_context: &'a M, + block_number: BlockNumber, + epoch_number_with_fraction: EpochNumberWithFraction, + parent_hash: Byte32, + consensus: &'a Consensus, + chain_store: &'a CS, + ) -> Self { + TransactionVerifier { + non_contextual: NonContextualTransactionVerifier::new(&rtx.transaction, consensus), + contextual: ContextualTransactionVerifier::new( + rtx, + median_time_context, + block_number, + epoch_number_with_fraction, + parent_hash, + consensus, + chain_store, + ), + } + } + + pub fn verify(&self, max_cycles: Cycle) -> Result { + self.non_contextual.verify()?; + self.contextual.verify(max_cycles) + } +} + pub struct FeeCalculator<'a, CS> { transaction: &'a ResolvedTransaction, consensus: &'a Consensus, From e9d186a0c8a6926dc2ee88e6a6a5ebd1d4c4c72a Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Fri, 17 Jul 2020 05:41:09 +0800 Subject: [PATCH 4/6] feat: re-broadcast when duplicated transaction submit --- rpc/src/error.rs | 57 ++++++++++++++++++++++++------------------ rpc/src/module/pool.rs | 34 +++++++++++++++++-------- 2 files changed, 56 insertions(+), 35 deletions(-) diff --git a/rpc/src/error.rs b/rpc/src/error.rs index 55f1a8c8eb..821c19197b 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -73,6 +73,27 @@ impl RPCError { } } + pub fn from_submit_transaction_reject(reject: &Reject) -> Error { + let code = match reject { + Reject::LowFeeRate(_, _) => RPCError::PoolRejectedTransactionByMinFeeRate, + Reject::ExceededMaximumAncestorsCount => { + RPCError::PoolRejectedTransactionByMaxAncestorsCountLimit + } + Reject::Full(_, _) => RPCError::PoolIsFull, + Reject::Duplicated(_) => RPCError::PoolRejectedDuplicatedTransaction, + Reject::Malformed(_) => RPCError::PoolRejectedMalformedTransaction, + }; + RPCError::custom_with_error(code, reject) + } + + pub fn downcast_submit_transaction_reject(err: &CKBError) -> Option<&Reject> { + use ckb_error::ErrorKind::SubmitTransaction; + match err.kind() { + SubmitTransaction => err.downcast_ref::(), + _ => None, + } + } + pub fn from_ckb_error(err: CKBError) -> Error { use ckb_error::ErrorKind::*; match err.kind() { @@ -82,24 +103,6 @@ impl RPCError { RPCError::TransactionFailedToVerify, err.unwrap_cause_or_self(), ), - SubmitTransaction => { - let reject = match err.downcast_ref::() { - Some(err) => err, - None => return Self::ckb_internal_error(err), - }; - - let kind = match *reject { - Reject::LowFeeRate(_, _) => RPCError::PoolRejectedTransactionByMinFeeRate, - Reject::ExceededMaximumAncestorsCount => { - RPCError::PoolRejectedTransactionByMaxAncestorsCountLimit - } - Reject::Full(_, _) => RPCError::PoolIsFull, - Reject::Duplicated(_) => RPCError::PoolRejectedDuplicatedTransaction, - Reject::Malformed(_) => RPCError::PoolRejectedMalformedTransaction, - }; - - RPCError::custom_with_error(kind, reject) - } Internal => { let internal_err = match err.downcast_ref::() { Some(err) => err, @@ -159,35 +162,41 @@ mod tests { } #[test] - fn test_submit_tx_error_from_ckb_error() { + fn test_submit_transaction_error() { let err: CKBError = Reject::LowFeeRate(100, 50).into(); assert_eq!( "PoolRejectedTransactionByMinFeeRate: Transaction fee rate must >= 100 shannons/KB, got: 50", - RPCError::from_ckb_error(err).message + RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message ); let err: CKBError = Reject::ExceededMaximumAncestorsCount.into(); assert_eq!( "PoolRejectedTransactionByMaxAncestorsCountLimit: Transaction exceeded maximum ancestors count limit, try send it later", - RPCError::from_ckb_error(err).message + RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message ); let err: CKBError = Reject::Full("size".to_owned(), 10).into(); assert_eq!( "PoolIsFull: Transaction pool exceeded maximum size limit(10), try send it later", - RPCError::from_ckb_error(err).message + RPCError::from_submit_transaction_reject( + RPCError::downcast_submit_transaction_reject(&err).unwrap() + ) + .message ); let err: CKBError = Reject::Duplicated(Byte32::new([0; 32])).into(); assert_eq!( "PoolRejectedDuplicatedTransaction: Transaction(Byte32(0x0000000000000000000000000000000000000000000000000000000000000000)) already exist in transaction_pool", - RPCError::from_ckb_error(err).message + RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message ); let err: CKBError = Reject::Malformed("cellbase like".to_owned()).into(); assert_eq!( "PoolRejectedMalformedTransaction: Malformed cellbase like transaction", - RPCError::from_ckb_error(err).message + RPCError::from_submit_transaction_reject( + RPCError::downcast_submit_transaction_reject(&err).unwrap() + ) + .message ); } diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index d526733c02..3546458877 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -7,6 +7,7 @@ use ckb_network::PeerIndex; use ckb_script::IllTransactionChecker; use ckb_shared::shared::Shared; use ckb_sync::SyncShared; +use ckb_tx_pool::error::Reject; use ckb_types::{core, packed, prelude::*, H256}; use ckb_verification::{Since, SinceMetric}; use jsonrpc_core::Result; @@ -100,20 +101,31 @@ impl PoolRpc for PoolRpcImpl { return Err(RPCError::ckb_internal_error(e)); } + let broadcast = |tx_hash: packed::Byte32| { + // workaround: we are using `PeerIndex(usize::max)` to indicate that tx hash source is itself. + let peer_index = PeerIndex::new(usize::max_value()); + self.sync_shared + .state() + .tx_hashes() + .entry(peer_index) + .or_default() + .insert(tx_hash); + }; + let tx_hash = tx.hash(); match submit_txs.unwrap() { Ok(_) => { - // workaround: we are using `PeerIndex(usize::max)` to indicate that tx hash source is itself. - let peer_index = PeerIndex::new(usize::max_value()); - let hash = tx.hash(); - self.sync_shared - .state() - .tx_hashes() - .entry(peer_index) - .or_default() - .insert(hash.clone()); - Ok(hash.unpack()) + broadcast(tx_hash.clone()); + Ok(tx_hash.unpack()) } - Err(e) => Err(RPCError::from_ckb_error(e)), + Err(e) => match RPCError::downcast_submit_transaction_reject(&e) { + Some(reject) => { + if let Reject::Duplicated(_) = reject { + broadcast(tx_hash); + } + Err(RPCError::from_submit_transaction_reject(reject)) + } + None => Err(RPCError::ckb_internal_error(e)), + }, } } From 32cc6100030c130fb877993c6813193c7a29a1d2 Mon Sep 17 00:00:00 2001 From: keroro Date: Fri, 24 Jul 2020 16:18:56 +0800 Subject: [PATCH 5/6] test: replace get_tip_tx_pool_info --- test/src/node.rs | 32 ++++++++++++++++++++--- test/src/specs/indexer/basic.rs | 6 ++--- test/src/specs/rpc/truncate.rs | 4 +-- test/src/specs/tx_pool/limit.rs | 4 +-- test/src/specs/tx_pool/pool_resurrect.rs | 2 +- test/src/specs/tx_pool/txs_relay_order.rs | 4 +-- 6 files changed, 38 insertions(+), 14 deletions(-) diff --git a/test/src/node.rs b/test/src/node.rs index 93773b0820..14b42ca189 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -4,6 +4,7 @@ use crate::SYSTEM_CELL_ALWAYS_SUCCESS_INDEX; use ckb_app_config::{BlockAssemblerConfig, CKBAppConfig}; use ckb_chain_spec::consensus::Consensus; use ckb_chain_spec::ChainSpec; +use ckb_jsonrpc_types::TxPoolInfo; use ckb_types::{ core::{ self, capacity_bytes, BlockBuilder, BlockNumber, BlockView, Capacity, HeaderView, @@ -13,10 +14,12 @@ use ckb_types::{ prelude::*, }; use failure::Error; +use failure::_core::time::Duration; use std::convert::Into; use std::fs; use std::path::Path; use std::process::{self, Child, Command, Stdio}; +use std::time::Instant; pub struct Node { binary: String, @@ -361,6 +364,27 @@ impl Node { .into() } + /// The states of chain and txpool are updated asynchronously. Which means that the chain has + /// updated to the newest tip but txpool not. + /// get_tip_tx_pool_info wait to ensure the txpool update to the newest tip as well. + pub fn get_tip_tx_pool_info(&self) -> TxPoolInfo { + let tip_header = self.rpc_client().get_tip_header(); + let tip_hash = &tip_header.hash; + let instant = Instant::now(); + let mut recent = TxPoolInfo::default(); + while instant.elapsed() < Duration::from_secs(10) { + let tx_pool_info = self.rpc_client().tx_pool_info(); + if &tx_pool_info.tip_hash == tip_hash { + return tx_pool_info; + } + recent = tx_pool_info; + } + panic!( + "timeout to get_tip_tx_pool_info, tip_header={:?}, tx_pool_info: {:?}", + tip_header, recent + ); + } + pub fn new_block( &self, bytes_limit: Option, @@ -552,24 +576,24 @@ impl Node { } pub fn assert_tx_pool_size(&self, pending_size: u64, proposed_size: u64) { - let tx_pool_info = self.rpc_client().tx_pool_info(); + let tx_pool_info = self.get_tip_tx_pool_info(); assert_eq!(tx_pool_info.pending.value(), pending_size); assert_eq!(tx_pool_info.proposed.value(), proposed_size); } pub fn assert_tx_pool_statics(&self, total_tx_size: u64, total_tx_cycles: u64) { - let tx_pool_info = self.rpc_client().tx_pool_info(); + let tx_pool_info = self.get_tip_tx_pool_info(); assert_eq!(tx_pool_info.total_tx_size.value(), total_tx_size); assert_eq!(tx_pool_info.total_tx_cycles.value(), total_tx_cycles); } pub fn assert_tx_pool_cycles(&self, total_tx_cycles: u64) { - let tx_pool_info = self.rpc_client().tx_pool_info(); + let tx_pool_info = self.get_tip_tx_pool_info(); assert_eq!(tx_pool_info.total_tx_cycles.value(), total_tx_cycles); } pub fn assert_tx_pool_serialized_size(&self, total_tx_size: u64) { - let tx_pool_info = self.rpc_client().tx_pool_info(); + let tx_pool_info = self.get_tip_tx_pool_info(); assert_eq!(tx_pool_info.total_tx_size.value(), total_tx_size); } } diff --git a/test/src/specs/indexer/basic.rs b/test/src/specs/indexer/basic.rs index 3cd5031662..bc4383f776 100644 --- a/test/src/specs/indexer/basic.rs +++ b/test/src/specs/indexer/basic.rs @@ -57,16 +57,16 @@ impl Spec for IndexerBasic { }); info!("Generate 3 more blocks on node0 to commit 6 txs"); - let tx_pool_info = node0.rpc_client().tx_pool_info(); + let tx_pool_info = node0.get_tip_tx_pool_info(); assert_eq!(6, tx_pool_info.pending.value() as u64); node0.generate_blocks(1); - let tx_pool_info = node0.rpc_client().tx_pool_info(); + let tx_pool_info = node0.get_tip_tx_pool_info(); // in gap assert_eq!(6, tx_pool_info.pending.value() as u64); node0.generate_blocks(1); - let tx_pool_info = node0.rpc_client().tx_pool_info(); + let tx_pool_info = node0.get_tip_tx_pool_info(); assert_eq!(6, tx_pool_info.proposed.value() as u64); node0.generate_blocks(1); diff --git a/test/src/specs/rpc/truncate.rs b/test/src/specs/rpc/truncate.rs index 78ef6a83d9..1304ea30b5 100644 --- a/test/src/specs/rpc/truncate.rs +++ b/test/src/specs/rpc/truncate.rs @@ -29,7 +29,7 @@ impl Spec for RpcTruncate { .get_live_cell(tx1.inputs().get(0).unwrap().previous_output().into(), false); assert_eq!(cell1.status, "unknown", "cell1 was spent within tx1"); - let tx_pool_info = node.rpc_client().tx_pool_info(); + let tx_pool_info = node.get_tip_tx_pool_info(); assert!(tx_pool_info.total_tx_size.value() > 0, "tx-pool holds tx2"); // Truncate from `to_truncate` @@ -56,7 +56,7 @@ impl Spec for RpcTruncate { .get_live_cell(tx1.inputs().get(0).unwrap().previous_output().into(), false); assert_eq!(cell1.status, "live", "cell1 is alive after roll-backing"); - let tx_pool_info = node.rpc_client().tx_pool_info(); + let tx_pool_info = node.get_tip_tx_pool_info(); assert_eq!(tx_pool_info.orphan.value(), 0, "tx-pool was cleared"); assert_eq!(tx_pool_info.pending.value(), 0, "tx-pool was cleared"); assert_eq!(tx_pool_info.proposed.value(), 0, "tx-pool was cleared"); diff --git a/test/src/specs/tx_pool/limit.rs b/test/src/specs/tx_pool/limit.rs index df2e2f3649..ec7f320583 100644 --- a/test/src/specs/tx_pool/limit.rs +++ b/test/src/specs/tx_pool/limit.rs @@ -24,7 +24,7 @@ impl Spec for SizeLimit { let mut hash = node.submit_transaction(&tx); txs_hash.push(hash.clone()); - let tx_pool_info = node.rpc_client().tx_pool_info(); + let tx_pool_info = node.get_tip_tx_pool_info(); let one_tx_size = tx_pool_info.total_tx_size.value(); let one_tx_cycles = tx_pool_info.total_tx_cycles.value(); @@ -87,7 +87,7 @@ impl Spec for CyclesLimit { let mut hash = node.submit_transaction(&tx); txs_hash.push(hash.clone()); - let tx_pool_info = node.rpc_client().tx_pool_info(); + let tx_pool_info = node.get_tip_tx_pool_info(); let one_tx_cycles = tx_pool_info.total_tx_cycles.value(); let one_tx_size = tx.data().serialized_size_in_block(); diff --git a/test/src/specs/tx_pool/pool_resurrect.rs b/test/src/specs/tx_pool/pool_resurrect.rs index 66b0d7c703..d2142ba12d 100644 --- a/test/src/specs/tx_pool/pool_resurrect.rs +++ b/test/src/specs/tx_pool/pool_resurrect.rs @@ -30,7 +30,7 @@ impl Spec for PoolResurrect { node0.generate_blocks(3); info!("Pool should be empty"); - let tx_pool_info = node0.rpc_client().tx_pool_info(); + let tx_pool_info = node0.get_tip_tx_pool_info(); assert_eq!(tx_pool_info.pending.value(), 0); info!("Generate 5 blocks on node1"); diff --git a/test/src/specs/tx_pool/txs_relay_order.rs b/test/src/specs/tx_pool/txs_relay_order.rs index 69fec8d6ea..aa0c8b3e0a 100644 --- a/test/src/specs/tx_pool/txs_relay_order.rs +++ b/test/src/specs/tx_pool/txs_relay_order.rs @@ -40,13 +40,13 @@ impl Spec for TxsRelayOrder { for tx in txs.iter() { node0.rpc_client().send_transaction(tx.data().into()); } - let tx_pool_info = node0.rpc_client().tx_pool_info(); + let tx_pool_info = node0.get_tip_tx_pool_info(); assert_eq!(COUNT as u64, tx_pool_info.pending.value()); assert_eq!(0, tx_pool_info.orphan.value()); // node1 should receive all txs sleep(10); - let tx_pool_info = node1.rpc_client().tx_pool_info(); + let tx_pool_info = node1.get_tip_tx_pool_info(); assert_eq!( COUNT as u64, tx_pool_info.pending.value() + tx_pool_info.orphan.value() From 08a823d94fe8bcc9eb5ed1ebcdb7519c78c7f74d Mon Sep 17 00:00:00 2001 From: keroro Date: Sat, 25 Jul 2020 22:23:34 +0800 Subject: [PATCH 6/6] fix(rpc): the updated txpool is inconsistent with snapshot --- rpc/src/module/pool.rs | 3 ++- rpc/src/module/test.rs | 3 ++- tx-pool/src/process.rs | 5 ++--- tx-pool/src/service.rs | 13 ++++++++----- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index 2a260416c0..8e767cba43 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -141,9 +141,10 @@ impl PoolRpc for PoolRpcImpl { } fn clear_tx_pool(&self) -> Result<()> { + let snapshot = Arc::clone(&self.shared.snapshot()); let tx_pool = self.shared.tx_pool_controller(); tx_pool - .clear_pool() + .clear_pool(snapshot) .map_err(|err| RPCError::custom(RPCError::Invalid, err.to_string()))?; Ok(()) diff --git a/rpc/src/module/test.rs b/rpc/src/module/test.rs index 3c949f513b..40acf92e92 100644 --- a/rpc/src/module/test.rs +++ b/rpc/src/module/test.rs @@ -103,9 +103,10 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { .map_err(|err| RPCError::custom(RPCError::Invalid, err.to_string()))?; // Clear the tx_pool + let new_snapshot = Arc::clone(&self.shared.snapshot()); let tx_pool = self.shared.tx_pool_controller(); tx_pool - .clear_pool() + .clear_pool(new_snapshot) .map_err(|err| RPCError::custom(RPCError::Invalid, err.to_string()))?; Ok(()) diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 79f3513a85..ade2e65dd5 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -492,12 +492,11 @@ impl TxPoolService { }); } - pub(crate) async fn clear_pool(&self) { + pub(crate) async fn clear_pool(&self, new_snapshot: Arc) { let mut tx_pool = self.tx_pool.write().await; let config = tx_pool.config; - let snapshot = Arc::clone(&tx_pool.snapshot); let last_txs_updated_at = Arc::new(AtomicU64::new(0)); - *tx_pool = TxPool::new(config, snapshot, last_txs_updated_at); + *tx_pool = TxPool::new(config, new_snapshot, last_txs_updated_at); } } diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index ad1b52c23c..169712d905 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -77,7 +77,7 @@ pub enum Message { FetchTxRPC(Request>), NewUncle(Notify), PlugEntry(Request<(Vec, PlugTarget), ()>), - ClearPool(Request<(), ()>), + ClearPool(Request, ()>), } #[derive(Clone)] @@ -254,10 +254,10 @@ impl TxPoolController { response.recv().map_err(Into::into) } - pub fn clear_pool(&self) -> Result<(), FailureError> { + pub fn clear_pool(&self, new_snapshot: Arc) -> Result<(), FailureError> { let mut sender = self.sender.clone(); let (responder, response) = crossbeam_channel::bounded(1); - let request = Request::call((), responder); + let request = Request::call(new_snapshot, responder); sender.try_send(Message::ClearPool(request)).map_err(|e| { let (_m, e) = handle_try_send_error(e); e @@ -497,8 +497,11 @@ async fn process(service: TxPoolService, message: Message) { error!("responder send plug_entry failed {:?}", e); }; } - Message::ClearPool(Request { responder, .. }) => { - service.clear_pool().await; + Message::ClearPool(Request { + responder, + arguments: new_snapshot, + }) => { + service.clear_pool(new_snapshot).await; if let Err(e) = responder.send(()) { error!("responder send clear_pool failed {:?}", e) };