Skip to content

Commit

Permalink
In-Memory Engine: WriteBatch with Skiplist Engine (#16433)
Browse files Browse the repository at this point in the history
ref #16323

Update WriteBatch to assume a single skiplist and use RangeManager::contains.
Implement and test `get_value_cf_opt` for `HybridEngineSnapshot`.
Integrate single WriteBatch with HybridEngine.

Signed-off-by: Alex Feinberg <alex@strlen.net>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
afeinberg and ti-chi-bot[bot] committed Jan 31, 2024
1 parent b67dd09 commit 87d9a97
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 133 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/hybrid_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ txn_types = { workspace = true }
tikv_util = { workspace = true }
engine_rocks = { workspace = true }
region_cache_memory_engine = { workspace = true }
tempfile = "3.0"

[dev-dependencies]
tempfile = "3.0"
41 changes: 30 additions & 11 deletions components/hybrid_engine/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{
KvEngine, Peekable, RangeCacheEngine, ReadOptions, Result, SnapshotContext, SnapshotMiscExt,
SyncMutable, WriteBatchExt,
KvEngine, Mutable, Peekable, RangeCacheEngine, ReadOptions, Result, SnapshotContext,
SnapshotMiscExt, SyncMutable, WriteBatch, WriteBatchExt,
};

use crate::snapshot::HybridEngineSnapshot;
Expand Down Expand Up @@ -122,29 +122,48 @@ impl<EK, EC> SyncMutable for HybridEngine<EK, EC>
where
EK: KvEngine,
EC: RangeCacheEngine,
HybridEngine<EK, EC>: WriteBatchExt,
{
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.put(key, value)?;
let _ = batch.write()?;
Ok(())
}

fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.put_cf(cf, key, value)?;
let _ = batch.write()?;
Ok(())
}

fn delete(&self, key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete(key)?;
let _ = batch.write()?;
Ok(())
}

fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete_cf(cf, key)?;
let _ = batch.write()?;
Ok(())
}

fn delete_range(&self, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete_range(begin_key, end_key)?;
let _ = batch.write()?;
Ok(())
}

fn delete_range_cf(&self, cf: &str, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete_range_cf(cf, begin_key, end_key)?;
let _ = batch.write()?;
Ok(())
}
}

Expand All @@ -171,7 +190,7 @@ mod tests {
let range = CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}
Expand All @@ -188,14 +207,14 @@ mod tests {
assert!(s.region_cache_snapshot_available());

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, false);
}
let s = hybrid_engine.snapshot(Some(snap_ctx.clone()));
assert!(!s.region_cache_snapshot_available());

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
}
snap_ctx.read_ts = 5;
Expand Down
1 change: 1 addition & 0 deletions components/hybrid_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod snapshot;
mod sst;
mod table_properties;
mod ttl_properties;
pub mod util;
mod write_batch;

pub use engine::HybridEngine;
Expand Down
59 changes: 53 additions & 6 deletions components/hybrid_engine/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt::{self, Debug, Formatter};
use std::{
fmt::{self, Debug, Formatter},
ops::Deref,
};

use engine_traits::{
CfNamesExt, IterOptions, Iterable, KvEngine, Peekable, RangeCacheEngine, ReadOptions, Result,
Snapshot, SnapshotMiscExt,
CfNamesExt, DbVector, IterOptions, Iterable, KvEngine, Peekable, RangeCacheEngine, ReadOptions,
Result, Snapshot, SnapshotMiscExt, CF_DEFAULT,
};

use crate::engine_iterator::HybridEngineIterator;
Expand Down Expand Up @@ -33,6 +36,14 @@ where
pub fn region_cache_snapshot_available(&self) -> bool {
self.region_cache_snap.is_some()
}

pub fn region_cache_snap(&self) -> Option<&EC::Snapshot> {
self.region_cache_snap.as_ref()
}

pub fn disk_snap(&self) -> &EK::Snapshot {
&self.disk_snap
}
}

impl<EK, EC> Snapshot for HybridEngineSnapshot<EK, EC>
Expand Down Expand Up @@ -64,15 +75,40 @@ where
}
}

