Skip to content

Commit

Permalink
Merge branch 'master' into introduce-apply-trace
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Dec 17, 2022
2 parents 261c162 + 87b0eaf commit 561b5ef
Show file tree
Hide file tree
Showing 31 changed files with 416 additions and 31 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tikv"
version = "6.5.0-alpha"
version = "6.6.0-alpha"
authors = ["The TiKV Authors"]
description = "A distributed transactional key-value database powered by Rust and Raft"
license = "Apache-2.0"
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fail = "0.5"
file_system = { workspace = true }
fs2 = "0.4"
futures = { version = "0.3", features = ["compat"] }
itertools = "0.10"
keys = { workspace = true }
kvproto = { workspace = true }
log_wrappers = { workspace = true }
Expand Down
100 changes: 97 additions & 3 deletions components/raftstore-v2/src/operation/command/admin/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use collections::HashSet;
use crossbeam::channel::SendError;
use engine_traits::{Checkpointer, KvEngine, RaftEngine, RaftLogBatch, TabletContext};
use fail::fail_point;
use itertools::Itertools;
use kvproto::{
metapb::{self, Region},
raft_cmdpb::{AdminRequest, AdminResponse, RaftCmdRequest, SplitRequest},
Expand All @@ -39,6 +40,7 @@ use kvproto::{
use protobuf::Message;
use raft::{prelude::Snapshot, INVALID_ID};
use raftstore::{
coprocessor::split_observer::{is_valid_split_key, strip_timestamp_if_exists},
store::{
fsm::apply::validate_batch_split,
metrics::PEER_ADMIN_CMD_COUNTER,
Expand All @@ -48,7 +50,8 @@ use raftstore::{
},
Result,
};
use slog::info;
use slog::{error, info, warn, Logger};
use tikv_util::box_err;

use crate::{
batch::StoreContext,
Expand Down Expand Up @@ -99,13 +102,66 @@ impl SplitInit {
}
}

// validate split request and strip ts from split keys if needed
fn pre_propose_split(logger: &Logger, req: &mut AdminRequest, region: &Region) -> Result<()> {
if !req.has_splits() {
return Err(box_err!(
"cmd_type is BatchSplit but it doesn't have splits request, message maybe \
corrupted!"
.to_owned()
));
}

let mut requests: Vec<SplitRequest> = req.mut_splits().take_requests().into();
let ajusted_splits = std::mem::take(&mut requests)
.into_iter()
.enumerate()
.filter_map(|(i, mut split)| {
let key = split.take_split_key();
let key = strip_timestamp_if_exists(key);
if is_valid_split_key(&key, i, region) {
split.split_key = key;
Some(split)
} else {
None
}
})
.coalesce(|prev, curr| {
// Make sure that the split keys are sorted and unique.
if prev.split_key < curr.split_key {
Err((prev, curr))
} else {
warn!(
logger,
"skip invalid split key: key should not be larger than the previous.";
"key" => log_wrappers::Value::key(&curr.split_key),
"previous" => log_wrappers::Value::key(&prev.split_key),
);
Ok(prev)
}
})
.collect::<Vec<_>>();

if ajusted_splits.is_empty() {
error!(
logger,
"failed to handle split req, no valid key found for split";
);
Err(box_err!("no valid key found for split.".to_owned()))
} else {
// Rewrite the splits.
req.mut_splits().set_requests(ajusted_splits.into());
Ok(())
}
}

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn propose_split<T>(
&mut self,
store_ctx: &mut StoreContext<EK, ER, T>,
req: RaftCmdRequest,
mut req: RaftCmdRequest,
) -> Result<u64> {
validate_batch_split(req.get_admin_request(), self.region())?;
pre_propose_split(&self.logger, req.mut_admin_request(), self.region())?;
// We rely on ConflictChecker to detect conflicts, so no need to set proposal
// context.
let data = req.write_to_bytes().unwrap();
Expand Down Expand Up @@ -509,6 +565,7 @@ mod test {
store::{new_learner_peer, new_peer},
worker::dummy_scheduler,
};
use txn_types::Key;

use super::*;
use crate::{fsm::ApplyResReporter, raft::Apply, router::ApplyRes};
Expand Down Expand Up @@ -612,6 +669,43 @@ mod test {
}
}

#[test]
fn test_propose() {
let logger = slog_global::borrow_global().new(o!());

let mut region = Region::default();
region.set_end_key(b"k10".to_vec());

let mut req = AdminRequest::default();
let err = pre_propose_split(&logger, &mut req, &region).unwrap_err();
assert!(
err.to_string()
.contains("cmd_type is BatchSplit but it doesn't have splits")
);

let mut splits = BatchSplitRequest::default();
req.set_splits(splits.clone());
let err = pre_propose_split(&logger, &mut req, &region).unwrap_err();
assert!(err.to_string().contains("no valid key found"));

splits.mut_requests().push(new_split_req(b"", 0, vec![]));
splits.mut_requests().push(new_split_req(b"k03", 0, vec![]));
splits.mut_requests().push(new_split_req(b"k02", 0, vec![]));
splits.mut_requests().push(new_split_req(b"k11", 0, vec![]));
let split_key = Key::from_raw(b"k06");
let split_key_with_ts = split_key.clone().append_ts(10.into());
splits
.mut_requests()
.push(new_split_req(split_key_with_ts.as_encoded(), 0, vec![]));

req.set_splits(splits);
pre_propose_split(&logger, &mut req, &region).unwrap();
let split_reqs = req.get_splits().get_requests();
assert_eq!(split_reqs.len(), 2);
assert_eq!(split_reqs[0].get_split_key(), b"k03");
assert_eq!(split_reqs[1].get_split_key(), split_key.as_encoded());
}

#[test]
fn test_split() {
let store_id = 2;
Expand Down
12 changes: 7 additions & 5 deletions src/server/lock_manager/waiter_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ impl WaitTable {
Some(waiter)
}

fn update_waiter(&mut self, update_event: &UpdateWaitForEvent) -> Option<KeyLockWaitInfo> {
fn update_waiter(
&mut self,
update_event: &UpdateWaitForEvent,
) -> Option<(KeyLockWaitInfo, DiagnosticContext)> {
let waiter = self.waiter_pool.get_mut(&update_event.token)?;

assert_eq!(waiter.wait_info.key, update_event.wait_info.key);
Expand All @@ -351,9 +354,8 @@ impl WaitTable {
}

let result = std::mem::replace(&mut waiter.wait_info, update_event.wait_info.clone());
waiter.diag_ctx = update_event.diag_ctx.clone();

Some(result)
Some((result, waiter.diag_ctx.clone()))
}

fn take_waiter_by_lock_digest(
Expand Down Expand Up @@ -542,11 +544,11 @@ impl WaiterManager {
continue;
}

if let Some(previous_wait_info) = previous_wait_info {
if let Some((previous_wait_info, diag_ctx)) = previous_wait_info {
self.detector_scheduler
.clean_up_wait_for(event.start_ts, previous_wait_info);
self.detector_scheduler
.detect(event.start_ts, event.wait_info, event.diag_ctx);
.detect(event.start_ts, event.wait_info, diag_ctx);
}
}
}
Expand Down
72 changes: 71 additions & 1 deletion src/storage/lock_manager/lock_waiting_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use txn_types::{Key, TimeStamp};
use crate::storage::{
lock_manager::{
lock_wait_context::{LockWaitContextSharedState, PessimisticLockKeyCallback},
LockManager, LockWaitToken,
KeyLockWaitInfo, LockDigest, LockManager, LockWaitToken, UpdateWaitForEvent,
},
metrics::*,
mvcc::{Error as MvccError, ErrorInner as MvccErrorInner},
Expand Down Expand Up @@ -599,6 +599,36 @@ impl<L: LockManager> LockWaitQueues<L> {
result
}

pub fn update_lock_wait(&self, lock_info: Vec<kvrpcpb::LockInfo>) {
let mut update_wait_for_events = vec![];
for lock_info in lock_info {
let key = Key::from_raw(lock_info.get_key());
if let Some(mut key_state) = self.inner.queue_map.get_mut(&key) {
key_state.current_lock = lock_info;
update_wait_for_events.reserve(key_state.queue.len());
for (&token, entry) in key_state.queue.iter() {
let event = UpdateWaitForEvent {
token,
start_ts: entry.parameters.start_ts,
is_first_lock: entry.parameters.is_first_lock,
wait_info: KeyLockWaitInfo {
key: key.clone(),
lock_digest: LockDigest {
ts: key_state.current_lock.lock_version.into(),
hash: entry.lock_hash,
},
lock_info: key_state.current_lock.clone(),
},
};
update_wait_for_events.push(event);
}
}
}
if !update_wait_for_events.is_empty() {
self.inner.lock_mgr.update_wait_for(update_wait_for_events);
}
}

/// Gets the count of entries currently waiting in queues.
///
/// Mind that the contents of the queues may be changed concurrently.
Expand Down Expand Up @@ -1205,4 +1235,44 @@ mod tests {
queues.must_not_contain_key(b"k1");
assert_eq!(queues.entry_count(), 0);
}

#[bench]
fn bench_update_lock_wait_empty(b: &mut test::Bencher) {
let queues = LockWaitQueues::new(MockLockManager::new());
queues.mock_lock_wait(b"k1", 5, 6, false);

let mut lock_info = kvrpcpb::LockInfo::default();
let key = b"t\x00\x00\x00\x00\x00\x00\x00\x01_r\x00\x00\x00\x00\x00\x00\x00\x01";
lock_info.set_key(key.to_vec());
lock_info.set_primary_lock(key.to_vec());
lock_info.set_lock_version(10);
lock_info.set_lock_for_update_ts(10);
let lock_info = vec![lock_info];

b.iter(|| {
queues.update_lock_wait(lock_info.clone());
});
}

#[bench]
fn bench_update_lock_wait_queue_len_512(b: &mut test::Bencher) {
let queues = LockWaitQueues::new(MockLockManager::new());

let key = b"t\x00\x00\x00\x00\x00\x00\x00\x01_r\x00\x00\x00\x00\x00\x00\x00\x01";

for i in 0..512 {
queues.mock_lock_wait(key, 15 + i, 10, true);
}

let mut lock_info = kvrpcpb::LockInfo::default();
lock_info.set_key(key.to_vec());
lock_info.set_primary_lock(key.to_vec());
lock_info.set_lock_version(10);
lock_info.set_lock_for_update_ts(10);
let lock_info = vec![lock_info];

b.iter(|| {
queues.update_lock_wait(lock_info.clone());
});
}
}
1 change: 0 additions & 1 deletion src/storage/lock_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ pub struct UpdateWaitForEvent {
pub start_ts: TimeStamp,
pub is_first_lock: bool,
pub wait_info: KeyLockWaitInfo,
pub diag_ctx: DiagnosticContext,
}

/// `LockManager` manages transactions waiting for locks held by other
Expand Down

0 comments on commit 561b5ef

Please sign in to comment.