Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-Memory Engine: WriteBatch with Skiplist Engine #16433

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/hybrid_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ txn_types = { workspace = true }
tikv_util = { workspace = true }
engine_rocks = { workspace = true }
region_cache_memory_engine = { workspace = true }
tempfile = "3.0"

[dev-dependencies]
tempfile = "3.0"
41 changes: 30 additions & 11 deletions components/hybrid_engine/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{
KvEngine, Peekable, RangeCacheEngine, ReadOptions, Result, SnapshotContext, SnapshotMiscExt,
SyncMutable, WriteBatchExt,
KvEngine, Mutable, Peekable, RangeCacheEngine, ReadOptions, Result, SnapshotContext,
SnapshotMiscExt, SyncMutable, WriteBatch, WriteBatchExt,
};

use crate::snapshot::HybridEngineSnapshot;
Expand Down Expand Up @@ -122,29 +122,48 @@ impl<EK, EC> SyncMutable for HybridEngine<EK, EC>
where
EK: KvEngine,
EC: RangeCacheEngine,
HybridEngine<EK, EC>: WriteBatchExt,
{
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.put(key, value)?;
let _ = batch.write()?;
Ok(())
}

fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.put_cf(cf, key, value)?;
let _ = batch.write()?;
Ok(())
}

fn delete(&self, key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete(key)?;
let _ = batch.write()?;
Ok(())
}

fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete_cf(cf, key)?;
let _ = batch.write()?;
Ok(())
}

fn delete_range(&self, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete_range(begin_key, end_key)?;
let _ = batch.write()?;
Ok(())
}

fn delete_range_cf(&self, cf: &str, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
unimplemented!()
let mut batch = self.write_batch();
batch.delete_range_cf(cf, begin_key, end_key)?;
let _ = batch.write()?;
Ok(())
}
}

Expand All @@ -171,7 +190,7 @@ mod tests {
let range = CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}
Expand All @@ -188,14 +207,14 @@ mod tests {
assert!(s.region_cache_snapshot_available());

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, false);
}
let s = hybrid_engine.snapshot(Some(snap_ctx.clone()));
assert!(!s.region_cache_snapshot_available());

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
}
snap_ctx.read_ts = 5;
Expand Down
1 change: 1 addition & 0 deletions components/hybrid_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod snapshot;
mod sst;
mod table_properties;
mod ttl_properties;
pub mod util;
mod write_batch;

pub use engine::HybridEngine;
Expand Down
59 changes: 53 additions & 6 deletions components/hybrid_engine/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt::{self, Debug, Formatter};
use std::{
fmt::{self, Debug, Formatter},
ops::Deref,
};

use engine_traits::{
CfNamesExt, IterOptions, Iterable, KvEngine, Peekable, RangeCacheEngine, ReadOptions, Result,
Snapshot, SnapshotMiscExt,
CfNamesExt, DbVector, IterOptions, Iterable, KvEngine, Peekable, RangeCacheEngine, ReadOptions,
Result, Snapshot, SnapshotMiscExt, CF_DEFAULT,
};

use crate::engine_iterator::HybridEngineIterator;
Expand Down Expand Up @@ -33,6 +36,14 @@ where
pub fn region_cache_snapshot_available(&self) -> bool {
self.region_cache_snap.is_some()
}

pub fn region_cache_snap(&self) -> Option<&EC::Snapshot> {
self.region_cache_snap.as_ref()
}

pub fn disk_snap(&self) -> &EK::Snapshot {
&self.disk_snap
}
}

impl<EK, EC> Snapshot for HybridEngineSnapshot<EK, EC>
Expand Down Expand Up @@ -64,15 +75,40 @@ where
}
}

/// TODO: May be possible to replace this with an Either.
pub struct HybridDbVector(Box<dyn DbVector>);
afeinberg marked this conversation as resolved.
Show resolved Hide resolved

impl DbVector for HybridDbVector {}

impl Deref for HybridDbVector {
type Target = [u8];

fn deref(&self) -> &[u8] {
&self.0
}
}

impl Debug for HybridDbVector {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "{:?}", &**self)
}
}

