Skip to content

Commit

Permalink
Merge #2169 #2184
Browse files Browse the repository at this point in the history
2169: small tx-pool refactoring  r=quake,doitian a=zhangsoledad

proposal changes:
* rename ContextualTransactionVerifier -> TimeRelativeTransactionVerifier
* split NonContextualTransactionVerifier from TransactionVerifier
* check syntactic correctness first before 
* refactory tx-pool rejection error
* re-broadcast when duplicated tx submit

2184: feat: tx_pool_info include tip hash r=quake,doitian a=keroro520

The state of the transaction pool is related to the chain tip. So attaching `tip_hash` and `tip_number` into `TxPoolInfo` which returns by RPC `tx_pool_info` makes it more rigorous.

Co-authored-by: zhangsoledad <787953403@qq.com>
Co-authored-by: keroro <keroroxx520@gmail.com>
  • Loading branch information
3 people committed Jul 30, 2020
3 parents 7cf407d + e9d186a + 08a823d commit 6c86698
Show file tree
Hide file tree
Showing 24 changed files with 312 additions and 154 deletions.
6 changes: 0 additions & 6 deletions error/src/internal.rs
Expand Up @@ -13,12 +13,6 @@ pub enum InternalErrorKind {
/// e.g. `Capacity::safe_add`
CapacityOverflow,

/// The transaction_pool is already full
TransactionPoolFull,

/// The transaction already exist in transaction_pool
PoolTransactionDuplicated,

/// Persistent data had corrupted
DataCorrupted,

Expand Down
2 changes: 2 additions & 0 deletions rpc/README.md
Expand Up @@ -2151,6 +2151,8 @@ http://localhost:8114
"orphan": "0x0",
"pending": "0x1",
"proposed": "0x0",
"tip_hash": "0xa5f5c85987a15de25661e5a214f2c1449cd803f071acc7999820f25246471f40",
"tip_number": "0x400",
"total_tx_cycles": "0x219",
"total_tx_size": "0x112"
}
Expand Down
48 changes: 25 additions & 23 deletions rpc/json/rpc.json
Expand Up @@ -342,8 +342,8 @@
{
"addresses": [
{
"score": "0x64",
"address": "/ip6/::ffff:18.185.102.19/tcp/8115/p2p/QmXwUgF48ULy6hkgfqrEwEfuHW7WyWyWauueRDAYQHNDfN"
"address": "/ip6/::ffff:18.185.102.19/tcp/8115/p2p/QmXwUgF48ULy6hkgfqrEwEfuHW7WyWyWauueRDAYQHNDfN",
"score": "0x64"
},
{
"address": "/ip4/18.185.102.19/tcp/8115/p2p/QmXwUgF48ULy6hkgfqrEwEfuHW7WyWyWauueRDAYQHNDfN",
Expand All @@ -355,15 +355,15 @@
"version": "0.31.0 (4231360 2020-04-20)"
},
{
"version": "0.29.0 (a6733e6 2020-02-26)",
"addresses": [
{
"address": "/ip4/174.80.182.60/tcp/52965/p2p/QmVTMd7SEXfxS5p4EEM5ykTe1DwWWVewEM3NwjLY242vr2",
"score": "0x1"
}
],
"is_outbound": false,
"node_id": "QmVTMd7SEXfxS5p4EEM5ykTe1DwWWVewEM3NwjLY242vr2",
"is_outbound": false
"version": "0.29.0 (a6733e6 2020-02-26)"
}
],
"skip": true
Expand Down Expand Up @@ -843,6 +843,8 @@
"orphan": "0x0",
"pending": "0x1",
"proposed": "0x0",
"tip_hash": "0xa5f5c85987a15de25661e5a214f2c1449cd803f071acc7999820f25246471f40",
"tip_number": "0x400",
"total_tx_cycles": "0x219",
"total_tx_size": "0x112"
}
Expand Down Expand Up @@ -1392,15 +1394,10 @@
"0x4ceaa32f692948413e213ce6f3a83337145bde6e11fd8cb94377ce2637dcc412"
],
"result": {
"block_number": "0x400",
"capacity": "0xb00fb84df292",
"cells_count": "0x3f5",
"block_number": "0x400"
"cells_count": "0x3f5"
},
"types": [
{
"lock_hash": "Cell lock script hash"
}
],
"returns": [
{
"capacity": "Total capacity"
Expand All @@ -1411,6 +1408,11 @@
{
"block_number": "At which block capacity was calculated"
}
],
"types": [
{
"lock_hash": "Cell lock script hash"
}
]
},
{
Expand Down Expand Up @@ -1606,17 +1608,17 @@
"new_tip_header"
],
"result": "0x2a",
"types": [
{
"topic": "Subscription topic (enum: new_tip_header | new_tip_block)"
}
],
"returns": [
{
"id": "Subscription id"
}
],
"skip": true
"skip": true,
"types": [
{
"topic": "Subscription topic (enum: new_tip_header | new_tip_block)"
}
]
},
{
"description": "unsubscribe from a subscribed topic",
Expand All @@ -1626,16 +1628,16 @@
"0x2a"
],
"result": true,
"types": [
{
"id": "Subscription id"
}
],
"returns": [
{
"result": "Unsubscribe result"
}
],
"skip": true
"skip": true,
"types": [
{
"id": "Subscription id"
}
]
}
]
82 changes: 49 additions & 33 deletions rpc/src/error.rs
@@ -1,5 +1,5 @@
use ckb_error::{Error as CKBError, InternalError, InternalErrorKind};
use ckb_tx_pool::error::SubmitTxError;
use ckb_tx_pool::error::Reject;
use jsonrpc_core::{Error, ErrorCode, Value};
use std::fmt::{Debug, Display};

