Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency issue for RBF #4258

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions sync/src/relayer/tests/reconstruct_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::helper::{build_chain, new_transaction};
use crate::relayer::packed::{CellInput, OutPoint};
use crate::relayer::ReconstructionResult;
use crate::StatusCode;
use ckb_tx_pool::{PlugTarget, TxEntry};
Expand All @@ -8,7 +9,6 @@ use ckb_types::{
packed::{self, CompactBlockBuilder},
};
use std::collections::HashSet;

// There are more test cases in block_transactions_process and compact_block_process.rs
#[test]
fn test_missing_txs() {
Expand Down Expand Up @@ -64,9 +64,23 @@ fn test_missing_txs() {
#[test]
fn test_reconstruct_transactions_and_uncles() {
let (relayer, always_success_out_point) = build_chain(5);
let prepare: Vec<TransactionView> = (0..20)
.map(|i| new_transaction(&relayer, i, &always_success_out_point))
.collect();
let parent = new_transaction(&relayer, 0, &always_success_out_point);

// create a chain of transactions as prepare
let mut prepare = vec![parent];
while prepare.len() <= 20 {
let parent = prepare.last().unwrap();
let child = parent
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
.previous_output(OutPoint::new(parent.hash(), 0))
.build()
}])
.set_outputs(vec![parent.output(0).unwrap()])
.build();
prepare.push(child);
}
let uncle = BlockBuilder::default().build();