/// TODO: May be possible to replace this with an Either.
pub struct HybridDbVector(Box<dyn DbVector>);

impl DbVector for HybridDbVector {}

impl Deref for HybridDbVector {
type Target = [u8];

fn deref(&self) -> &[u8] {
&self.0
}
}

impl Debug for HybridDbVector {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "{:?}", &**self)
}
}

impl<'a> PartialEq<&'a [u8]> for HybridDbVector {
fn eq(&self, rhs: &&[u8]) -> bool {
**rhs == **self
}
}

impl<EK, EC> Peekable for HybridEngineSnapshot<EK, EC>
where
EK: KvEngine,
EC: RangeCacheEngine,
{
type DbVector = EK::DbVector;
type DbVector = HybridDbVector;

fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result<Option<Self::DbVector>> {
unimplemented!()
self.get_value_cf_opt(opts, CF_DEFAULT, key)
}

fn get_value_cf_opt(
Expand All @@ -81,7 +117,18 @@ where
cf: &str,
key: &[u8],
) -> Result<Option<Self::DbVector>> {
unimplemented!()
self.region_cache_snap.as_ref().map_or_else(
|| {
self.disk_snap
.get_value_cf_opt(opts, cf, key)
.map(|r| r.map(|e| HybridDbVector(Box::new(e))))
},
|cache_snapshot| {
cache_snapshot
.get_value_cf_opt(opts, cf, key)
.map(|r| r.map(|e| HybridDbVector(Box::new(e))))
},
)
}
}

Expand Down
46 changes: 46 additions & 0 deletions components/hybrid_engine/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use engine_rocks::{util::new_engine, RocksEngine};
use engine_traits::{Result, CF_DEFAULT, CF_LOCK, CF_WRITE};
use region_cache_memory_engine::RangeCacheMemoryEngine;
use tempfile::{Builder, TempDir};

use crate::HybridEngine;

