Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: protect primary locks of pessimistic transactions from being collapsed #5575

Merged
merged 33 commits into from Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
28d09f7
protect cleanup result from being collapsed
sticnarf Oct 4, 2019
b8afa01
Merge branch 'master' into protect-rollback
sticnarf Oct 4, 2019
93b60fa
Add cleanup and collapse rollback tests
sticnarf Oct 7, 2019
ee4a286
Add tests for protected field
sticnarf Oct 7, 2019
e844c96
Merge branch 'master' into protect-rollback
sticnarf Oct 7, 2019
c89b367
Set protected to false by default and add more comments
sticnarf Oct 8, 2019
71aeae3
Reuse short value field for compatibility
sticnarf Oct 8, 2019
857f78f
Merge branch 'master' into protect-rollback
sticnarf Oct 8, 2019
fa6ad05
Add unit test for is_protected
sticnarf Oct 8, 2019
47eb1fa
Fix test_cleanup
sticnarf Oct 8, 2019
a72cde0
util: compare raw and encoded keys without explicit decoding
sticnarf Oct 10, 2019
3944db5
Merge branch 'master' into compare-key
sticnarf Oct 10, 2019
1953617
Merge remote-tracking branch 'upstream/master' into compare-key
sticnarf Oct 10, 2019
ac24c25
Add negative cases
sticnarf Oct 10, 2019
d8820fe
Merge branch 'master' into compare-key
sticnarf Oct 10, 2019
fd5784c
address comments
sticnarf Oct 12, 2019
29dc972
Merge remote-tracking branch 'upstream/master' into compare-key
sticnarf Oct 12, 2019
4215ba4
address comments
sticnarf Oct 12, 2019
5741c61
don't panic when encoded is invalid
sticnarf Oct 13, 2019
83d0e43
make clippy happy
sticnarf Oct 13, 2019
e161686
Merge branch 'compare-key' into protect-rollback
sticnarf Oct 14, 2019
802cc5d
determine protected by checking if the key is primary
sticnarf Oct 14, 2019
04819fb
fix comments for Key::is_encoded_from
sticnarf Oct 14, 2019
935874c
Merge branch 'compare-key' into protect-rollback
sticnarf Oct 14, 2019
70d7b1e
Merge remote-tracking branch 'upstream/master' into protect-rollback
sticnarf Oct 14, 2019
fbc77f3
remove protect param in cleanup
sticnarf Oct 14, 2019
b78d5fd
Merge remote-tracking branch 'upstream/master' into protect-rollback
sticnarf Oct 14, 2019
7ea4c03
remove comments that no long hold
sticnarf Oct 14, 2019
b12eebb
fix incorrect comments in tests
sticnarf Oct 16, 2019
e5d2dc8
Merge remote-tracking branch 'upstream/master' into protect-rollback
sticnarf Oct 16, 2019
26db127
update comments
sticnarf Oct 16, 2019
47db423
Merge branch 'master' into protect-rollback
sticnarf Oct 16, 2019
5040354
Merge branch 'master' into protect-rollback
AndreMouche Oct 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/storage/mvcc/mod.rs
Expand Up @@ -788,6 +788,24 @@ pub mod tests {
assert_eq!(ret, None);
}

pub fn must_get_rollback_protected<E: Engine>(
engine: &E,
key: &[u8],
start_ts: u64,
protected: bool,
) {
let snapshot = engine.snapshot(&Context::default()).unwrap();
let mut reader = MvccReader::new(snapshot, None, true, None, None, IsolationLevel::Si);

let (ts, write) = reader
.seek_write(&Key::from_raw(key), start_ts)
.unwrap()
.unwrap();
assert_eq!(ts, start_ts);
assert_eq!(write.write_type, WriteType::Rollback);
assert_eq!(write.is_protected(), protected);
}

pub fn must_scan_keys<E: Engine>(
engine: &E,
start: Option<&[u8]>,
Expand Down
95 changes: 80 additions & 15 deletions src/storage/mvcc/txn.rs
Expand Up @@ -170,12 +170,15 @@ impl<S: Snapshot> MvccTxn<S> {
}
}

