Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pessimistic-txn: solve non-pessimistic-lock conflict #4801

Merged
Merged
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ad2c0b7
txn: replace is_pessimistic_lock to for_update_ts in Lock
youjiali1995 May 28, 2019
928015e
pessimistic-txn: overwrite optimistic lock in pessimistic_prewrite if
youjiali1995 May 29, 2019
9dc44b4
modify comment
youjiali1995 May 29, 2019
0c97e0a
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
5881075
address comment
youjiali1995 May 29, 2019
0a88057
Merge branch 'master' of github.com:tikv/tikv into solve-pessimistic-…
youjiali1995 May 29, 2019
8ffe875
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
e5895b6
address comment
youjiali1995 May 29, 2019
b7d935c
address comment
youjiali1995 May 29, 2019
5fef132
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 29, 2019
bf054bc
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
cc0490a
address comment
youjiali1995 May 29, 2019
9a242fc
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 29, 2019
cdd9a21
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 29, 2019
1666be9
add comment
youjiali1995 May 30, 2019
1d4b0ff
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 30, 2019
4d5e7fb
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 30, 2019
7d2f655
return Error let TiDB to resolve lock
youjiali1995 May 30, 2019
14f6be9
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 30, 2019
a3e9dd5
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 May 31, 2019
098241a
address comment
youjiali1995 May 31, 2019
ac1b250
Merge branch 'solve-pessimistic-prewrite-conflict' of github.com:youj…
youjiali1995 May 31, 2019
f28cac1
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 Jun 4, 2019
b095cc6
address comment
youjiali1995 Jun 5, 2019
e3459f0
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 Jun 5, 2019
b7c1a74
Merge branch 'master' into solve-pessimistic-prewrite-conflict
youjiali1995 Jun 6, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Some generated files are not rendered by default. Learn more.

