Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async-io: make v2 a log batch #13935

Merged
merged 2 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 15 additions & 8 deletions components/raftstore-v2/src/operation/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ use std::cmp;

use batch_system::BasicMailbox;
use crossbeam::channel::{SendError, TrySendError};
use engine_traits::{KvEngine, RaftEngine};
use engine_traits::{KvEngine, RaftEngine, RaftLogBatch};
use kvproto::{
metapb::Region,
raft_serverpb::{PeerState, RaftMessage},
};
use raftstore::store::{util, ExtraStates, WriteTask};
use raftstore::store::{util, WriteTask};
use slog::{debug, error, info, warn};
use tikv_util::store::find_peer;

Expand Down Expand Up @@ -175,7 +175,7 @@ impl Store {
return;
}
let from_epoch = msg.get_region_epoch();
let local_state = match ctx.engine.get_region_state(region_id, 0) {
let local_state = match ctx.engine.get_region_state(region_id, u64::MAX) {
Ok(s) => s,
Err(e) => {
error!(self.logger(), "failed to get region state"; "region_id" => region_id, "err" => ?e);
Expand Down Expand Up @@ -304,13 +304,20 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
Some((f, l)) => Some((cmp::min(first_index, f), cmp::max(last_index, l))),
};
}
let mut extra_states = ExtraStates::new(entry_storage.apply_state().clone());
let raft_engine = self.entry_storage().raft_engine();
let mut region_state = self.storage().region_state().clone();
let region_id = region_state.get_region().get_id();
let lb = write_task
.extra_write
.ensure_v2(|| raft_engine.log_batch(2));
// We only use raft-log-engine for v2, first index is not important.
let raft_state = self.entry_storage().raft_state();
raft_engine.clean(region_id, 0, raft_state, lb).unwrap();
// Write worker will do the clean up when meeting tombstone state.
region_state.set_state(PeerState::Tombstone);
extra_states.set_region_state(region_state);
extra_states.set_raft_state(entry_storage.raft_state().clone());
write_task.extra_write.set_v2(extra_states);
let applied_index = self.entry_storage().applied_index();
lb.put_region_state(region_id, applied_index, &region_state)
.unwrap();
self.destroy_progress_mut().start();
}

Expand All @@ -325,6 +332,6 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
// new peer. Ignore error as it's just a best effort.
let _ = ctx.router.send_raft_message(msg);
}
// TODO: close apply mailbox.
self.clear_apply_scheduler();
}
}
16 changes: 11 additions & 5 deletions components/raftstore-v2/src/operation/ready/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ mod snapshot;

use std::{cmp, time::Instant};

use engine_traits::{KvEngine, RaftEngine};
use engine_traits::{KvEngine, RaftEngine, RaftLogBatch};
use error_code::ErrorCodeExt;
use kvproto::{raft_cmdpb::AdminCmdType, raft_serverpb::RaftMessage};
use protobuf::Message as _;
use raft::{eraftpb, prelude::MessageType, Ready, StateRole, INVALID_ID};
use raftstore::store::{util, ExtraStates, FetchedLogs, ReadProgress, Transport, WriteTask};
use raftstore::store::{util, FetchedLogs, ReadProgress, Transport, WriteTask};
use slog::{debug, error, trace, warn};
use tikv_util::time::{duration_to_sec, monotonic_raw_now};

Expand Down Expand Up @@ -555,9 +555,15 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
write_task.raft_state = Some(entry_storage.raft_state().clone());
}
if !ever_persisted {
let mut extra_states = ExtraStates::new(self.apply_state().clone());
extra_states.set_region_state(self.region_state().clone());
write_task.extra_write.set_v2(extra_states);
let region_id = self.region().get_id();
let raft_engine = self.entry_storage().raft_engine();
let lb = write_task
.extra_write
.ensure_v2(|| raft_engine.log_batch(3));
lb.put_apply_state(region_id, 0, self.apply_state())
.unwrap();
lb.put_region_state(region_id, 0, self.region_state())
.unwrap();
self.set_ever_persisted();
}
}
Expand Down
5 changes: 5 additions & 0 deletions components/raftstore-v2/src/raft/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,11 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
self.apply_scheduler = Some(apply_scheduler);
}

#[inline]
pub fn clear_apply_scheduler(&mut self) {
self.apply_scheduler.take();
}

/// Whether the snapshot is handling.
/// See the comments of `check_snap_status` for more details.
#[inline]
Expand Down
6 changes: 3 additions & 3 deletions components/raftstore-v2/src/raft/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
read_scheduler: Scheduler<ReadTask<EK>>,
logger: &Logger,
) -> Result<Option<Storage<EK, ER>>> {
let region_state = match engine.get_region_state(region_id, 0) {
let region_state = match engine.get_region_state(region_id, u64::MAX) {
Ok(Some(s)) => s,
res => {
return Err(box_err!(
Expand All @@ -180,7 +180,7 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
}
};

let apply_state = match engine.get_apply_state(region_id, 0) {
let apply_state = match engine.get_apply_state(region_id, u64::MAX) {
Ok(Some(s)) => s,
res => {
return Err(box_err!("failed to get apply state: {:?}", res));
Expand Down Expand Up @@ -450,7 +450,7 @@ mod tests {
assert_eq!(hs.get_term(), RAFT_INIT_LOG_TERM);
assert_eq!(hs.get_commit(), RAFT_INIT_LOG_INDEX);

let apply_state = raft_engine.get_apply_state(4, 0).unwrap().unwrap();
let apply_state = raft_engine.get_apply_state(4, u64::MAX).unwrap().unwrap();
assert_eq!(apply_state.get_applied_index(), RAFT_INIT_LOG_INDEX);
let ts = apply_state.get_truncated_state();
assert_eq!(ts.get_index(), RAFT_INIT_LOG_INDEX);
Expand Down
7 changes: 5 additions & 2 deletions components/raftstore-v2/tests/integrations/test_life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ fn assert_tombstone(raft_engine: &impl RaftEngine, region_id: u64, peer: &metapb
raft_engine.get_all_entries_to(region_id, &mut buf).unwrap();
assert!(buf.is_empty(), "{:?}", buf);
assert_matches!(raft_engine.get_raft_state(region_id), Ok(None));
assert_matches!(raft_engine.get_apply_state(region_id, 0), Ok(None));
let region_state = raft_engine.get_region_state(region_id, 0).unwrap().unwrap();
assert_matches!(raft_engine.get_apply_state(region_id, u64::MAX), Ok(None));
let region_state = raft_engine
.get_region_state(region_id, u64::MAX)
.unwrap()
.unwrap();
assert_matches!(region_state.get_state(), PeerState::Tombstone);
assert!(
region_state.get_region().get_peers().contains(peer),
Expand Down