Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix reverse scan check memory locks #11447

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,8 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
}

/// Scan keys in [`start_key`, `end_key`) up to `limit` keys from the snapshot.
///
/// If `end_key` is `None`, it means the upper bound is unbounded.
/// If `reverse_scan` is true, it scans [`end_key`, `start_key`) in descending order.
/// If `end_key` is `None`, it means the upper bound or the lower bound if reverse scan is unbounded.
///
/// Only writes committed before `start_ts` are visible.
pub fn scan(
Expand Down Expand Up @@ -829,6 +829,10 @@ impl<E: Engine, L: LockManager> Storage<E, L> {

// TODO: check_api_version

let (mut start_key, mut end_key) = (Some(start_key), end_key);
if reverse_scan {
std::mem::swap(&mut start_key, &mut end_key);
}
let command_duration = tikv_util::time::Instant::now_coarse();

let bypass_locks = TsSet::from_u64s(ctx.take_resolved_locks());
Expand All @@ -840,7 +844,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
if ctx.get_isolation_level() == IsolationLevel::Si {
let begin_instant = Instant::now();
concurrency_manager
.read_range_check(Some(&start_key), end_key.as_ref(), |key, lock| {
.read_range_check(start_key.as_ref(), end_key.as_ref(), |key, lock| {
Lock::check_ts_conflict(
Cow::Borrowed(lock),
key,
Expand Down Expand Up @@ -868,7 +872,9 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
};
if need_check_locks_in_replica_read(&ctx) {
let mut key_range = KeyRange::default();
key_range.set_start_key(start_key.as_encoded().to_vec());
if let Some(start_key) = &start_key {
key_range.set_start_key(start_key.as_encoded().to_vec());
}
if let Some(end_key) = &end_key {
key_range.set_end_key(end_key.as_encoded().to_vec());
}
Expand All @@ -890,14 +896,8 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
false,
);

let mut scanner;
if !reverse_scan {
scanner =
snap_store.scanner(false, key_only, false, Some(start_key), end_key)?;
} else {
scanner =
snap_store.scanner(true, key_only, false, end_key, Some(start_key))?;
};
let mut scanner =
snap_store.scanner(reverse_scan, key_only, false, start_key, end_key)?;
let res = scanner.scan(limit, sample_step);

let statistics = scanner.take_statistics();
Expand Down Expand Up @@ -6549,20 +6549,22 @@ mod tests {
);
assert_eq!(key_error.get_locked().get_key(), b"key");

// Test scan
let key_error = extract_key_error(
&block_on(storage.scan(
let scan = |start_key, end_key, reverse| {
block_on(storage.scan(
ctx.clone(),
Key::from_raw(b"a"),
None,
start_key,
end_key,
10,
0,
100.into(),
false,
false,
reverse,
))
.unwrap_err(),
);
.unwrap_err()
};
let key_error = extract_key_error(&scan(Key::from_raw(b"a"), None, false));
assert_eq!(key_error.get_locked().get_key(), b"key");
let key_error = extract_key_error(&scan(Key::from_raw(b"\xff"), None, true));
assert_eq!(key_error.get_locked().get_key(), b"key");

// Test batch_get_command
Expand Down