fn rollback_lock(&mut self, key: Key, lock: &Lock) -> Result<()> {
fn rollback_lock(&mut self, key: Key, lock: &Lock, is_pessimistic_txn: bool) -> Result<()> {
// If prewrite type is DEL or LOCK or PESSIMISTIC, it is no need to delete value.
if lock.short_value.is_none() && lock.lock_type == LockType::Put {
self.delete_value(key.clone(), lock.ts);
}
let write = Write::new(WriteType::Rollback, self.start_ts, None);

// Only the primary key of a pessimistic transaction needs to be protected.
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
let write = Write::new_rollback(self.start_ts, protected);
self.put_write(key.clone(), self.start_ts, write.to_bytes());
self.unlock_key(key.clone());
if self.collapse_rollback {
Expand Down Expand Up @@ -504,7 +507,8 @@ impl<S: Snapshot> MvccTxn<S> {
}

/// Cleanup the lock if it's TTL has expired, comparing with `current_ts`. If `current_ts` is 0,
/// cleanup the lock without checking TTL.
/// cleanup the lock without checking TTL. If the lock is the primary lock of a pessimistic
/// transaction, the rollback record is protected from being collapsed.
///
/// Returns whether the lock is a pessimistic lock. Returns error if the key has already been
/// committed.
Expand All @@ -529,7 +533,7 @@ impl<S: Snapshot> MvccTxn<S> {
}

let is_pessimistic_txn = lock.for_update_ts != 0;
self.rollback_lock(key, lock)?;
self.rollback_lock(key, lock, is_pessimistic_txn)?;
Ok(is_pessimistic_txn)
}
_ => {
Expand Down Expand Up @@ -558,8 +562,14 @@ impl<S: Snapshot> MvccTxn<S> {
self.collapse_prev_rollback(key.clone())?;
}

// insert a Rollback to WriteCF when receives Rollback before Prewrite
let write = Write::new(WriteType::Rollback, ts, None);
// Insert a Rollback to Write CF in case that a stale prewrite command
// is received after a cleanup command.
// Pessimistic transactions prewrite successfully only if all its
// pessimistic locks exist. So collapsing the rollback of a pessimistic
// lock is safe. After a pessimistic transaction acquires all its locks,
// it is impossible that neither a lock nor a write record is found.
// Therefore, we don't need to protect the rollback here.
let write = Write::new_rollback(ts, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in doubt about the correctness to set this non-protected. Rest LGTM.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok because if is no Lock exist here, then the corresponding transaction must not a pessimistic transaction's primary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've update the comments here to:

Pessimistic transactions prewrite successfully only if all its pessimistic locks exist. So collapsing the rollback of a pessimistic lock is safe. After a pessimistic transaction acquires all its locks, it is impossible that neither a lock nor a write record is found. Therefore, we don't need to protect the rollback here.

Do you think it's OK? @MyonKeminta

self.put_write(key, ts, write.to_bytes());
Ok(false)
}
Expand All @@ -583,7 +593,7 @@ impl<S: Snapshot> MvccTxn<S> {

fn collapse_prev_rollback(&mut self, key: Key) -> Result<()> {
if let Some((commit_ts, write)) = self.reader.seek_write(&key, self.start_ts)? {
if write.write_type == WriteType::Rollback {
if write.write_type == WriteType::Rollback && !write.is_protected() {
self.delete_write(key, commit_ts);
}
}
Expand Down Expand Up @@ -652,7 +662,7 @@ impl<S: Snapshot> MvccTxn<S> {

if extract_physical(lock.ts) + lock.ttl < extract_physical(current_ts) {
// If the lock is expired, clean it up.
self.rollback_lock(primary_key, lock)?;
self.rollback_lock(primary_key, lock, is_pessimistic_txn)?;
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.rollback.inc();
return Ok((0, 0, is_pessimistic_txn));
}
Expand Down Expand Up @@ -912,7 +922,7 @@ mod tests {
}

#[test]
fn test_rollback_lock() {
fn test_rollback_lock_optimistic() {
let engine = TestEngineBuilder::new().build().unwrap();

let (k, v) = (b"k1", b"v1");
Expand All @@ -925,6 +935,35 @@ mod tests {

// Rollback lock
must_rollback(&engine, k, 15);
// Rollbacks of optimistic transactions needn't be protected
must_get_rollback_protected(&engine, k, 15, false);
}

#[test]
fn test_rollback_lock_pessimistic() {
let engine = TestEngineBuilder::new().build().unwrap();

let (k1, k2, v) = (b"k1", b"k2", b"v1");

must_acquire_pessimistic_lock(&engine, k1, k1, 5, 5);
must_acquire_pessimistic_lock(&engine, k2, k1, 5, 7);
must_rollback(&engine, k1, 5);
must_rollback(&engine, k2, 5);
// The rollback of the primary key should be protected
must_get_rollback_protected(&engine, k1, 5, true);
// The rollback of the secondary key needn't be protected
must_get_rollback_protected(&engine, k2, 5, false);

must_acquire_pessimistic_lock(&engine, k1, k1, 15, 15);
must_acquire_pessimistic_lock(&engine, k2, k1, 15, 17);
must_pessimistic_prewrite_put(&engine, k1, v, k1, 15, 17, true);
must_pessimistic_prewrite_put(&engine, k2, v, k1, 15, 17, true);
must_rollback(&engine, k1, 15);
must_rollback(&engine, k2, 15);
// The rollback of the primary key should be protected
must_get_rollback_protected(&engine, k1, 15, true);
// The rollback of the secondary key needn't be protected
must_get_rollback_protected(&engine, k2, 15, false);
}

#[test]
Expand Down Expand Up @@ -966,12 +1005,27 @@ mod tests {

// Try to cleanup another transaction's lock. Does nothing.
must_cleanup(&engine, k, ts(10, 1), ts(120, 0));
// If there is no exisiting lock when cleanup, it cannot be a pessimistic transaction,
// so the rollback needn't be protected.
must_get_rollback_protected(&engine, k, ts(10, 1), false);
must_locked(&engine, k, ts(10, 0));

// TTL expired. The lock should be removed.
must_cleanup(&engine, k, ts(10, 0), ts(120, 0));
must_unlocked(&engine, k);
// Rollbacks of optimistic transactions needn't be protected
must_get_rollback_protected(&engine, k, ts(10, 0), false);
must_get_rollback_ts(&engine, k, ts(10, 0));

// Rollbacks of primary keys in pessimistic transactions should be protected
must_acquire_pessimistic_lock(&engine, k, k, ts(11, 1), ts(12, 1));
must_cleanup(&engine, k, ts(11, 1), ts(120, 0));
must_get_rollback_protected(&engine, k, ts(11, 1), true);

must_acquire_pessimistic_lock(&engine, k, k, ts(13, 1), ts(14, 1));
must_pessimistic_prewrite_put(&engine, k, v, k, ts(13, 1), ts(14, 1), true);
must_cleanup(&engine, k, ts(13, 1), ts(120, 0));
must_get_rollback_protected(&engine, k, ts(13, 1), true);
}

#[test]
Expand Down Expand Up @@ -1450,12 +1504,12 @@ mod tests {
// Lock conflict
must_prewrite_put(&engine, k, v, k, 3);
must_acquire_pessimistic_lock_err(&engine, k, k, 4, 4);
must_rollback(&engine, k, 3);
must_cleanup(&engine, k, 3, 0);
must_unlocked(&engine, k);
must_acquire_pessimistic_lock(&engine, k, k, 5, 5);
must_prewrite_lock_err(&engine, k, k, 6);
must_acquire_pessimistic_lock_err(&engine, k, k, 6, 6);
must_rollback(&engine, k, 5);
must_cleanup(&engine, k, 5, 0);
must_unlocked(&engine, k);

// Data conflict
Expand All @@ -1472,7 +1526,7 @@ mod tests {
// Rollback
must_acquire_pessimistic_lock(&engine, k, k, 11, 11);
must_pessimistic_locked(&engine, k, 11, 11);
must_rollback(&engine, k, 11);
must_cleanup(&engine, k, 11, 0);
must_acquire_pessimistic_lock_err(&engine, k, k, 11, 11);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 11, 11, true);
must_prewrite_lock_err(&engine, k, k, 11);
Expand All @@ -1481,7 +1535,7 @@ mod tests {
must_acquire_pessimistic_lock(&engine, k, k, 12, 12);
must_pessimistic_prewrite_put(&engine, k, v, k, 12, 12, true);
must_locked(&engine, k, 12);
must_rollback(&engine, k, 12);
must_cleanup(&engine, k, 12, 0);
must_acquire_pessimistic_lock_err(&engine, k, k, 12, 12);
must_pessimistic_prewrite_put_err(&engine, k, v, k, 12, 12, true);
must_prewrite_lock_err(&engine, k, k, 12);
Expand Down Expand Up @@ -1525,7 +1579,7 @@ mod tests {
must_prewrite_put(&engine, k, v, k, 23);
must_locked(&engine, k, 23);
must_acquire_pessimistic_lock_err(&engine, k, k, 23, 23);
must_rollback(&engine, k, 23);
must_cleanup(&engine, k, 23, 0);
must_acquire_pessimistic_lock(&engine, k, k, 24, 24);
must_pessimistic_locked(&engine, k, 24, 24);
must_commit_err(&engine, k, 24, 25);
Expand Down Expand Up @@ -1626,9 +1680,20 @@ mod tests {
// there won't be conflicts. So this case is also not checked, and prewrite will succeeed.
must_pessimistic_prewrite_put(&engine, k, v, k, 47, 48, false);
must_locked(&engine, k, 47);
must_rollback(&engine, k, 47);
must_cleanup(&engine, k, 47, 0);
must_unlocked(&engine, k);

// The rollback of the primary key in a pessimistic transaction should be protected from
// being collapsed.
must_acquire_pessimistic_lock(&engine, k, k, 49, 60);
must_pessimistic_prewrite_put(&engine, k, v, k, 49, 60, true);
must_locked(&engine, k, 49);
must_cleanup(&engine, k, 49, 0);
must_get_rollback_protected(&engine, k, 49, true);
must_prewrite_put(&engine, k, v, k, 51);
must_rollback_collapsed(&engine, k, 51);
must_acquire_pessimistic_lock_err(&engine, k, k, 49, 60);

// start_ts and commit_ts interlacing
for start_ts in &[140, 150, 160] {
let for_update_ts = start_ts + 48;
Expand Down
45 changes: 43 additions & 2 deletions src/storage/mvcc/write.rs
Expand Up @@ -20,6 +20,9 @@ const FLAG_DELETE: u8 = b'D';
const FLAG_LOCK: u8 = b'L';
const FLAG_ROLLBACK: u8 = b'R';

/// The short value for rollback records which are protected from being collapsed.
const PROTECTED_ROLLBACK_SHORT_VALUE: &[u8] = b"p";

impl WriteType {
pub fn from_lock_type(tp: LockType) -> Option<WriteType> {
match tp {
Expand Down Expand Up @@ -75,6 +78,7 @@ impl std::fmt::Debug for Write {
}

impl Write {
/// Creates a new `Write` record.
pub fn new(write_type: WriteType, start_ts: u64, short_value: Option<Value>) -> Write {
Write {
write_type,
Expand All @@ -83,6 +87,20 @@ impl Write {
}
}

pub fn new_rollback(start_ts: u64, protected: bool) -> Write {
let short_value = if protected {
Some(PROTECTED_ROLLBACK_SHORT_VALUE.to_vec())
} else {
None
};

Write {
write_type: WriteType::Rollback,
start_ts,
short_value,
}
}

pub fn to_bytes(&self) -> Vec<u8> {
let mut b = Vec::with_capacity(1 + MAX_VAR_U64_LEN + SHORT_VALUE_MAX_LEN + 2);
b.push(self.write_type.to_u8());
Expand Down Expand Up @@ -122,6 +140,15 @@ impl Write {
pub fn parse_type(mut b: &[u8]) -> Result<WriteType> {
WriteType::from_u8(b.read_u8()?).ok_or(Error::BadFormatWrite)
}

pub fn is_protected(&self) -> bool {
self.write_type == WriteType::Rollback
&& self
.short_value
.as_ref()
.map(|v| *v == PROTECTED_ROLLBACK_SHORT_VALUE)
.unwrap_or_default()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -165,8 +192,10 @@ mod tests {
fn test_write() {
// Test `Write::to_bytes()` and `Write::parse()` works as a pair.
let mut writes = vec![
Write::new(WriteType::Put, 0, None),
Write::new(WriteType::Delete, 0, Some(b"short_value".to_vec())),
Write::new(WriteType::Put, 0, Some(b"short_value".to_vec())),
Write::new(WriteType::Delete, 1 << 20, None),
Write::new_rollback(1 << 40, true),
Write::new(WriteType::Rollback, 1 << 41, None),
];
for (i, write) in writes.drain(..).enumerate() {
let v = write.to_bytes();
Expand All @@ -183,4 +212,16 @@ mod tests {
assert!(Write::parse(&v[..1]).is_err());
assert_eq!(Write::parse_type(&v).unwrap(), lock.write_type);
}

#[test]
fn test_is_protected() {
assert!(Write::new_rollback(1, true).is_protected());
assert!(!Write::new_rollback(2, false).is_protected());
assert!(!Write::new(
WriteType::Put,
3,
Some(PROTECTED_ROLLBACK_SHORT_VALUE.to_vec())
)
.is_protected());
}
}