-
Notifications
You must be signed in to change notification settings - Fork 219
/
block_fetcher.rs
206 lines (180 loc) · 7.19 KB
/
block_fetcher.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
use crate::synchronizer::{BlockStatus, Synchronizer};
use crate::types::HeaderView;
use crate::{
BLOCK_DOWNLOAD_TIMEOUT, BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER,
PER_FETCH_BLOCK_LIMIT,
};
use ckb_core::header::Header;
use ckb_network::PeerIndex;
use ckb_shared::index::ChainIndex;
use ckb_traits::ChainProvider;
use ckb_util::try_option;
use faketime::unix_time_as_millis;
use log::{debug, trace};
use numext_fixed_hash::H256;
use numext_fixed_uint::U256;
use std::cmp;
pub struct BlockFetcher<CI: ChainIndex> {
synchronizer: Synchronizer<CI>,
peer: PeerIndex,
tip_header: Header,
total_difficulty: U256,
}
impl<CI> BlockFetcher<CI>
where
CI: ChainIndex,
{
pub fn new(synchronizer: Synchronizer<CI>, peer: PeerIndex) -> Self {
let (tip_header, total_difficulty) = {
let chain_state = synchronizer.shared.chain_state().lock();
(
chain_state.tip_header().clone(),
chain_state.total_difficulty().clone(),
)
};
BlockFetcher {
peer,
synchronizer,
tip_header,
total_difficulty,
}
}
pub fn initial_and_check_inflight(&self) -> bool {
let mut blocks_inflight = self.synchronizer.peers.blocks_inflight.write();
let inflight = blocks_inflight
.entry(self.peer)
.or_insert_with(Default::default);
if inflight.timestamp < unix_time_as_millis().saturating_sub(BLOCK_DOWNLOAD_TIMEOUT) {
trace!(target: "sync", "[block downloader] inflight block download timeout");
inflight.clear();
}
// current peer block blocks_inflight reach limit
if MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight.len()) == 0 {
debug!(target: "sync", "[block downloader] inflight count reach limit");
true
} else {
false
}
}
pub fn is_better_chain(&self, header: &HeaderView) -> bool {
*header.total_difficulty() >= self.total_difficulty
}
pub fn peer_best_known_header(&self) -> Option<HeaderView> {
self.synchronizer
.peers
.best_known_headers
.read()
.get(&self.peer)
.cloned()
}
pub fn last_common_header(&self, best: &HeaderView) -> Option<Header> {
let mut guard = self.synchronizer.peers.last_common_headers.write();
let last_common_header = try_option!(guard.get(&self.peer).cloned().or_else(|| {
if best.number() < self.tip_header.number() {
let last_common_hash = self.synchronizer.shared.block_hash(best.number())?;
self.synchronizer.shared.block_header(&last_common_hash)
} else {
Some(self.tip_header.clone())
}
}));
let fixed_last_common_header = self
.synchronizer
.last_common_ancestor(&last_common_header, &best.inner())?;
if fixed_last_common_header != last_common_header {
guard
.entry(self.peer)
.and_modify(|last_common_header| {
*last_common_header = fixed_last_common_header.clone()
})
.or_insert_with(|| fixed_last_common_header.clone());
}
Some(fixed_last_common_header)
}
// this peer's tip is wherethe the ancestor of global_best_known_header
pub fn is_known_best(&self, header: &HeaderView) -> bool {
let global_best_known_header = { self.synchronizer.best_known_header.read().clone() };
if let Some(ancestor) = self
.synchronizer
.get_ancestor(&global_best_known_header.hash(), header.number())
{
if ancestor != header.inner().clone() {
debug!(
target: "sync",
"[block downloader] peer best_known_header is not ancestor of global_best_known_header"
);
return false;
}
} else {
return false;
}
true
}
pub fn fetch(self) -> Option<Vec<H256>> {
trace!(target: "sync", "[block downloader] BlockFetcher process");
if self.initial_and_check_inflight() {
debug!(target: "sync", "[block downloader] inflight count reach limit");
return None;
}
let best_known_header = match self.peer_best_known_header() {
Some(best_known_header) => best_known_header,
_ => {
trace!(target: "sync", "[block downloader] peer_best_known_header not found peer={}", self.peer);
return None;
}
};
// This peer has nothing interesting.
if !self.is_better_chain(&best_known_header) {
debug!(
target: "sync",
"[block downloader] best_known_header {} chain {}",
best_known_header.total_difficulty(),
self.total_difficulty
);
return None;
}
if !self.is_known_best(&best_known_header) {
return None;
}
// If the peer reorganized, our previous last_common_header may not be an ancestor
// of its current best_known_header. Go back enough to fix that.
let fixed_last_common_header = try_option!(self.last_common_header(&best_known_header));
if fixed_last_common_header == best_known_header.inner().clone() {
debug!(target: "sync", "[block downloader] fixed_last_common_header == best_known_header");
return None;
}
debug!(
target: "sync",
"[block downloader] fixed_last_common_header = {} best_known_header = {}",
fixed_last_common_header.number(),
best_known_header.number()
);
debug_assert!(best_known_header.number() > fixed_last_common_header.number());
let window_end = fixed_last_common_header.number() + BLOCK_DOWNLOAD_WINDOW;
let max_height = cmp::min(window_end + 1, best_known_header.number());
let mut n_height = fixed_last_common_header.number();
let mut v_fetch = Vec::with_capacity(PER_FETCH_BLOCK_LIMIT);
{
let mut guard = self.synchronizer.peers.blocks_inflight.write();
let inflight = guard.get_mut(&self.peer).expect("inflight already init");
while n_height < max_height && v_fetch.len() < PER_FETCH_BLOCK_LIMIT {
n_height += 1;
let to_fetch = try_option!(self
.synchronizer
.get_ancestor(&best_known_header.hash(), n_height));
let to_fetch_hash = to_fetch.hash();
let block_status = self.synchronizer.get_block_status(&to_fetch_hash);
if block_status == BlockStatus::VALID_MASK
&& inflight.insert(to_fetch_hash.clone().clone())
{
debug!(
target: "sync", "[Synchronizer] inflight insert {:?}------------{:x}",
to_fetch.number(),
to_fetch_hash
);
v_fetch.push(to_fetch_hash.clone());
}
}
}
Some(v_fetch)
}
}