Skip to content

Commit

Permalink
Fix PointGetter performance issue when there are concurrent write (ti…
Browse files Browse the repository at this point in the history
…kv#5469)

Signed-off-by: Breezewish <breezewish@pingcap.com>
  • Loading branch information
breezewish authored and sre-bot committed Sep 17, 2019
1 parent 14e4d6a commit 8524c80
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 151 deletions.
165 changes: 14 additions & 151 deletions src/storage/mvcc/reader/point_getter.rs
Expand Up @@ -82,7 +82,6 @@ impl<S: Snapshot> PointGetterBuilder<S> {
Ok(PointGetter {
snapshot: self.snapshot,
multi: self.multi,
fill_cache: self.fill_cache,
omit_value: self.omit_value,
isolation_level: self.isolation_level,
ts: self.ts,
Expand All @@ -91,9 +90,6 @@ impl<S: Snapshot> PointGetterBuilder<S> {

write_cursor,
write_cursor_valid: true,
lock_cursor: None,
lock_cursor_valid: false,
default_cursor: None,

drained: false,
})
Expand All @@ -107,7 +103,6 @@ impl<S: Snapshot> PointGetterBuilder<S> {
pub struct PointGetter<S: Snapshot> {
snapshot: S,
multi: bool,
fill_cache: bool,
omit_value: bool,
isolation_level: IsolationLevel,
ts: u64,
Expand All @@ -116,10 +111,6 @@ pub struct PointGetter<S: Snapshot> {

write_cursor: Cursor<S::Iter>,
write_cursor_valid: bool,
/// Lock cursor and default cursor will be built only when necessary.
lock_cursor: Option<Cursor<S::Iter>>,
lock_cursor_valid: bool,
default_cursor: Option<Cursor<S::Iter>>,

/// Indicating whether or not this structure can serve more requests. It is meaningful only
/// when `multi == false`, to protect from producing undefined values when trying to get
Expand All @@ -131,7 +122,7 @@ impl<S: Snapshot> PointGetter<S> {
/// Take out and reset the statistics collected so far.
#[inline]
pub fn take_statistics(&mut self) -> Statistics {
::std::mem::replace(&mut self.statistics, Statistics::default())
std::mem::replace(&mut self.statistics, Statistics::default())
}

/// Get the value of a user key.
Expand Down Expand Up @@ -169,21 +160,11 @@ impl<S: Snapshot> PointGetter<S> {
/// Get a lock of a user key in the lock CF. If lock exists, it will be checked to
/// see whether it conflicts with the given `ts`. If there is no conflict or no lock,
/// the safe `ts` will be returned.
#[inline]
///
/// In common cases we expect to get nothing in lock cf. Using a `get_cf` instead of `seek`
/// is fast in such cases due to no need for RocksDB to continue move and skip deleted entries
/// until find a user key.
fn load_and_check_lock(&mut self, user_key: &Key, ts: u64) -> Result<CheckLockResult> {
if self.multi {
self.load_and_check_lock_multi_get(user_key, ts)
} else {
self.load_and_check_lock_single_get(user_key, ts)
}
}

/// If only one `get()` will be called, we can use `snapshot.get_cf()` to get lock directly.
fn load_and_check_lock_single_get(
&mut self,
user_key: &Key,
ts: u64,
) -> Result<CheckLockResult> {
self.statistics.lock.get += 1;
let lock_value = self.snapshot.get_cf(CF_LOCK, user_key)?;

Expand All @@ -196,63 +177,6 @@ impl<S: Snapshot> PointGetter<S> {
}
}

/// If multiple `get()` will be called, we need to use cursor to read the lock.
fn load_and_check_lock_multi_get(
&mut self,
user_key: &Key,
ts: u64,
) -> Result<CheckLockResult> {
self.ensure_lock_cursor()?;
let lock_cursor = self.lock_cursor.as_mut().unwrap();
if !self.lock_cursor_valid {
return Ok(CheckLockResult::NotLocked);
}
if !lock_cursor.near_seek(user_key, &mut self.statistics.lock)? {
// If we seek and get nothing, `lock_cursor` becomes invalid. So next time calling
// `near_seek` will result in cursor jump back if the given key is smaller than the
// current key. To keep cursor move in forward direction constantly, let's mark this
// state. Additionally this protects us from https://github.com/tikv/tikv/issues/3378.
self.lock_cursor_valid = false;
return Ok(CheckLockResult::NotLocked);
}
if lock_cursor.key(&mut self.statistics.lock) == user_key.as_encoded().as_slice() {
self.statistics.lock.processed += 1;
let lock_value = lock_cursor.value(&mut self.statistics.lock);
let lock = Lock::parse(lock_value)?;
super::util::check_lock(user_key, ts, &lock)
} else {
Ok(CheckLockResult::NotLocked)
}
}

/// Creates the lock cursor if not created. This function will only be called when
/// `multi == true`.
fn ensure_lock_cursor(&mut self) -> Result<()> {
if self.lock_cursor.is_some() {
return Ok(());
}
// Keys will be given in non-descending order, so forward mode cursor is fine.
let cursor = CursorBuilder::new(&self.snapshot, CF_LOCK)
.fill_cache(self.fill_cache)
.build()?;
self.lock_cursor = Some(cursor);
self.lock_cursor_valid = true;
Ok(())
}

/// Creates the default cursor if not created. This function will only be called when
/// `multi == true`.
fn ensure_default_cursor(&mut self) -> Result<()> {
if self.default_cursor.is_some() {
return Ok(());
}
let cursor = CursorBuilder::new(&self.snapshot, CF_DEFAULT)
.fill_cache(self.fill_cache)
.build()?;
self.default_cursor = Some(cursor);
Ok(())
}

/// Load the value.
///
/// First, a correct version info in the Write CF will be sought. Then, value will be loaded
Expand All @@ -262,7 +186,7 @@ impl<S: Snapshot> PointGetter<S> {
return Ok(None);
}

// Seek to `${user_key}_${ts}`.
// Seek to `${user_key}_${ts}`. TODO: We can avoid this clone.
if !self
.write_cursor
.near_seek(&user_key.clone().append_ts(ts), &mut self.statistics.write)?
Expand Down Expand Up @@ -321,24 +245,19 @@ impl<S: Snapshot> PointGetter<S> {
// Value is carried in `write`.
Ok(value)
}
None => {
if self.multi {
self.load_data_from_default_cf_multi_get(write, user_key)
} else {
self.load_data_from_default_cf_single_get(write, user_key)
}
}
None => self.load_data_from_default_cf(write, user_key),
}
}

/// Load the value from default CF. Use `snapshot.get_cf()` directly.
fn load_data_from_default_cf_single_get(
&mut self,
write: Write,
user_key: &Key,
) -> Result<Value> {
/// Load the value from default CF.
///
/// We assume that mostly the keys given to batch get keys are not very close to each other.
/// `near_seek` will likely fall back to `seek` in such scenario, which takes 2x time
/// compared to `get_cf`. Thus we use `get_cf` directly here.
fn load_data_from_default_cf(&mut self, write: Write, user_key: &Key) -> Result<Value> {
// TODO: Not necessary to receive a `Write`.
self.statistics.data.get += 1;
// TODO: We can avoid this clone.
let value = self
.snapshot
.get_cf(CF_DEFAULT, &user_key.clone().append_ts(write.start_ts))?;
Expand All @@ -354,22 +273,6 @@ impl<S: Snapshot> PointGetter<S> {
))
}
}

/// Load the value from default CF. Use cursor to read value.
fn load_data_from_default_cf_multi_get(
&mut self,
write: Write,
user_key: &Key,
) -> Result<Value> {
self.ensure_default_cursor()?;
let value = super::util::near_load_data_by_write(
&mut self.default_cursor.as_mut().unwrap(),
user_key,
write,
&mut self.statistics,
)?;
Ok(value)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -548,56 +451,40 @@ mod tests {
// Get a deleted key
must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 1, 0, 0);
assert_seek_next_prev(&s.write, 1, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);
// Get again
must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

// Get a key that exists
must_get_value(&mut getter, b"foo2", b"foo2v");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
// We have to check every version so there is 42 next and 0 seek
assert_seek_next_prev(&s.write, 0, 42, 0);
assert_seek_next_prev(&s.data, 1, 0, 0);
// Get again
must_get_value(&mut getter, b"foo2", b"foo2v");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

// Get a smaller key
must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

// Get a key that does not exist
must_get_none(&mut getter, b"z");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 2, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

// Get a key that exists
must_get_value(&mut getter, b"zz", b"zzv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 0, 0);
assert_seek_next_prev(&s.data, 0, 1, 0);
// Get again
must_get_value(&mut getter, b"zz", b"zzv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);
}

/// Some ts larger than get ts
Expand All @@ -609,39 +496,27 @@ mod tests {

must_get_value(&mut getter, b"bar", b"barv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 1, 0, 0);
assert_seek_next_prev(&s.write, 1, 0, 0);
assert_seek_next_prev(&s.data, 1, 0, 0);

must_get_value(&mut getter, b"bar", b"barv");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

must_get_none(&mut getter, b"bo");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 1, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

must_get_none(&mut getter, b"box");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 1, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

must_get_value(&mut getter, b"foo1", b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 1, 0);
assert_seek_next_prev(&s.data, 0, 2, 0);

must_get_none(&mut getter, b"zz");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 1, SEEK_BOUND as usize, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);
}

/// All ts larger than get ts
Expand All @@ -653,23 +528,17 @@ mod tests {

must_get_none(&mut getter, b"foo1");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 1, 0, 0);
assert_seek_next_prev(&s.write, 1, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

must_get_none(&mut getter, b"non_exist");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 1, SEEK_BOUND as usize, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

// Cursor never move back.
must_get_none(&mut getter, b"foo1");
must_get_none(&mut getter, b"foo0");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 0, 0, 0);
assert_seek_next_prev(&s.write, 0, 0, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);
}

/// There are some locks in the Lock CF.
Expand All @@ -683,9 +552,7 @@ mod tests {
must_get_none(&mut getter, b"foo1");
must_get_none(&mut getter, b"foo2");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 1, 1, 0);
assert_seek_next_prev(&s.write, 1, 2, 0);
assert_seek_next_prev(&s.data, 0, 0, 0);

let mut getter = new_multi_point_getter(&engine, 3);
must_get_none(&mut getter, b"a");
Expand All @@ -696,9 +563,7 @@ mod tests {
must_get_none(&mut getter, b"foo2");
must_get_none(&mut getter, b"foo2");
let s = getter.take_statistics();
assert_seek_next_prev(&s.lock, 1, 1, 0);
assert_seek_next_prev(&s.write, 1, 2, 0);
assert_seek_next_prev(&s.data, 1, 0, 0);

let mut getter = new_multi_point_getter(&engine, 4);
must_get_none(&mut getter, b"a");
Expand All @@ -707,9 +572,7 @@ mod tests {
must_get_value(&mut getter, b"foo1", b"foo1v");
must_get_err(&mut getter, b"foo2");
must_get_none(&mut getter, b"zz");
assert_seek_next_prev(&s.lock, 1, 1, 0);
assert_seek_next_prev(&s.write, 1, 2, 0);
assert_seek_next_prev(&s.data, 1, 0, 0);
}

/// Single Point Getter can only get once.
Expand Down
4 changes: 4 additions & 0 deletions src/storage/txn/store.rs
Expand Up @@ -166,6 +166,10 @@ impl<S: Snapshot> Store for SnapshotStore<S> {
use std::mem::{self, MaybeUninit};
type Element = Result<Option<Value>>;

if keys.len() == 1 {
return Ok(vec![self.get(&keys[0], statistics)]);
}

let mut order_and_keys: Vec<_> = keys.iter().enumerate().collect();
order_and_keys.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));

Expand Down

0 comments on commit 8524c80

Please sign in to comment.