Skip to content

Commit

Permalink
In-memory engine: support changing the direction of iteration (tikv#1…
Browse files Browse the repository at this point in the history
…7129)

ref tikv#16141, close tikv#17079

support reverse direction when iterating

Signed-off-by: SpadeA-Tang <u6748471@anu.edu.au>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and overvenus committed Jun 17, 2024
1 parent f32c559 commit fb87c7a
Showing 1 changed file with 299 additions and 5 deletions.
304 changes: 299 additions & 5 deletions components/region_cache_memory_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::{
RangeCacheMemoryEngine,
};

pub const MAX_SEQUENCE_NUMBER: u64 = (1 << 56) - 1;

#[derive(PartialEq)]
enum Direction {
Uninit,
Expand Down Expand Up @@ -431,6 +433,27 @@ impl RangeCacheIterator {
self.iter.prev(guard);
}
}

fn reverse_to_backward(&mut self, guard: &epoch::Guard) {
self.direction = Direction::Backward;
self.find_user_key_before_saved(guard);
}

fn reverse_to_forward(&mut self, guard: &epoch::Guard) {
if self.prefix_extractor.is_some() || !self.iter.valid() {
let seek_key = encode_seek_key(&self.saved_user_key, MAX_SEQUENCE_NUMBER);
self.iter.seek(&seek_key, guard);
}

self.direction = Direction::Forward;
while self.iter.valid() {
let InternalKey { user_key, .. } = decode_key(self.iter.key().as_slice());
if user_key >= self.saved_user_key.as_slice() {
return;
}
self.iter.next(guard);
}
}
}

impl Iterator for RangeCacheIterator {
Expand All @@ -441,17 +464,21 @@ impl Iterator for RangeCacheIterator {

fn value(&self) -> &[u8] {
assert!(self.valid);
if let Some(saved_value) = self.saved_value.as_ref() {
saved_value.as_slice()
if self.direction == Direction::Backward {
self.saved_value.as_ref().unwrap().as_slice()
} else {
self.iter.value().as_slice()
}
}

fn next(&mut self) -> Result<bool> {
assert!(self.valid);
assert!(self.direction == Direction::Forward);
let guard = &epoch::pin();

if self.direction == Direction::Backward {
self.reverse_to_forward(guard);
}

self.iter.next(guard);

perf_counter_add!(internal_key_skipped_count, 1);
Expand All @@ -473,8 +500,12 @@ impl Iterator for RangeCacheIterator {

fn prev(&mut self) -> Result<bool> {
assert!(self.valid);
assert!(self.direction == Direction::Backward);
let guard = &epoch::pin();

if self.direction == Direction::Forward {
self.reverse_to_backward(guard);
}

self.prev_internal(guard);

self.local_stats.number_db_prev += 1;
Expand Down Expand Up @@ -630,7 +661,7 @@ mod tests {
use tempfile::Builder;
use tikv_util::config::VersionTrack;

use super::RangeCacheIterator;
use super::{RangeCacheIterator, RangeCacheSnapshot};
use crate::{
engine::{cf_to_id, SkiplistEngine},
keys::{
Expand All @@ -640,6 +671,7 @@ mod tests {
perf_context::PERF_CONTEXT,
statistics::Tickers,
RangeCacheEngineConfig, RangeCacheEngineContext, RangeCacheMemoryEngine,
RangeCacheWriteBatch,
};

#[test]
Expand Down Expand Up @@ -2014,4 +2046,266 @@ mod tests {
assert_eq!(3, statistics.get_ticker_count(Tickers::NumberDbPrev));
assert_eq!(3, statistics.get_ticker_count(Tickers::NumberDbPrevFound));
}

fn set_up_for_iteator<F>(
wb_sequence: u64,
snap_sequence: u64,
put_entries: F,
) -> (
RangeCacheMemoryEngine,
RangeCacheSnapshot,
RangeCacheIterator,
)
where
F: FnOnce(&mut RangeCacheWriteBatch),
{
let engine = RangeCacheMemoryEngine::new(RangeCacheEngineContext::new(Arc::new(
VersionTrack::new(RangeCacheEngineConfig::config_for_test()),
)));
let range = CacheRange::new(b"".to_vec(), b"z".to_vec());
engine.new_range(range.clone());

let mut wb = engine.write_batch();
wb.prepare_for_range(range.clone());
put_entries(&mut wb);
wb.set_sequence_number(wb_sequence).unwrap();
wb.write().unwrap();

let snap = engine.snapshot(range.clone(), 100, snap_sequence).unwrap();
let mut iter_opt = IterOptions::default();
iter_opt.set_upper_bound(&range.end, 0);
iter_opt.set_lower_bound(&range.start, 0);

let iter = snap.iterator_opt("default", iter_opt).unwrap();
(engine, snap, iter)
}

// copied from RocksDB TEST_F(DBIteratorTest, DBIterator10)
#[test]
fn test_iterator() {
let (.., mut iter) = set_up_for_iteator(100, 200, |wb| {
wb.put(b"a", b"1").unwrap();
wb.put(b"b", b"2").unwrap();
wb.put(b"c", b"3").unwrap();
wb.put(b"d", b"4").unwrap();
});

iter.seek(b"c").unwrap();
assert!(iter.valid().unwrap());
iter.prev().unwrap();
assert!(iter.valid().unwrap());
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"2");

iter.next().unwrap();
assert!(iter.valid().unwrap());
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"3");

iter.seek_for_prev(b"c").unwrap();
assert!(iter.valid().unwrap());
iter.next().unwrap();
assert!(iter.valid().unwrap());
assert_eq!(iter.key(), b"d");
assert_eq!(iter.value(), b"4");

iter.prev().unwrap();
assert!(iter.valid().unwrap());
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"3");
}

// copied from RocksDB TEST_P(DBIteratorTest, IterNextWithNewerSeq) and
// TEST_P(DBIteratorTest, IterPrevWithNewerSeq)
#[test]
fn test_next_with_newer_seq() {
let (engine, _, mut iter) = set_up_for_iteator(100, 110, |wb| {
wb.put(b"0", b"0").unwrap();
wb.put(b"a", b"b").unwrap();
wb.put(b"c", b"d").unwrap();
wb.put(b"d", b"e").unwrap();
});

let mut wb = engine.write_batch();
wb.prepare_for_range(CacheRange::new(b"".to_vec(), b"z".to_vec()));
wb.put(b"b", b"f").unwrap();
wb.set_sequence_number(200).unwrap();

iter.seek(b"a").unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"b");

iter.next().unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"d");

iter.seek_for_prev(b"b").unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"b");

iter.next().unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"d");

iter.seek(b"d").unwrap();
assert_eq!(iter.key(), b"d");
assert_eq!(iter.value(), b"e");

iter.prev().unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"d");

iter.prev().unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"b");

iter.prev().unwrap();
iter.seek_for_prev(b"d").unwrap();
assert_eq!(iter.key(), b"d");
assert_eq!(iter.value(), b"e");

iter.prev().unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"d");

