-
Notifications
You must be signed in to change notification settings - Fork 221
/
synchronizer.rs
124 lines (109 loc) · 4.13 KB
/
synchronizer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::synchronizer::{BLOCK_FETCH_TOKEN, SEND_GET_HEADERS_TOKEN, TIMEOUT_EVICTION_TOKEN};
use crate::tests::TestNode;
use crate::{Config, NetworkProtocol, Synchronizer};
use ckb_chain::chain::ChainBuilder;
use ckb_chain_spec::consensus::Consensus;
use ckb_core::block::BlockBuilder;
use ckb_core::header::HeaderBuilder;
use ckb_core::transaction::{CellInput, CellOutput, TransactionBuilder};
use ckb_db::memorydb::MemoryKeyValueDB;
use ckb_network::ProtocolId;
use ckb_notify::NotifyService;
use ckb_protocol::SyncMessage;
use ckb_shared::shared::{Shared, SharedBuilder};
use ckb_shared::store::ChainKVStore;
use ckb_traits::ChainProvider;
use faketime::{self, unix_time_as_millis};
use flatbuffers::get_root;
use numext_fixed_uint::U256;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
#[test]
fn basic_sync() {
let faketime_file = faketime::millis_tempfile(0).expect("create faketime file");
faketime::enable(&faketime_file);
let thread_name = format!("FAKETIME={}", faketime_file.display());
let (mut node1, shared1) = setup_node(&thread_name, 1);
let (mut node2, shared2) = setup_node(&thread_name, 3);
node1.connect(&mut node2, NetworkProtocol::SYNC as ProtocolId);
let (signal_tx1, signal_rx1) = channel();
thread::Builder::new()
.name(thread_name.clone())
.spawn(move || {
node1.start(&signal_tx1, |data| {
let msg = get_root::<SyncMessage>(data);
// terminate thread after 3 blocks
msg.payload_as_block()
.map(|block| block.header().unwrap().number() == 3)
.unwrap_or(false)
});
})
.expect("thread spawn");
let (signal_tx2, _) = channel();
thread::Builder::new()
.name(thread_name)
.spawn(move || {
node2.start(&signal_tx2, |_| false);
})
.expect("thread spawn");
// Wait node1 receive block from node2
let _ = signal_rx1.recv();
assert_eq!(shared1.chain_state().lock().tip_number(), 3);
assert_eq!(
shared1.chain_state().lock().tip_number(),
shared2.chain_state().lock().tip_number()
);
}
fn setup_node(
thread_name: &str,
height: u64,
) -> (TestNode, Shared<ChainKVStore<MemoryKeyValueDB>>) {
let mut block = BlockBuilder::default().with_header_builder(
HeaderBuilder::default()
.timestamp(unix_time_as_millis())
.difficulty(U256::from(1000u64)),
);
let consensus = Consensus::default().set_genesis_block(block.clone());
let shared = SharedBuilder::<MemoryKeyValueDB>::new()
.consensus(consensus)
.build();
let notify = NotifyService::default().start(Some(thread_name));
let chain_service = ChainBuilder::new(shared.clone(), notify)
.verification(false)
.build();
let chain_controller = chain_service.start::<&str>(None);
for _i in 0..height {
let number = block.header().number() + 1;
let timestamp = block.header().timestamp() + 1;
let difficulty = shared.calculate_difficulty(&block.header()).unwrap();
let cellbase = TransactionBuilder::default()
.input(CellInput::new_cellbase_input(number))
.output(CellOutput::default())
.build();
let header_builder = HeaderBuilder::default()
.parent_hash(block.header().hash().clone())
.number(number)
.timestamp(timestamp)
.difficulty(difficulty);
block = BlockBuilder::default()
.commit_transaction(cellbase)
.with_header_builder(header_builder);
chain_controller
.process_block(Arc::new(block.clone()))
.expect("process block should be OK");
}
let synchronizer = Synchronizer::new(chain_controller, shared.clone(), Config::default());
let mut node = TestNode::default();
let protocol = Arc::new(synchronizer) as Arc<_>;
node.add_protocol(
NetworkProtocol::SYNC as ProtocolId,
&protocol,
&[
SEND_GET_HEADERS_TOKEN,
BLOCK_FETCH_TOKEN,
TIMEOUT_EVICTION_TOKEN,
],
);
(node, shared)
}