-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
broadcast_duplicates_run.rs
344 lines (314 loc) · 13.1 KB
/
broadcast_duplicates_run.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
use {
super::{broadcast_utils::ReceiveResults, *},
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::{
entry::{create_ticks, Entry, EntrySlice},
shred::Shredder,
},
solana_runtime::blockhash_queue::BlockhashQueue,
solana_sdk::{
fee_calculator::FeeCalculator,
hash::Hash,
signature::{Keypair, Signer},
transaction::Transaction,
},
std::collections::VecDeque,
};
// Queue which facilitates delivering shreds with a delay
type DelayedQueue = VecDeque<(Option<Pubkey>, Option<Vec<Shred>>)>;
#[derive(Clone)]
pub(super) struct BroadcastDuplicatesRun {
config: BroadcastDuplicatesConfig,
// Local queue for broadcast to track which duplicate blockhashes we've sent
duplicate_queue: BlockhashQueue,
// Shared queue between broadcast and transmit threads
delayed_queue: Arc<Mutex<DelayedQueue>>,
// Buffer for duplicate entries
duplicate_entries_buffer: Vec<Entry>,
last_duplicate_entry_hash: Hash,
last_broadcast_slot: Slot,
next_shred_index: u32,
shred_version: u16,
keypair: Arc<Keypair>,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
}
impl BroadcastDuplicatesRun {
pub(super) fn new(
keypair: Arc<Keypair>,
shred_version: u16,
config: BroadcastDuplicatesConfig,
) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
let mut delayed_queue = DelayedQueue::new();
delayed_queue.resize(config.duplicate_send_delay, (None, None));
Self {
config,
delayed_queue: Arc::new(Mutex::new(delayed_queue)),
duplicate_queue: BlockhashQueue::default(),
duplicate_entries_buffer: vec![],
next_shred_index: u32::MAX,
last_broadcast_slot: 0,
last_duplicate_entry_hash: Hash::default(),
shred_version,
keypair,
cluster_nodes_cache,
}
}
fn queue_or_create_duplicate_entries(
&mut self,
bank: &Arc<Bank>,
receive_results: &ReceiveResults,
) -> (Vec<Entry>, u32) {
// If the last entry hash is default, grab the last blockhash from the parent bank
if self.last_duplicate_entry_hash == Hash::default() {
self.last_duplicate_entry_hash = bank.last_blockhash();
}
// Create duplicate entries by..
// 1) rearranging real entries so that all transaction entries are moved to
// the front and tick entries are moved to the back.
// 2) setting all transaction entries to zero hashes and all tick entries to `hashes_per_tick`.
// 3) removing any transactions which reference blockhashes which aren't in the
// duplicate blockhash queue.
let (duplicate_entries, next_shred_index) = if bank.slot() > MINIMUM_DUPLICATE_SLOT {
let mut tx_entries: Vec<Entry> = receive_results
.entries
.iter()
.filter_map(|entry| {
if entry.is_tick() {
return None;
}
let transactions: Vec<Transaction> = entry
.transactions
.iter()
.filter(|tx| {
self.duplicate_queue
.get_hash_age(&tx.message.recent_blockhash)
.is_some()
})
.cloned()
.collect();
if !transactions.is_empty() {
Some(Entry::new_mut(
&mut self.last_duplicate_entry_hash,
&mut 0,
transactions,
))
} else {
None
}
})
.collect();
let mut tick_entries = create_ticks(
receive_results.entries.tick_count(),
bank.hashes_per_tick().unwrap_or_default(),
self.last_duplicate_entry_hash,
);
self.duplicate_entries_buffer.append(&mut tx_entries);
self.duplicate_entries_buffer.append(&mut tick_entries);
// Only send out duplicate entries when the block is finished otherwise the
// recipient will start repairing for shreds they haven't received yet and
// hit duplicate slot issues before we want them to.
let entries = if receive_results.last_tick_height == bank.max_tick_height() {
self.duplicate_entries_buffer.drain(..).collect()
} else {
vec![]
};
// Set next shred index to 0 since we are sending the full slot
(entries, 0)
} else {
// Send real entries until we hit min duplicate slot
(receive_results.entries.clone(), self.next_shred_index)
};
// Save last duplicate entry hash to avoid invalid entry hash errors
if let Some(last_duplicate_entry) = duplicate_entries.last() {
self.last_duplicate_entry_hash = last_duplicate_entry.hash;
}
(duplicate_entries, next_shred_index)
}
}
/// Duplicate slots should only be sent once all validators have started.
/// This constant is intended to be used as a buffer so that all validators
/// are live before sending duplicate slots.
pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
impl BroadcastRun for BroadcastDuplicatesRun {
fn run(
&mut self,
blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;
if self.next_shred_index == u32::MAX {
self.next_shred_index = blockstore
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0) as u32
}
// We were not the leader, but just became leader again
if bank.slot() > self.last_broadcast_slot + 1 {
self.last_duplicate_entry_hash = Hash::default();
}
self.last_broadcast_slot = bank.slot();
let shredder = Shredder::new(
bank.slot(),
bank.parent().unwrap().slot(),
self.keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
)
.expect("Expected to create a new shredder");
let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds(
&receive_results.entries,
last_tick_height == bank.max_tick_height(),
self.next_shred_index,
);
let (duplicate_entries, next_duplicate_shred_index) =
self.queue_or_create_duplicate_entries(&bank, &receive_results);
let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() {
shredder.entries_to_shreds(
&duplicate_entries,
last_tick_height == bank.max_tick_height(),
next_duplicate_shred_index,
)
} else {
(vec![], vec![], 0)
};
// Manually track the shred index because relying on slot meta consumed is racy
if last_tick_height == bank.max_tick_height() {
self.next_shred_index = 0;
self.duplicate_queue
.register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default());
} else {
self.next_shred_index = last_shred_index;
}
// Partition network with duplicate and real shreds based on stake
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let mut duplicate_recipients = HashMap::new();
let mut real_recipients = HashMap::new();
let mut stakes: Vec<(Pubkey, u64)> = bank
.epoch_staked_nodes(bank_epoch)
.unwrap()
.iter()
.filter(|(pubkey, _)| **pubkey != self.keypair.pubkey())
.map(|(pubkey, stake)| (*pubkey, *stake))
.collect();
stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| {
if r_stake == l_stake {
l_key.cmp(r_key)
} else {
r_stake.cmp(l_stake)
}
});
let highest_staked_node = stakes.first().cloned().map(|x| x.0);
let stake_total: u64 = stakes.iter().map(|(_, stake)| *stake).sum();
let mut cumulative_stake: u64 = 0;
for (pubkey, stake) in stakes.into_iter().rev() {
cumulative_stake += stake;
if (100 * cumulative_stake / stake_total) as u8 <= self.config.stake_partition {
duplicate_recipients.insert(pubkey, stake);
} else {
real_recipients.insert(pubkey, stake);
}
}
if let Some(highest_staked_node) = highest_staked_node {
if bank.slot() > MINIMUM_DUPLICATE_SLOT && last_tick_height == bank.max_tick_height() {
warn!(
"{} sent duplicate slot {} to nodes: {:?}",
self.keypair.pubkey(),
bank.slot(),
&duplicate_recipients,
);
warn!(
"Duplicate shreds for slot {} will be broadcast in {} slot(s)",
bank.slot(),
self.config.duplicate_send_delay
);
let delayed_shreds: Option<Vec<Shred>> = vec![
duplicate_data_shreds.last().cloned(),
data_shreds.last().cloned(),
]
.into_iter()
.collect();
self.delayed_queue
.lock()
.unwrap()
.push_back((Some(highest_staked_node), delayed_shreds));
}
}
let _duplicate_recipients = Arc::new(duplicate_recipients);
let _real_recipients = Arc::new(real_recipients);
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step
socket_sender.send((Arc::new(duplicate_data_shreds), None))?;
socket_sender.send((Arc::new(duplicate_coding_shreds), None))?;
socket_sender.send((data_shreds, None))?;
socket_sender.send((Arc::new(coding_shreds), None))?;
Ok(())
}
fn transmit(
&mut self,
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
// Check the delay queue for shreds that are ready to be sent
let (delayed_recipient, delayed_shreds) = {
let mut delayed_deque = self.delayed_queue.lock().unwrap();
if delayed_deque.len() > self.config.duplicate_send_delay {
delayed_deque.pop_front().unwrap()
} else {
(None, None)
}
};
let (shreds, _) = receiver.lock().unwrap().recv()?;
if shreds.is_empty() {
return Ok(());
}
let slot = shreds.first().unwrap().slot();
assert!(shreds.iter().all(|shred| shred.slot() == slot));
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default();
let socket_addr_space = cluster_info.socket_addr_space();
for peer in cluster_info.tvu_peers() {
// Forward shreds to circumvent gossip
if stakes.get(&peer.id).is_some() {
shreds.iter().for_each(|shred| {
if socket_addr_space.check(&peer.tvu_forwards) {
sock.send_to(&shred.payload, &peer.tvu_forwards).unwrap();
}
});
}
// After a delay, broadcast duplicate shreds to a single node
if let Some(shreds) = delayed_shreds.as_ref() {
if Some(peer.id) == delayed_recipient {
shreds.iter().for_each(|shred| {
if socket_addr_space.check(&peer.tvu) {
sock.send_to(&shred.payload, &peer.tvu).unwrap();
}
});
}
}
}
Ok(())
}
fn record(
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
let (data_shreds, _) = receiver.lock().unwrap().recv()?;
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
Ok(())
}
}