Skip to content

Commit

Permalink
Merge branch 'master' into upgrade_grpcio
Browse files Browse the repository at this point in the history
  • Loading branch information
LykxSassinator committed Dec 26, 2023
2 parents 1b6b75b + 9313afa commit 84721d2
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/region_cache_memory_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ collections = { workspace = true }
skiplist-rs = { git = "https://github.com/tikv/skiplist-rs.git", branch = "main" }
bytes = "1.0"
tikv_util = { workspace = true }
engine_rocks = { workspace = true }
112 changes: 101 additions & 11 deletions components/region_cache_memory_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{

use bytes::Bytes;
use collections::HashMap;
use engine_rocks::{raw::SliceTransform, util::FixedSuffixSliceTransform};
use engine_traits::{
CfNamesExt, DbVector, Error, IterOptions, Iterable, Iterator, Mutable, Peekable, ReadOptions,
RegionCacheEngine, Result, Snapshot, SnapshotMiscExt, WriteBatch, WriteBatchExt, WriteOptions,
Expand Down Expand Up @@ -216,8 +217,6 @@ enum Direction {
pub struct RegionCacheIterator {
cf: String,
valid: bool,
prefix_same_as_start: bool,
prefix: Option<Vec<u8>>,
iter: IterRef<Skiplist<InternalKeyComparator>, InternalKeyComparator>,
// The lower bound is inclusive while the upper bound is exclusive if set
// Note: bounds (region boundaries) have no mvcc versions
Expand All @@ -232,6 +231,11 @@ pub struct RegionCacheIterator {
// `iter`
saved_value: Option<Bytes>,

// Not None means we are performing prefix seek
// Note: prefix_seek doesn't support seek_to_first and seek_to_last.
prefix_extractor: Option<FixedSuffixSliceTransform>,
prefix: Option<Vec<u8>>,

direction: Direction,
}

Expand Down Expand Up @@ -260,9 +264,11 @@ impl RegionCacheIterator {
break;
}

if self.prefix_same_as_start {
// todo(SpadeA): support prefix seek
unimplemented!()
if let Some(ref prefix) = self.prefix {
if prefix != self.prefix_extractor.as_mut().unwrap().transform(user_key) {
// stop iterating due to unmatched prefix
break;
}
}

if self.is_visible(sequence) {
Expand Down Expand Up @@ -323,9 +329,11 @@ impl RegionCacheIterator {
break;
}

if self.prefix_same_as_start {
// todo(SpadeA): support prefix seek
unimplemented!()
if let Some(ref prefix) = self.prefix {
if prefix != self.prefix_extractor.as_mut().unwrap().transform(user_key) {
// stop iterating due to unmatched prefix
break;
}
}

if !self.find_value_for_current_key() {
Expand Down Expand Up @@ -429,6 +437,11 @@ impl Iterator for RegionCacheIterator {

fn seek(&mut self, key: &[u8]) -> Result<bool> {
self.direction = Direction::Forward;
if let Some(ref mut extractor) = self.prefix_extractor {
assert!(key.len() >= 8);
self.prefix = Some(extractor.transform(key).to_vec())
}

let seek_key = if key < self.lower_bound.as_slice() {
self.lower_bound.as_slice()
} else {
Expand All @@ -441,6 +454,11 @@ impl Iterator for RegionCacheIterator {

fn seek_for_prev(&mut self, key: &[u8]) -> Result<bool> {
self.direction = Direction::Backward;
if let Some(ref mut extractor) = self.prefix_extractor {
assert!(key.len() >= 8);
self.prefix = Some(extractor.transform(key).to_vec())
}

let seek_key = if key > self.upper_bound.as_slice() {
encode_seek_key(
self.upper_bound.as_slice(),
Expand All @@ -455,13 +473,15 @@ impl Iterator for RegionCacheIterator {
}

fn seek_to_first(&mut self) -> Result<bool> {
assert!(self.prefix_extractor.is_none());
self.direction = Direction::Forward;
let seek_key =
encode_seek_key(&self.lower_bound, self.sequence_number, VALUE_TYPE_FOR_SEEK);
self.seek_internal(&seek_key)
}

fn seek_to_last(&mut self) -> Result<bool> {
assert!(self.prefix_extractor.is_none());
self.direction = Direction::Backward;
let seek_key = encode_seek_key(&self.upper_bound, u64::MAX, VALUE_TYPE_FOR_SEEK_FOR_PREV);
self.seek_for_prev_internal(&seek_key)
Expand Down Expand Up @@ -596,16 +616,21 @@ impl Iterable for RegionCacheSnapshot {

fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result<Self::Iterator> {
let iter = self.region_memory_engine.data[cf_to_id(cf)].iter();
let prefix_same_as_start = opts.prefix_same_as_start();
let prefix_extractor = if opts.prefix_same_as_start() {
Some(FixedSuffixSliceTransform::new(8))
} else {
None
};

let (lower_bound, upper_bound) = opts.build_bounds();
// only support with lower/upper bound set
if lower_bound.is_none() || upper_bound.is_none() {
return Err(Error::BoundaryNotSet);
}

Ok(RegionCacheIterator {
cf: String::from(cf),
valid: false,
prefix_same_as_start,
prefix: None,
lower_bound: lower_bound.unwrap(),
upper_bound: upper_bound.unwrap(),
Expand All @@ -614,6 +639,7 @@ impl Iterable for RegionCacheSnapshot {
saved_user_key: vec![],
saved_value: None,
direction: Direction::Uninit,
prefix_extractor,
})
}
}
Expand Down Expand Up @@ -1351,7 +1377,6 @@ mod tests {
fn test_seq_visibility_backward() {
let engine = RegionCacheMemoryEngine::default();
engine.new_region(1);
let step: i32 = 2;

{
let mut core = engine.core.lock().unwrap();
Expand Down Expand Up @@ -1574,4 +1599,69 @@ mod tests {
}
}
}

#[test]
fn test_prefix_seek() {
let engine = RegionCacheMemoryEngine::default();
engine.new_region(1);

{
let mut core = engine.core.lock().unwrap();
core.region_metas.get_mut(&1).unwrap().can_read = true;
core.region_metas.get_mut(&1).unwrap().safe_ts = 5;
let sl = core.engine.get_mut(&1).unwrap().data[cf_to_id("write")].clone();

for i in 1..5 {
for mvcc in 10..20 {
let user_key = construct_key(i, mvcc);
let internal_key = encode_key(&user_key, 10, ValueType::Value);
let v = format!("v{:02}{:02}", i, mvcc);
sl.put(internal_key, v);
}
}
}

let mut iter_opt = IterOptions::default();
let lower_bound = construct_user_key(1);
let upper_bound = construct_user_key(5);
iter_opt.set_upper_bound(&upper_bound, 0);
iter_opt.set_lower_bound(&lower_bound, 0);
iter_opt.set_prefix_same_as_start(true);
let snapshot = engine.snapshot(1, u64::MAX, u64::MAX).unwrap();
let mut iter = snapshot.iterator_opt("write", iter_opt.clone()).unwrap();

// prefix seek, forward
for i in 1..5 {
let seek_key = construct_key(i, 100);
assert!(iter.seek(&seek_key).unwrap());
let mut start = 19;
while iter.valid().unwrap() {
let user_key = iter.key();
let mvcc = !u64::from_be_bytes(user_key[user_key.len() - 8..].try_into().unwrap());
assert_eq!(mvcc, start);
let v = format!("v{:02}{:02}", i, start);
assert_eq!(v.as_bytes(), iter.value());
start -= 1;
iter.next().unwrap();
}
assert_eq!(start, 9);
}

// prefix seek, backward
for i in 1..5 {
let seek_key = construct_key(i, 0);
assert!(iter.seek_for_prev(&seek_key).unwrap());
let mut start = 10;
while iter.valid().unwrap() {
let user_key = iter.key();
let mvcc = !u64::from_be_bytes(user_key[user_key.len() - 8..].try_into().unwrap());
assert_eq!(mvcc, start);
let v = format!("v{:02}{:02}", i, start);
assert_eq!(v.as_bytes(), iter.value());
start += 1;
iter.prev().unwrap();
}
assert_eq!(start, 20);
}
}
}

0 comments on commit 84721d2

Please sign in to comment.