@@ -1725,7 +1725,7 @@ mod tests {
for &(prefix, tp, value, version) in &cf_lock_data {
let encoded_key = Key::from_raw(prefix);
let key = keys::data_key(encoded_key.as_encoded().as_slice());
let lock = Lock::new(tp, value.to_vec(), version, 0, None, false, 0);
let lock = Lock::new(tp, value.to_vec(), version, 0, None, 0, 0);
let value = lock.to_bytes();
engine
.put_cf(lock_cf, key.as_slice(), value.as_slice())
@@ -2083,7 +2083,7 @@ mod tests {
} else {
None
};
let lock = Lock::new(tp, vec![], ts, 0, v, false, 0);
let lock = Lock::new(tp, vec![], ts, 0, v, 0, 0);
kv.push((CF_LOCK, Key::from_raw(key), lock.to_bytes(), expect));
}
for (key, start_ts, commit_ts, tp, short_value, expect) in write {
@@ -1322,6 +1322,7 @@ fn future_prewrite<E: Engine>(
let mut options = Options::default();
options.lock_ttl = req.get_lock_ttl();
options.skip_constraint_check = req.get_skip_constraint_check();
options.for_update_ts = req.get_for_update_ts();

This comment has been minimized.

Copy link
@AndreMouche

AndreMouche May 30, 2019

Member

Why we need for_update_ts in prewrite, seems we already have it by acquire_pessimistic_lock?

This comment has been minimized.

Copy link
@AndreMouche

AndreMouche May 30, 2019

Member

By the way, why the for_update_ts is the same for all key in the prewrite command?

This comment has been minimized.

Copy link
@youjiali1995

youjiali1995 May 30, 2019

Author Contributor

for_update_ts is used to resolve non-pessimistic-lock conflict.

This comment has been minimized.

Copy link
@youjiali1995

youjiali1995 May 30, 2019

Author Contributor

TiDB uses the last for_update_ts as prewrite's.

options.is_pessimistic_lock = req.take_is_pessimistic_lock();
options.txn_size = req.get_txn_size();

@@ -1364,14 +1365,14 @@ fn future_acquire_pessimistic_lock<E: Engine>(
let mut options = Options::default();
options.lock_ttl = req.get_lock_ttl();
options.is_first_lock = req.get_is_first_lock();
options.for_update_ts = req.get_for_update_ts();

let (cb, f) = paired_future_callback();
let res = storage.async_acquire_pessimistic_lock(
req.take_context(),
keys,
req.take_primary_lock(),
req.get_start_version(),
req.get_for_update_ts(),
options,
cb,
);
@@ -483,7 +483,7 @@ mod tests {
let pr = ProcessResult::Res;
waiter_mgr_scheduler.wait_for(0, StorageCb::Boolean(cb), pr, Lock { ts: 0, hash: 0 }, true);
assert_eq!(
rx.recv_timeout(Duration::from_millis(1100))
rx.recv_timeout(Duration::from_millis(2000))
.unwrap()
.unwrap(),
()
@@ -503,7 +503,7 @@ mod tests {
);
waiter_mgr_scheduler.wake_up(0, vec![3, 1, 2], 1);
assert!(rx
.recv_timeout(Duration::from_millis(100))
.recv_timeout(Duration::from_millis(500))
.unwrap()
.is_err());
}
@@ -49,7 +49,7 @@ pub type Callback<T> = Box<dyn FnOnce(Result<T>) + Send>;
// Short value max len must <= 255.
pub const SHORT_VALUE_MAX_LEN: usize = 64;
pub const SHORT_VALUE_PREFIX: u8 = b'v';
pub const PESSIMISTIC_TXN: u8 = b'p';
pub const FOR_UPDATE_TS_PREFIX: u8 = b'f';

This comment has been minimized.

Copy link
@AndreMouche

AndreMouche May 30, 2019

Member

I think it's not a good idea to change the prefix .. we will have two different kinds of prefix between versions.

This comment has been minimized.

Copy link
@youjiali1995

youjiali1995 May 30, 2019

Author Contributor

Even if the prefix is the same, different versions won't be compatible because the old version doesn't record for_update_ts after the prefix.

This comment has been minimized.

Copy link
@zhangjinpeng1987

zhangjinpeng1987 Jun 5, 2019

Member

I think it is ok since this feature is in developing.

pub const TXN_SIZE_PREFIX: u8 = b't';

use engine::{CfName, ALL_CFS, CF_DEFAULT, CF_LOCK, CF_WRITE, DATA_CFS};
@@ -99,7 +99,6 @@ pub enum Command {
keys: Vec<(Key, bool)>,
primary: Vec<u8>,
start_ts: u64,
for_update_ts: u64,
options: Options,
},
Commit {
@@ -176,14 +175,12 @@ impl Display for Command {
ref ctx,
ref keys,
start_ts,
for_update_ts,
..
} => write!(
f,
"kv::command::acquirepessimisticlock keys({}) @ {},{} | {:?}",
"kv::command::acquirepessimisticlock keys({}) @ {} | {:?}",

This comment has been minimized.

Copy link
@zhangjinpeng1987

zhangjinpeng1987 Jun 5, 2019

Member

Better to display for_update_ts, it is useful for debugging.

This comment has been minimized.

Copy link
@youjiali1995

youjiali1995 Jun 5, 2019

Author Contributor

Done

keys.len(),
start_ts,
for_update_ts,
ctx
),
Command::Commit {
@@ -436,6 +433,7 @@ pub struct Options {
pub key_only: bool,
pub reverse_scan: bool,
pub is_first_lock: bool,
pub for_update_ts: u64,
pub is_pessimistic_lock: Vec<bool>,
// How many keys this transaction involved.
pub txn_size: u64,
@@ -449,6 +447,7 @@ impl Options {
key_only,
reverse_scan: false,
is_first_lock: false,
for_update_ts: 0,
is_pessimistic_lock: vec![],
txn_size: 0,
}
@@ -938,7 +937,6 @@ impl<E: Engine> Storage<E> {
keys: Vec<(Key, bool)>,
primary: Vec<u8>,
start_ts: u64,
for_update_ts: u64,
options: Options,
callback: Callback<Vec<Result<()>>>,
) -> Result<()> {
@@ -959,7 +957,6 @@ impl<E: Engine> Storage<E> {
keys,
primary,
start_ts,
for_update_ts,
options,
};
self.schedule(cmd, StorageCb::Booleans(callback))?;
@@ -3,7 +3,7 @@
use super::super::types::Value;
use super::{Error, Result};
use crate::storage::{
Mutation, PESSIMISTIC_TXN, SHORT_VALUE_MAX_LEN, SHORT_VALUE_PREFIX, TXN_SIZE_PREFIX,
Mutation, FOR_UPDATE_TS_PREFIX, SHORT_VALUE_MAX_LEN, SHORT_VALUE_PREFIX, TXN_SIZE_PREFIX,
};
use byteorder::ReadBytesExt;
use tikv_util::codec::bytes::{self, BytesEncoder};
@@ -58,7 +58,8 @@ pub struct Lock {
pub ts: u64,
pub ttl: u64,
pub short_value: Option<Value>,
pub is_pessimistic_txn: bool,
// If for_update_ts != 0, this lock belongs to a pessimistic transaction
pub for_update_ts: u64,
pub txn_size: u64,
}

@@ -69,7 +70,7 @@ impl Lock {
ts: u64,
ttl: u64,
short_value: Option<Value>,
is_pessimistic_txn: bool,
for_update_ts: u64,
txn_size: u64,
) -> Lock {
Lock {
@@ -78,7 +79,7 @@ impl Lock {
ts,
ttl,
short_value,
is_pessimistic_txn,
for_update_ts,
txn_size,
}
}
@@ -96,8 +97,9 @@ impl Lock {
b.push(v.len() as u8);
b.extend_from_slice(v);
}
if self.is_pessimistic_txn {
b.push(PESSIMISTIC_TXN);
if self.for_update_ts > 0 {
b.push(FOR_UPDATE_TS_PREFIX);
b.encode_u64(self.for_update_ts).unwrap();
}
if self.txn_size > 0 {
b.push(TXN_SIZE_PREFIX);
@@ -120,11 +122,11 @@ impl Lock {
};

if b.is_empty() {
return Ok(Lock::new(lock_type, primary, ts, ttl, None, false, 0));
return Ok(Lock::new(lock_type, primary, ts, ttl, None, 0, 0));
}

let mut short_value = None;
let mut is_pessimistic_txn: bool = false;
let mut for_update_ts = 0;
let mut txn_size: u64 = 0;
while !b.is_empty() {
match b.read_u8()? {
@@ -140,7 +142,7 @@ impl Lock {
short_value = Some(b[..len as usize].to_vec());
b = &b[len as usize..];
}
PESSIMISTIC_TXN => is_pessimistic_txn = true,
FOR_UPDATE_TS_PREFIX => for_update_ts = number::decode_u64(&mut b)?,
TXN_SIZE_PREFIX => txn_size = number::decode_u64(&mut b)?,
flag => panic!("invalid flag [{:?}] in lock", flag),
}
@@ -151,7 +153,7 @@ impl Lock {
ts,
ttl,
short_value,
is_pessimistic_txn,
for_update_ts,
txn_size,
))
}
@@ -208,26 +210,46 @@ mod tests {
fn test_lock() {
// Test `Lock::to_bytes()` and `Lock::parse()` works as a pair.
let mut locks = vec![
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, false, 0),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 0, 0),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
false,
0,
0,
),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, true, 0),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 10, 0),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
true,
10,
0,
),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 0, 16),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
0,
16,
),
Lock::new(LockType::Put, b"pk".to_vec(), 1, 10, None, 10, 16),
Lock::new(
LockType::Delete,
b"pk".to_vec(),
1,
10,
Some(b"short_value".to_vec()),
10,
0,
),
];
for (i, lock) in locks.drain(..).enumerate() {
let v = lock.to_bytes();
@@ -244,7 +266,7 @@ mod tests {
1,
10,
Some(b"short_value".to_vec()),
false,
0,
0,
);
let v = lock.to_bytes();
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.