Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#14637
Browse files Browse the repository at this point in the history
close tikv#14636, ref pingcap/tidb#42937

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
MyonKeminta authored and ti-chi-bot committed Apr 26, 2023
1 parent a80fcf4 commit 3eadec2
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 4 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components/error_code/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ define_error_codes!(

ASSERTION_FAILED => ("AssertionFailed", "", ""),

PRIMARY_MISMATCH => ("PrimaryMismatch", "", ""),

UNKNOWN => ("Unknown", "", "")
);
13 changes: 13 additions & 0 deletions etc/error_code.toml
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,19 @@ error = '''
KV:Storage:AssertionFailed
'''

<<<<<<< HEAD
=======
["KV:Storage:LockIfExistsFailed"]
error = '''
KV:Storage:LockIfExistsFailed
'''

["KV:Storage:PrimaryMismatch"]
error = '''
KV:Storage:PrimaryMismatch
'''

>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
["KV:Storage:Unknown"]
error = '''
KV:Storage:Unknown
Expand Down
7 changes: 7 additions & 0 deletions src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ pub fn extract_key_error(err: &Error) -> kvrpcpb::KeyError {
assertion_failed.set_existing_commit_ts(existing_commit_ts.into_inner());
key_error.set_assertion_failed(assertion_failed);
}
Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::PrimaryMismatch(lock_info),
))))) => {
let mut primary_mismatch = kvrpcpb::PrimaryMismatch::default();
primary_mismatch.set_lock_info(lock_info.clone());
key_error.set_primary_mismatch(primary_mismatch);
}
_ => {
error!(?*err; "txn aborts");
key_error.set_abort(format!("{:?}", err));
Expand Down
10 changes: 9 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6648,6 +6648,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_fail_callback(tx.clone(), 0, |e| match e {
Expand All @@ -6674,6 +6675,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, LockNotExist),
Expand Down Expand Up @@ -6731,6 +6733,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(
Expand Down Expand Up @@ -6776,6 +6779,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, committed(ts(20, 0))),
Expand All @@ -6787,7 +6791,7 @@ mod tests {
.sched_txn_command(
commands::Prewrite::with_lock_ttl(
vec![Mutation::make_put(k.clone(), v)],
k.as_encoded().to_vec(),
k.to_raw().unwrap(),
ts(25, 0),
100,
),
Expand All @@ -6807,6 +6811,7 @@ mod tests {
true,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, TtlExpire),
Expand Down Expand Up @@ -7615,6 +7620,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_value_callback(
Expand Down Expand Up @@ -7651,6 +7657,7 @@ mod tests {
false,
false,
false,
true,
Context::default(),
),
expect_value_callback(tx.clone(), 0, TxnStatus::TtlExpire),
Expand Down Expand Up @@ -8028,6 +8035,7 @@ mod tests {
true,
false,
false,
true,
Default::default(),
),
expect_ok_callback(tx.clone(), 0),
Expand Down
27 changes: 27 additions & 0 deletions src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ pub enum ErrorInner {
existing_commit_ts: TimeStamp,
},

<<<<<<< HEAD
=======
#[error(
"Lock_only_if_exists of a pessimistic lock request is set to true, but return_value is not, start_ts:{}, key:{}",
.start_ts, log_wrappers::Value::key(.key)
)]
LockIfExistsFailed { start_ts: TimeStamp, key: Vec<u8> },

#[error("check_txn_status sent to secondary lock, current lock: {0:?}")]
PrimaryMismatch(kvproto::kvrpcpb::LockInfo),

>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
#[error("{0:?}")]
Other(#[from] Box<dyn error::Error + Sync + Send>),
}
Expand Down Expand Up @@ -276,6 +288,16 @@ impl ErrorInner {
existing_start_ts: *existing_start_ts,
existing_commit_ts: *existing_commit_ts,
}),
<<<<<<< HEAD
=======
ErrorInner::LockIfExistsFailed { start_ts, key } => {
Some(ErrorInner::LockIfExistsFailed {
start_ts: *start_ts,
key: key.clone(),
})
}
ErrorInner::PrimaryMismatch(l) => Some(ErrorInner::PrimaryMismatch(l.clone())),
>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
ErrorInner::Io(_) | ErrorInner::Other(_) => None,
}
}
Expand Down Expand Up @@ -375,6 +397,11 @@ impl ErrorCodeExt for Error {
}
ErrorInner::CommitTsTooLarge { .. } => error_code::storage::COMMIT_TS_TOO_LARGE,
ErrorInner::AssertionFailed { .. } => error_code::storage::ASSERTION_FAILED,
<<<<<<< HEAD
=======
ErrorInner::LockIfExistsFailed { .. } => error_code::storage::LOCK_IF_EXISTS_FAILED,
ErrorInner::PrimaryMismatch(_) => error_code::storage::PRIMARY_MISMATCH,
>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
ErrorInner::Other(_) => error_code::storage::UNKNOWN,
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/storage/txn/actions/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,23 @@ pub fn check_txn_status_lock_exists(
caller_start_ts: TimeStamp,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
verify_is_primary: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
<<<<<<< HEAD
// Never rollback or push forward min_commit_ts in check_txn_status if it's using async commit.
// Rollback of async-commit locks are done during ResolveLock.
=======
if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) {
// Return the current lock info to tell the client what the actual primary is.
return Err(
ErrorInner::PrimaryMismatch(lock.into_lock_info(primary_key.into_raw()?)).into(),
);
}

// Never rollback or push forward min_commit_ts in check_txn_status if it's
// using async commit. Rollback of async-commit locks are done during
// ResolveLock.
>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
if lock.use_async_commit {
if force_sync_commit {
info!(
Expand Down
85 changes: 82 additions & 3 deletions src/storage/txn/commands/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ command! {
// lock, the transaction status could not be decided if the primary lock is pessimistic too and
// it's still uncertain.
resolving_pessimistic_lock: bool,
// Whether it's needed to check wheter the lock on the key (if any) is the primary lock.
// This is for handling some corner cases when pessimistic transactions changes its primary
// (see https://github.com/pingcap/tidb/issues/42937 for details).
// Must be set to true, unless the client is old version that doesn't support this behavior.
verify_is_primary: bool,
}
}

Expand Down Expand Up @@ -103,6 +108,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
self.caller_start_ts,
self.force_sync_commit,
self.resolving_pessimistic_lock,
self.verify_is_primary,
)?,
l => (
check_txn_status_missing_lock(
Expand Down Expand Up @@ -142,16 +148,26 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
#[cfg(test)]
pub mod tests {
use concurrency_manager::ConcurrencyManager;
<<<<<<< HEAD
use kvproto::kvrpcpb::Context;
=======
use kvproto::kvrpcpb::{self, Context, LockInfo, PrewriteRequestPessimisticAction::*};
>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
use tikv_util::deadline::Deadline;
use txn_types::{Key, WriteType};

use super::{TxnStatus::*, *};
use crate::storage::{
kv::Engine,
<<<<<<< HEAD
lock_manager::DummyLockManager,
=======
lock_manager::MockLockManager,
mvcc,
>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
mvcc::tests::*,
txn::{
self,
commands::{pessimistic_rollback, WriteCommand, WriteContext},
scheduler::DEFAULT_EXECUTION_DURATION_LIMIT,
tests::*,
Expand Down Expand Up @@ -185,6 +201,7 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
verify_is_primary: true,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let result = command
Expand Down Expand Up @@ -216,7 +233,7 @@ pub mod tests {
rollback_if_not_exist: bool,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
) {
) -> txn::Error {
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let current_ts = current_ts.into();
Expand All @@ -231,8 +248,10 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
verify_is_primary: true,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
<<<<<<< HEAD
assert!(
command
.process_write(
Expand All @@ -244,9 +263,27 @@ pub mod tests {
statistics: &mut Default::default(),
async_apply_prewrite: false,
},
=======
command
.process_write(
snapshot,
WriteContext {
lock_mgr: &MockLockManager::new(),
concurrency_manager: cm,
extra_op: Default::default(),
statistics: &mut Default::default(),
async_apply_prewrite: false,
raw_ext: None,
},
)
.map(|r| {
panic!(
"expected check_txn_status fail but succeeded with result: {:?}",
r.pr
>>>>>>> 8656623b8b (txn: Check whether the primary matches when handling check_txn_status requests (#14637))
)
.is_err()
);
})
.unwrap_err()
}

fn committed(commit_ts: impl Into<TimeStamp>) -> impl FnOnce(TxnStatus) -> bool {
Expand Down Expand Up @@ -1109,4 +1146,46 @@ pub mod tests {
must_unlocked(&engine, k);
must_get_rollback_ts(&engine, k, ts(50, 0));
}

#[test]
fn test_verify_is_primary() {
let mut engine = TestEngineBuilder::new().build().unwrap();

let check_lock = |l: LockInfo, key: &'_ [u8], primary: &'_ [u8], lock_type| {
assert_eq!(&l.key, key);
assert_eq!(l.lock_type, lock_type);
assert_eq!(&l.primary_lock, primary);
};

let check_error = |e, key: &'_ [u8], primary: &'_ [u8], lock_type| match e {
txn::Error(box txn::ErrorInner::Mvcc(mvcc::Error(
box mvcc::ErrorInner::PrimaryMismatch(lock_info),
))) => {
check_lock(lock_info, key, primary, lock_type);
}
e => panic!("unexpected error: {:?}", e),
};

must_acquire_pessimistic_lock(&mut engine, b"k1", b"k2", 1, 1);
let e = must_err(&mut engine, b"k1", 1, 1, 0, true, false, true);
check_error(e, b"k1", b"k2", kvrpcpb::Op::PessimisticLock);
let lock = must_pessimistic_locked(&mut engine, b"k1", 1, 1);
check_lock(
lock.into_lock_info(b"k1".to_vec()),
b"k1",
b"k2",
kvrpcpb::Op::PessimisticLock,
);

must_pessimistic_prewrite_put(&mut engine, b"k1", b"v1", b"k2", 1, 1, DoPessimisticCheck);
let e = must_err(&mut engine, b"k1", 1, 1, 0, true, false, true);
check_error(e, b"k1", b"k2", kvrpcpb::Op::Put);
let lock = must_locked(&mut engine, b"k1", 1);
check_lock(
lock.into_lock_info(b"k1".to_vec()),
b"k1",
b"k2",
kvrpcpb::Op::Put,
);
}
}
1 change: 1 addition & 0 deletions src/storage/txn/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ impl From<CheckTxnStatusRequest> for TypedCommand<TxnStatus> {
req.get_rollback_if_not_exist(),
req.get_force_sync_commit(),
req.get_resolving_pessimistic_lock(),
req.get_verify_is_primary(),
req.take_context(),
)
}
Expand Down

0 comments on commit 3eadec2

Please sign in to comment.