Expand Down Expand Up @@ -33,6 +33,7 @@ pub enum RPCError {
PoolRejectedTransactionByMaxAncestorsCountLimit = -1105,
PoolIsFull = -1106,
PoolRejectedDuplicatedTransaction = -1107,
PoolRejectedMalformedTransaction = -1108,
}

impl RPCError {
Expand Down Expand Up @@ -72,6 +73,27 @@ impl RPCError {
}
}

pub fn from_submit_transaction_reject(reject: &Reject) -> Error {
let code = match reject {
Reject::LowFeeRate(_, _) => RPCError::PoolRejectedTransactionByMinFeeRate,
Reject::ExceededMaximumAncestorsCount => {
RPCError::PoolRejectedTransactionByMaxAncestorsCountLimit
}
Reject::Full(_, _) => RPCError::PoolIsFull,
Reject::Duplicated(_) => RPCError::PoolRejectedDuplicatedTransaction,
Reject::Malformed(_) => RPCError::PoolRejectedMalformedTransaction,
};
RPCError::custom_with_error(code, reject)
}

pub fn downcast_submit_transaction_reject(err: &CKBError) -> Option<&Reject> {
use ckb_error::ErrorKind::SubmitTransaction;
match err.kind() {
SubmitTransaction => err.downcast_ref::<Reject>(),
_ => None,
}
}

