Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft_engine: remove redundant methods #13900

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 4 additions & 16 deletions components/engine_panic/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,6 @@ impl RaftEngine for PanicEngine {
panic!()
}

fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
panic!()
}

fn put_raft_state(&self, raft_group_id: u64, state: &RaftLocalState) -> Result<()> {
panic!()
}

fn gc(&self, raft_group_id: u64, mut from: u64, to: u64) -> Result<usize> {
panic!()
}
Expand Down Expand Up @@ -148,21 +140,13 @@ impl RaftEngine for PanicEngine {
panic!()
}

fn put_store_ident(&self, ident: &StoreIdent) -> Result<()> {
panic!()
}

fn for_each_raft_group<E, F>(&self, f: &mut F) -> std::result::Result<(), E>
where
F: FnMut(u64) -> std::result::Result<(), E>,
E: From<Error>,
{
panic!()
}

fn put_recover_state(&self, state: &StoreRecoverState) -> Result<()> {
panic!()
}
}

impl RaftLogBatch for PanicWriteBatch {
Expand Down Expand Up @@ -209,4 +193,8 @@ impl RaftLogBatch for PanicWriteBatch {
fn put_apply_state(&mut self, raft_group_id: u64, state: &RaftApplyState) -> Result<()> {
panic!()
}

fn put_recover_state(&mut self, state: &StoreRecoverState) -> Result<()> {
panic!()
}
}
27 changes: 6 additions & 21 deletions components/engine_rocks/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// #[PerformanceCriticalPath]
use engine_traits::{
Error, Iterable, KvEngine, MiscExt, Mutable, Peekable, RaftEngine, RaftEngineDebug,
RaftEngineReadOnly, RaftLogBatch, RaftLogGcTask, Result, SyncMutable, WriteBatch,
WriteBatchExt, WriteOptions, CF_DEFAULT, RAFT_LOG_MULTI_GET_CNT,
RaftEngineReadOnly, RaftLogBatch, RaftLogGcTask, Result, WriteBatch, WriteBatchExt,
WriteOptions, CF_DEFAULT, RAFT_LOG_MULTI_GET_CNT,
};
use kvproto::{
metapb::Region,
Expand Down Expand Up @@ -286,17 +286,6 @@ impl RaftEngine for RocksEngine {
Ok(())
}

fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
let mut wb = self.write_batch();
let buf = Vec::with_capacity(1024);
wb.append_impl(raft_group_id, &entries, buf)?;
self.consume(&mut wb, false)
}

fn put_raft_state(&self, raft_group_id: u64, state: &RaftLocalState) -> Result<()> {
self.put_msg(&keys::raft_state_key(raft_group_id), state)
}

fn batch_gc(&self, groups: Vec<RaftLogGcTask>) -> Result<usize> {
let mut total = 0;
let mut raft_wb = self.write_batch_with_cap(4 * 1024);
Expand Down Expand Up @@ -343,10 +332,6 @@ impl RaftEngine for RocksEngine {
self.as_inner().path()
}

fn put_store_ident(&self, ident: &StoreIdent) -> Result<()> {
self.put_msg(keys::STORE_IDENT_KEY, ident)
}

fn for_each_raft_group<E, F>(&self, f: &mut F) -> std::result::Result<(), E>
where
F: FnMut(u64) -> std::result::Result<(), E>,
Expand Down Expand Up @@ -374,10 +359,6 @@ impl RaftEngine for RocksEngine {
Some(e) => Err(e),
}
}

fn put_recover_state(&self, state: &StoreRecoverState) -> Result<()> {
self.put_msg(keys::RECOVER_STATE_KEY, state)
}
}

