Skip to content

Commit

Permalink
Merge branch 'master' into pitr-config-update
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau committed Mar 17, 2023
2 parents e0054f1 + fd2db9a commit 5dae2d9
Show file tree
Hide file tree
Showing 22 changed files with 484 additions and 401 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -245,7 +245,8 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
if entry.get_data().is_empty() {
continue;
}
let cmd: RaftCmdRequest = util::parse_data_at(entry.get_data(), entry.get_index());
let cmd: RaftCmdRequest =
util::parse_data_at(entry.get_data(), entry.get_index(), "tag");
if !cmd.has_admin_request() {
continue;
}
Expand Down
20 changes: 19 additions & 1 deletion components/raftstore-v2/src/operation/command/admin/split.rs
Expand Up @@ -391,6 +391,11 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
req: &AdminRequest,
log_index: u64,
) -> Result<(AdminResponse, AdminCmdResult)> {
fail_point!(
"on_apply_batch_split",
self.peer().get_store_id() == 3,
|_| { unreachable!() }
);
PEER_ADMIN_CMD_COUNTER.batch_split.all.inc();

let region = self.region();
Expand Down Expand Up @@ -692,7 +697,20 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
mut split_init: Box<SplitInit>,
) {
let region_id = split_init.region.id;
if self.storage().is_initialized() && self.persisted_index() >= RAFT_INIT_LOG_INDEX {
let peer_id = split_init
.region
.get_peers()
.iter()
.find(|p| p.get_store_id() == self.peer().get_store_id())
.unwrap()
.get_id();

// If peer_id in `split_init` is less than the current peer_id, the conf change
// for the peer should have occurred and we should just report finish to
// the source region of this out of dated peer initialization.
if self.storage().is_initialized() && self.persisted_index() >= RAFT_INIT_LOG_INDEX
|| peer_id < self.peer().get_id()
{
// Race with split operation. The tablet created by split will eventually be
// deleted. We don't trim it.
report_split_init_finish(store_ctx, split_init.derived_region_id, region_id, true);
Expand Down
21 changes: 17 additions & 4 deletions components/raftstore-v2/src/operation/ready/mod.rs
Expand Up @@ -255,11 +255,24 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
let pre_committed_index = self.raft_group().raft.raft_log.committed;
if msg.get_message().get_msg_type() == MessageType::MsgTransferLeader {
self.on_transfer_leader_msg(ctx, msg.get_message(), msg.disk_usage)
} else if let Err(e) = self.raft_group_mut().step(msg.take_message()) {
error!(self.logger, "raft step error"; "err" => ?e);
} else {
let committed_index = self.raft_group().raft.raft_log.committed;
self.report_commit_log_duration(ctx, pre_committed_index, committed_index);
// This can be a message that sent when it's still a follower. Nevertheleast,
// it's meaningless to continue to handle the request as callbacks are cleared.
if msg.get_message().get_msg_type() == MessageType::MsgReadIndex
&& self.is_leader()
&& (msg.get_message().get_from() == raft::INVALID_ID
|| msg.get_message().get_from() == self.peer_id())
{
ctx.raft_metrics.message_dropped.stale_msg.inc();
return;
}

if let Err(e) = self.raft_group_mut().step(msg.take_message()) {
error!(self.logger, "raft step error"; "err" => ?e);
} else {
let committed_index = self.raft_group().raft.raft_log.committed;
self.report_commit_log_duration(ctx, pre_committed_index, committed_index);
}
}

self.set_has_ready();
Expand Down
32 changes: 11 additions & 21 deletions components/raftstore/src/store/entry_storage.rs
Expand Up @@ -30,11 +30,7 @@ use super::{
metrics::*, peer_storage::storage_error, WriteTask, MEMTRACE_ENTRY_CACHE, RAFT_INIT_LOG_INDEX,
RAFT_INIT_LOG_TERM,
};
use crate::{
bytes_capacity,
store::{util::ParsedEntry, ReadTask},
Result,
};
use crate::{bytes_capacity, store::ReadTask, Result};

const MAX_ASYNC_FETCH_TRY_CNT: usize = 3;
const SHRINK_CACHE_CAPACITY: usize = 64;
Expand All @@ -58,7 +54,7 @@ pub fn last_index(state: &RaftLocalState) -> u64 {
pub struct CachedEntries {
pub range: Range<u64>,
// Entries and dangle size for them. `dangle` means not in entry cache.
entries: Arc<Mutex<(Vec<ParsedEntry>, usize)>>,
entries: Arc<Mutex<(Vec<Entry>, usize)>>,
}

impl CachedEntries {
Expand All @@ -68,24 +64,21 @@ impl CachedEntries {
let end = entries.last().map(|x| x.index).unwrap() + 1;
let range = Range { start, end };
CachedEntries {
entries: Arc::new(Mutex::new((
entries.into_iter().map(|e| ParsedEntry::new(e)).collect(),
0,
))),
entries: Arc::new(Mutex::new((entries, 0))),
range,
}
}

pub fn iter_entries_mut(&self, mut f: impl FnMut(&mut ParsedEntry)) {
let mut entries = self.entries.lock().unwrap();
for entry in &mut entries.0 {
pub fn iter_entries(&self, mut f: impl FnMut(&Entry)) {
let entries = self.entries.lock().unwrap();
for entry in &entries.0 {
f(entry);
}
}

/// Take cached entries and dangle size for them. `dangle` means not in
/// entry cache.
pub fn take_entries(&self) -> (Vec<ParsedEntry>, usize) {
pub fn take_entries(&self) -> (Vec<Entry>, usize) {
mem::take(&mut *self.entries.lock().unwrap())
}
}
Expand Down Expand Up @@ -332,19 +325,16 @@ impl EntryCache {
let dangle_size = {
let mut guard = entries.entries.lock().unwrap();

let last_idx = guard.0.last().map(|e| e.get_index()).unwrap();
let cache_front = match self.cache.front().map(|e| e.get_index()) {
let last_idx = guard.0.last().map(|e| e.index).unwrap();
let cache_front = match self.cache.front().map(|e| e.index) {
Some(i) => i,
None => u64::MAX,
};

let dangle_range = if last_idx < cache_front {
// All entries are not in entry cache.
0..guard.0.len()
} else if let Ok(i) = guard
.0
.binary_search_by(|e| e.get_index().cmp(&cache_front))
{
} else if let Ok(i) = guard.0.binary_search_by(|e| e.index.cmp(&cache_front)) {
// Some entries are in entry cache.
0..i
} else {
Expand All @@ -354,7 +344,7 @@ impl EntryCache {

let mut size = 0;
for e in &guard.0[dangle_range] {
size += e.bytes_capacity();
size += bytes_capacity(&e.data) + bytes_capacity(&e.context);
}
guard.1 = size;
size
Expand Down

0 comments on commit 5dae2d9

Please sign in to comment.