pub fn from_ckb_error(err: CKBError) -> Error {
use ckb_error::ErrorKind::*;
match err.kind() {
Expand All @@ -81,23 +103,6 @@ impl RPCError {
RPCError::TransactionFailedToVerify,
err.unwrap_cause_or_self(),
),
SubmitTransaction => {
let submit_tx_err = match err.downcast_ref::<SubmitTxError>() {
Some(err) => err,
None => return Self::ckb_internal_error(err),
};

let kind = match *submit_tx_err {
SubmitTxError::LowFeeRate(_, _) => {
RPCError::PoolRejectedTransactionByMinFeeRate
}
SubmitTxError::ExceededMaximumAncestorsCount => {
RPCError::PoolRejectedTransactionByMaxAncestorsCountLimit
}
};

RPCError::custom_with_error(kind, submit_tx_err)
}
Internal => {
let internal_err = match err.downcast_ref::<InternalError>() {
Some(err) => err,
Expand All @@ -106,10 +111,6 @@ impl RPCError {

let kind = match internal_err.kind() {
InternalErrorKind::CapacityOverflow => RPCError::IntegerOverflow,
InternalErrorKind::TransactionPoolFull => RPCError::PoolIsFull,
InternalErrorKind::PoolTransactionDuplicated => {
RPCError::PoolRejectedDuplicatedTransaction
}
InternalErrorKind::DataCorrupted => RPCError::DatabaseIsCorrupt,
InternalErrorKind::Database => RPCError::DatabaseError,
InternalErrorKind::Config => RPCError::ConfigError,
Expand Down Expand Up @@ -161,26 +162,41 @@ mod tests {
}

#[test]
fn test_submit_tx_error_from_ckb_error() {
let err: CKBError = SubmitTxError::LowFeeRate(100, 50).into();
fn test_submit_transaction_error() {
let err: CKBError = Reject::LowFeeRate(100, 50).into();
assert_eq!(
"PoolRejectedTransactionByMinFeeRate: Transaction fee rate must >= 100 shannons/KB, got: 50",
RPCError::from_ckb_error(err).message
RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message
);

let err: CKBError = SubmitTxError::ExceededMaximumAncestorsCount.into();
let err: CKBError = Reject::ExceededMaximumAncestorsCount.into();
assert_eq!(
"PoolRejectedTransactionByMaxAncestorsCountLimit: Transaction exceeded maximum ancestors count limit, try send it later",
RPCError::from_ckb_error(err).message
RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message
);
}

#[test]
fn test_internal_error_from_ckb_error() {
let err: CKBError = InternalErrorKind::TransactionPoolFull.into();
let err: CKBError = Reject::Full("size".to_owned(), 10).into();
assert_eq!(
"PoolIsFull: TransactionPoolFull",
RPCError::from_ckb_error(err).message
"PoolIsFull: Transaction pool exceeded maximum size limit(10), try send it later",
RPCError::from_submit_transaction_reject(
RPCError::downcast_submit_transaction_reject(&err).unwrap()
)
.message
);

let err: CKBError = Reject::Duplicated(Byte32::new([0; 32])).into();
assert_eq!(
"PoolRejectedDuplicatedTransaction: Transaction(Byte32(0x0000000000000000000000000000000000000000000000000000000000000000)) already exist in transaction_pool",
RPCError::from_submit_transaction_reject(RPCError::downcast_submit_transaction_reject(&err).unwrap()).message
);

let err: CKBError = Reject::Malformed("cellbase like".to_owned()).into();
assert_eq!(
"PoolRejectedMalformedTransaction: Malformed cellbase like transaction",
RPCError::from_submit_transaction_reject(
RPCError::downcast_submit_transaction_reject(&err).unwrap()
)
.message
);
}

Expand Down
39 changes: 27 additions & 12 deletions rpc/src/module/pool.rs
Expand Up @@ -7,6 +7,7 @@ use ckb_network::PeerIndex;
use ckb_script::IllTransactionChecker;
use ckb_shared::shared::Shared;
use ckb_sync::SyncShared;
use ckb_tx_pool::error::Reject;
use ckb_types::{core, packed, prelude::*, H256};
use ckb_verification::{Since, SinceMetric};
use jsonrpc_core::Result;
Expand Down Expand Up @@ -100,20 +101,31 @@ impl PoolRpc for PoolRpcImpl {
return Err(RPCError::ckb_internal_error(e));
}

let broadcast = |tx_hash: packed::Byte32| {
// workaround: we are using `PeerIndex(usize::max)` to indicate that tx hash source is itself.
let peer_index = PeerIndex::new(usize::max_value());
self.sync_shared
.state()
.tx_hashes()
.entry(peer_index)
.or_default()
.insert(tx_hash);
};
let tx_hash = tx.hash();
match submit_txs.unwrap() {
Ok(_) => {
// workaround: we are using `PeerIndex(usize::max)` to indicate that tx hash source is itself.
let peer_index = PeerIndex::new(usize::max_value());
let hash = tx.hash();
self.sync_shared
.state()
.tx_hashes()
.entry(peer_index)
.or_default()
.insert(hash.clone());
Ok(hash.unpack())
broadcast(tx_hash.clone());
Ok(tx_hash.unpack())
}
Err(e) => Err(RPCError::from_ckb_error(e)),
Err(e) => match RPCError::downcast_submit_transaction_reject(&e) {
Some(reject) => {
if let Reject::Duplicated(_) = reject {
broadcast(tx_hash);
}
Err(RPCError::from_submit_transaction_reject(reject))
}
None => Err(RPCError::ckb_internal_error(e)),
},
}
}

Expand All @@ -128,6 +140,8 @@ impl PoolRpc for PoolRpcImpl {
let tx_pool_info = get_tx_pool_info.unwrap();

Ok(TxPoolInfo {
tip_hash: tx_pool_info.tip_hash.unpack(),
tip_number: tx_pool_info.tip_number.into(),
pending: (tx_pool_info.pending_size as u64).into(),
proposed: (tx_pool_info.proposed_size as u64).into(),
orphan: (tx_pool_info.orphan_size as u64).into(),
Expand All @@ -139,9 +153,10 @@ impl PoolRpc for PoolRpcImpl {
}

fn clear_tx_pool(&self) -> Result<()> {
let snapshot = Arc::clone(&self.shared.snapshot());
let tx_pool = self.shared.tx_pool_controller();
tx_pool
.clear_pool()
.clear_pool(snapshot)
.map_err(|err| RPCError::custom(RPCError::Invalid, err.to_string()))?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion rpc/src/module/test.rs
Expand Up @@ -111,9 +111,10 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl {
.map_err(|err| RPCError::custom(RPCError::Invalid, err.to_string()))?;

// Clear the tx_pool
let new_snapshot = Arc::clone(&self.shared.snapshot());
let tx_pool = self.shared.tx_pool_controller();
tx_pool
.clear_pool()
.clear_pool(new_snapshot)
.map_err(|err| RPCError::custom(RPCError::Invalid, err.to_string()))?;

Ok(())
Expand Down

0 comments on commit 6c86698

Please sign in to comment.