impl RaftLogBatch for RocksWriteBatchVec {
Expand Down Expand Up @@ -431,6 +412,10 @@ impl RaftLogBatch for RocksWriteBatchVec {
fn put_apply_state(&mut self, raft_group_id: u64, state: &RaftApplyState) -> Result<()> {
self.put_msg(&keys::apply_state_key(raft_group_id), state)
}

fn put_recover_state(&mut self, state: &StoreRecoverState) -> Result<()> {
self.put_msg(keys::RECOVER_STATE_KEY, state)
}
}

impl RocksWriteBatchVec {
Expand Down
21 changes: 6 additions & 15 deletions components/engine_traits/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@ pub trait RaftEngine: RaftEngineReadOnly + PerfContextExt + Clone + Sync + Send
batch: &mut Self::LogBatch,
) -> Result<()>;

/// Append some log entries and return written bytes.
///
/// Note: `RaftLocalState` won't be updated in this call.
fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize>;

fn put_store_ident(&self, ident: &StoreIdent) -> Result<()>;

fn put_raft_state(&self, raft_group_id: u64, state: &RaftLocalState) -> Result<()>;

/// Like `cut_logs` but the range could be very large. Return the deleted
/// count. Generally, `from` can be passed in `0`.
fn gc(&self, raft_group_id: u64, from: u64, to: u64) -> Result<usize>;
Expand Down Expand Up @@ -151,12 +142,6 @@ pub trait RaftEngine: RaftEngineReadOnly + PerfContextExt + Clone + Sync + Send
where
F: FnMut(u64) -> std::result::Result<(), E>,
E: From<Error>;

/// Indicate whether region states should be recovered from raftdb and
/// replay raft logs.
/// When kvdb's write-ahead-log is disabled, the sequence number of the last
/// boot time is saved.
fn put_recover_state(&self, state: &StoreRecoverState) -> Result<()>;
}

