Skip to content

Commit

Permalink
Merge #2169
Browse files Browse the repository at this point in the history
2169: small tx-pool refactoring  r=quake,doitian a=zhangsoledad

proposal changes:
* rename ContextualTransactionVerifier -> TimeRelativeTransactionVerifier
* split NonContextualTransactionVerifier from TransactionVerifier
* check syntactic correctness first before 
* refactory tx-pool rejection error
* re-broadcast when duplicated tx submit

Co-authored-by: zhangsoledad <787953403@qq.com>
  • Loading branch information
bors[bot] and zhangsoledad committed Aug 3, 2020
2 parents 719d937 + 2701205 commit 5d93fbd
Show file tree
Hide file tree
Showing 16 changed files with 223 additions and 108 deletions.
6 changes: 0 additions & 6 deletions error/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ pub enum InternalErrorKind {
/// e.g. `Capacity::safe_add`
CapacityOverflow,

/// The transaction_pool is already full
TransactionPoolFull,

/// The transaction already exist in transaction_pool
PoolTransactionDuplicated,

/// Persistent data had corrupted
DataCorrupted,

Expand Down
82 changes: 49 additions & 33 deletions rpc/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ckb_error::{Error as CKBError, InternalError, InternalErrorKind};
use ckb_tx_pool::error::SubmitTxError;
use ckb_tx_pool::error::Reject;
use jsonrpc_core::{Error, ErrorCode, Value};
use std::fmt::{Debug, Display};

Expand Down Expand Up @@ -33,6 +33,7 @@ pub enum RPCError {
PoolRejectedTransactionByMaxAncestorsCountLimit = -1105,
PoolIsFull = -1106,
PoolRejectedDuplicatedTransaction = -1107,
PoolRejectedMalformedTransaction = -1108,
}

impl RPCError {
Expand Down Expand Up @@ -72,6 +73,27 @@ impl RPCError {
}
}

pub fn from_submit_transaction_reject(reject: &Reject) -> Error {
let code = match reject {
Reject::LowFeeRate(_, _) => RPCError::PoolRejectedTransactionByMinFeeRate,
Reject::ExceededMaximumAncestorsCount => {
RPCError::PoolRejectedTransactionByMaxAncestorsCountLimit
}
Reject::Full(_, _) => RPCError::PoolIsFull,
Reject::Duplicated(_) => RPCError::PoolRejectedDuplicatedTransaction,
Reject::Malformed(_) => RPCError::PoolRejectedMalformedTransaction,
};
RPCError::custom_with_error(code, reject)
}

pub fn downcast_submit_transaction_reject(err: &CKBError) -> Option<&Reject> {
use ckb_error::ErrorKind::SubmitTransaction;
match err.kind() {
SubmitTransaction => err.downcast_ref::<Reject>(),
_ => None,
}
}

pub fn from_ckb_error(err: CKBError) -> Error {
use ckb_error::ErrorKind::*;
match err.kind() {
Expand All @@ -81,23 +103,6 @@ impl RPCError {
RPCError::TransactionFailedToVerify,
err.unwrap_cause_or_self(),
),
SubmitTransaction => {
let submit_tx_err = match err.downcast_ref::<SubmitTxError>() {
Some(err) => err,
None => return Self::ckb_internal_error(err),
};

let kind = match *submit_tx_err {
SubmitTxError::LowFeeRate(_, _) => {
RPCError::PoolRejectedTransactionByMinFeeRate
}
SubmitTxError::ExceededMaximumAncestorsCount => {
RPCError::PoolRejectedTransactionByMaxAncestorsCountLimit
}
};

RPCError::custom_with_error(kind, submit_tx_err)
}
Internal => {
let internal_err = match err.downcast_ref::<InternalError>() {
Some(err) => err,
Expand All @@ -106,10 +111,6 @@ impl RPCError {

let kind = match internal_err.kind() {
InternalErrorKind::CapacityOverflow => RPCError::IntegerOverflow,
InternalErrorKind::TransactionPoolFull => RPCError::PoolIsFull,
InternalErrorKind::PoolTransactionDuplicated => {
RPCError::PoolRejectedDuplicatedTransaction
}
InternalErrorKind::DataCorrupted => RPCError::DatabaseIsCorrupt,
InternalErrorKind::Database => RPCError::DatabaseError,
InternalErrorKind::Config => RPCError::ConfigError,
Expand Down Expand Up @@ -161,26 +162,41 @@ mod tests {
}

#[test]
fn test_submit_tx_error_from_ckb_error() {
let err: CKBError = SubmitTxError::LowFeeRate(100, 50).into();
fn test_submit_transaction_error() {
let err: CKBError = Reject::LowFeeRate(100, 50).into();
assert_eq!(
"PoolRejectedTransactionByMinFeeRate: Transaction fee rate must >= 100 shannons/KB, got: 50",
RPCError::from_ckb_error(err).message
RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message
);

let err: CKBError = SubmitTxError::ExceededMaximumAncestorsCount.into();
let err: CKBError = Reject::ExceededMaximumAncestorsCount.into();
assert_eq!(
"PoolRejectedTransactionByMaxAncestorsCountLimit: Transaction exceeded maximum ancestors count limit, try send it later",
RPCError::from_ckb_error(err).message
RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message
);
}

#[test]
fn test_internal_error_from_ckb_error() {
let err: CKBError = InternalErrorKind::TransactionPoolFull.into();
let err: CKBError = Reject::Full("size".to_owned(), 10).into();
assert_eq!(
"PoolIsFull: TransactionPoolFull",
RPCError::from_ckb_error(err).message
"PoolIsFull: Transaction pool exceeded maximum size limit(10), try send it later",
RPCError::from_submit_transaction_reject(
RPCError::downcast_submit_transaction_reject(&err).unwrap()
)
.message
);

let err: CKBError = Reject::Duplicated(Byte32::new([0; 32])).into();
assert_eq!(
"PoolRejectedDuplicatedTransaction: Transaction(Byte32(0x0000000000000000000000000000000000000000000000000000000000000000)) already exist in transaction_pool",
RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message
);

let err: CKBError = Reject::Malformed("cellbase like".to_owned()).into();
assert_eq!(
"PoolRejectedMalformedTransaction: Malformed cellbase like transaction",
RPCError::from_submit_transaction_reject(
RPCError::downcast_submit_transaction_reject(&err).unwrap()
)
.message
);
}

Expand Down
34 changes: 23 additions & 11 deletions rpc/src/module/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ckb_network::PeerIndex;
use ckb_script::IllTransactionChecker;
use ckb_shared::shared::Shared;
use ckb_sync::SyncShared;
use ckb_tx_pool::error::Reject;
use ckb_types::{core, packed, prelude::*, H256};
use ckb_verification::{Since, SinceMetric};
use jsonrpc_core::Result;
Expand Down Expand Up @@ -100,20 +101,31 @@ impl PoolRpc for PoolRpcImpl {
return Err(RPCError::ckb_internal_error(e));
}

let broadcast = |tx_hash: packed::Byte32| {
// workaround: we are using `PeerIndex(usize::max)` to indicate that tx hash source is itself.
let peer_index = PeerIndex::new(usize::max_value());
self.sync_shared
.state()
.tx_hashes()
.entry(peer_index)
.or_default()
.insert(tx_hash);
};
let tx_hash = tx.hash();
match submit_txs.unwrap() {
Ok(_) => {
// workaround: we are using `PeerIndex(usize::max)` to indicate that tx hash source is itself.
let peer_index = PeerIndex::new(usize::max_value());
let hash = tx.hash();
self.sync_shared
.state()
.tx_hashes()
.entry(peer_index)
.or_default()
.insert(hash.clone());
Ok(hash.unpack())
broadcast(tx_hash.clone());
Ok(tx_hash.unpack())
}
Err(e) => Err(RPCError::from_ckb_error(e)),
Err(e) => match RPCError::downcast_submit_transaction_reject(&e) {
Some(reject) => {
if let Reject::Duplicated(_) = reject {
broadcast(tx_hash);
}
Err(RPCError::from_submit_transaction_reject(reject))
}
None => Err(RPCError::from_ckb_error(e)),
},
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/src/specs/tx_pool/collision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Spec for TransactionHashCollisionDifferentWitnessHashes {
.err()
.unwrap()
.to_string()
.contains("PoolTransactionDuplicated"));
.contains("PoolRejectedDuplicatedTransaction"));
}
}

Expand All @@ -54,7 +54,7 @@ impl Spec for DuplicatedTransaction {
.err()
.unwrap()
.to_string()
.contains("PoolTransactionDuplicated"));
.contains("PoolRejectedDuplicatedTransaction"));
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/src/specs/tx_pool/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Spec for SizeLimit {

info!("The next tx reach size limit");
let tx = node.new_transaction(hash);
assert_send_transaction_fail(node, &tx, "TransactionPoolFull");
assert_send_transaction_fail(node, &tx, "Transaction pool exceeded maximum size limit");

node.assert_tx_pool_serialized_size(max_tx_num * one_tx_size);
(0..DEFAULT_TX_PROPOSAL_WINDOW.0).for_each(|_| {
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Spec for CyclesLimit {

info!("The next tx reach cycles limit");
let tx = node.new_transaction(hash);
assert_send_transaction_fail(node, &tx, "TransactionPoolFull");
assert_send_transaction_fail(node, &tx, "Transaction pool exceeded maximum cycles limit");

node.assert_tx_pool_cycles(max_tx_num * one_tx_cycles);
(0..DEFAULT_TX_PROPOSAL_WINDOW.0).for_each(|_| {
Expand Down
6 changes: 3 additions & 3 deletions tx-pool/src/component/container.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The primary module containing the implementations of the transaction pool
//! and its top-level members.

use crate::{component::entry::TxEntry, error::SubmitTxError};
use crate::{component::entry::TxEntry, error::Reject};
use ckb_types::{core::Capacity, packed::ProposalShortId};
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -161,7 +161,7 @@ impl SortedTxMap {
}
}

pub fn add_entry(&mut self, mut entry: TxEntry) -> Result<Option<TxEntry>, SubmitTxError> {
pub fn add_entry(&mut self, mut entry: TxEntry) -> Result<Option<TxEntry>, Reject> {
let short_id = entry.transaction.proposal_short_id();

// find in pool parents
Expand All @@ -185,7 +185,7 @@ impl SortedTxMap {
self.update_ancestors_stat_for_entry(&mut entry, &parents);

if entry.ancestors_count > self.max_ancestors_count {
return Err(SubmitTxError::ExceededMaximumAncestorsCount);
return Err(Reject::ExceededMaximumAncestorsCount);
}

// check duplicate tx
Expand Down
4 changes: 2 additions & 2 deletions tx-pool/src/component/pending.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::component::container::{AncestorsScoreSortKey, SortedTxMap};
use crate::component::entry::TxEntry;
use crate::error::SubmitTxError;
use crate::error::Reject;
use ckb_fee_estimator::FeeRate;
use ckb_types::{
core::{
Expand Down Expand Up @@ -28,7 +28,7 @@ impl PendingQueue {
self.inner.size()
}

pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result<Option<TxEntry>, SubmitTxError> {
pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result<Option<TxEntry>, Reject> {
self.inner.add_entry(entry)
}

Expand Down
4 changes: 2 additions & 2 deletions tx-pool/src/component/proposed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::component::container::SortedTxMap;
use crate::component::entry::TxEntry;
use crate::error::SubmitTxError;
use crate::error::Reject;
use ckb_types::{
bytes::Bytes,
core::{
Expand Down Expand Up @@ -196,7 +196,7 @@ impl ProposedPool {
removed
}

pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result<Option<TxEntry>, SubmitTxError> {
pub(crate) fn add_entry(&mut self, entry: TxEntry) -> Result<Option<TxEntry>, Reject> {
let inputs = entry.transaction.input_pts_iter();
let outputs = entry.transaction.output_pts();

Expand Down
20 changes: 17 additions & 3 deletions tx-pool/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
use ckb_error::{Error, ErrorKind};
use ckb_types::packed::Byte32;
use failure::Fail;
use tokio::sync::mpsc::error::TrySendError as TokioTrySendError;

#[derive(Debug, PartialEq, Clone, Eq, Fail)]
pub enum SubmitTxError {
pub enum Reject {
/// The fee rate of transaction is lower than min fee rate
#[fail(
display = "Transaction fee rate must >= {} shannons/KB, got: {}",
_0, _1
)]
LowFeeRate(u64, u64),

#[fail(display = "Transaction exceeded maximum ancestors count limit, try send it later")]
ExceededMaximumAncestorsCount,

#[fail(
display = "Transaction pool exceeded maximum {} limit({}), try send it later",
_0, _1
)]
Full(String, u64),

#[fail(display = "Transaction({}) already exist in transaction_pool", _0)]
Duplicated(Byte32),

#[fail(display = "Malformed {} transaction", _0)]
Malformed(String),
}

impl From<SubmitTxError> for Error {
fn from(error: SubmitTxError) -> Self {
impl From<Reject> for Error {
fn from(error: Reject) -> Self {
error.context(ErrorKind::SubmitTransaction).into()
}
}
Expand Down
Loading

0 comments on commit 5d93fbd

Please sign in to comment.