let block = BlockBuilder::default()
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(RbfContainInvalidCells),
Box::new(RbfRejectReplaceProposed),
Box::new(RbfReplaceProposedSuccess),
Box::new(RbfConcurrency),
Box::new(CompactBlockEmpty),
Box::new(CompactBlockEmptyParentUnknown),
Box::new(CompactBlockPrefilled),
Expand Down
10 changes: 8 additions & 2 deletions test/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct Node {
consensus: Consensus,
p2p_listen: String,
rpc_client: RpcClient,
rpc_listen: String,

node_id: Option<String>, // initialize when starts node
guard: Option<ProcessGuard>, // initialize when starts node
Expand Down Expand Up @@ -134,8 +135,8 @@ impl Node {
};

let p2p_listen = app_config.network.listen_addresses[0].to_string();
let rpc_address = app_config.rpc.listen_address;
let rpc_client = RpcClient::new(&format!("http://{rpc_address}/"));
let rpc_listen = format!("http://{}/", app_config.rpc.listen_address);
let rpc_client = RpcClient::new(&rpc_listen);
let consensus = {
// Ensure the data path is available because chain_spec.build_consensus() needs to access the
// system-cell data.
Expand All @@ -154,6 +155,7 @@ impl Node {
consensus,
p2p_listen,
rpc_client,
rpc_listen,
node_id: None,
guard: None,
}
Expand Down Expand Up @@ -184,6 +186,10 @@ impl Node {
self.p2p_listen.clone()
}

pub fn rpc_listen(&self) -> String {
self.rpc_listen.clone()
}

pub fn p2p_address(&self) -> String {
format!("{}/p2p/{}", self.p2p_listen(), self.node_id())
}
Expand Down
3 changes: 2 additions & 1 deletion test/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ impl RpcClient {
}
}

jsonrpc!(pub struct Inner {
jsonrpc!(
pub struct Inner {
pub fn get_block(&self, _hash: H256) -> Option<BlockView>;
pub fn get_fork_block(&self, _hash: H256) -> Option<BlockView>;
pub fn get_block_by_number(&self, _number: BlockNumber) -> Option<BlockView>;
Expand Down
112 changes: 80 additions & 32 deletions test/src/specs/tx_pool/replace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::{utils::wait_until, Node, Spec};
use crate::{
rpc::RpcClient,
util::{cell::gen_spendable, transaction::always_success_transactions},
utils::wait_until,
Node, Spec,
};
use ckb_jsonrpc_types::Status;
use ckb_logger::info;
use ckb_types::{
Expand Down Expand Up @@ -441,40 +446,23 @@ impl Spec for RbfContainInvalidCells {

node0.mine_until_out_bootstrap_period();

// build txs chain
let tx0 = node0.new_transaction_spend_tip_cellbase();
let mut txs = vec![tx0];
let max_count = 5;
while txs.len() <= max_count {
let parent = txs.last().unwrap();
let child = parent
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
.previous_output(OutPoint::new(parent.hash(), 0))
.build()
}])
.set_outputs(vec![parent.output(0).unwrap()])
.build();
txs.push(child);
}
assert_eq!(txs.len(), max_count + 1);
// send Tx chain
for tx in txs[..=max_count - 1].iter() {
let cells = gen_spendable(node0, 3);
let txs = always_success_transactions(node0, &cells);
for tx in txs.iter() {
let ret = node0.rpc_client().send_transaction_result(tx.data().into());
assert!(ret.is_ok());
}

let clone_tx = txs[2].clone();

let cell = CellDep::new_builder()
.out_point(OutPoint::new(txs[1].hash(), 0))
.build();

// Set tx2 fee to a higher value
let output2 = CellOutputBuilder::default()
.capacity(capacity_bytes!(70).pack())
.build();

// build a cell from conflicts txs's output
let cell = CellDep::new_builder()
.out_point(OutPoint::new(txs[2].hash(), 0))
.build();
let tx2 = clone_tx
.as_advanced_builder()
.set_inputs(vec![{
Expand All @@ -490,11 +478,6 @@ impl Spec for RbfContainInvalidCells {
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_err(), "tx2 should be rejected");
assert!(res
.err()
.unwrap()
.to_string()
.contains("new Tx contains cell deps from conflicts"));
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -687,7 +670,7 @@ impl Spec for RbfReplaceProposedSuccess {
let tx2_status = node0.rpc_client().get_transaction(tx2.hash()).tx_status;
assert_eq!(tx2_status.status, Status::Pending);

// submit a black block
// submit a blank block
let example = node0.new_block(None, None, None);
let blank_block = example
.as_advanced_builder()
Expand Down Expand Up @@ -730,3 +713,68 @@ impl Spec for RbfReplaceProposedSuccess {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}

pub struct RbfConcurrency;
impl Spec for RbfConcurrency {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];

node0.mine_until_out_bootstrap_period();
node0.new_block_with_blocking(|template| template.number.value() != 13);
let tx_hash_0 = node0.generate_transaction();
info!("Generate 4 txs with same input");
let tx1 = node0.new_transaction(tx_hash_0.clone());

let mut conflicts = vec![tx1];
// tx1 capacity is 100, set other txs to higer fee
let fees = vec![
capacity_bytes!(83),
capacity_bytes!(82),
capacity_bytes!(81),
capacity_bytes!(80),
];
for fee in fees.iter() {
let tx2_temp = node0.new_transaction(tx_hash_0.clone());
let output = CellOutputBuilder::default().capacity(fee.pack()).build();

let tx2 = tx2_temp
.as_advanced_builder()
.set_outputs(vec![output])
.build();
conflicts.push(tx2);
}

// make 5 threads to set_transaction concurrently
let mut handles = vec![];
for tx in &conflicts {
let cur_tx = tx.clone();
let rpc_address = node0.rpc_listen();
let handle = std::thread::spawn(move || {
let rpc_client = RpcClient::new(&rpc_address);
let _ = rpc_client.send_transaction_result(cur_tx.data().into());
});
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}

let status: Vec<_> = conflicts
.iter()
.map(|tx| {
let res = node0.rpc_client().get_transaction(tx.hash());
res.tx_status.status
})
.collect();

// the last tx should be in Pending(with the highest fee), others should be in Rejected
assert_eq!(status[4], Status::Pending);
for s in status.iter().take(4) {
assert_eq!(*s, Status::Rejected);
}
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}
14 changes: 4 additions & 10 deletions tx-pool/src/chunk_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ impl ChunkProcess {
let tx_hash = tx.hash();

let (ret, snapshot) = self.service.pre_check(&tx).await;
let (tip_hash, rtx, status, fee, tx_size, conflicts) =
try_or_return_with_snapshot!(ret, snapshot);
let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);

let cached = self.service.fetch_tx_verify_cache(&tx_hash).await;

Expand All @@ -244,10 +243,8 @@ impl ChunkProcess {
let completed = try_or_return_with_snapshot!(ret, snapshot);

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, submit_snapshot) = self
.service
.submit_entry(tip_hash, entry, status, conflicts)
.await;
let (ret, submit_snapshot) =
self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, submit_snapshot);
self.service
.after_process(tx, remote, &submit_snapshot, &Ok(completed))
Expand Down Expand Up @@ -325,10 +322,7 @@ impl ChunkProcess {
}

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, submit_snapshot) = self
.service
.submit_entry(tip_hash, entry, status, conflicts)
.await;
let (ret, submit_snapshot) = self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, snapshot);

self.service.notify_block_assembler(status).await;
Expand Down
27 changes: 24 additions & 3 deletions tx-pool/src/component/edges.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use ckb_types::packed::{Byte32, OutPoint, ProposalShortId};
use ckb_types::{
core::tx_pool::Reject,
packed::{Byte32, OutPoint, ProposalShortId},
};
use std::collections::{hash_map::Entry, HashMap, HashSet};

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -27,8 +30,26 @@ impl Edges {
self.deps.len()
}

pub(crate) fn insert_input(&mut self, out_point: OutPoint, txid: ProposalShortId) {
self.inputs.insert(out_point, txid);
pub(crate) fn insert_input(
&mut self,
out_point: OutPoint,
txid: ProposalShortId,
) -> Result<(), Reject> {
// inputs is occupied means double speanding happened here
match self.inputs.entry(out_point.clone()) {
chenyukang marked this conversation as resolved.
Show resolved Hide resolved
Entry::Occupied(occupied) => {
let msg =
format!(
"txpool unexpected double-spending out_point: {:?} old_tx: {:?} new_tx: {:?}",
out_point, occupied.get(), txid
);
Err(Reject::RBFRejected(msg))
}
Entry::Vacant(vacant) => {
vacant.insert(txid);
Ok(())
}
}
}

pub(crate) fn remove_input(&mut self, out_point: &OutPoint) -> Option<ProposalShortId> {
Expand Down
7 changes: 4 additions & 3 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ impl PoolMap {
}
trace!("pool_map.add_{:?} {}", status, entry.transaction().hash());
self.check_and_record_ancestors(&mut entry)?;
self.record_entry_edges(&entry)?;
self.insert_entry(&entry, status);
self.record_entry_edges(&entry);
self.record_entry_descendants(&entry);
self.track_entry_statics();
Ok(true)
Expand Down Expand Up @@ -389,7 +389,7 @@ impl PoolMap {
}
}

fn record_entry_edges(&mut self, entry: &TxEntry) {
fn record_entry_edges(&mut self, entry: &TxEntry) -> Result<(), Reject> {
let tx_short_id: ProposalShortId = entry.proposal_short_id();
let header_deps = entry.transaction().header_deps();
let related_dep_out_points: Vec<_> = entry.related_dep_out_points().cloned().collect();
Expand All @@ -398,7 +398,7 @@ impl PoolMap {
// if input reference a in-pool output, connect it
// otherwise, record input for conflict check
for i in inputs {
self.edges.insert_input(i.to_owned(), tx_short_id.clone());
self.edges.insert_input(i.to_owned(), tx_short_id.clone())?;
}

// record dep-txid
Expand All @@ -411,6 +411,7 @@ impl PoolMap {
.header_deps
.insert(tx_short_id, header_deps.into_iter().collect());
}
Ok(())
}

fn record_entry_descendants(&mut self, entry: &TxEntry) {
Expand Down
Loading
Loading