Skip to content

Commit

Permalink
*: support start with raftkv2 (#13981)
Browse files Browse the repository at this point in the history
ref #12842

Not all functionality are supported, this is just a naive pure
KV system with transaction support.

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
BusyJay and ti-chi-bot committed Dec 23, 2022
1 parent a499caf commit 90505f5
Show file tree
Hide file tree
Showing 19 changed files with 1,980 additions and 133 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions cmd/tikv-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{path::Path, process};
use clap::{crate_authors, App, Arg};
use serde_json::{Map, Value};
use server::setup::{ensure_no_unrecognized_config, validate_and_persist_config};
use tikv::config::{to_flatten_config_info, TikvConfig};
use tikv::{
config::{to_flatten_config_info, TikvConfig},
storage::config::EngineType,
};

fn main() {
let build_timestamp = option_env!("TIKV_BUILD_TIME");
Expand Down Expand Up @@ -207,5 +210,8 @@ fn main() {
process::exit(0);
}

server::server::run_tikv(config);
match config.storage.engine {
EngineType::RaftKv => server::server::run_tikv(config),
EngineType::RaftKv2 => server::server2::run_tikv(config),
}
}
40 changes: 19 additions & 21 deletions components/raftstore-v2/src/batch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tikv_util::{
sys::SysQuota,
time::Instant as TiInstant,
timer::SteadyTimer,
worker::{Scheduler, Worker},
worker::{LazyWorker, Scheduler, Worker},
yatp_pool::{DefaultTicker, FuturePool, YatpPoolBuilder},
Either,
};
Expand Down Expand Up @@ -373,18 +373,18 @@ pub struct Schedulers<EK: KvEngine, ER: RaftEngine> {
struct Workers<EK: KvEngine, ER: RaftEngine> {
/// Worker for fetching raft logs asynchronously
async_read: Worker,
pd: Worker,
pd: LazyWorker<pd::Task>,
async_write: StoreWriters<EK, ER>,

// Following is not maintained by raftstore itself.
background: Worker,
}

impl<EK: KvEngine, ER: RaftEngine> Workers<EK, ER> {
fn new(background: Worker) -> Self {
fn new(background: Worker, pd: LazyWorker<pd::Task>) -> Self {
Self {
async_read: Worker::new("async-read-worker"),
pd: Worker::new("pd-worker"),
pd,
async_write: StoreWriters::default(),
background,
}
Expand Down Expand Up @@ -415,6 +415,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
coprocessor_host: CoprocessorHost<EK>,
background: Worker,
pd_worker: LazyWorker<pd::Task>,
) -> Result<()>
where
T: Transport + 'static,
Expand All @@ -428,7 +429,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
.broadcast_normal(|| PeerMsg::Tick(PeerTick::PdHeartbeat));
});

let mut workers = Workers::new(background);
let mut workers = Workers::new(background, pd_worker);
workers
.async_write
.spawn(store_id, raft_engine.clone(), None, router, &trans, &cfg)?;
Expand All @@ -437,21 +438,18 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
read_runner.set_snap_mgr(snap_mgr.clone());
let read_scheduler = workers.async_read.start("async-read-worker", read_runner);

let pd_scheduler = workers.pd.start(
"pd-worker",
pd::Runner::new(
store_id,
pd_client,
raft_engine.clone(),
tablet_registry.clone(),
router.clone(),
workers.pd.remote(),
concurrency_manager,
causal_ts_provider,
self.logger.clone(),
self.shutdown.clone(),
),
);
workers.pd.start(pd::Runner::new(
store_id,
pd_client,
raft_engine.clone(),
tablet_registry.clone(),
router.clone(),
workers.pd.remote(),
concurrency_manager,
causal_ts_provider,
self.logger.clone(),
self.shutdown.clone(),
));

let split_check_scheduler = workers.background.start(
"split-check",
Expand All @@ -464,7 +462,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {

let schedulers = Schedulers {
read: read_scheduler,
pd: pd_scheduler,
pd: workers.pd.scheduler(),
write: workers.async_write.senders(),
split_check: split_check_scheduler,
};
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ pub use bootstrap::Bootstrap;
pub use fsm::StoreMeta;
pub use operation::{SimpleWriteBinary, SimpleWriteEncoder, StateStorage};
pub use raftstore::{store::Config, Error, Result};
pub use worker::pd::{FlowReporter, Task as PdTask};
30 changes: 28 additions & 2 deletions components/raftstore-v2/src/worker/pd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use concurrency_manager::ConcurrencyManager;
use engine_traits::{KvEngine, RaftEngine, TabletRegistry};
use kvproto::{metapb, pdpb};
use pd_client::PdClient;
use raftstore::store::{util::KeysInfoFormatter, TxnExt};
use raftstore::store::{util::KeysInfoFormatter, FlowStatsReporter, ReadStats, TxnExt, WriteStats};
use slog::{error, info, Logger};
use tikv_util::{time::UnixSecs, worker::Runnable};
use tikv_util::{
time::UnixSecs,
worker::{Runnable, Scheduler},
};
use yatp::{task::future::TaskCell, Remote};

use crate::{
Expand Down Expand Up @@ -206,6 +209,29 @@ where
}
}

#[derive(Clone)]
pub struct FlowReporter {
_scheduler: Scheduler<Task>,
}

impl FlowReporter {
pub fn new(scheduler: Scheduler<Task>) -> Self {
FlowReporter {
_scheduler: scheduler,
}
}
}

impl FlowStatsReporter for FlowReporter {
fn report_read_stats(&self, _read_stats: ReadStats) {
// TODO
}

fn report_write_stats(&self, _write_stats: WriteStats) {
// TODO
}
}

mod requests {
use kvproto::raft_cmdpb::{
AdminCmdType, AdminRequest, ChangePeerRequest, ChangePeerV2Request, RaftCmdRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{

use causal_ts::CausalTsProvider;
use engine_traits::{KvEngine, RaftEngine};
use fail::fail_point;
use futures::{compat::Future01CompatExt, FutureExt};
use pd_client::PdClient;
use raftstore::{store::TxnExt, Result};
Expand Down Expand Up @@ -96,7 +95,7 @@ where

#[cfg(feature = "failpoints")]
let delay = (|| {
fail_point!("delay_update_max_ts", |_| true);
fail::fail_point!("delay_update_max_ts", |_| true);
false
})();
#[cfg(not(feature = "failpoints"))]
Expand Down
4 changes: 3 additions & 1 deletion components/raftstore-v2/tests/integrations/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use test_pd::mocker::Service;
use tikv_util::{
config::{ReadableDuration, VersionTrack},
store::new_peer,
worker::Worker,
worker::{LazyWorker, Worker},
};
use txn_types::WriteBatchFlags;

Expand Down Expand Up @@ -286,6 +286,7 @@ impl RunningState {
raftstore::coprocessor::Config::default(),
);
let background = Worker::new("background");
let pd_worker = LazyWorker::new("pd-worker");
system
.start(
store_id,
Expand All @@ -301,6 +302,7 @@ impl RunningState {
causal_ts_provider,
coprocessor_host,
background.clone(),
pd_worker,
)
.unwrap();

Expand Down
29 changes: 29 additions & 0 deletions components/raftstore/src/store/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,35 @@ impl TabletSnapManager {
true
}
}

pub fn total_snap_size(&self) -> Result<u64> {
let mut total_size = 0;
for entry in file_system::read_dir(&self.base)? {
let entry = match entry {
Ok(e) => e,
Err(e) if e.kind() == ErrorKind::NotFound => continue,
Err(e) => return Err(Error::from(e)),
};

let path = entry.path();
// Generated snapshots are just checkpoints, only counts received snapshots.
if !path
.file_name()
.and_then(|n| n.to_str())
.map_or(true, |n| n.starts_with(SNAP_REV_PREFIX))
{
continue;
}
for e in file_system::read_dir(path)? {
match e.and_then(|e| e.metadata()) {
Ok(m) => total_size += m.len(),
Err(e) if e.kind() == ErrorKind::NotFound => continue,
Err(e) => return Err(Error::from(e)),
}
}
}
Ok(total_size)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions components/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protobuf = { version = "2.8", features = ["bytes"] }
raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] }
raft_log_engine = { workspace = true }
raftstore = { workspace = true, features = ["engine_rocks"] }
raftstore-v2 = { workspace = true }
rand = "0.8"
resolved_ts = { workspace = true }
resource_metering = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions components/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ pub mod setup;
pub mod memory;
pub mod raft_engine_switch;
pub mod server;
pub mod server2;
pub mod signal_handler;
12 changes: 6 additions & 6 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ use tikv_util::{
thread_group::GroupProperties,
time::{Instant, Monitor},
worker::{Builder as WorkerBuilder, LazyWorker, Scheduler, Worker},
Either,
};
use tokio::runtime::Builder;

Expand Down Expand Up @@ -959,9 +960,9 @@ where
),
coprocessor_v2::Endpoint::new(&self.config.coprocessor_v2),
self.resolver.clone().unwrap(),
snap_mgr.clone(),
Either::Left(snap_mgr.clone()),
gc_worker.clone(),
check_leader_scheduler,
Some(check_leader_scheduler),
self.env.clone(),
unified_read_pool,
debug_thread_pool,
Expand Down Expand Up @@ -1649,7 +1650,7 @@ where
self.config.server.status_thread_pool_size,
self.cfg_controller.take().unwrap(),
Arc::new(self.config.security.clone()),
self.router.clone(),
self.engines.as_ref().unwrap().engine.raft_extension(),
self.store_path.clone(),
) {
Ok(status_server) => Box::new(status_server),
Expand Down Expand Up @@ -1951,13 +1952,12 @@ fn get_lock_dir() -> String {

/// A small trait for components which can be trivially stopped. Lets us keep
/// a list of these in `TiKV`, rather than storing each component individually.
trait Stop {
pub(crate) trait Stop {
fn stop(self: Box<Self>);
}

impl<E, R> Stop for StatusServer<E, R>
impl<R> Stop for StatusServer<R>
where
E: 'static,
R: 'static + Send,
{
fn stop(self: Box<Self>) {
Expand Down

0 comments on commit 90505f5

Please sign in to comment.