forked from nervosnetwork/ckb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction_process.rs
108 lines (98 loc) · 4.1 KB
/
transaction_process.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use crate::relayer::Relayer;
use crate::relayer::MAX_RELAY_PEERS;
use ckb_core::{transaction::Transaction, Cycle};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_protocol::{RelayMessage, ValidTransaction as FbsValidTransaction};
use ckb_shared::index::ChainIndex;
use ckb_shared::tx_pool::types::PoolError;
use ckb_traits::chain_provider::ChainProvider;
use ckb_verification::TransactionError;
use failure::Error as FailureError;
use flatbuffers::FlatBufferBuilder;
use log::{debug, warn};
use numext_fixed_hash::H256;
use std::convert::TryInto;
use std::time::Duration;
const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
pub struct TransactionProcess<'a, CI> {
message: &'a FbsValidTransaction<'a>,
relayer: &'a Relayer<CI>,
peer: PeerIndex,
nc: &'a mut CKBProtocolContext,
}
impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {
pub fn new(
message: &'a FbsValidTransaction,
relayer: &'a Relayer<CI>,
peer: PeerIndex,
nc: &'a mut CKBProtocolContext,
) -> Self {
TransactionProcess {
message,
nc,
relayer,
peer,
}
}
pub fn execute(self) -> Result<(), FailureError> {
let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).try_into()?;
let tx_hash = tx.hash();
if self.already_known(tx_hash.clone()) {
debug!(target: "relay", "discarding already known transaction {:#x}", tx_hash);
return Ok(());
}
let tx_result = {
let chain_state = self.relayer.shared.chain_state().lock();
let max_block_cycles = self.relayer.shared.consensus().max_block_cycles();
chain_state.add_tx_to_pool(tx.clone(), max_block_cycles)
};
// disconnect peer if cycles mismatch
match tx_result {
Ok(cycles) if cycles == relay_cycles => {
// broadcast tx
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx, cycles);
fbb.finish(message, None);
let mut known_txs = self.relayer.peers.known_txs.lock();
let selected_peers: Vec<PeerIndex> = self
.nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_txs.insert(*peer_index, tx_hash.clone()) && (self.peer != *peer_index)
})
.take(MAX_RELAY_PEERS)
.collect();
for peer in selected_peers {
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
}
}
Err(PoolError::InvalidTx(TransactionError::UnknownInput))
| Err(PoolError::InvalidTx(TransactionError::Conflict))
| Err(PoolError::Duplicate)
| Err(PoolError::InvalidTx(TransactionError::Immature)) => {
// this error may occured when peer's tip is different with us,
// we can't proof peer is bad so just ignore this
debug!(target: "relay", "peer {} relay a conflict or missing input tx: {:?}", self.peer, tx);
}
Ok(cycles) => {
debug!(target: "relay", "peer {} relay wrong cycles tx: {:?} real cycles {} wrong cycles {}", self.peer, tx, cycles, relay_cycles);
// TODO use report score interface
self.nc.ban_peer(self.peer, DEFAULT_BAN_TIME);
}
Err(err) => {
debug!(target: "relay", "peer {} relay a invalid tx: {:?}, error: {:?}", self.peer, tx, err);
// TODO use report score interface
self.nc.ban_peer(self.peer, DEFAULT_BAN_TIME);
}
}
Ok(())
}
fn already_known(&self, hash: H256) -> bool {
let mut tx_filter = self.relayer.state.tx_filter.lock();
tx_filter.insert(hash, ()).is_some()
}
}