impl<'a> PartialEq<&'a [u8]> for HybridDbVector {
fn eq(&self, rhs: &&[u8]) -> bool {
**rhs == **self
}
}

impl<EK, EC> Peekable for HybridEngineSnapshot<EK, EC>
where
EK: KvEngine,
EC: RangeCacheEngine,
{
type DbVector = EK::DbVector;
type DbVector = HybridDbVector;

fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result<Option<Self::DbVector>> {
unimplemented!()
self.get_value_cf_opt(opts, CF_DEFAULT, key)
}

fn get_value_cf_opt(
Expand All @@ -81,7 +117,18 @@ where
cf: &str,
key: &[u8],
) -> Result<Option<Self::DbVector>> {
unimplemented!()
self.region_cache_snap.as_ref().map_or_else(
|| {
self.disk_snap
.get_value_cf_opt(opts, cf, key)
.map(|r| r.map(|e| HybridDbVector(Box::new(e))))
},
|cache_snapshot| {
cache_snapshot
.get_value_cf_opt(opts, cf, key)
.map(|r| r.map(|e| HybridDbVector(Box::new(e))))
},
)
}
}

Expand Down
46 changes: 46 additions & 0 deletions components/hybrid_engine/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use engine_rocks::{util::new_engine, RocksEngine};
use engine_traits::{Result, CF_DEFAULT, CF_LOCK, CF_WRITE};
use region_cache_memory_engine::RangeCacheMemoryEngine;
use tempfile::{Builder, TempDir};

use crate::HybridEngine;

