Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Add batch-resumed mode for acquire_pessimistic_lock storage command #13687

Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/storage/lock_manager/lock_wait_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ mod tests {
for_update_ts: 1.into(),
..Default::default()
},
should_not_exist: false,
lock_wait_token: token,
legacy_wake_up_index: None,
key_cb: None,
Expand Down
5 changes: 5 additions & 0 deletions src/storage/lock_manager/lock_waiting_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ pub struct LockWaitEntry {
pub key: Key,
pub lock_hash: u64,
pub parameters: PessimisticLockParameters,
// `parameters` provides parameter for a request, but `should_not_exist` is specified key-wise.
// Put it in a separated field.
pub should_not_exist: bool,
pub lock_wait_token: LockWaitToken,
pub legacy_wake_up_index: Option<usize>,
pub key_cb: Option<SyncWrapper<PessimisticLockKeyCallback>>,
Expand Down Expand Up @@ -687,6 +690,7 @@ mod tests {
min_commit_ts: 0.into(),
check_existence: false,
is_first_lock: false,
lock_only_if_exists: false,
allow_lock_with_conflict: false,
};

Expand All @@ -697,6 +701,7 @@ mod tests {
key,
lock_hash,
parameters,
should_not_exist: false,
lock_wait_token: token,
legacy_wake_up_index: None,
key_cb: Some(SyncWrapper::new(Box::new(move |res| tx.send(res).unwrap()))),
Expand Down
1 change: 1 addition & 0 deletions src/storage/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ make_auto_flush_static_metric! {
batch_get_command,
prewrite,
acquire_pessimistic_lock,
acquire_pessimistic_lock_resumed,
commit,
cleanup,
rollback,
Expand Down
30 changes: 21 additions & 9 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ use tikv_util::{
use tracker::{
clear_tls_tracker_token, set_tls_tracker_token, with_tls_tracker, TrackedFuture, TrackerToken,
};
use txn_types::{Key, KvPair, Lock, LockType, OldValues, TimeStamp, TsSet, Value};
use txn_types::{Key, KvPair, Lock, LockType, TimeStamp, TsSet, Value};

pub use self::{
errors::{get_error_kind_from_header, get_tag_from_header, Error, ErrorHeaderKind, ErrorInner},
Expand Down Expand Up @@ -1416,7 +1416,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
callback: Callback<T>,
) -> Result<()> {
use crate::storage::txn::commands::{
AcquirePessimisticLock, Prewrite, PrewritePessimistic,
AcquirePessimisticLock, AcquirePessimisticLockResumed, Prewrite, PrewritePessimistic,
};

let cmd: Command = cmd.into();
Expand Down Expand Up @@ -1452,6 +1452,18 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
)?;
check_key_size!(keys, self.max_key_size, callback);
}
Command::AcquirePessimisticLockResumed(AcquirePessimisticLockResumed {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
items, ..
}) => {
let keys = items.iter().map(|item| item.key.as_encoded());
Self::check_api_version(
self.api_version,
cmd.ctx().api_version,
CommandKind::acquire_pessimistic_lock_resumed,
keys.clone(),
)?;
check_key_size!(keys, self.max_key_size, callback);
}
_ => {}
}
with_tls_tracker(|tracker| {
Expand Down Expand Up @@ -3341,9 +3353,9 @@ pub mod test_util {
Some(WaitTimeout::Default),
return_values,
for_update_ts.next(),
OldValues::default(),
check_existence,
false,
false,
Context::default(),
)
}
Expand Down Expand Up @@ -8111,7 +8123,7 @@ mod tests {
Some(WaitTimeout::Millis(100)),
false,
21.into(),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand Down Expand Up @@ -8203,7 +8215,7 @@ mod tests {
Some(WaitTimeout::Millis(5000)),
false,
(lock_ts + 1).into(),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand Down Expand Up @@ -8788,7 +8800,7 @@ mod tests {
None,
false,
0.into(),
OldValues::default(),
false,
false,
false,
Default::default(),
Expand All @@ -8811,7 +8823,7 @@ mod tests {
None,
false,
0.into(),
OldValues::default(),
false,
false,
false,
Default::default(),
Expand Down Expand Up @@ -9041,7 +9053,7 @@ mod tests {
None,
false,
TimeStamp::new(12),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand All @@ -9067,7 +9079,7 @@ mod tests {
None,
false,
TimeStamp::new(12),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl MvccTxn {
self.write_size
}

pub fn is_empty(&self) -> bool {
self.modifies.len() == 0 && self.locks_for_1pc.len() == 0
}

pub(crate) fn put_lock(&mut self, key: Key, lock: &Lock) {
let write = Modify::Put(CF_LOCK, key, lock.to_bytes());
self.write_size += write.size();
Expand Down
162 changes: 79 additions & 83 deletions src/storage/txn/commands/acquire_pessimistic_lock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

// #[PerformanceCriticalPath]
use kvproto::kvrpcpb::{ExtraOp, LockInfo};
use kvproto::kvrpcpb::ExtraOp;
use txn_types::{Key, OldValues, TimeStamp, TxnExtra};

use crate::storage::{
Expand All @@ -17,7 +17,7 @@ use crate::storage::{
Error, ErrorInner, Result,
},
types::{PessimisticLockParameters, PessimisticLockResults},
Error as StorageError, ErrorInner as StorageErrorInner, ProcessResult, Result as StorageResult,
Error as StorageError, PessimisticLockKeyResult, ProcessResult, Result as StorageResult,
Snapshot,
};

Expand Down Expand Up @@ -46,9 +46,9 @@ command! {
/// later read in the same transaction.
return_values: bool,
min_commit_ts: TimeStamp,
old_values: OldValues,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removed the field old_values? Do we still support this feature of old_value?
um..., I know little about old_value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the old_value is never assigned as any valid value outside the command, and its only purpose is to hold the results produced during the pessimistic lock command and return it after finishing the command, which seems can be done by a local variable.

check_existence: bool,
lock_only_if_exists: bool,
allow_lock_with_conflict: bool,
}
}

Expand All @@ -69,27 +69,28 @@ impl CommandExt for AcquirePessimisticLock {
gen_lock!(keys: multiple(|x| &x.0));
}

fn extract_lock_info_from_result<T>(res: &StorageResult<T>) -> &LockInfo {
match res {
Err(StorageError(box StorageErrorInner::Txn(Error(box ErrorInner::Mvcc(MvccError(
box MvccErrorInner::KeyIsLocked(info),
)))))) => info,
_ => panic!("unexpected mvcc error"),
}
}

impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for AcquirePessimisticLock {
fn process_write(mut self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
if self.allow_lock_with_conflict && self.keys.len() > 1 {
// Currently multiple keys with `allow_lock_with_conflict` set is not supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support a single request which has multiple keys and allow_lock_with_conflict set finally ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we'll supported it in the future. It depends.

return Err(Error::from(ErrorInner::Other(box_err!(
"multiple keys in a single request with allowed_lock_with_conflict set is not allowed"
))));
}

let (start_ts, ctx, keys) = (self.start_ts, self.ctx, self.keys);
let mut txn = MvccTxn::new(start_ts, context.concurrency_manager);
let mut reader = ReaderWithStats::new(
SnapshotReader::new_with_ctx(start_ts, snapshot, &ctx),
context.statistics,
);

let rows = keys.len();
let mut res = Ok(PessimisticLockResults::with_capacity(rows));
let mut written_rows = 0;
let total_keys = keys.len();
let mut res = PessimisticLockResults::with_capacity(total_keys);
let mut encountered_locks = vec![];
let need_old_value = context.extra_op == ExtraOp::ReadOldValue;
let mut old_values = OldValues::default();
for (k, should_not_exist) in keys {
match acquire_pessimistic_lock(
&mut txn,
Expand All @@ -104,101 +105,96 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for AcquirePessimisticLock
self.min_commit_ts,
need_old_value,
self.lock_only_if_exists,
false,
self.allow_lock_with_conflict,
) {
Ok((key_res, old_value)) => {
res.as_mut().unwrap().push(key_res);
res.push(key_res);
if old_value.resolved() {
let key = k.append_ts(txn.start_ts);
// MutationType is unknown in AcquirePessimisticLock stage.
let mutation_type = None;
self.old_values.insert(key, (old_value, mutation_type));
old_values.insert(key, (old_value, mutation_type));
}
written_rows += 1;
}
Err(e @ MvccError(box MvccErrorInner::KeyIsLocked { .. })) => {
res = Err(e).map_err(Error::from).map_err(StorageError::from);
Err(MvccError(box MvccErrorInner::KeyIsLocked(lock_info))) => {
let request_parameters = PessimisticLockParameters {
pb_ctx: ctx.clone(),
primary: self.primary.clone(),
start_ts,
lock_ttl: self.lock_ttl,
for_update_ts: self.for_update_ts,
wait_timeout: self.wait_timeout,
return_values: self.return_values,
min_commit_ts: self.min_commit_ts,
check_existence: self.check_existence,
is_first_lock: self.is_first_lock,
lock_only_if_exists: self.lock_only_if_exists,
allow_lock_with_conflict: self.allow_lock_with_conflict,
};
let lock_info = WriteResultLockInfo::new(
lock_info,
request_parameters,
k,
should_not_exist,
);
encountered_locks.push(lock_info);
// Do not lock previously succeeded keys.
txn.clear();
res.0.clear();
written_rows = 0;
res.push(PessimisticLockKeyResult::Waiting);
break;
}
Err(e) => return Err(Error::from(e)),
}
}

// no conflict
let (pr, to_be_write, rows, ctx, lock_info) = if res.is_ok() {
let pr = ProcessResult::PessimisticLockRes { res };
let modifies = txn.into_modifies();

let mut res = Ok(res);

// If encountered lock and `wait_timeout` is `None` (which means no wait),
// return error directly here.
if !encountered_locks.is_empty() && self.wait_timeout.is_none() {
// Mind the difference of the protocols of legacy requests and resumable
// requests. For resumable requests (allow_lock_with_conflict ==
// true), key errors are considered key by key instead of for the
// whole request.
let lock_info = encountered_locks.drain(..).next().unwrap().lock_info_pb;
let err = StorageError::from(Error::from(MvccError::from(
MvccErrorInner::KeyIsLocked(lock_info),
)));
if self.allow_lock_with_conflict {
res.as_mut().unwrap().0[0] = PessimisticLockKeyResult::Failed(err.into())
} else {
res = Err(err)
}
}

let pr = ProcessResult::PessimisticLockRes { res };

let to_be_write = if written_rows > 0 {
let extra = TxnExtra {
old_values: self.old_values,
// One pc status is unkown AcquirePessimisticLock stage.
old_values,
// One pc status is unknown in AcquirePessimisticLock stage.
one_pc: false,
for_flashback: false,
};
let write_data = WriteData::new(txn.into_modifies(), extra);
(pr, write_data, rows, ctx, None)
WriteData::new(modifies, extra)
} else {
let request_parameters = PessimisticLockParameters {
pb_ctx: ctx.clone(),
primary: self.primary.clone(),
start_ts: self.start_ts,
lock_ttl: self.lock_ttl,
for_update_ts: self.for_update_ts,
wait_timeout: self.wait_timeout,
return_values: self.return_values,
min_commit_ts: self.min_commit_ts,
check_existence: self.check_existence,
is_first_lock: self.is_first_lock,
allow_lock_with_conflict: false,
};
let lock_info_pb = extract_lock_info_from_result(&res);
let lock_info = WriteResultLockInfo::new(lock_info_pb.clone(), request_parameters);
let pr = ProcessResult::PessimisticLockRes { res };
// Wait for lock released
(pr, WriteData::default(), 0, ctx, Some(lock_info))
WriteData::default()
};

Ok(WriteResult {
ctx,
to_be_write,
rows,
rows: written_rows,
pr,
lock_info,
lock_info: encountered_locks,
released_locks: ReleasedLocks::new(),
lock_guards: vec![],
response_policy: ResponsePolicy::OnProposed,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_gen_lock_info_from_result() {
let raw_key = b"key".to_vec();
let key = Key::from_raw(&raw_key);
let ts = 100;
let is_first_lock = true;
let wait_timeout = WaitTimeout::from_encoded(200);

let mut info = LockInfo::default();
info.set_key(raw_key.clone());
info.set_lock_version(ts);
info.set_lock_ttl(100);
let case = StorageError::from(StorageErrorInner::Txn(Error::from(ErrorInner::Mvcc(
MvccError::from(MvccErrorInner::KeyIsLocked(info.clone())),
))));
let lock_info = WriteResultLockInfo::new(
extract_lock_info_from_result::<()>(&Err(case)).clone(),
PessimisticLockParameters {
is_first_lock,
wait_timeout,
..Default::default()
},
);
assert_eq!(lock_info.lock_digest.ts, ts.into());
assert_eq!(lock_info.lock_digest.hash, key.gen_hash());
assert_eq!(lock_info.key.into_raw().unwrap(), raw_key);
assert_eq!(lock_info.parameters.is_first_lock, is_first_lock);
assert_eq!(lock_info.parameters.wait_timeout, wait_timeout);
assert_eq!(lock_info.lock_info_pb, info);
}
}