Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Check whether the primary matches when handling check_txn_status requests (#14637) #14659

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ txn_types = { path = "components/txn_types" }
grpcio = { version = "0.10.4", default-features = false, features = ["openssl-vendored", "protobuf-codec", "nightly"] }
grpcio-health = { version = "0.10.4", default-features = false, features = ["protobuf-codec"] }
tipb = { git = "https://github.com/pingcap/tipb.git" }
kvproto = { git = "https://github.com/pingcap/kvproto.git" }
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-7.1" }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }
tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hotfix" }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
Expand Down
2 changes: 2 additions & 0 deletions components/error_code/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ define_error_codes!(
ASSERTION_FAILED => ("AssertionFailed", "", ""),
LOCK_IF_EXISTS_FAILED => ("LockIfExistsFailed", "", ""),

PRIMARY_MISMATCH => ("PrimaryMismatch", "", ""),

UNKNOWN => ("Unknown", "", "")
);
5 changes: 5 additions & 0 deletions etc/error_code.toml
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,11 @@ error = '''
KV:Storage:LockIfExistsFailed
'''

["KV:Storage:PrimaryMismatch"]
error = '''
KV:Storage:PrimaryMismatch
'''

["KV:Storage:Unknown"]
error = '''
KV:Storage:Unknown
Expand Down
7 changes: 7 additions & 0 deletions src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ pub fn extract_key_error(err: &Error) -> kvrpcpb::KeyError {
assertion_failed.set_existing_commit_ts(existing_commit_ts.into_inner());
key_error.set_assertion_failed(assertion_failed);
}
Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::PrimaryMismatch(lock_info),
))))) => {
let mut primary_mismatch = kvrpcpb::PrimaryMismatch::default();
primary_mismatch.set_lock_info(lock_info.clone());
key_error.set_primary_mismatch(primary_mismatch);
}
_ => {
error!(?*err; "txn aborts");
key_error.set_abort(format!("{:?}", err));
Expand Down
10 changes: 9 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7910,6 +7910,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_fail_callback(tx.clone(), 0, |e| match e {
Expand All @@ -7936,6 +7937,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, LockNotExist),
Expand Down Expand Up @@ -7993,6 +7995,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(
Expand Down Expand Up @@ -8038,6 +8041,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, committed(ts(20, 0))),
Expand All @@ -8049,7 +8053,7 @@ mod tests {
.sched_txn_command(
commands::Prewrite::with_lock_ttl(
vec![Mutation::make_put(k.clone(), v)],
k.as_encoded().to_vec(),
k.to_raw().unwrap(),
ts(25, 0),
100,
),
Expand All @@ -8069,6 +8073,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, TtlExpire),
Expand Down Expand Up @@ -9411,6 +9416,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_value_callback(
Expand Down Expand Up @@ -9447,6 +9453,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, TxnStatus::TtlExpire),
Expand Down Expand Up @@ -9840,6 +9847,7 @@ mod tests {
true,
false,
false,
true,
Default::default(),
),
expect_ok_callback(tx.clone(), 0),
Expand Down
5 changes: 5 additions & 0 deletions src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ pub enum ErrorInner {
)]
LockIfExistsFailed { start_ts: TimeStamp, key: Vec<u8> },

#[error("check_txn_status sent to secondary lock, current lock: {0:?}")]
PrimaryMismatch(kvproto::kvrpcpb::LockInfo),

#[error("{0:?}")]
Other(#[from] Box<dyn error::Error + Sync + Send>),
}
Expand Down Expand Up @@ -300,6 +303,7 @@ impl ErrorInner {
key: key.clone(),
})
}
ErrorInner::PrimaryMismatch(l) => Some(ErrorInner::PrimaryMismatch(l.clone())),
ErrorInner::Io(_) | ErrorInner::Other(_) => None,
}
}
Expand Down Expand Up @@ -402,6 +406,7 @@ impl ErrorCodeExt for Error {
ErrorInner::CommitTsTooLarge { .. } => error_code::storage::COMMIT_TS_TOO_LARGE,
ErrorInner::AssertionFailed { .. } => error_code::storage::ASSERTION_FAILED,
ErrorInner::LockIfExistsFailed { .. } => error_code::storage::LOCK_IF_EXISTS_FAILED,
ErrorInner::PrimaryMismatch(_) => error_code::storage::PRIMARY_MISMATCH,
ErrorInner::Other(_) => error_code::storage::UNKNOWN,
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/storage/txn/actions/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ pub fn check_txn_status_lock_exists(
caller_start_ts: TimeStamp,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
verify_is_primary: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) {
// Return the current lock info to tell the client what the actual primary is.
return Err(
ErrorInner::PrimaryMismatch(lock.into_lock_info(primary_key.into_raw()?)).into(),
);
}

// Never rollback or push forward min_commit_ts in check_txn_status if it's
// using async commit. Rollback of async-commit locks are done during
// ResolveLock.
Expand Down
88 changes: 72 additions & 16 deletions src/storage/txn/commands/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ command! {
// lock, the transaction status could not be decided if the primary lock is pessimistic too and
// it's still uncertain.
resolving_pessimistic_lock: bool,
// Whether it's needed to check wheter the lock on the key (if any) is the primary lock.
// This is for handling some corner cases when pessimistic transactions changes its primary
// (see https://github.com/pingcap/tidb/issues/42937 for details).
// Must be set to true, unless the client is old version that doesn't support this behavior.
verify_is_primary: bool,
}
}

Expand Down Expand Up @@ -107,6 +112,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
self.caller_start_ts,
self.force_sync_commit,
self.resolving_pessimistic_lock,
self.verify_is_primary,
)?,
l => (
check_txn_status_missing_lock(
Expand Down Expand Up @@ -145,16 +151,18 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
#[cfg(test)]
pub mod tests {
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::{Context, PrewriteRequestPessimisticAction::*};
use kvproto::kvrpcpb::{self, Context, LockInfo, PrewriteRequestPessimisticAction::*};
use tikv_util::deadline::Deadline;
use txn_types::{Key, WriteType};

use super::{TxnStatus::*, *};
use crate::storage::{
kv::Engine,
lock_manager::MockLockManager,
mvcc,
mvcc::tests::*,
txn::{
self,
commands::{pessimistic_rollback, WriteCommand, WriteContext},
scheduler::DEFAULT_EXECUTION_DURATION_LIMIT,
tests::*,
Expand Down Expand Up @@ -188,6 +196,7 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
verify_is_primary: true,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let result = command
Expand Down Expand Up @@ -220,7 +229,7 @@ pub mod tests {
rollback_if_not_exist: bool,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
) {
) -> txn::Error {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let current_ts = current_ts.into();
Expand All @@ -235,23 +244,28 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
verify_is_primary: true,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
assert!(
command
.process_write(
snapshot,
WriteContext {
lock_mgr: &MockLockManager::new(),
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
raw_ext: None,
},
command
.process_write(
snapshot,
WriteContext {
lock_mgr: &MockLockManager::new(),
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
raw_ext: None,
},
)
.map(|r| {
panic!(
"expected check_txn_status fail but succeeded with result: {:?}",
r.pr
)
.is_err()
);
})
.unwrap_err()
}

fn committed(commit_ts: impl Into<TimeStamp>) -> impl FnOnce(TxnStatus) -> bool {
Expand Down Expand Up @@ -1188,4 +1202,46 @@ pub mod tests {
assert!(rollback.last_change_ts.is_zero());
assert_eq!(rollback.versions_to_last_change, 0);
}

#[test]
fn test_verify_is_primary() {
let mut engine = TestEngineBuilder::new().build().unwrap();

let check_lock = |l: LockInfo, key: &'_ [u8], primary: &'_ [u8], lock_type| {
assert_eq!(&l.key, key);
assert_eq!(l.lock_type, lock_type);
assert_eq!(&l.primary_lock, primary);
};

let check_error = |e, key: &'_ [u8], primary: &'_ [u8], lock_type| match e {
txn::Error(box txn::ErrorInner::Mvcc(mvcc::Error(
box mvcc::ErrorInner::PrimaryMismatch(lock_info),
))) => {
check_lock(lock_info, key, primary, lock_type);
}
e => panic!("unexpected error: {:?}", e),
};

must_acquire_pessimistic_lock(&mut engine, b"k1", b"k2", 1, 1);
let e = must_err(&mut engine, b"k1", 1, 1, 0, true, false, true);
check_error(e, b"k1", b"k2", kvrpcpb::Op::PessimisticLock);
let lock = must_pessimistic_locked(&mut engine, b"k1", 1, 1);
check_lock(
lock.into_lock_info(b"k1".to_vec()),
b"k1",
b"k2",
kvrpcpb::Op::PessimisticLock,
);

must_pessimistic_prewrite_put(&mut engine, b"k1", b"v1", b"k2", 1, 1, DoPessimisticCheck);
let e = must_err(&mut engine, b"k1", 1, 1, 0, true, false, true);
check_error(e, b"k1", b"k2", kvrpcpb::Op::Put);
let lock = must_locked(&mut engine, b"k1", 1);
check_lock(
lock.into_lock_info(b"k1".to_vec()),
b"k1",
b"k2",
kvrpcpb::Op::Put,
);
}
}
1 change: 1 addition & 0 deletions src/storage/txn/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ impl From<CheckTxnStatusRequest> for TypedCommand<TxnStatus> {
req.get_rollback_if_not_exist(),
req.get_force_sync_commit(),
req.get_resolving_pessimistic_lock(),
req.get_verify_is_primary(),
req.take_context(),
)
}
Expand Down