/// Create a [`HybridEngine`] using temporary storage in `prefix`.
/// Once the memory engine is created, runs `configure_memory_engine_fn`.
/// Returns the handle to temporary directory and HybridEngine.
/// # Example
///
/// ```
/// use hybrid_engine::util::hybrid_engine_for_tests;
/// let (_path, _hybrid_engine) = hybrid_engine_for_tests("temp", |memory_engine| {
/// let range = engine_traits::CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
/// memory_engine.new_range(range.clone());
/// {
/// let mut core = memory_engine.core().write().unwrap();
/// core.mut_range_manager().set_range_readable(&range, true);
/// core.mut_range_manager().set_safe_ts(&range, 10);
/// }
/// })
/// .unwrap();
/// ```
pub fn hybrid_engine_for_tests<F>(
prefix: &str,
configure_memory_engine_fn: F,
) -> Result<(TempDir, HybridEngine<RocksEngine, RangeCacheMemoryEngine>)>
where
F: FnOnce(&RangeCacheMemoryEngine),
{
let path = Builder::new().prefix(prefix).tempdir()?;
let disk_engine = new_engine(
path.path().to_str().unwrap(),
&[CF_DEFAULT, CF_LOCK, CF_WRITE],
)?;
let memory_engine = RangeCacheMemoryEngine::new(Arc::default());
configure_memory_engine_fn(&memory_engine);
let hybrid_engine = HybridEngine::new(disk_engine, memory_engine);
Ok((path, hybrid_engine))
}
73 changes: 51 additions & 22 deletions components/hybrid_engine/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,34 +124,63 @@ impl<EK: KvEngine> Mutable for HybridEngineWriteBatch<EK> {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use engine_traits::{
CacheRange, KvEngine, Mutable, Peekable, SnapshotContext, WriteBatch, WriteBatchExt,
};

use engine_rocks::util::new_engine;
use engine_traits::{CacheRange, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_WRITE};
use region_cache_memory_engine::RangeCacheMemoryEngine;
use tempfile::Builder;
use crate::util::hybrid_engine_for_tests;

use crate::HybridEngine;
#[test]
fn test_write_to_both_engines() {
let range = CacheRange::new(b"".to_vec(), b"z".to_vec());
let range_clone = range.clone();
let (_path, hybrid_engine) = hybrid_engine_for_tests("temp", move |memory_engine| {
memory_engine.new_range(range_clone.clone());
{
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager()
.set_range_readable(&range_clone, true);
core.mut_range_manager().set_safe_ts(&range_clone, 5);
}
})
.unwrap();
let mut write_batch = hybrid_engine.write_batch();
write_batch.put(b"hello", b"world").unwrap();
let seq = write_batch.write().unwrap();
assert!(seq > 0);
let actual: &[u8] = &hybrid_engine.get_value(b"hello").unwrap().unwrap();
assert_eq!(b"world", &actual);
let ctx = SnapshotContext {
range: Some(range.clone()),
read_ts: 10,
};
let snap = hybrid_engine.snapshot(Some(ctx));
let actual: &[u8] = &snap.get_value(b"hello").unwrap().unwrap();
assert_eq!(b"world", &actual);
let actual: &[u8] = &snap.disk_snap().get_value(b"hello").unwrap().unwrap();
assert_eq!(b"world", &actual);
let actual: &[u8] = &snap
.region_cache_snap()
.unwrap()
.get_value(b"hello")
.unwrap()
.unwrap();
assert_eq!(b"world", &actual);
}

#[test]
fn test_region_cache_memory_engine() {
let path = Builder::new().prefix("temp").tempdir().unwrap();
let disk_engine = new_engine(
path.path().to_str().unwrap(),
&[CF_DEFAULT, CF_LOCK, CF_WRITE],
)
fn test_range_cache_memory_engine() {
let (_path, hybrid_engine) = hybrid_engine_for_tests("temp", |memory_engine| {
let range = CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}
})
.unwrap();
let memory_engine = RangeCacheMemoryEngine::new(Arc::default());
let range = CacheRange::new(b"k00".to_vec(), b"k10".to_vec());
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().lock().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}

let hybrid_engine =
HybridEngine::<_, RangeCacheMemoryEngine>::new(disk_engine, memory_engine.clone());
let mut write_batch = hybrid_engine.write_batch();
write_batch
.cache_write_batch
Expand Down
19 changes: 14 additions & 5 deletions components/raftstore/src/store/worker/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,7 @@ mod tests {
use engine_test::kv::{KvTestEngine, KvTestSnapshot};
use engine_traits::{CacheRange, MiscExt, Peekable, SyncMutable, ALL_CFS};
use hybrid_engine::{HybridEngine, HybridEngineSnapshot};
use keys::DATA_PREFIX;
use kvproto::{metapb::RegionEpoch, raft_cmdpb::*};
use region_cache_memory_engine::RangeCacheMemoryEngine;
use tempfile::{Builder, TempDir};
Expand Down Expand Up @@ -2533,6 +2534,15 @@ mod tests {
};
let leader2 = prs[0].clone();
region1.set_region_epoch(epoch13.clone());
let range = CacheRange::from_region(&region1);
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 1);
}
let kv = (&[DATA_PREFIX, b'a'], b"b");
reader.kv_engine.put(kv.0, kv.1).unwrap();
let term6 = 6;
let mut lease = Lease::new(Duration::seconds(1), Duration::milliseconds(250)); // 1s is long enough.
let read_progress = Arc::new(RegionReadProgress::new(&region1, 1, 1, 1));
Expand Down Expand Up @@ -2574,10 +2584,8 @@ mod tests {
let s = get_snapshot(None, &mut reader, cmd.clone(), &rx);
assert!(!s.region_cache_snapshot_available());

let range = CacheRange::from_region(&region1);
memory_engine.new_range(range.clone());
{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
core.mut_range_manager().set_safe_ts(&range, 10);
}
Expand All @@ -2589,16 +2597,17 @@ mod tests {

let s = get_snapshot(Some(snap_ctx.clone()), &mut reader, cmd.clone(), &rx);
assert!(s.region_cache_snapshot_available());
assert_eq!(s.get_value(kv.0).unwrap().unwrap(), kv.1);

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, false);
}
let s = get_snapshot(Some(snap_ctx.clone()), &mut reader, cmd.clone(), &rx);
assert!(!s.region_cache_snapshot_available());

{
let mut core = memory_engine.core().lock().unwrap();
let mut core = memory_engine.core().write().unwrap();
core.mut_range_manager().set_range_readable(&range, true);
}
snap_ctx.read_ts = 5;
Expand Down