/// Create a [`HybridEngine`] using temporary storage in `prefix`.
/// Once the memory engine is created, runs `configure_memory_engine_fn`.
/// Returns the handle to temporary directory and HybridEngine.
/// # Example
///
/// ```
/// use hybrid_engine::util::hybrid_engine_for_tests;
/// let (_path, _hybrid_engine) = hybrid_engine_for_tests("temp", |memory_engine| {
/// let range = engine_traits::CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
/// memory_engine.new_range(range.clone());
/// {
/// let mut core = memory_engine.core().write().unwrap();
/// core.mut_range_manager().set_range_readable(&range, true);
/// core.mut_range_manager().set_safe_ts(&range, 10);
/// }
/// })
/// .unwrap();
/// ```
pub fn hybrid_engine_for_tests<F>(
prefix: &str,
configure_memory_engine_fn: F,
) -> Result<(TempDir, HybridEngine<RocksEngine, RangeCacheMemoryEngine>)>
where
F: FnOnce(&RangeCacheMemoryEngine),
{
let path = Builder::new().prefix(prefix).tempdir()?;
let disk_engine = new_engine(
path.path().to_str().unwrap(),
&[CF_DEFAULT, CF_LOCK, CF_WRITE],
)?;
let memory_engine = RangeCacheMemoryEngine::new(Arc::default());
configure_memory_engine_fn(&memory_engine);
let hybrid_engine = HybridEngine::new(disk_engine, memory_engine);
Ok((path, hybrid_engine))
}
73 changes: 51 additions & 22 deletions components/hybrid_engine/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,34 +124,63 @@ impl<EK: KvEngine> Mutable for HybridEngineWriteBatch<EK> {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use engine_traits::{
CacheRange, KvEngine, Mutable, Peekable, SnapshotContext, WriteBatch, WriteBatchExt,
};

use engine_rocks::util::new_engine;
use engine_traits::{CacheRange, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_WRITE};
use region_cache_memory_engine::RangeCacheMemoryEngine;
use tempfile::Builder;
use crate::util::hybrid_engine_for_tests;

use crate::HybridEngine;
#[test]
fn test_write_to_both_engines() {
let range = CacheRange::new(b"".to_vec(), b"z".to_vec());
let range_clone = range.clone();
let (_path, hybrid_engine) = hybrid_engine_for_tests("temp", move |memory_engine| {
memory_engine.new_range(range_clone.clone());
{
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager()
.set_range_readable(&range_clone, true);
core.mut_range_manager().set_safe_ts(&range_clone, 5);
}
})
.unwrap();
let mut write_batch = hybrid_engine.write_batch();
write_batch.put(b"hello", b"world").unwrap();
let seq = write_batch.write().unwrap();
assert!(seq > 0);
let actual: &[u8] = &hybrid_engine.get_value(b"hello").unwrap().unwrap();
assert_eq!(b"world", &actual);
let ctx = SnapshotContext {
range: Some(range.clone()),
read_ts: 10,
};
let snap = hybrid_engine.snapshot(Some(ctx));
let actual: &[u8] = &snap.get_value(b"hello").unwrap().unwrap();
assert_eq!(b"world", &actual);
let actual: &[u8] = &snap.disk_snap().get_value(b"hello").unwrap().unwrap();
assert_eq!(b"world", &actual);
let actual: &[u8] = &snap
.region_cache_snap()
.unwrap()
.get_value(b"hello")
.unwrap()
.unwrap();
assert_eq!(b"world", &actual);
}

#[test]
fn test_region_cache_memory_engine() {
let path = Builder::new().prefix("temp").tempdir().unwrap();
let disk_engine = new_engine(
path.path().to_str().unwrap(),
&[CF_DEFAULT, CF_LOCK, CF_WRITE],
)
fn test_range_cache_memory_engine() {
let (_path, hybrid_engine) = hybrid_engine_for_tests("temp", |memory_engine| {
let range = CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}
})
.unwrap();
let memory_engine = RangeCacheMemoryEngine::new(Arc::default());
let range = CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().lock().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}

let hybrid_engine =
HybridEngine::<_, RangeCacheMemoryEngine>::new(disk_engine, memory_engine.clone());
let mut write_batch = hybrid_engine.write_batch();
write_batch
.cache_write_batch
Expand Down
19 changes: 14 additions & 5 deletions components/raftstore/src/store/worker/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,7 @@ mod tests {
use engine_test::kv::{KvTestEngine, KvTestSnapshot};
use engine_traits::{CacheRange, MiscExt, Peekable, SyncMutable, ALL_CFS};
use hybrid_engine::{HybridEngine, HybridEngineSnapshot};
use keys::DATA_PREFIX;
use kvproto::{metapb::RegionEpoch, raft_cmdpb::*};
use region_cache_memory_engine::RangeCacheMemoryEngine;
use tempfile::{Builder, TempDir};
Expand Down Expand Up @@ -2533,6 +2534,15 @@ mod tests {
};
let leader2 = prs[0].clone();
region1.set_region_epoch(epoch13.clone());
let range = CacheRange::from_region(&region1);
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 1);
}
let kv = (&[DATA_PREFIX, b'a'], b"b");
reader.kv_engine.put(kv.0, kv.1).unwrap();
let term6 = 6;
let mut lease = Lease::new(Duration::seconds(1), Duration::milliseconds(250)); // 1s is long enough.
let read_progress = Arc::new(RegionReadProgress::new(&region1, 1, 1, 1));
Expand Down Expand Up @@ -2574,10 +2584,8 @@ mod tests {
let s = get_snapshot(None, &mut reader, cmd.clone(), &rx);
assert!(!s.region_cache_snapshot_available());

let range = CacheRange::from_region(&region1);
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}
Expand All @@ -2589,16 +2597,17 @@ mod tests {

let s = get_snapshot(Some(snap_ctx.clone()), &mut reader, cmd.clone(), &rx);
assert!(s.region_cache_snapshot_available());
assert_eq!(s.get_value(kv.0).unwrap().unwrap(), kv.1);

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, false);
}
let s = get_snapshot(Some(snap_ctx.clone()), &mut reader, cmd.clone(), &rx);
assert!(!s.region_cache_snapshot_available());

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
}
snap_ctx.read_ts = 5;
Expand Down

0 comments on commit 87d9a97

Please sign in to comment.