/
apply.rs
5103 lines (4698 loc) · 180 KB
/
apply.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0.
use std::borrow::Cow;
use std::cmp::{Ord, Ordering as CmpOrdering};
use std::collections::VecDeque;
use std::fmt::{self, Debug, Formatter};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[cfg(test)]
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::vec::Drain;
use std::{cmp, usize};
use batch_system::{BasicMailbox, BatchRouter, BatchSystem, Fsm, HandlerBuilder, PollHandler};
use collections::{HashMap, HashMapEntry, HashSet};
use crossbeam::channel::{TryRecvError, TrySendError};
use engine_traits::PerfContext;
use engine_traits::PerfContextKind;
use engine_traits::{
DeleteStrategy, KvEngine, RaftEngine, Range as EngineRange, Snapshot, WriteBatch,
};
use engine_traits::{SSTMetaInfo, ALL_CFS, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE};
use fail::fail_point;
use kvproto::import_sstpb::SstMeta;
use kvproto::kvrpcpb::ExtraOp as TxnExtraOp;
use kvproto::metapb::{PeerRole, Region, RegionEpoch};
use kvproto::raft_cmdpb::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request, Response,
};
use kvproto::raft_serverpb::{
MergeState, PeerState, RaftApplyState, RaftTruncatedState, RegionLocalState,
};
use raft::eraftpb::{
ConfChange, ConfChangeType, ConfChangeV2, Entry, EntryType, Snapshot as RaftSnapshot,
};
use raft_proto::ConfChangeI;
use sst_importer::SSTImporter;
use tikv_util::config::{Tracker, VersionTrack};
use tikv_util::mpsc::{loose_bounded, LooseBoundedSender, Receiver};
use tikv_util::time::{duration_to_sec, Instant};
use tikv_util::worker::Scheduler;
use tikv_util::{box_err, box_try, debug, error, info, safe_panic, slow_log, warn};
use tikv_util::{Either, MustConsumeVec};
use time::Timespec;
use uuid::Builder as UuidBuilder;
use crate::coprocessor::{Cmd, CoprocessorHost};
use crate::store::fsm::RaftPollerBuilder;
use crate::store::metrics::*;
use crate::store::msg::{Callback, PeerMsg, ReadResponse, SignificantMsg};
use crate::store::peer::Peer;
use crate::store::peer_storage::{
self, write_initial_apply_state, write_peer_state, ENTRY_MEM_SIZE,
};
use crate::store::util::{
check_region_epoch, compare_region_epoch, is_learner, ChangePeerI, ConfChangeKind,
KeysInfoFormatter, ADMIN_CMD_EPOCH_MAP,
};
use crate::store::{cmd_resp, util, Config, RegionSnapshot, RegionTask};
use crate::{Error, Result};
use super::metrics::*;
const DEFAULT_APPLY_WB_SIZE: usize = 4 * 1024;
const APPLY_WB_SHRINK_SIZE: usize = 1024 * 1024;
const SHRINK_PENDING_CMD_QUEUE_CAP: usize = 64;
pub struct PendingCmd<S>
where
S: Snapshot,
{
pub index: u64,
pub term: u64,
pub cb: Option<Callback<S>>,
}
impl<S> PendingCmd<S>
where
S: Snapshot,
{
fn new(index: u64, term: u64, cb: Callback<S>) -> PendingCmd<S> {
PendingCmd {
index,
term,
cb: Some(cb),
}
}
}
impl<S> Drop for PendingCmd<S>
where
S: Snapshot,
{
fn drop(&mut self) {
if self.cb.is_some() {
safe_panic!(
"callback of pending command at [index: {}, term: {}] is leak",
self.index,
self.term
);
}
}
}
impl<S> Debug for PendingCmd<S>
where
S: Snapshot,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"PendingCmd [index: {}, term: {}, has_cb: {}]",
self.index,
self.term,
self.cb.is_some()
)
}
}
/// Commands waiting to be committed and applied.
#[derive(Debug)]
pub struct PendingCmdQueue<S>
where
S: Snapshot,
{
normals: VecDeque<PendingCmd<S>>,
conf_change: Option<PendingCmd<S>>,
}
impl<S> PendingCmdQueue<S>
where
S: Snapshot,
{
fn new() -> PendingCmdQueue<S> {
PendingCmdQueue {
normals: VecDeque::new(),
conf_change: None,
}
}
fn pop_normal(&mut self, index: u64, term: u64) -> Option<PendingCmd<S>> {
self.normals.pop_front().and_then(|cmd| {
if self.normals.capacity() > SHRINK_PENDING_CMD_QUEUE_CAP
&& self.normals.len() < SHRINK_PENDING_CMD_QUEUE_CAP
{
self.normals.shrink_to_fit();
}
if (cmd.term, cmd.index) > (term, index) {
self.normals.push_front(cmd);
return None;
}
Some(cmd)
})
}
fn append_normal(&mut self, cmd: PendingCmd<S>) {
self.normals.push_back(cmd);
}
fn take_conf_change(&mut self) -> Option<PendingCmd<S>> {
// conf change will not be affected when changing between follower and leader,
// so there is no need to check term.
self.conf_change.take()
}
// TODO: seems we don't need to separate conf change from normal entries.
fn set_conf_change(&mut self, cmd: PendingCmd<S>) {
self.conf_change = Some(cmd);
}
}
#[derive(Default, Debug)]
pub struct ChangePeer {
pub index: u64,
// The proposed ConfChangeV2 or (legacy) ConfChange
// ConfChange (if it is) will convert to ConfChangeV2
pub conf_change: ConfChangeV2,
// The change peer requests come along with ConfChangeV2
// or (legacy) ConfChange, for ConfChange, it only contains
// one element
pub changes: Vec<ChangePeerRequest>,
pub region: Region,
}
pub struct Range {
pub cf: String,
pub start_key: Vec<u8>,
pub end_key: Vec<u8>,
}
impl Debug for Range {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"{{ cf: {:?}, start_key: {:?}, end_key: {:?} }}",
self.cf,
log_wrappers::Value::key(&self.start_key),
log_wrappers::Value::key(&self.end_key)
)
}
}
impl Range {
fn new(cf: String, start_key: Vec<u8>, end_key: Vec<u8>) -> Range {
Range {
cf,
start_key,
end_key,
}
}
}
#[derive(Debug)]
pub enum ExecResult<S> {
ChangePeer(ChangePeer),
CompactLog {
state: RaftTruncatedState,
first_index: u64,
},
SplitRegion {
regions: Vec<Region>,
derived: Region,
new_split_regions: HashMap<u64, NewSplitPeer>,
},
PrepareMerge {
region: Region,
state: MergeState,
},
CommitMerge {
region: Region,
source: Region,
},
RollbackMerge {
region: Region,
commit: u64,
},
ComputeHash {
region: Region,
index: u64,
context: Vec<u8>,
snap: S,
},
VerifyHash {
index: u64,
context: Vec<u8>,
hash: Vec<u8>,
},
DeleteRange {
ranges: Vec<Range>,
},
IngestSst {
ssts: Vec<SSTMetaInfo>,
},
}
/// The possible returned value when applying logs.
pub enum ApplyResult<S> {
None,
Yield,
/// Additional result that needs to be sent back to raftstore.
Res(ExecResult<S>),
/// It is unable to apply the `CommitMerge` until the source peer
/// has applied to the required position and sets the atomic boolean
/// to true.
WaitMergeSource(Arc<AtomicU64>),
}
struct ExecContext {
apply_state: RaftApplyState,
index: u64,
term: u64,
}
impl ExecContext {
pub fn new(apply_state: RaftApplyState, index: u64, term: u64) -> ExecContext {
ExecContext {
apply_state,
index,
term,
}
}
}
struct ApplyCallback<EK>
where
EK: KvEngine,
{
region: Region,
cbs: Vec<(Option<Callback<EK::Snapshot>>, Cmd)>,
}
impl<EK> ApplyCallback<EK>
where
EK: KvEngine,
{
fn new(region: Region) -> Self {
let cbs = vec![];
ApplyCallback { region, cbs }
}
fn invoke_all(self, host: &CoprocessorHost<EK>) {
for (cb, mut cmd) in self.cbs {
host.post_apply(&self.region, &mut cmd);
if let Some(cb) = cb {
cb.invoke_with_response(cmd.response)
};
}
}
fn push(&mut self, cb: Option<Callback<EK::Snapshot>>, cmd: Cmd) {
self.cbs.push((cb, cmd));
}
}
pub trait Notifier<EK: KvEngine>: Send {
fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>);
fn notify_one(&self, region_id: u64, msg: PeerMsg<EK>);
fn clone_box(&self) -> Box<dyn Notifier<EK>>;
}
struct ApplyContext<EK, W>
where
EK: KvEngine,
W: WriteBatch<EK>,
{
tag: String,
timer: Option<Instant>,
host: CoprocessorHost<EK>,
importer: Arc<SSTImporter>,
region_scheduler: Scheduler<RegionTask<EK::Snapshot>>,
router: ApplyRouter<EK>,
notifier: Box<dyn Notifier<EK>>,
engine: EK,
cbs: MustConsumeVec<ApplyCallback<EK>>,
apply_res: Vec<ApplyRes<EK::Snapshot>>,
exec_ctx: Option<ExecContext>,
kv_wb: W,
kv_wb_last_bytes: u64,
kv_wb_last_keys: u64,
last_applied_index: u64,
committed_count: usize,
// Whether synchronize WAL is preferred.
sync_log_hint: bool,
// Whether to use the delete range API instead of deleting one by one.
use_delete_range: bool,
perf_context: EK::PerfContext,
yield_duration: Duration,
store_id: u64,
/// region_id -> (peer_id, is_splitting)
/// Used for handling race between splitting and creating new peer.
/// An uninitialized peer can be replaced to the one from splitting iff they are exactly the same peer.
pending_create_peers: Arc<Mutex<HashMap<u64, (u64, bool)>>>,
/// We must delete the ingested file before calling `callback` so that any ingest-request reaching this
/// peer could see this update if leader had changed. We must also delete them after the applied-index
/// has been persisted to kvdb because this entry may replay because of panic or power-off, which
/// happened before `WriteBatch::write` and after `SSTImporter::delete`. We shall make sure that
/// this entry will never apply again at first, then we can delete the ssts files.
delete_ssts: Vec<SSTMetaInfo>,
}
impl<EK, W> ApplyContext<EK, W>
where
EK: KvEngine,
W: WriteBatch<EK>,
{
pub fn new(
tag: String,
host: CoprocessorHost<EK>,
importer: Arc<SSTImporter>,
region_scheduler: Scheduler<RegionTask<EK::Snapshot>>,
engine: EK,
router: ApplyRouter<EK>,
notifier: Box<dyn Notifier<EK>>,
cfg: &Config,
store_id: u64,
pending_create_peers: Arc<Mutex<HashMap<u64, (u64, bool)>>>,
) -> ApplyContext<EK, W> {
// If `enable_multi_batch_write` was set true, we create `RocksWriteBatchVec`.
// Otherwise create `RocksWriteBatch`.
let kv_wb = W::with_capacity(&engine, DEFAULT_APPLY_WB_SIZE);
ApplyContext {
tag,
timer: None,
host,
importer,
region_scheduler,
engine: engine.clone(),
router,
notifier,
kv_wb,
cbs: MustConsumeVec::new("callback of apply context"),
apply_res: vec![],
kv_wb_last_bytes: 0,
kv_wb_last_keys: 0,
last_applied_index: 0,
committed_count: 0,
sync_log_hint: false,
exec_ctx: None,
use_delete_range: cfg.use_delete_range,
perf_context: engine.get_perf_context(cfg.perf_level, PerfContextKind::RaftstoreApply),
yield_duration: cfg.apply_yield_duration.0,
delete_ssts: vec![],
store_id,
pending_create_peers,
}
}
/// Prepares for applying entries for `delegate`.
///
/// A general apply progress for a delegate is:
/// `prepare_for` -> `commit` [-> `commit` ...] -> `finish_for`.
/// After all delegates are handled, `write_to_db` method should be called.
pub fn prepare_for(&mut self, delegate: &mut ApplyDelegate<EK>) {
self.cbs.push(ApplyCallback::new(delegate.region.clone()));
self.last_applied_index = delegate.apply_state.get_applied_index();
if let Some(observe_cmd) = &delegate.observe_cmd {
let region_id = delegate.region_id();
// TODO: skip this step when we do not need to observe cmds.
self.host.prepare_for_apply(observe_cmd.id, region_id);
}
}
/// Commits all changes have done for delegate. `persistent` indicates whether
/// write the changes into rocksdb.
///
/// This call is valid only when it's between a `prepare_for` and `finish_for`.
pub fn commit(&mut self, delegate: &mut ApplyDelegate<EK>) {
if self.last_applied_index < delegate.apply_state.get_applied_index() {
delegate.write_apply_state(self.kv_wb_mut());
}
// last_applied_index doesn't need to be updated, set persistent to true will
// force it call `prepare_for` automatically.
self.commit_opt(delegate, true);
}
fn commit_opt(&mut self, delegate: &mut ApplyDelegate<EK>, persistent: bool) {
delegate.update_metrics(self);
if persistent {
self.write_to_db();
self.prepare_for(delegate);
}
self.kv_wb_last_bytes = self.kv_wb().data_size() as u64;
self.kv_wb_last_keys = self.kv_wb().count() as u64;
}
/// Writes all the changes into RocksDB.
/// If it returns true, all pending writes are persisted in engines.
pub fn write_to_db(&mut self) -> bool {
let need_sync = self.sync_log_hint;
if !self.kv_wb_mut().is_empty() {
let mut write_opts = engine_traits::WriteOptions::new();
write_opts.set_sync(need_sync);
self.kv_wb().write_opt(&write_opts).unwrap_or_else(|e| {
panic!("failed to write to engine: {:?}", e);
});
self.perf_context.report_metrics();
self.sync_log_hint = false;
let data_size = self.kv_wb().data_size();
if data_size > APPLY_WB_SHRINK_SIZE {
// Control the memory usage for the WriteBatch. Whether it's `RocksWriteBatch` or
// `RocksWriteBatchVec` depends on the `enable_multi_batch_write` configuration.
self.kv_wb = W::with_capacity(&self.engine, DEFAULT_APPLY_WB_SIZE);
} else {
// Clear data, reuse the WriteBatch, this can reduce memory allocations and deallocations.
self.kv_wb_mut().clear();
}
self.kv_wb_last_bytes = 0;
self.kv_wb_last_keys = 0;
}
if !self.delete_ssts.is_empty() {
let tag = self.tag.clone();
for sst in self.delete_ssts.drain(..) {
self.importer.delete(&sst.meta).unwrap_or_else(|e| {
panic!("{} cleanup ingested file {:?}: {:?}", tag, sst, e);
});
}
}
// Call it before invoking callback for preventing Commit is executed before Prewrite is observed.
self.host.on_flush_apply(self.engine.clone());
for cbs in self.cbs.drain(..) {
cbs.invoke_all(&self.host);
}
need_sync
}
/// Finishes `Apply`s for the delegate.
pub fn finish_for(
&mut self,
delegate: &mut ApplyDelegate<EK>,
results: VecDeque<ExecResult<EK::Snapshot>>,
) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
region_id: delegate.region_id(),
apply_state: delegate.apply_state.clone(),
exec_res: results,
metrics: delegate.metrics.clone(),
applied_index_term: delegate.applied_index_term,
});
}
pub fn delta_bytes(&self) -> u64 {
self.kv_wb().data_size() as u64 - self.kv_wb_last_bytes
}
pub fn delta_keys(&self) -> u64 {
self.kv_wb().count() as u64 - self.kv_wb_last_keys
}
#[inline]
pub fn kv_wb(&self) -> &W {
&self.kv_wb
}
#[inline]
pub fn kv_wb_mut(&mut self) -> &mut W {
&mut self.kv_wb
}
/// Flush all pending writes to engines.
/// If it returns true, all pending writes are persisted in engines.
pub fn flush(&mut self) -> bool {
// TODO: this check is too hacky, need to be more verbose and less buggy.
let t = match self.timer.take() {
Some(t) => t,
None => return false,
};
// Write to engine
// raftstore.sync-log = true means we need prevent data loss when power failure.
// take raft log gc for example, we write kv WAL first, then write raft WAL,
// if power failure happen, raft WAL may synced to disk, but kv WAL may not.
// so we use sync-log flag here.
let is_synced = self.write_to_db();
if !self.apply_res.is_empty() {
let apply_res = std::mem::replace(&mut self.apply_res, vec![]);
self.notifier.notify(apply_res);
}
let elapsed = t.elapsed();
STORE_APPLY_LOG_HISTOGRAM.observe(duration_to_sec(elapsed) as f64);
slow_log!(
elapsed,
"{} handle ready {} committed entries",
self.tag,
self.committed_count
);
self.committed_count = 0;
is_synced
}
}
/// Calls the callback of `cmd` when the Region is removed.
fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd<impl Snapshot>) {
debug!(
"region is removed, notify commands";
"region_id" => region_id,
"peer_id" => peer_id,
"index" => cmd.index,
"term" => cmd.term
);
notify_req_region_removed(region_id, cmd.cb.take().unwrap());
}
pub fn notify_req_region_removed(region_id: u64, cb: Callback<impl Snapshot>) {
let region_not_found = Error::RegionNotFound(region_id);
let resp = cmd_resp::new_error(region_not_found);
cb.invoke_with_response(resp);
}
/// Calls the callback of `cmd` when it can not be processed further.
fn notify_stale_command(
region_id: u64,
peer_id: u64,
term: u64,
mut cmd: PendingCmd<impl Snapshot>,
) {
info!(
"command is stale, skip";
"region_id" => region_id,
"peer_id" => peer_id,
"index" => cmd.index,
"term" => cmd.term
);
notify_stale_req(term, cmd.cb.take().unwrap());
}
pub fn notify_stale_req(term: u64, cb: Callback<impl Snapshot>) {
let resp = cmd_resp::err_resp(Error::StaleCommand, term);
cb.invoke_with_response(resp);
}
/// Checks if a write is needed to be issued before handling the command.
fn should_write_to_engine(cmd: &RaftCmdRequest) -> bool {
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// ComputeHash require an up to date snapshot.
AdminCmdType::ComputeHash |
// Merge needs to get the latest apply index.
AdminCmdType::CommitMerge |
AdminCmdType::RollbackMerge => return true,
_ => {}
}
}
// Some commands may modify keys covered by the current write batch, so we
// must write the current write batch to the engine first.
for req in cmd.get_requests() {
if req.has_delete_range() {
return true;
}
if req.has_ingest_sst() {
return true;
}
}
false
}
/// Checks if a write is needed to be issued after handling the command.
fn should_sync_log(cmd: &RaftCmdRequest) -> bool {
if cmd.has_admin_request() {
if cmd.get_admin_request().get_cmd_type() == AdminCmdType::CompactLog {
// We do not need to sync WAL before compact log, because this request will send a msg to
// raft_gc_log thread to delete the entries before this index instead of deleting them in
// apply thread directly.
return false;
}
return true;
}
for req in cmd.get_requests() {
// After ingest sst, sst files are deleted quickly. As a result,
// ingest sst command can not be handled again and must be synced.
// See more in Cleanup worker.
if req.has_ingest_sst() {
return true;
}
}
false
}
/// A struct that stores the state related to Merge.
///
/// When executing a `CommitMerge`, the source peer may have not applied
/// to the required index, so the target peer has to abort current execution
/// and wait for it asynchronously.
///
/// When rolling the stack, all states required to recover are stored in
/// this struct.
/// TODO: check whether generator/coroutine is a good choice in this case.
struct WaitSourceMergeState {
/// A flag that indicates whether the source peer has applied to the required
/// index. If the source peer is ready, this flag should be set to the region id
/// of source peer.
logs_up_to_date: Arc<AtomicU64>,
}
struct YieldState<EK>
where
EK: KvEngine,
{
/// All of the entries that need to continue to be applied after
/// the source peer has applied its logs.
pending_entries: Vec<Entry>,
/// All of messages that need to continue to be handled after
/// the source peer has applied its logs and pending entries
/// are all handled.
pending_msgs: Vec<Msg<EK>>,
}
impl<EK> Debug for YieldState<EK>
where
EK: KvEngine,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("YieldState")
.field("pending_entries", &self.pending_entries.len())
.field("pending_msgs", &self.pending_msgs.len())
.finish()
}
}
impl Debug for WaitSourceMergeState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WaitSourceMergeState")
.field("logs_up_to_date", &self.logs_up_to_date)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct NewSplitPeer {
pub peer_id: u64,
// `None` => success,
// `Some(s)` => fail due to `s`.
pub result: Option<String>,
}
/// The apply delegate of a Region which is responsible for handling committed
/// raft log entries of a Region.
///
/// `Apply` is a term of Raft, which means executing the actual commands.
/// In Raft, once some log entries are committed, for every peer of the Raft
/// group will apply the logs one by one. For write commands, it does write or
/// delete to local engine; for admin commands, it does some meta change of the
/// Raft group.
///
/// `Delegate` is just a structure to congregate all apply related fields of a
/// Region. The apply worker receives all the apply tasks of different Regions
/// located at this store, and it will get the corresponding apply delegate to
/// handle the apply task to make the code logic more clear.
#[derive(Debug)]
pub struct ApplyDelegate<EK>
where
EK: KvEngine,
{
/// The ID of the peer.
id: u64,
/// The term of the Region.
term: u64,
/// The Region information of the peer.
region: Region,
/// Peer_tag, "[region region_id] peer_id".
tag: String,
/// If the delegate should be stopped from polling.
/// A delegate can be stopped in conf change, merge or requested by destroy message.
stopped: bool,
/// The start time of the current round to execute commands.
handle_start: Option<Instant>,
/// Set to true when removing itself because of `ConfChangeType::RemoveNode`, and then
/// any following committed logs in same Ready should be applied failed.
pending_remove: bool,
/// The commands waiting to be committed and applied
pending_cmds: PendingCmdQueue<EK::Snapshot>,
/// The counter of pending request snapshots. See more in `Peer`.
pending_request_snapshot_count: Arc<AtomicUsize>,
/// Indicates the peer is in merging, if that compact log won't be performed.
is_merging: bool,
/// Records the epoch version after the last merge.
last_merge_version: u64,
yield_state: Option<YieldState<EK>>,
/// A temporary state that keeps track of the progress of the source peer state when
/// CommitMerge is unable to be executed.
wait_merge_state: Option<WaitSourceMergeState>,
// ID of last region that reports ready.
ready_source_region_id: u64,
/// TiKV writes apply_state to KV RocksDB, in one write batch together with kv data.
///
/// If we write it to Raft RocksDB, apply_state and kv data (Put, Delete) are in
/// separate WAL file. When power failure, for current raft log, apply_index may synced
/// to file, but KV data may not synced to file, so we will lose data.
apply_state: RaftApplyState,
/// The term of the raft log at applied index.
applied_index_term: u64,
/// The latest synced apply index.
last_sync_apply_index: u64,
/// Info about cmd observer.
observe_cmd: Option<ObserveCmd>,
/// The local metrics, and it will be flushed periodically.
metrics: ApplyMetrics,
}
impl<EK> ApplyDelegate<EK>
where
EK: KvEngine,
{
fn from_registration(reg: Registration) -> ApplyDelegate<EK> {
ApplyDelegate {
id: reg.id,
tag: format!("[region {}] {}", reg.region.get_id(), reg.id),
region: reg.region,
pending_remove: false,
last_sync_apply_index: reg.apply_state.get_applied_index(),
apply_state: reg.apply_state,
applied_index_term: reg.applied_index_term,
term: reg.term,
stopped: false,
handle_start: None,
ready_source_region_id: 0,
yield_state: None,
wait_merge_state: None,
is_merging: reg.is_merging,
pending_cmds: PendingCmdQueue::new(),
metrics: Default::default(),
last_merge_version: 0,
pending_request_snapshot_count: reg.pending_request_snapshot_count,
observe_cmd: None,
}
}
pub fn region_id(&self) -> u64 {
self.region.get_id()
}
pub fn id(&self) -> u64 {
self.id
}
/// Handles all the committed_entries, namely, applies the committed entries.
fn handle_raft_committed_entries<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
mut committed_entries_drainer: Drain<Entry>,
) {
if committed_entries_drainer.len() == 0 {
return;
}
apply_ctx.prepare_for(self);
// If we send multiple ConfChange commands, only first one will be proposed correctly,
// others will be saved as a normal entry with no data, so we must re-propose these
// commands again.
apply_ctx.committed_count += committed_entries_drainer.len();
let mut results = VecDeque::new();
while let Some(entry) = committed_entries_drainer.next() {
if self.pending_remove {
// This peer is about to be destroyed, skip everything.
break;
}
let expect_index = self.apply_state.get_applied_index() + 1;
if expect_index != entry.get_index() {
panic!(
"{} expect index {}, but got {}",
self.tag,
expect_index,
entry.get_index()
);
}
// NOTE: before v5.0, `EntryType::EntryConfChangeV2` entry is handled by `unimplemented!()`,
// which can break compatibility (i.e. old version tikv running on data written by new version tikv),
// but PD will reject old version tikv join the cluster, so this should not happen.
let res = match entry.get_entry_type() {
EntryType::EntryNormal => self.handle_raft_entry_normal(apply_ctx, &entry),
EntryType::EntryConfChange | EntryType::EntryConfChangeV2 => {
self.handle_raft_entry_conf_change(apply_ctx, &entry)
}
};
match res {
ApplyResult::None => {}
ApplyResult::Res(res) => results.push_back(res),
ApplyResult::Yield | ApplyResult::WaitMergeSource(_) => {
// Both cancel and merge will yield current processing.
apply_ctx.committed_count -= committed_entries_drainer.len() + 1;
let mut pending_entries =
Vec::with_capacity(committed_entries_drainer.len() + 1);
// Note that current entry is skipped when yield.
pending_entries.push(entry);
pending_entries.extend(committed_entries_drainer);
apply_ctx.finish_for(self, results);
self.yield_state = Some(YieldState {
pending_entries,
pending_msgs: Vec::default(),
});
if let ApplyResult::WaitMergeSource(logs_up_to_date) = res {
self.wait_merge_state = Some(WaitSourceMergeState { logs_up_to_date });
}
return;
}
}
}
apply_ctx.finish_for(self, results);
if self.pending_remove {
self.destroy(apply_ctx);
}
}
fn update_metrics<W: WriteBatch<EK>>(&mut self, apply_ctx: &ApplyContext<EK, W>) {
self.metrics.written_bytes += apply_ctx.delta_bytes();
self.metrics.written_keys += apply_ctx.delta_keys();
}
fn write_apply_state<W: WriteBatch<EK>>(&self, wb: &mut W) {
wb.put_msg_cf(
CF_RAFT,
&keys::apply_state_key(self.region.get_id()),
&self.apply_state,
)
.unwrap_or_else(|e| {
panic!(
"{} failed to save apply state to write batch, error: {:?}",
self.tag, e
);
});
}
fn handle_raft_entry_normal<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
entry: &Entry,
) -> ApplyResult<EK::Snapshot> {
fail_point!("yield_apply_1000", self.region_id() == 1000, |_| {
ApplyResult::Yield
});
let index = entry.get_index();
let term = entry.get_term();
let data = entry.get_data();
if !data.is_empty() {
let cmd = util::parse_data_at(data, index, &self.tag);
if should_write_to_engine(&cmd) || apply_ctx.kv_wb().should_write_to_engine() {
apply_ctx.commit(self);
if let Some(start) = self.handle_start.as_ref() {
if start.elapsed() >= apply_ctx.yield_duration {
return ApplyResult::Yield;
}
}
}
return self.process_raft_cmd(apply_ctx, index, term, cmd);
}
// TOOD(cdc): should we observe empty cmd, aka leader change?
self.apply_state.set_applied_index(index);
self.applied_index_term = term;
assert!(term > 0);
// 1. When a peer become leader, it will send an empty entry.
// 2. When a leader tries to read index during transferring leader,
// it will also propose an empty entry. But that entry will not contain
// any associated callback. So no need to clear callback.
while let Some(mut cmd) = self.pending_cmds.pop_normal(std::u64::MAX, term - 1) {
apply_ctx.cbs.last_mut().unwrap().push(
cmd.cb.take(),
Cmd::new(
cmd.index,
RaftCmdRequest::default(),
cmd_resp::err_resp(Error::StaleCommand, term),
),
);
}
ApplyResult::None
}
fn handle_raft_entry_conf_change<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
entry: &Entry,
) -> ApplyResult<EK::Snapshot> {
// Although conf change can't yield in normal case, it is convenient to
// simulate yield before applying a conf change log.
fail_point!("yield_apply_conf_change_3", self.id() == 3, |_| {
ApplyResult::Yield
});
let (index, term) = (entry.get_index(), entry.get_term());
let conf_change: ConfChangeV2 = match entry.get_entry_type() {
EntryType::EntryConfChange => {
let conf_change: ConfChange =
util::parse_data_at(entry.get_data(), index, &self.tag);
conf_change.into_v2()
}
EntryType::EntryConfChangeV2 => util::parse_data_at(entry.get_data(), index, &self.tag),
_ => unreachable!(),
};
let cmd = util::parse_data_at(conf_change.get_context(), index, &self.tag);
match self.process_raft_cmd(apply_ctx, index, term, cmd) {
ApplyResult::None => {
// If failed, tell Raft that the `ConfChange` was aborted.
ApplyResult::Res(ExecResult::ChangePeer(Default::default()))
}
ApplyResult::Res(mut res) => {
if let ExecResult::ChangePeer(ref mut cp) = res {
cp.conf_change = conf_change;
} else {
panic!(
"{} unexpected result {:?} for conf change {:?} at {}",
self.tag, res, conf_change, index