Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: Implement coprocessor observer pre_persist #12957

Merged
merged 16 commits into from Sep 15, 2022
20 changes: 20 additions & 0 deletions components/raftstore/src/coprocessor/dispatcher.rs
Expand Up @@ -635,6 +635,26 @@ impl<E: KvEngine> CoprocessorHost<E> {
);
}

/// `pre_commit` is called we we want to persist data or meta for a region.
/// For example, in `finish_for` and `commit`,
/// we will separately call `pre_commit` with is_finished = true/false.
/// By returning false, we reject this persistence.
pub fn pre_commit(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better give it a better name without leaking the implementation details. It will be changed in v2.

&self,
region: &Region,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.region_change_observers {
let observer = observer.observer.inner();
if !observer.pre_commit(&mut ctx, is_finished, cmd) {
return false;
}
}
true
}

pub fn on_flush_applied_cmd_batch(
&self,
max_level: ObserveLevel,
Expand Down
11 changes: 11 additions & 0 deletions components/raftstore/src/coprocessor/mod.rs
Expand Up @@ -310,6 +310,17 @@ pub enum RegionChangeEvent {
pub trait RegionChangeObserver: Coprocessor {
/// Hook to call when a region changed on this TiKV
fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {}

/// Should be called everytime before we commit in a WriteBatch in
/// ApplyDelegate. Returns false if we can't commit at this time.
fn pre_commit(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
true
}
}

#[derive(Clone, Debug, Default)]
Expand Down
58 changes: 52 additions & 6 deletions components/raftstore/src/store/fsm/apply.rs
Expand Up @@ -584,10 +584,17 @@ where
delegate: &mut ApplyDelegate<EK>,
results: VecDeque<ExecResult<EK::Snapshot>>,
) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
if self.host.pre_commit(&delegate.region, true, None) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, false);
} else {
debug!("do not persist when finish_for";
"region" => ?delegate.region,
"tag" => &delegate.tag,
);
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
region_id: delegate.region_id(),
apply_state: delegate.apply_state.clone(),
Expand Down Expand Up @@ -1073,8 +1080,9 @@ where
}
let mut has_unflushed_data =
self.last_flush_applied_index != self.apply_state.get_applied_index();
if has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine()
if (has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine())
&& apply_ctx.host.pre_commit(&self.region, false, Some(&cmd))
{
apply_ctx.commit(self);
if let Some(start) = self.handle_start.as_ref() {
Expand Down Expand Up @@ -4972,6 +4980,7 @@ mod tests {
cmd_sink: Option<Arc<Mutex<Sender<CmdBatch>>>>,
filter_compact_log: Arc<AtomicBool>,
filter_consistency_check: Arc<AtomicBool>,
skip_persist_when_pre_commit: Arc<AtomicBool>,
delay_remove_ssts: Arc<AtomicBool>,
last_delete_sst_count: Arc<AtomicU64>,
last_pending_delete_sst_count: Arc<AtomicU64>,
Expand Down Expand Up @@ -5095,6 +5104,17 @@ mod tests {
fn on_applied_current_term(&self, _: raft::StateRole, _: &Region) {}
}

impl RegionChangeObserver for ApplyObserver {
fn pre_commit(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
!self.skip_persist_when_pre_commit.load(Ordering::SeqCst)
}
}

#[test]
fn test_handle_raft_committed_entries() {
let (_path, engine) = create_tmp_engine("test-delegate");
Expand Down Expand Up @@ -5716,6 +5736,8 @@ mod tests {
let obs = ApplyObserver::default();
host.registry
.register_admin_observer(1, BoxAdminObserver::new(obs.clone()));
host.registry
.register_region_change_observer(1, BoxRegionChangeObserver::new(obs.clone()));
host.registry
.register_query_observer(1, BoxQueryObserver::new(obs.clone()));

Expand Down Expand Up @@ -5751,6 +5773,8 @@ mod tests {
reg.region.mut_region_epoch().set_version(3);
router.schedule_task(1, Msg::Registration(reg));

obs.skip_persist_when_pre_commit
.store(true, Ordering::SeqCst);
let mut index_id = 1;
let put_entry = EntryBuilder::new(index_id, 1)
.put(b"k1", b"v1")
Expand All @@ -5759,7 +5783,19 @@ mod tests {
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![put_entry], vec![])));
fetch_apply_res(&rx);
let apply_res = fetch_apply_res(&rx);

// We don't persist at `finish_for`, since we disabled `pre_commit`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index() + 1
);
obs.skip_persist_when_pre_commit
.store(false, Ordering::SeqCst);

// Phase 1: we test if pre_exec will filter execution of commands correctly.
index_id += 1;
Expand All @@ -5781,6 +5817,16 @@ mod tests {
assert_eq!(apply_res.exec_res.len(), 0);
assert_eq!(apply_res.apply_state.get_truncated_state().get_index(), 0);

// We persist at `finish_for`, since we enabled `pre_commit`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index()
);

index_id += 1;
// Don't filter CompactLog
obs.filter_compact_log.store(false, Ordering::SeqCst);
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/node.rs
Expand Up @@ -269,6 +269,7 @@ impl Simulator for NodeCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I find test cases like test_server_huge_snapshot_multi_files may not check expected behavior if enable_multi_snapshot_files is set to false.
These tests are to test if snapshot works fine when we enable multi file snapshot. However, we may not generate multi file snapshot if this switch is closed, so these tests will actually test single file snapshot.

.build(tmp.path().to_str().unwrap());
(snap_mgr, Some(tmp))
} else {
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/server.rs
Expand Up @@ -454,6 +454,7 @@ impl ServerCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
.build(tmp_str);
self.snap_mgrs.insert(node_id, snap_mgr.clone());
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));
Expand Down