Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pessimistic-txn: solve non-pessimistic-lock conflict #4801

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ad2c0b7
txn: replace is_pessimistic_lock to for_update_ts in Lock
youjiali1995 May 28, 2019
928015e
pessimistic-txn: overwrite optimistic lock in pessimistic_prewrite if
youjiali1995 May 29, 2019
9dc44b4
modify comment
youjiali1995 May 29, 2019
0c97e0a
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
5881075
address comment
youjiali1995 May 29, 2019
0a88057
Merge branch 'master' of github.com:tikv/tikv into solve-pessimistic-…
youjiali1995 May 29, 2019
8ffe875
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
e5895b6
address comment
youjiali1995 May 29, 2019
b7d935c
address comment
youjiali1995 May 29, 2019
5fef132
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 29, 2019
bf054bc
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
cc0490a
address comment
youjiali1995 May 29, 2019
9a242fc
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 29, 2019
cdd9a21
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
1666be9
add comment
youjiali1995 May 30, 2019
1d4b0ff
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 30, 2019
4d5e7fb
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 30, 2019
7d2f655
return Error let TiDB to resolve lock
youjiali1995 May 30, 2019
14f6be9
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 30, 2019
a3e9dd5
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 31, 2019
098241a
address comment
youjiali1995 May 31, 2019
ac1b250
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 31, 2019
f28cac1
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 Jun 4, 2019
b095cc6
address comment
youjiali1995 Jun 5, 2019
e3459f0
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 Jun 5, 2019
b7c1a74
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 Jun 6, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/server/debug.rs
Expand Up @@ -1725,7 +1725,7 @@ mod tests {
for &(prefix, tp, value, version) in &cf_lock_data {
let encoded_key = Key::from_raw(prefix);
let key = keys::data_key(encoded_key.as_encoded().as_slice());
let lock = Lock::new(tp, value.to_vec(), version, 0, None, false, 0);
let lock = Lock::new(tp, value.to_vec(), version, 0, None, 0, 0);
let value = lock.to_bytes();
engine
.put_cf(lock_cf, key.as_slice(), value.as_slice())
Expand Down Expand Up @@ -2083,7 +2083,7 @@ mod tests {
} else {
None
};
let lock = Lock::new(tp, vec![], ts, 0, v, false, 0);
let lock = Lock::new(tp, vec![], ts, 0, v, 0, 0);
kv.push((CF_LOCK, Key::from_raw(key), lock.to_bytes(), expect));
}
for (key, start_ts, commit_ts, tp, short_value, expect) in write {
Expand Down
3 changes: 2 additions & 1 deletion src/server/service/kv.rs
Expand Up @@ -1322,6 +1322,7 @@ fn future_prewrite<E: Engine>(
let mut options = Options::default();
options.lock_ttl = req.get_lock_ttl();
options.skip_constraint_check = req.get_skip_constraint_check();
options.for_update_ts = req.get_for_update_ts();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need for_update_ts in prewrite, seems we already have it by acquire_pessimistic_lock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, why the for_update_ts is the same for all key in the prewrite command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for_update_ts is used to resolve non-pessimistic-lock conflict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TiDB uses the last for_update_ts as prewrite's.

options.is_pessimistic_lock = req.take_is_pessimistic_lock();
options.txn_size = req.get_txn_size();

Expand Down Expand Up @@ -1364,14 +1365,14 @@ fn future_acquire_pessimistic_lock<E: Engine>(
let mut options = Options::default();
options.lock_ttl = req.get_lock_ttl();
options.is_first_lock = req.get_is_first_lock();
options.for_update_ts = req.get_for_update_ts();

let (cb, f) = paired_future_callback();
let res = storage.async_acquire_pessimistic_lock(
req.take_context(),
keys,
req.take_primary_lock(),
req.get_start_version(),
req.get_for_update_ts(),
options,
cb,
);
Expand Down
4 changes: 2 additions & 2 deletions src/storage/lock_manager/waiter_manager.rs
Expand Up @@ -483,7 +483,7 @@ mod tests {
let pr = ProcessResult::Res;
waiter_mgr_scheduler.wait_for(0, StorageCb::Boolean(cb), pr, Lock { ts: 0, hash: 0 }, true);
assert_eq!(
rx.recv_timeout(Duration::from_millis(1100))
rx.recv_timeout(Duration::from_millis(2000))
.unwrap()
.unwrap(),
()
Expand All @@ -503,7 +503,7 @@ mod tests {
);
waiter_mgr_scheduler.wake_up(0, vec![3, 1, 2], 1);
assert!(rx
.recv_timeout(Duration::from_millis(100))
.recv_timeout(Duration::from_millis(500))
.unwrap()
.is_err());
}
Expand Down
11 changes: 4 additions & 7 deletions src/storage/mod.rs
Expand Up @@ -49,7 +49,7 @@ pub type Callback<T> = Box<dyn FnOnce(Result<T>) + Send>;
// Short value max len must <= 255.
pub const SHORT_VALUE_MAX_LEN: usize = 64;
pub const SHORT_VALUE_PREFIX: u8 = b'v';
pub const PESSIMISTIC_TXN: u8 = b'p';
pub const FOR_UPDATE_TS_PREFIX: u8 = b'f';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not a good idea to change the prefix .. we will have two different kinds of prefix between versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the prefix is the same, different versions won't be compatible because the old version doesn't record for_update_ts after the prefix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok since this feature is in developing.

pub const TXN_SIZE_PREFIX: u8 = b't';

use engine::{CfName, ALL_CFS, CF_DEFAULT, CF_LOCK, CF_WRITE, DATA_CFS};
Expand Down Expand Up @@ -99,7 +99,6 @@ pub enum Command {
keys: Vec<(Key, bool)>,
primary: Vec<u8>,
start_ts: u64,
for_update_ts: u64,
options: Options,
},
Commit {
Expand Down Expand Up @@ -176,14 +175,12 @@ impl Display for Command {
ref ctx,
ref keys,
start_ts,
for_update_ts,
..
} => write!(
f,
"kv::command::acquirepessimisticlock keys({}) @ {},{} | {:?}",
"kv::command::acquirepessimisticlock keys({}) @ {} | {:?}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to display for_update_ts, it is useful for debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

keys.len(),
start_ts,
for_update_ts,
ctx
),
Command::Commit {
Expand Down Expand Up @@ -436,6 +433,7 @@ pub struct Options {
pub key_only: bool,
pub reverse_scan: bool,
pub is_first_lock: bool,
pub for_update_ts: u64,
pub is_pessimistic_lock: Vec<bool>,
// How many keys this transaction involved.
pub txn_size: u64,
Expand All @@ -449,6 +447,7 @@ impl Options {
key_only,
reverse_scan: false,
is_first_lock: false,
for_update_ts: 0,
is_pessimistic_lock: vec![],
txn_size: 0,
}
Expand Down Expand Up @@ -938,7 +937,6 @@ impl<E: Engine> Storage<E> {
keys: Vec<(Key, bool)>,
primary: Vec<u8>,
start_ts: u64,
for_update_ts: u64,
options: Options,
callback: Callback<Vec<Result<()>>>,
) -> Result<()> {
Expand All @@ -959,7 +957,6 @@ impl<E: Engine> Storage<E> {
keys,
primary,
start_ts,
for_update_ts,
options,
};
self.schedule(cmd, StorageCb::Booleans(callback))?;
Expand Down
52 changes: 37 additions & 15 deletions src/storage/mvcc/lock.rs
Expand Up @@ -3,7 +3,7 @@
use super::super::types::Value;
use super::{Error, Result};
use crate::storage::{
Mutation, PESSIMISTIC_TXN, SHORT_VALUE_MAX_LEN, SHORT_VALUE_PREFIX, TXN_SIZE_PREFIX,
Mutation, FOR_UPDATE_TS_PREFIX, SHORT_VALUE_MAX_LEN, SHORT_VALUE_PREFIX, TXN_SIZE_PREFIX,
};
use byteorder::ReadBytesExt;
use tikv_util::codec::bytes::{self, BytesEncoder};
Expand Down Expand Up @@ -58,7 +58,8 @@ pub struct Lock {
pub ts: u64,
pub ttl: u64,
pub short_value: Option<Value>,
pub is_pessimistic_txn: bool,
// If for_update_ts != 0, this lock belongs to a pessimistic transaction
pub for_update_ts: u64,
pub txn_size: u64,
}

Expand All @@ -69,7 +70,7 @@ impl Lock {
ts: u64,
ttl: u64,
short_value: Option<Value>,
is_pessimistic_txn: bool,
for_update_ts: u64,
txn_size: u64,
) -> Lock {
Lock {
Expand All @@ -78,7 +79,7 @@ impl Lock {
ts,
ttl,
short_value,
is_pessimistic_txn,
for_update_ts,
txn_size,
}
}
Expand All @@ -96,8 +97,9 @@ impl Lock {
b.push(v.len() as u8);
b.extend_from_slice(v);
}
if self.is_pessimistic_txn {
b.push(PESSIMISTIC_TXN);
if self.for_update_ts > 0 {
b.push(FOR_UPDATE_TS_PREFIX);
b.encode_u64(self.for_update_ts).unwrap();
}
if self.txn_size > 0 {
b.push(TXN_SIZE_PREFIX);
Expand All @@ -120,11 +122,11 @@ impl Lock {
};

if b.is_empty() {
return Ok(Lock::new(lock_type, primary, ts, ttl, None, false, 0));
return Ok(Lock::new(lock_type, primary, ts, ttl, None, 0, 0));
}

let mut short_value = None;
let mut is_pessimistic_txn: bool = false;
let mut for_update_ts = 0;
let mut txn_size: u64 = 0;
while !b.is_empty() {
match b.read_u8()? {
Expand All @@ -140,7 +142,7 @@ impl Lock {
short_value = Some(b[..len as usize].to_vec());
b = &b[len as usize..];
}
PESSIMISTIC_TXN => is_pessimistic_txn = true,
FOR_UPDATE_TS_PREFIX => for_update_ts = number::decode_u64(&mut b)?,
TXN_SIZE_PREFIX => txn_size = number::decode_u64(&mut b)?,
flag => panic!("invalid flag [{:?}] in lock", flag),
}
Expand All @@ -151,7 +153,7 @@ impl Lock {
ts,
ttl,
short_value,
is_pessimistic_txn,
for_update_ts,
txn_size,
))
}
Expand Down Expand Up @@ -208,26 +210,46 @@ mod tests {
fn test_lock() {
// Test `Lock::to_bytes()` and `Lock::parse()` works as a pair.
let mut locks = vec![
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, false, 0),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 0, 0),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
false,
0,
0,
),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, true, 0),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 10, 0),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
true,
10,
0,
),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 0, 16),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
0,
16,
),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 10, 16),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
10,
0,
),
];
for (i, lock) in locks.drain(..).enumerate() {
let v = lock.to_bytes();
Expand All @@ -244,7 +266,7 @@ mod tests {
1,
10,
Some(b"short_value".to_vec()),
false,
0,
0,
);
let v = lock.to_bytes();
Expand Down