Skip to content

Commit

Permalink
In-memory Engine: integrate hybrid engine with TiKV (#16132)
Browse files Browse the repository at this point in the history
ref #16141

Integrate hybrid engine with TiKV. User can choose to use hybrid engine by set `memory_engine_enabled` in TiKV config.

Signed-off-by: SpadeA-Tang <u6748471@anu.edu.au>
  • Loading branch information
SpadeA-Tang committed Dec 13, 2023
1 parent 820b220 commit 8e8c6ab
Show file tree
Hide file tree
Showing 20 changed files with 566 additions and 71 deletions.
25 changes: 24 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ raftstore = { workspace = true, features = ["engine_rocks"] }
raftstore-v2 = { workspace = true }
rand = "0.7.3"
regex = "1.3"
region_cache_memory_engine = { workspace = true }
resource_control = { workspace = true }
resource_metering = { workspace = true }
rev_lines = "0.2.1"
Expand Down Expand Up @@ -321,6 +322,7 @@ encryption_export = { path = "components/encryption/export" }
engine_panic = { path = "components/engine_panic" }
engine_rocks = { path = "components/engine_rocks" }
hybrid_engine = { path = "components/hybrid_engine" }
region_cache_memory_engine = { path = "components/region_cache_memory_engine" }
engine_rocks_helper = { path = "components/engine_rocks_helper" }
engine_test = { path = "components/engine_test", default-features = false }
engine_traits = { path = "components/engine_traits" }
Expand Down
5 changes: 5 additions & 0 deletions components/engine_panic/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,9 @@ impl MiscExt for PanicEngine {
fn get_accumulated_flush_count_cf(cf: &str) -> Result<u64> {
panic!()
}

type DiskEngine = PanicEngine;
fn get_disk_engine(&self) -> &Self::DiskEngine {
panic!()
}
}
5 changes: 5 additions & 0 deletions components/engine_rocks/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ impl MiscExt for RocksEngine {
.get();
Ok(n)
}

type DiskEngine = RocksEngine;
fn get_disk_engine(&self) -> &Self::DiskEngine {
self
}
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions components/engine_traits/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,7 @@ pub trait MiscExt: CfNamesExt + FlowControlFactorsExt + WriteBatchExt {
}
Ok(n)
}

type DiskEngine;
fn get_disk_engine(&self) -> &Self::DiskEngine;
}
3 changes: 2 additions & 1 deletion components/hybrid_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ testexport = []
[dependencies]
engine_traits = { workspace = true }
txn_types = { workspace = true }
tikv_util = { workspace = true }
tikv_util = { workspace = true }
engine_rocks = { workspace = true }
13 changes: 13 additions & 0 deletions components/hybrid_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ where
}
}

impl<EK, EC> HybridEngine<EK, EC>
where
EK: KvEngine,
EC: RegionCacheEngine,
{
pub fn new(disk_engine: EK, region_cache_engine: EC) -> Self {
Self {
disk_engine,
region_cache_engine,
}
}
}

// todo: implement KvEngine methods as well as it's super traits.
impl<EK, EC> KvEngine for HybridEngine<EK, EC>
where
Expand Down
2 changes: 2 additions & 0 deletions components/hybrid_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ mod sst;
mod table_properties;
mod ttl_properties;
mod write_batch;

pub use engine::HybridEngine;
5 changes: 5 additions & 0 deletions components/hybrid_engine/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,9 @@ where
fn get_accumulated_flush_count_cf(cf: &str) -> Result<u64> {
unimplemented!()
}

type DiskEngine = EK::DiskEngine;
fn get_disk_engine(&self) -> &Self::DiskEngine {
self.disk_engine().get_disk_engine()
}
}
18 changes: 13 additions & 5 deletions components/raftstore/src/compacted_event_sender.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
use std::sync::Mutex;

use engine_rocks::{CompactedEventSender, RocksCompactedEvent, RocksEngine};
use engine_traits::RaftEngine;
use engine_rocks::{CompactedEventSender, RocksCompactedEvent};
use engine_traits::{KvEngine, RaftEngine};
use tikv_util::error_unknown;

use crate::store::{fsm::store::RaftRouter, StoreMsg};

// raftstore v1's implementation
pub struct RaftRouterCompactedEventSender<ER: RaftEngine> {
pub router: Mutex<RaftRouter<RocksEngine, ER>>,
pub struct RaftRouterCompactedEventSender<EK, ER>
where
EK: KvEngine,
ER: RaftEngine,
{
pub router: Mutex<RaftRouter<EK, ER>>,
}

impl<ER: RaftEngine> CompactedEventSender for RaftRouterCompactedEventSender<ER> {
impl<EK, ER> CompactedEventSender for RaftRouterCompactedEventSender<EK, ER>
where
EK: KvEngine<CompactedEvent = RocksCompactedEvent>,
ER: RaftEngine,
{
fn send(&self, event: RocksCompactedEvent) {
let router = self.router.lock().unwrap();
let event = StoreMsg::CompactedEvent(event);
Expand Down
13 changes: 13 additions & 0 deletions components/region_cache_memory_engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "region_cache_memory_engine"
version = "0.0.1"
edition = "2021"
publish = false

[features]
testexport = []

[dependencies]
engine_traits = { workspace = true }
collections = { workspace = true }
skiplist-rs = { git = "https://github.com/tikv/skiplist-rs.git", branch = "main" }

0 comments on commit 8e8c6ab

Please sign in to comment.