Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support start with raftkv2 #13981

Merged
merged 7 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions cmd/tikv-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{path::Path, process};
use clap::{crate_authors, App, Arg};
use serde_json::{Map, Value};
use server::setup::{ensure_no_unrecognized_config, validate_and_persist_config};
use tikv::config::{to_flatten_config_info, TikvConfig};
use tikv::{
config::{to_flatten_config_info, TikvConfig},
storage::config::EngineType,
};

fn main() {
let build_timestamp = option_env!("TIKV_BUILD_TIME");
Expand Down Expand Up @@ -207,5 +210,8 @@ fn main() {
process::exit(0);
}

server::server::run_tikv(config);
match config.storage.engine {
EngineType::RaftKv => server::server::run_tikv(config),
EngineType::RaftKv2 => server::server2::run_tikv(config),
}
}
40 changes: 19 additions & 21 deletions components/raftstore-v2/src/batch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tikv_util::{
sys::SysQuota,
time::Instant as TiInstant,
timer::SteadyTimer,
worker::{Scheduler, Worker},
worker::{LazyWorker, Scheduler, Worker},
yatp_pool::{DefaultTicker, FuturePool, YatpPoolBuilder},
Either,
};
Expand Down Expand Up @@ -373,18 +373,18 @@ pub struct Schedulers<EK: KvEngine, ER: RaftEngine> {
struct Workers<EK: KvEngine, ER: RaftEngine> {
/// Worker for fetching raft logs asynchronously
async_read: Worker,
pd: Worker,
pd: LazyWorker<pd::Task>,
async_write: StoreWriters<EK, ER>,

// Following is not maintained by raftstore itself.
background: Worker,
}

impl<EK: KvEngine, ER: RaftEngine> Workers<EK, ER> {
fn new(background: Worker) -> Self {
fn new(background: Worker, pd: LazyWorker<pd::Task>) -> Self {
Self {
async_read: Worker::new("async-read-worker"),
pd: Worker::new("pd-worker"),
pd,
async_write: StoreWriters::default(),
background,
}
Expand Down Expand Up @@ -415,6 +415,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
coprocessor_host: CoprocessorHost<EK>,
background: Worker,
pd_worker: LazyWorker<pd::Task>,
) -> Result<()>
where
T: Transport + 'static,
Expand All @@ -428,7 +429,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
.broadcast_normal(|| PeerMsg::Tick(PeerTick::PdHeartbeat));
});

let mut workers = Workers::new(background);
let mut workers = Workers::new(background, pd_worker);
workers
.async_write
.spawn(store_id, raft_engine.clone(), None, router, &trans, &cfg)?;
Expand All @@ -437,21 +438,18 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
read_runner.set_snap_mgr(snap_mgr.clone());
let read_scheduler = workers.async_read.start("async-read-worker", read_runner);

let pd_scheduler = workers.pd.start(
"pd-worker",
pd::Runner::new(
store_id,
pd_client,
raft_engine.clone(),
tablet_registry.clone(),
router.clone(),
workers.pd.remote(),
concurrency_manager,
causal_ts_provider,
self.logger.clone(),
self.shutdown.clone(),
),
);
workers.pd.start(pd::Runner::new(
store_id,
pd_client,
raft_engine.clone(),
tablet_registry.clone(),
router.clone(),
workers.pd.remote(),
concurrency_manager,
causal_ts_provider,
self.logger.clone(),
self.shutdown.clone(),
));

let split_check_scheduler = workers.background.start(
"split-check",
Expand All @@ -464,7 +462,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {

let schedulers = Schedulers {
read: read_scheduler,
pd: pd_scheduler,
pd: workers.pd.scheduler(),
write: workers.async_write.senders(),
split_check: split_check_scheduler,
};
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ pub use bootstrap::Bootstrap;
pub use fsm::StoreMeta;
pub use operation::{SimpleWriteBinary, SimpleWriteEncoder, StateStorage};
pub use raftstore::{store::Config, Error, Result};
pub use worker::pd::{FlowReporter, Task as PdTask};
30 changes: 28 additions & 2 deletions components/raftstore-v2/src/worker/pd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use concurrency_manager::ConcurrencyManager;
use engine_traits::{KvEngine, RaftEngine, TabletRegistry};
use kvproto::{metapb, pdpb};
use pd_client::PdClient;
use raftstore::store::{util::KeysInfoFormatter, TxnExt};
use raftstore::store::{util::KeysInfoFormatter, FlowStatsReporter, ReadStats, TxnExt, WriteStats};
use slog::{error, info, Logger};
use tikv_util::{time::UnixSecs, worker::Runnable};
use tikv_util::{
time::UnixSecs,
worker::{Runnable, Scheduler},
};
use yatp::{task::future::TaskCell, Remote};

use crate::{
Expand Down Expand Up @@ -206,6 +209,29 @@ where
}
}

#[derive(Clone)]
pub struct FlowReporter {
_scheduler: Scheduler<Task>,
}

impl FlowReporter {
pub fn new(scheduler: Scheduler<Task>) -> Self {
FlowReporter {
_scheduler: scheduler,
}
}
}

impl FlowStatsReporter for FlowReporter {
fn report_read_stats(&self, _read_stats: ReadStats) {
// TODO
}

fn report_write_stats(&self, _write_stats: WriteStats) {
// TODO
}
}

mod requests {
use kvproto::raft_cmdpb::{
AdminCmdType, AdminRequest, ChangePeerRequest, ChangePeerV2Request, RaftCmdRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{

use causal_ts::CausalTsProvider;
use engine_traits::{KvEngine, RaftEngine};
use fail::fail_point;
use futures::{compat::Future01CompatExt, FutureExt};
use pd_client::PdClient;
use raftstore::{store::TxnExt, Result};
Expand Down Expand Up @@ -96,7 +95,7 @@ where

#[cfg(feature = "failpoints")]
let delay = (|| {
fail_point!("delay_update_max_ts", |_| true);
fail::fail_point!("delay_update_max_ts", |_| true);
false
})();
#[cfg(not(feature = "failpoints"))]
Expand Down
4 changes: 3 additions & 1 deletion components/raftstore-v2/tests/integrations/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use test_pd::mocker::Service;
use tikv_util::{
config::{ReadableDuration, VersionTrack},
store::new_peer,
worker::Worker,
worker::{LazyWorker, Worker},
};
use txn_types::WriteBatchFlags;

Expand Down Expand Up @@ -286,6 +286,7 @@ impl RunningState {
raftstore::coprocessor::Config::default(),
);
let background = Worker::new("background");
let pd_worker = LazyWorker::new("pd-worker");
system
.start(
store_id,
Expand All @@ -301,6 +302,7 @@ impl RunningState {
causal_ts_provider,
coprocessor_host,
background.clone(),
pd_worker,
)
.unwrap();

Expand Down
29 changes: 29 additions & 0 deletions components/raftstore/src/store/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,35 @@ impl TabletSnapManager {
true
}
}

pub fn total_snap_size(&self) -> Result<u64> {
let mut total_size = 0;
for entry in file_system::read_dir(&self.base)? {
let entry = match entry {
Ok(e) => e,
Err(e) if e.kind() == ErrorKind::NotFound => continue,
Err(e) => return Err(Error::from(e)),
};

let path = entry.path();
// Generated snapshots are just checkpoints, only counts received snapshots.
if !path
.file_name()
.and_then(|n| n.to_str())
.map_or(true, |n| n.starts_with(SNAP_REV_PREFIX))
{
continue;
}
for e in file_system::read_dir(path)? {
match e.and_then(|e| e.metadata()) {
Ok(m) => total_size += m.len(),
Err(e) if e.kind() == ErrorKind::NotFound => continue,
Err(e) => return Err(Error::from(e)),
}
}
}
Ok(total_size)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions components/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protobuf = { version = "2.8", features = ["bytes"] }
raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] }
raft_log_engine = { workspace = true }
raftstore = { workspace = true, features = ["engine_rocks"] }
raftstore-v2 = { workspace = true }
rand = "0.8"
resolved_ts = { workspace = true }
resource_metering = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions components/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ pub mod setup;
pub mod memory;
pub mod raft_engine_switch;
pub mod server;
pub mod server2;
pub mod signal_handler;
12 changes: 6 additions & 6 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ use tikv_util::{
thread_group::GroupProperties,
time::{Instant, Monitor},
worker::{Builder as WorkerBuilder, LazyWorker, Scheduler, Worker},
Either,
};
use tokio::runtime::Builder;

Expand Down Expand Up @@ -959,9 +960,9 @@ where
),
coprocessor_v2::Endpoint::new(&self.config.coprocessor_v2),
self.resolver.clone().unwrap(),
snap_mgr.clone(),
Either::Left(snap_mgr.clone()),
gc_worker.clone(),
check_leader_scheduler,
Some(check_leader_scheduler),
self.env.clone(),
unified_read_pool,
debug_thread_pool,
Expand Down Expand Up @@ -1649,7 +1650,7 @@ where
self.config.server.status_thread_pool_size,
self.cfg_controller.take().unwrap(),
Arc::new(self.config.security.clone()),
self.router.clone(),
self.engines.as_ref().unwrap().engine.raft_extension(),
self.store_path.clone(),
) {
Ok(status_server) => Box::new(status_server),
Expand Down Expand Up @@ -1951,13 +1952,12 @@ fn get_lock_dir() -> String {

/// A small trait for components which can be trivially stopped. Lets us keep
/// a list of these in `TiKV`, rather than storing each component individually.
trait Stop {
pub(crate) trait Stop {
fn stop(self: Box<Self>);
}

impl<E, R> Stop for StatusServer<E, R>
impl<R> Stop for StatusServer<R>
where
E: 'static,
R: 'static + Send,
{
fn stop(self: Box<Self>) {
Expand Down