pub trait RaftLogBatch: Send {
Expand All @@ -175,6 +160,12 @@ pub trait RaftLogBatch: Send {
fn put_region_state(&mut self, raft_group_id: u64, state: &RegionLocalState) -> Result<()>;
fn put_apply_state(&mut self, raft_group_id: u64, state: &RaftApplyState) -> Result<()>;

/// Indicate whether region states should be recovered from raftdb and
/// replay raft logs.
/// When kvdb's write-ahead-log is disabled, the sequence number of the last
/// boot time is saved.
fn put_recover_state(&mut self, state: &StoreRecoverState) -> Result<()>;

/// The data size of this RaftLogBatch.
fn persist_size(&self) -> usize;

Expand Down
36 changes: 6 additions & 30 deletions components/raft_log_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ impl RaftLogBatchTrait for RaftLogBatch {
.put_message(raft_group_id, APPLY_STATE_KEY.to_vec(), state)
.map_err(transfer_error)
}

fn put_recover_state(&mut self, state: &StoreRecoverState) -> Result<()> {
self.0
.put_message(STORE_STATE_ID, RECOVER_STATE_KEY.to_vec(), state)
.map_err(transfer_error)
}
}

impl RaftEngineReadOnly for RaftLogEngine {
Expand Down Expand Up @@ -538,26 +544,6 @@ impl RaftEngine for RaftLogEngine {
Ok(())
}

fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
let mut batch = Self::LogBatch::default();
batch.append(raft_group_id, entries)?;
self.consume(&mut batch, false)
}

fn put_store_ident(&self, ident: &StoreIdent) -> Result<()> {
let mut batch = Self::LogBatch::default();
batch.put_store_ident(ident)?;
self.consume(&mut batch, true)?;
Ok(())
}

fn put_raft_state(&self, raft_group_id: u64, state: &RaftLocalState) -> Result<()> {
let mut batch = Self::LogBatch::default();
batch.put_raft_state(raft_group_id, state)?;
self.consume(&mut batch, false)?;
Ok(())
}

fn gc(&self, raft_group_id: u64, from: u64, to: u64) -> Result<usize> {
self.batch_gc(vec![RaftLogGcTask {
raft_group_id,
Expand Down Expand Up @@ -626,16 +612,6 @@ impl RaftEngine for RaftLogEngine {
}
Ok(())
}

fn put_recover_state(&self, state: &StoreRecoverState) -> Result<()> {
let mut batch = Self::LogBatch::default();
batch
.0
.put_message(STORE_STATE_ID, RECOVER_STATE_KEY.to_vec(), state)
.map_err(transfer_error)?;
self.0.write(&mut batch.0, true).map_err(transfer_error)?;
Ok(())
}
}

fn transfer_error(e: RaftEngineError) -> engine_traits::Error {
Expand Down
5 changes: 3 additions & 2 deletions components/raftstore-v2/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ impl<'a, ER: RaftEngine> Bootstrap<'a, ER> {
let mut ident = StoreIdent::default();
ident.set_cluster_id(self.cluster_id);
ident.set_store_id(id);
self.engine.put_store_ident(&ident)?;
self.engine.sync()?;
let mut lb = self.engine.log_batch(1);
lb.put_store_ident(&ident)?;
self.engine.consume(&mut lb, true)?;
fail_point!("node_after_bootstrap_store", |_| Err(box_err!(
"injected error: node_after_bootstrap_store"
)));
Expand Down
41 changes: 24 additions & 17 deletions components/raftstore/src/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ fn init_raft_state<EK: KvEngine, ER: RaftEngine>(
raft_state.last_index = RAFT_INIT_LOG_INDEX;
raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM);
raft_state.mut_hard_state().set_commit(RAFT_INIT_LOG_INDEX);
engines.raft.put_raft_state(region.get_id(), &raft_state)?;
let mut lb = engines.raft.log_batch(0);
lb.put_raft_state(region.get_id(), &raft_state)?;
engines.raft.consume(&mut lb, true)?;
}
Ok(raft_state)
}
Expand Down Expand Up @@ -2077,32 +2079,35 @@ pub mod tests {
let initial_state = s.initial_state().unwrap();
assert_eq!(initial_state.hard_state, *raft_state.get_hard_state());

let mut lb = engines.raft.log_batch(4096);
// last_index < commit_index is invalid.
raft_state.set_last_index(11);
engines
.raft
.append(1, vec![new_entry(11, RAFT_INIT_LOG_TERM)])
lb.append(1, vec![new_entry(11, RAFT_INIT_LOG_TERM)])
.unwrap();
raft_state.mut_hard_state().set_commit(12);
engines.raft.put_raft_state(1, &raft_state).unwrap();
lb.put_raft_state(1, &raft_state).unwrap();
engines.raft.consume(&mut lb, false).unwrap();
assert!(build_storage().is_err());

raft_state.set_last_index(20);
let entries = (12..=20)
.map(|index| new_entry(index, RAFT_INIT_LOG_TERM))
.collect();
engines.raft.append(1, entries).unwrap();
engines.raft.put_raft_state(1, &raft_state).unwrap();
lb.append(1, entries).unwrap();
lb.put_raft_state(1, &raft_state).unwrap();
engines.raft.consume(&mut lb, false).unwrap();
s = build_storage().unwrap();
let initial_state = s.initial_state().unwrap();
assert_eq!(initial_state.hard_state, *raft_state.get_hard_state());

// Missing last log is invalid.
raft_state.set_last_index(21);
engines.raft.put_raft_state(1, &raft_state).unwrap();
lb.put_raft_state(1, &raft_state).unwrap();
engines.raft.consume(&mut lb, false).unwrap();
assert!(build_storage().is_err());
raft_state.set_last_index(20);
engines.raft.put_raft_state(1, &raft_state).unwrap();
lb.put_raft_state(1, &raft_state).unwrap();
engines.raft.consume(&mut lb, false).unwrap();

// applied_index > commit_index is invalid.
let mut apply_state = RaftApplyState::default();
Expand Down Expand Up @@ -2132,7 +2137,8 @@ pub mod tests {
.map(|index| new_entry(index, RAFT_INIT_LOG_TERM))
.collect();
engines.raft.gc(1, 0, 21).unwrap();
engines.raft.append(1, entries).unwrap();
lb.append(1, entries).unwrap();
engines.raft.consume(&mut lb, false).unwrap();
raft_state.mut_hard_state().set_commit(14);
s = build_storage().unwrap();
let initial_state = s.initial_state().unwrap();
Expand All @@ -2143,27 +2149,28 @@ pub mod tests {
.map(|index| new_entry(index, RAFT_INIT_LOG_TERM))
.collect();
entries[0].set_term(RAFT_INIT_LOG_TERM - 1);
engines.raft.append(1, entries).unwrap();
lb.append(1, entries).unwrap();
engines.raft.consume(&mut lb, false).unwrap();
assert!(build_storage().is_err());

// hard state term miss match is invalid.
let entries = (14..=20)
.map(|index| new_entry(index, RAFT_INIT_LOG_TERM))
.collect();
engines.raft.append(1, entries).unwrap();
lb.append(1, entries).unwrap();
raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM - 1);
engines.raft.put_raft_state(1, &raft_state).unwrap();
lb.put_raft_state(1, &raft_state).unwrap();
engines.raft.consume(&mut lb, false).unwrap();
assert!(build_storage().is_err());

// last index < recorded_commit_index is invalid.
engines.raft.gc(1, 0, 21).unwrap();
raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM);
raft_state.set_last_index(13);
engines
.raft
.append(1, vec![new_entry(13, RAFT_INIT_LOG_TERM)])
lb.append(1, vec![new_entry(13, RAFT_INIT_LOG_TERM)])
.unwrap();
engines.raft.put_raft_state(1, &raft_state).unwrap();
lb.put_raft_state(1, &raft_state).unwrap();
engines.raft.consume(&mut lb, false).unwrap();
assert!(build_storage().is_err());
}

Expand Down
9 changes: 6 additions & 3 deletions components/raftstore/src/store/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2006,8 +2006,9 @@ pub mod tests {
raft::RaftTestEngine,
};
use engine_traits::{
Engines, ExternalSstFileInfo, KvEngine, RaftEngine, Snapshot as EngineSnapshot, SstExt,
SstWriter, SstWriterBuilder, SyncMutable, ALL_CFS, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
Engines, ExternalSstFileInfo, KvEngine, RaftEngine, RaftLogBatch,
Snapshot as EngineSnapshot, SstExt, SstWriter, SstWriterBuilder, SyncMutable, ALL_CFS,
CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use kvproto::{
encryptionpb::EncryptionMethod,
Expand Down Expand Up @@ -2114,6 +2115,7 @@ pub mod tests {
let kv: KvTestEngine = open_test_db(p.join("kv").as_path(), kv_db_opt, kv_cf_opts)?;
let raft: RaftTestEngine =
engine_test::raft::new_engine(p.join("raft").to_str().unwrap(), raft_db_opt)?;
let mut lb = raft.log_batch(regions.len() * 128);
for &region_id in regions {
// Put apply state into kv engine.
let mut apply_state = RaftApplyState::default();
Expand All @@ -2123,14 +2125,15 @@ pub mod tests {
apply_entry.set_term(0);
apply_state.mut_truncated_state().set_index(10);
kv.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?;
raft.append(region_id, vec![apply_entry])?;
lb.append(region_id, vec![apply_entry])?;

// Put region info into kv engine.
let region = gen_test_region(region_id, 1, 1);
let mut region_state = RegionLocalState::default();
region_state.set_region(region);
kv.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &region_state)?;
}
raft.consume(&mut lb, false).unwrap();
Ok(Engines::new(kv, raft))
}

Expand Down
7 changes: 5 additions & 2 deletions components/server/src/raft_engine_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ fn run_dump_raft_engine_worker(
new_engine: &RocksEngine,
count_size: &Arc<AtomicUsize>,
) {
let mut batch = new_engine.log_batch(0);
while let Ok(id) = rx.recv() {
let state = old_engine.get_raft_state(id).unwrap().unwrap();
new_engine.put_raft_state(id, &state).unwrap();
batch.put_raft_state(id, &state).unwrap();
if let Some(last_index) = old_engine.last_index(id) {
let mut batch = new_engine.log_batch(0);
let mut begin = old_engine.first_index(id).unwrap();
while begin <= last_index {
let end = std::cmp::min(begin + 1024, last_index + 1);
Expand All @@ -210,6 +210,9 @@ fn run_dump_raft_engine_worker(
count_size.fetch_add(size, Ordering::Relaxed);
}
}
if !batch.is_empty() {
new_engine.consume(&mut batch, false).unwrap();
}
}
}

Expand Down