Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Add batch-resumed mode for acquire_pessimistic_lock storage command #13687

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/txn_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub use lock::{Lock, LockType, PessimisticLock};
use thiserror::Error;
pub use timestamp::{TimeStamp, TsSet, TSO_PHYSICAL_SHIFT_BITS};
pub use types::{
is_short_value, Key, KvPair, Mutation, MutationType, OldValue, OldValues, TxnExtra,
TxnExtraScheduler, Value, WriteBatchFlags, SHORT_VALUE_MAX_LEN,
insert_old_value_if_resolved, is_short_value, Key, KvPair, Mutation, MutationType, OldValue,
OldValues, TxnExtra, TxnExtraScheduler, Value, WriteBatchFlags, SHORT_VALUE_MAX_LEN,
};
pub use write::{Write, WriteRef, WriteType};

Expand Down
13 changes: 13 additions & 0 deletions components/txn_types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,19 @@ impl OldValue {
// MutationType is the type of mutation of the current write.
pub type OldValues = HashMap<Key, (OldValue, Option<MutationType>)>;

pub fn insert_old_value_if_resolved(
old_values: &mut OldValues,
key: Key,
start_ts: TimeStamp,
old_value: OldValue,
mutation_type: Option<MutationType>,
) {
if old_value.resolved() {
let key = key.append_ts(start_ts);
old_values.insert(key, (old_value, mutation_type));
}
}

// Extra data fields filled by kvrpcpb::ExtraOp.
#[derive(Default, Debug, Clone)]
pub struct TxnExtra {
Expand Down
1 change: 1 addition & 0 deletions src/storage/lock_manager/lock_wait_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ mod tests {
for_update_ts: 1.into(),
..Default::default()
},
should_not_exist: false,
lock_wait_token: token,
legacy_wake_up_index: None,
key_cb: None,
Expand Down
5 changes: 5 additions & 0 deletions src/storage/lock_manager/lock_waiting_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ pub struct LockWaitEntry {
pub key: Key,
pub lock_hash: u64,
pub parameters: PessimisticLockParameters,
// `parameters` provides parameter for a request, but `should_not_exist` is specified key-wise.
// Put it in a separated field.
pub should_not_exist: bool,
pub lock_wait_token: LockWaitToken,
pub legacy_wake_up_index: Option<usize>,
pub key_cb: Option<SyncWrapper<PessimisticLockKeyCallback>>,
Expand Down Expand Up @@ -687,6 +690,7 @@ mod tests {
min_commit_ts: 0.into(),
check_existence: false,
is_first_lock: false,
lock_only_if_exists: false,
allow_lock_with_conflict: false,
};

Expand All @@ -697,6 +701,7 @@ mod tests {
key,
lock_hash,
parameters,
should_not_exist: false,
lock_wait_token: token,
legacy_wake_up_index: None,
key_cb: Some(SyncWrapper::new(Box::new(move |res| tx.send(res).unwrap()))),
Expand Down
1 change: 1 addition & 0 deletions src/storage/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ make_auto_flush_static_metric! {
batch_get_command,
prewrite,
acquire_pessimistic_lock,
acquire_pessimistic_lock_resumed,
commit,
cleanup,
rollback,
Expand Down
30 changes: 21 additions & 9 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ use tikv_util::{
use tracker::{
clear_tls_tracker_token, set_tls_tracker_token, with_tls_tracker, TrackedFuture, TrackerToken,
};
use txn_types::{Key, KvPair, Lock, LockType, OldValues, TimeStamp, TsSet, Value};
use txn_types::{Key, KvPair, Lock, LockType, TimeStamp, TsSet, Value};

pub use self::{
errors::{get_error_kind_from_header, get_tag_from_header, Error, ErrorHeaderKind, ErrorInner},
Expand Down Expand Up @@ -1416,7 +1416,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
callback: Callback<T>,
) -> Result<()> {
use crate::storage::txn::commands::{
AcquirePessimisticLock, Prewrite, PrewritePessimistic,
AcquirePessimisticLock, AcquirePessimisticLockResumed, Prewrite, PrewritePessimistic,
};

let cmd: Command = cmd.into();
Expand Down Expand Up @@ -1452,6 +1452,18 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
)?;
check_key_size!(keys, self.max_key_size, callback);
}
Command::AcquirePessimisticLockResumed(AcquirePessimisticLockResumed {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
items, ..
}) => {
let keys = items.iter().map(|item| item.key.as_encoded());
Self::check_api_version(
self.api_version,
cmd.ctx().api_version,
CommandKind::acquire_pessimistic_lock_resumed,
keys.clone(),
)?;
check_key_size!(keys, self.max_key_size, callback);
}
_ => {}
}
with_tls_tracker(|tracker| {
Expand Down Expand Up @@ -3341,9 +3353,9 @@ pub mod test_util {
Some(WaitTimeout::Default),
return_values,
for_update_ts.next(),
OldValues::default(),
check_existence,
false,
false,
Context::default(),
)
}
Expand Down Expand Up @@ -8193,7 +8205,7 @@ mod tests {
Some(WaitTimeout::Millis(100)),
false,
21.into(),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand Down Expand Up @@ -8285,7 +8297,7 @@ mod tests {
Some(WaitTimeout::Millis(5000)),
false,
(lock_ts + 1).into(),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand Down Expand Up @@ -8870,7 +8882,7 @@ mod tests {
None,
false,
0.into(),
OldValues::default(),
false,
false,
false,
Default::default(),
Expand All @@ -8893,7 +8905,7 @@ mod tests {
None,
false,
0.into(),
OldValues::default(),
false,
false,
false,
Default::default(),
Expand Down Expand Up @@ -9123,7 +9135,7 @@ mod tests {
None,
false,
TimeStamp::new(12),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand All @@ -9149,7 +9161,7 @@ mod tests {
None,
false,
TimeStamp::new(12),
OldValues::default(),
false,
false,
false,
Context::default(),
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl MvccTxn {
self.write_size
}

pub fn is_empty(&self) -> bool {
self.modifies.len() == 0 && self.locks_for_1pc.len() == 0
}

pub(crate) fn put_lock(&mut self, key: Key, lock: &Lock) {
let write = Modify::Put(CF_LOCK, key, lock.to_bytes());
self.write_size += write.size();
Expand Down