iter.prev().unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"b");
}

#[test]
fn test_reverse_direction() {
let (engine, ..) = set_up_for_iteator(100, 100, |wb| {
wb.put(b"a", b"val_a1").unwrap(); // seq 100
wb.put(b"b", b"val_b1").unwrap(); // seq 101
wb.put(b"c", b"val_c1").unwrap(); // seq 102

wb.put(b"a", b"val_a2").unwrap(); // seq 103
wb.put(b"b", b"val_b2").unwrap(); // seq 104

wb.put(b"c", b"val_c2").unwrap(); // seq 105
wb.put(b"a", b"val_a3").unwrap(); // seq 106
wb.put(b"b", b"val_b3").unwrap(); // seq 107
wb.put(b"c", b"val_c3").unwrap(); // seq 108
});

// For sequence number 102
let range = CacheRange::new(b"".to_vec(), b"z".to_vec());
let snap = engine.snapshot(range.clone(), 100, 102).unwrap();
let mut iter_opt = IterOptions::default();
iter_opt.set_upper_bound(&range.end, 0);
iter_opt.set_lower_bound(&range.start, 0);

let mut iter = snap.iterator_opt("default", iter_opt.clone()).unwrap();
iter.seek(b"c").unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"val_c1");

iter.prev().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b1");

iter.seek(b"b").unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b1");

iter.prev().unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"val_a1");

iter.next().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b1");

iter.seek_for_prev(b"a").unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"val_a1");

iter.next().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b1");

iter.next().unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"val_c1");

iter.next().unwrap();
assert!(!iter.valid().unwrap());

// For sequence number 104
let snap = engine.snapshot(range.clone(), 100, 104).unwrap();
let mut iter = snap.iterator_opt("default", iter_opt.clone()).unwrap();
iter.seek(b"c").unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"val_c1");

iter.prev().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b2");

iter.seek(b"b").unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b2");

iter.prev().unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"val_a2");

iter.next().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b2");

iter.seek_for_prev(b"a").unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"val_a2");

iter.next().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b2");

iter.next().unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"val_c1");

iter.next().unwrap();
assert!(!iter.valid().unwrap());

// For sequence number 108
let snap = engine.snapshot(range.clone(), 100, 108).unwrap();
let mut iter = snap.iterator_opt("default", iter_opt.clone()).unwrap();
iter.seek(b"c").unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"val_c3");

iter.prev().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b3");

iter.seek(b"b").unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b3");

iter.prev().unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"val_a3");

iter.next().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b3");

iter.seek_for_prev(b"a").unwrap();
assert_eq!(iter.key(), b"a");
assert_eq!(iter.value(), b"val_a3");

iter.next().unwrap();
assert_eq!(iter.key(), b"b");
assert_eq!(iter.value(), b"val_b3");

iter.next().unwrap();
assert_eq!(iter.key(), b"c");
assert_eq!(iter.value(), b"val_c3");

iter.next().unwrap();
assert!(!iter.valid().unwrap());
}
}

0 comments on commit fb87c7a

Please sign in to comment.